lf_queue/
queue.rs

1//! A lock-free multi-producer multi-consumer unbounded queue.
2
3use crate::cache_pad::CachePad;
4use crate::node::{Node, NODE_CAPACITY, NODE_SIZE};
5use crate::slot::{DRAINING, FILLED, READING};
6use crate::variant::sync::atomic::{fence, AtomicPtr, AtomicUsize, Ordering};
7use crate::variant::sync::Arc;
8use crate::variant::thread;
9
10use std::mem::MaybeUninit;
11
12/// A lock-free multi-producer multi-consumer unbounded queue.
13#[derive(Clone, Debug)]
14pub struct Queue<T> {
15    inner: Arc<Inner<T>>,
16}
17
18impl<T> Queue<T> {
19    /// Creates a new [`Queue`].
20    ///
21    /// # Examples
22    ///
23    /// ```
24    /// use lf_queue::Queue;
25    ///
26    /// let queue = Queue::<usize>::new();
27    /// ```
28    pub fn new() -> Self {
29        Self {
30            inner: Arc::new(Inner::new()),
31        }
32    }
33
34    /// Push an item into the [`Queue`].
35    ///
36    /// # Examples
37    ///
38    /// ```
39    /// use lf_queue::Queue;
40    ///
41    /// let queue = Queue::<usize>::new();
42    ///
43    /// queue.push(1);
44    /// queue.push(2);
45    /// queue.push(3);
46    /// ```
47    pub fn push(&self, item: T) {
48        self.inner.push(item)
49    }
50
51    /// Pop an item from the [`Queue`]. Returns none if the [`Queue`] is empty.
52    ///
53    /// # Examples
54    ///
55    /// ```
56    /// use lf_queue::Queue;
57    ///
58    /// let queue = Queue::<usize>::new();
59    /// for i in 0..8 {
60    ///   queue.push(i);
61    /// }
62    ///
63    /// for i in 0..8 {
64    ///   assert_eq!(i, queue.pop().unwrap());
65    /// }
66    ///
67    /// assert!(queue.pop().is_none());
68    /// ```
69    pub fn pop(&self) -> Option<T> {
70        self.inner.pop()
71    }
72}
73
74impl<T> Default for Queue<T> {
75    fn default() -> Self {
76        Self::new()
77    }
78}
79
80#[derive(Debug)]
81struct Inner<T> {
82    head: CachePad<Cursor<T>>,
83    tail: CachePad<Cursor<T>>,
84}
85
86impl<T> Inner<T> {
87    fn new() -> Self {
88        #[cfg(not(loom))]
89        let node: Node<T> = Node::UNINIT;
90        #[cfg(loom)]
91        let node: Node<T> = Node::new();
92
93        let first_node: *mut CachePad<Node<T>> = Box::into_raw(Box::new(CachePad::new(node)));
94
95        Self {
96            head: CachePad::new(Cursor {
97                index: AtomicUsize::new(0),
98                node: AtomicPtr::new(first_node),
99            }),
100            tail: CachePad::new(Cursor {
101                index: AtomicUsize::new(0),
102                node: AtomicPtr::new(first_node),
103            }),
104        }
105    }
106
107    fn push(&self, item: T) {
108        let mut tail_index = self.tail.index.load(Ordering::Acquire);
109        let mut tail_node = self.tail.node.load(Ordering::Acquire);
110
111        loop {
112            // Defines the node container offset of the slot where the provided item should be stored.
113            let offset = (tail_index >> MARK_BIT_SHIFT) % NODE_SIZE;
114
115            // If the node container is full, we wait until the next node is
116            // installed before moving forward and update our local reference.
117            if offset == NODE_CAPACITY {
118                thread::yield_now();
119                tail_index = self.tail.index.load(Ordering::Acquire);
120                tail_node = self.tail.node.load(Ordering::Acquire);
121                continue;
122            }
123
124            // Increments the tail index.
125            let next_tail_index = tail_index + (1 << MARK_BIT_SHIFT);
126            match self.tail.index.compare_exchange_weak(
127                tail_index,
128                next_tail_index,
129                Ordering::SeqCst,
130                Ordering::Acquire,
131            ) {
132                // The tail index has been updated successfully so we can now use
133                // the offset to store the item in the next available slot.
134                Ok(_) => unsafe {
135                    // If we're filling the last available slot of the node container,
136                    // we install a new one and update both the tail and the node to point
137                    // to this new node.
138                    if offset + 1 == NODE_CAPACITY {
139                        #[cfg(not(loom))]
140                        let node: Node<T> = Node::UNINIT;
141                        #[cfg(loom)]
142                        let node: Node<T> = Node::new();
143
144                        let next_node = Box::into_raw(Box::new(CachePad::new(node)));
145                        self.tail.node.store(next_node, Ordering::Release);
146                        let _ = self
147                            .tail
148                            .index
149                            .fetch_add(1 << MARK_BIT_SHIFT, Ordering::Release);
150                        (*tail_node).next.store(next_node, Ordering::Release);
151                    }
152
153                    // We can now safely store the provided item into the slot.
154                    let slot = (*tail_node).container.get_unchecked(offset);
155                    slot.item.with_mut(|p| p.write(MaybeUninit::new(item)));
156                    let _ = slot.state.fetch_or(FILLED, Ordering::Release);
157
158                    return;
159                },
160                // While trying to push the next item, the tail index
161                // has been updated by another thread. We update our local
162                // references with the value stored when we tried to make
163                // the exchange and what is now the current tail's node.
164                Err(current_tail_index) => {
165                    tail_index = current_tail_index;
166                    tail_node = self.tail.node.load(Ordering::Acquire);
167                }
168            }
169        }
170    }
171
172    fn pop(&self) -> Option<T> {
173        let mut head_index = self.head.index.load(Ordering::Acquire);
174        let mut head_node = self.head.node.load(Ordering::Acquire);
175
176        loop {
177            // Defines the offset of the slot from where the next item should gathered.
178            let offset = (head_index >> MARK_BIT_SHIFT) % NODE_SIZE;
179
180            // If we reach the end of the node container, we wait until the next
181            // one is installed.
182            if offset == NODE_CAPACITY {
183                thread::yield_now();
184                head_index = self.head.index.load(Ordering::Acquire);
185                head_node = self.head.node.load(Ordering::Acquire);
186                continue;
187            }
188
189            // Increments the head index.
190            let mut next_head_index = head_index + (1 << MARK_BIT_SHIFT);
191
192            // If the mark bit is not set in the head index, we check if
193            // there is a pending item in the queue.
194            if next_head_index & MARK_BIT == 0 {
195                // Sync all threads and loads the current tail cursor.
196                fence(Ordering::SeqCst);
197                let tail_index = self.tail.index.load(Ordering::Acquire);
198
199                // If the head index equals the tail index, the queue is empty.
200                if head_index >> MARK_BIT_SHIFT == tail_index >> MARK_BIT_SHIFT {
201                    return None;
202                }
203
204                // If the head and the tail are not pointing to the same node,
205                // we set the `MARK_BIT` in the head to skip cheking if there
206                // is any item pending on the next iteration.
207                if (head_index >> MARK_BIT_SHIFT) / NODE_SIZE
208                    != (tail_index >> MARK_BIT_SHIFT) / NODE_SIZE
209                {
210                    next_head_index |= MARK_BIT;
211                }
212            }
213
214            // Try update the head index.
215            match self.head.index.compare_exchange_weak(
216                head_index,
217                next_head_index,
218                Ordering::SeqCst,
219                Ordering::Acquire,
220            ) {
221                // The head index has been updated successfully so we can now use
222                // the offset to pop the next item.
223                Ok(_) => unsafe {
224                    // If we're returning the last item of the node container, we
225                    // update the head cursor to point to the next node.
226                    if offset + 1 == NODE_CAPACITY {
227                        let next_node = (*head_node).wait_next();
228
229                        // Remove the mark bit if any and increment the index.
230                        let mut next_index =
231                            (next_head_index & !MARK_BIT).wrapping_add(1 << MARK_BIT_SHIFT);
232
233                        // If the next node points to another node, we can already
234                        // update the index to report that the next node that will
235                        // be installed is not the last one.
236                        if !(*next_node).next.load(Ordering::Relaxed).is_null() {
237                            next_index |= MARK_BIT;
238                        }
239
240                        self.head.node.store(next_node, Ordering::Release);
241                        self.head.index.store(next_index, Ordering::Release);
242                    }
243
244                    // Reads and returns the item.
245                    let slot = (*head_node).container.get_unchecked(offset);
246                    slot.wait_filled();
247                    let item = slot.item.with(|p| p.read().assume_init());
248
249                    // Drain and drop the node if we've reached the end of its container, or if another
250                    // thread wanted to do so but couldn't because this thread was busy reading from the slot.
251                    if offset + 1 == NODE_CAPACITY {
252                        Node::drain(head_node, 0);
253                    } else if slot.state.fetch_or(READING, Ordering::AcqRel) & DRAINING != 0 {
254                        Node::drain(head_node, offset + 1);
255                    }
256
257                    return Some(item);
258                },
259                // While trying to pop the next item, the head index
260                // has been updated by another thread. We update our local
261                // references with the value stored when we tried to make
262                // the exchange and what is now the current head's node.
263                Err(current_head_index) => {
264                    head_index = current_head_index;
265                    head_node = self.head.node.load(Ordering::Acquire);
266                }
267            }
268        }
269    }
270}
271
272#[derive(Debug)]
273struct Cursor<T> {
274    /// Reports the index of the next [`Slot`].
275    ///
276    /// Its value is used to define the offset of the slot into the current
277    /// [`Node`] container by divinding it by the [`NODE_CAPACITY`].
278    ///
279    /// [`Slot`]: crate::slot::Slot
280    index: AtomicUsize,
281
282    /// Points to the current [`Node`].
283    node: AtomicPtr<CachePad<Node<T>>>,
284}
285
286/// Defines how many lower bits are reserved for metadata.
287const MARK_BIT_SHIFT: usize = 1;
288
289/// The [`MARK_BIT`] indicates that the [`Node`] is not the last one.
290///
291/// The [`MARK_BIT`] helps to avoid loading the tail and head simultaneously
292/// to check whether or not the queue is empty when calling the `pop` method.
293///
294/// [`Node`]: crate::node::Node
295const MARK_BIT: usize = 1;