lf_queue/queue.rs
1//! A lock-free multi-producer multi-consumer unbounded queue.
2
3use crate::cache_pad::CachePad;
4use crate::node::{Node, NODE_CAPACITY, NODE_SIZE};
5use crate::slot::{DRAINING, FILLED, READING};
6use crate::variant::sync::atomic::{fence, AtomicPtr, AtomicUsize, Ordering};
7use crate::variant::sync::Arc;
8use crate::variant::thread;
9
10use std::mem::MaybeUninit;
11
12/// A lock-free multi-producer multi-consumer unbounded queue.
13#[derive(Clone, Debug)]
14pub struct Queue<T> {
15 inner: Arc<Inner<T>>,
16}
17
18impl<T> Queue<T> {
19 /// Creates a new [`Queue`].
20 ///
21 /// # Examples
22 ///
23 /// ```
24 /// use lf_queue::Queue;
25 ///
26 /// let queue = Queue::<usize>::new();
27 /// ```
28 pub fn new() -> Self {
29 Self {
30 inner: Arc::new(Inner::new()),
31 }
32 }
33
34 /// Push an item into the [`Queue`].
35 ///
36 /// # Examples
37 ///
38 /// ```
39 /// use lf_queue::Queue;
40 ///
41 /// let queue = Queue::<usize>::new();
42 ///
43 /// queue.push(1);
44 /// queue.push(2);
45 /// queue.push(3);
46 /// ```
47 pub fn push(&self, item: T) {
48 self.inner.push(item)
49 }
50
51 /// Pop an item from the [`Queue`]. Returns none if the [`Queue`] is empty.
52 ///
53 /// # Examples
54 ///
55 /// ```
56 /// use lf_queue::Queue;
57 ///
58 /// let queue = Queue::<usize>::new();
59 /// for i in 0..8 {
60 /// queue.push(i);
61 /// }
62 ///
63 /// for i in 0..8 {
64 /// assert_eq!(i, queue.pop().unwrap());
65 /// }
66 ///
67 /// assert!(queue.pop().is_none());
68 /// ```
69 pub fn pop(&self) -> Option<T> {
70 self.inner.pop()
71 }
72}
73
74impl<T> Default for Queue<T> {
75 fn default() -> Self {
76 Self::new()
77 }
78}
79
80#[derive(Debug)]
81struct Inner<T> {
82 head: CachePad<Cursor<T>>,
83 tail: CachePad<Cursor<T>>,
84}
85
86impl<T> Inner<T> {
87 fn new() -> Self {
88 #[cfg(not(loom))]
89 let node: Node<T> = Node::UNINIT;
90 #[cfg(loom)]
91 let node: Node<T> = Node::new();
92
93 let first_node: *mut CachePad<Node<T>> = Box::into_raw(Box::new(CachePad::new(node)));
94
95 Self {
96 head: CachePad::new(Cursor {
97 index: AtomicUsize::new(0),
98 node: AtomicPtr::new(first_node),
99 }),
100 tail: CachePad::new(Cursor {
101 index: AtomicUsize::new(0),
102 node: AtomicPtr::new(first_node),
103 }),
104 }
105 }
106
107 fn push(&self, item: T) {
108 let mut tail_index = self.tail.index.load(Ordering::Acquire);
109 let mut tail_node = self.tail.node.load(Ordering::Acquire);
110
111 loop {
112 // Defines the node container offset of the slot where the provided item should be stored.
113 let offset = (tail_index >> MARK_BIT_SHIFT) % NODE_SIZE;
114
115 // If the node container is full, we wait until the next node is
116 // installed before moving forward and update our local reference.
117 if offset == NODE_CAPACITY {
118 thread::yield_now();
119 tail_index = self.tail.index.load(Ordering::Acquire);
120 tail_node = self.tail.node.load(Ordering::Acquire);
121 continue;
122 }
123
124 // Increments the tail index.
125 let next_tail_index = tail_index + (1 << MARK_BIT_SHIFT);
126 match self.tail.index.compare_exchange_weak(
127 tail_index,
128 next_tail_index,
129 Ordering::SeqCst,
130 Ordering::Acquire,
131 ) {
132 // The tail index has been updated successfully so we can now use
133 // the offset to store the item in the next available slot.
134 Ok(_) => unsafe {
135 // If we're filling the last available slot of the node container,
136 // we install a new one and update both the tail and the node to point
137 // to this new node.
138 if offset + 1 == NODE_CAPACITY {
139 #[cfg(not(loom))]
140 let node: Node<T> = Node::UNINIT;
141 #[cfg(loom)]
142 let node: Node<T> = Node::new();
143
144 let next_node = Box::into_raw(Box::new(CachePad::new(node)));
145 self.tail.node.store(next_node, Ordering::Release);
146 let _ = self
147 .tail
148 .index
149 .fetch_add(1 << MARK_BIT_SHIFT, Ordering::Release);
150 (*tail_node).next.store(next_node, Ordering::Release);
151 }
152
153 // We can now safely store the provided item into the slot.
154 let slot = (*tail_node).container.get_unchecked(offset);
155 slot.item.with_mut(|p| p.write(MaybeUninit::new(item)));
156 let _ = slot.state.fetch_or(FILLED, Ordering::Release);
157
158 return;
159 },
160 // While trying to push the next item, the tail index
161 // has been updated by another thread. We update our local
162 // references with the value stored when we tried to make
163 // the exchange and what is now the current tail's node.
164 Err(current_tail_index) => {
165 tail_index = current_tail_index;
166 tail_node = self.tail.node.load(Ordering::Acquire);
167 }
168 }
169 }
170 }
171
172 fn pop(&self) -> Option<T> {
173 let mut head_index = self.head.index.load(Ordering::Acquire);
174 let mut head_node = self.head.node.load(Ordering::Acquire);
175
176 loop {
177 // Defines the offset of the slot from where the next item should gathered.
178 let offset = (head_index >> MARK_BIT_SHIFT) % NODE_SIZE;
179
180 // If we reach the end of the node container, we wait until the next
181 // one is installed.
182 if offset == NODE_CAPACITY {
183 thread::yield_now();
184 head_index = self.head.index.load(Ordering::Acquire);
185 head_node = self.head.node.load(Ordering::Acquire);
186 continue;
187 }
188
189 // Increments the head index.
190 let mut next_head_index = head_index + (1 << MARK_BIT_SHIFT);
191
192 // If the mark bit is not set in the head index, we check if
193 // there is a pending item in the queue.
194 if next_head_index & MARK_BIT == 0 {
195 // Sync all threads and loads the current tail cursor.
196 fence(Ordering::SeqCst);
197 let tail_index = self.tail.index.load(Ordering::Acquire);
198
199 // If the head index equals the tail index, the queue is empty.
200 if head_index >> MARK_BIT_SHIFT == tail_index >> MARK_BIT_SHIFT {
201 return None;
202 }
203
204 // If the head and the tail are not pointing to the same node,
205 // we set the `MARK_BIT` in the head to skip cheking if there
206 // is any item pending on the next iteration.
207 if (head_index >> MARK_BIT_SHIFT) / NODE_SIZE
208 != (tail_index >> MARK_BIT_SHIFT) / NODE_SIZE
209 {
210 next_head_index |= MARK_BIT;
211 }
212 }
213
214 // Try update the head index.
215 match self.head.index.compare_exchange_weak(
216 head_index,
217 next_head_index,
218 Ordering::SeqCst,
219 Ordering::Acquire,
220 ) {
221 // The head index has been updated successfully so we can now use
222 // the offset to pop the next item.
223 Ok(_) => unsafe {
224 // If we're returning the last item of the node container, we
225 // update the head cursor to point to the next node.
226 if offset + 1 == NODE_CAPACITY {
227 let next_node = (*head_node).wait_next();
228
229 // Remove the mark bit if any and increment the index.
230 let mut next_index =
231 (next_head_index & !MARK_BIT).wrapping_add(1 << MARK_BIT_SHIFT);
232
233 // If the next node points to another node, we can already
234 // update the index to report that the next node that will
235 // be installed is not the last one.
236 if !(*next_node).next.load(Ordering::Relaxed).is_null() {
237 next_index |= MARK_BIT;
238 }
239
240 self.head.node.store(next_node, Ordering::Release);
241 self.head.index.store(next_index, Ordering::Release);
242 }
243
244 // Reads and returns the item.
245 let slot = (*head_node).container.get_unchecked(offset);
246 slot.wait_filled();
247 let item = slot.item.with(|p| p.read().assume_init());
248
249 // Drain and drop the node if we've reached the end of its container, or if another
250 // thread wanted to do so but couldn't because this thread was busy reading from the slot.
251 if offset + 1 == NODE_CAPACITY {
252 Node::drain(head_node, 0);
253 } else if slot.state.fetch_or(READING, Ordering::AcqRel) & DRAINING != 0 {
254 Node::drain(head_node, offset + 1);
255 }
256
257 return Some(item);
258 },
259 // While trying to pop the next item, the head index
260 // has been updated by another thread. We update our local
261 // references with the value stored when we tried to make
262 // the exchange and what is now the current head's node.
263 Err(current_head_index) => {
264 head_index = current_head_index;
265 head_node = self.head.node.load(Ordering::Acquire);
266 }
267 }
268 }
269 }
270}
271
272#[derive(Debug)]
273struct Cursor<T> {
274 /// Reports the index of the next [`Slot`].
275 ///
276 /// Its value is used to define the offset of the slot into the current
277 /// [`Node`] container by divinding it by the [`NODE_CAPACITY`].
278 ///
279 /// [`Slot`]: crate::slot::Slot
280 index: AtomicUsize,
281
282 /// Points to the current [`Node`].
283 node: AtomicPtr<CachePad<Node<T>>>,
284}
285
286/// Defines how many lower bits are reserved for metadata.
287const MARK_BIT_SHIFT: usize = 1;
288
289/// The [`MARK_BIT`] indicates that the [`Node`] is not the last one.
290///
291/// The [`MARK_BIT`] helps to avoid loading the tail and head simultaneously
292/// to check whether or not the queue is empty when calling the `pop` method.
293///
294/// [`Node`]: crate::node::Node
295const MARK_BIT: usize = 1;