lock_freedom/
queue.rs

1use crate::owned_alloc::OwnedAlloc;
2use crate::{
3    incin::Pause,
4    ptr::{bypass_null, check_null_align},
5    removable::Removable,
6};
7use core::{
8    fmt,
9    iter::FromIterator,
10    ptr::{null_mut, NonNull},
11    sync::atomic::{AtomicPtr, Ordering::*},
12};
13
14/// A lock-free general-purpouse queue. FIFO semanthics are fully respected.
15/// It can be used as multi-producer and multi-consumer channel.
16pub struct Queue<T> {
17    front: AtomicPtr<Node<T>>,
18    back: AtomicPtr<Node<T>>,
19    incin: SharedIncin<T>,
20}
21
22impl<T> Queue<T> {
23    /// Creates a new empty queue.
24    pub fn new() -> Self {
25        check_null_align::<Node<T>>();
26        Self::with_incin(SharedIncin::new())
27    }
28
29    /// Creates an empty queue using the passed shared incinerator.
30    pub fn with_incin(incin: SharedIncin<T>) -> Self {
31        let node = Node::new(Removable::empty());
32        let sentinel = OwnedAlloc::new(node).into_raw().as_ptr();
33        Self {
34            front: AtomicPtr::new(sentinel),
35            back: AtomicPtr::new(sentinel),
36            incin,
37        }
38    }
39
40    /// Returns the shared incinerator used by this [`Queue`].
41    pub fn incin(&self) -> SharedIncin<T> {
42        self.incin.clone()
43    }
44
45    /// Creates an iterator over `T`s, based on [`pop`](Queue::pop) operation of
46    /// the [`Queue`].
47    pub fn pop_iter(&self) -> PopIter<'_, T> {
48        PopIter { queue: self }
49    }
50
51    /// Pushes a value into the back of the queue. This operation is also
52    /// wait-free.
53    pub fn push(&self, item: T) {
54        // Pretty simple: create a node from the item.
55        let node = Node::new(Removable::new(item));
56        let alloc = OwnedAlloc::new(node);
57        let node_ptr = alloc.into_raw().as_ptr();
58        // Swap with the previously stored back.
59        let prev_back = self.back.swap(node_ptr, AcqRel);
60        unsafe {
61            // Updates the previous back's next field to our newly allocated
62            // node. This may delay the visibility of the insertion.
63            (*prev_back).next.store(node_ptr, Release);
64        }
65    }
66
67    /// Takes a value from the front of the queue, if it is avaible.
68    pub fn pop(&self) -> Option<T> {
69        // Pausing because of ABA problem involving remotion from linked lists.
70        let pause = self.incin.get_unchecked().pause();
71        let mut front_nnptr = unsafe {
72            // The pointer stored in front and back must never be null. The
73            // queue always have at least one node. Front and back are
74            // always connected.
75            bypass_null(self.front.load(Relaxed))
76        };
77
78        loop {
79            // This dereferral is safe because we paused the incinerator and
80            // only delete nodes via incinerator.
81            //
82            // We first remove the node logically.
83            match unsafe { front_nnptr.as_ref().item.take(AcqRel) } {
84                Some(val) => {
85                    // Safe to call because we passed a pointer from the front
86                    // which was loaded during the very same pause we are
87                    // passing.
88                    unsafe { self.try_clear_first(front_nnptr, &pause) };
89                    break Some(val);
90                }
91
92                // Safe to call because we passed a pointer from the front
93                // which was loaded during the very same pause we are
94                // passing.
95                None => unsafe {
96                    front_nnptr = self.try_clear_first(front_nnptr, &pause)?;
97                },
98            }
99        }
100    }
101
102    /// Pushes elements from the given iterable. Acts just like
103    /// [`Extend::extend`] but does not require mutability.
104    pub fn extend<I>(&self, iterable: I)
105    where
106        I: IntoIterator<Item = T>,
107    {
108        for elem in iterable {
109            self.push(elem);
110        }
111    }
112
113    // Returns an `Option` so we can use the try operator (?) with the function.
114    // This function is unsafe because passing the wrong pointer will lead to
115    // undefined behavior. The pointer must have been loaded from the front
116    // during the passed pause.
117    unsafe fn try_clear_first(
118        &self,
119        expected: NonNull<Node<T>>,
120        pause: &Pause<OwnedAlloc<Node<T>>>,
121    ) -> Option<NonNull<Node<T>>> {
122        let next = expected.as_ref().next.load(Acquire);
123
124        // If this is the only node, we will not remove it. We want front and
125        // back to share the same node rather than having to set both to null
126        // when the queue is empty.
127        NonNull::new(next).map(|next_nnptr| {
128            let ptr = expected.as_ptr();
129
130            // We are not oblied to succeed. This is just cleanup and some other
131            // thread might do it.
132            match self.front.compare_exchange(ptr, next, Relaxed, Relaxed) {
133                Ok(_) => {
134                    // Only deleting nodes via incinerator due to ABA problem
135                    // and use-after-frees.
136                    pause.add_to_incin(OwnedAlloc::from_raw(expected));
137                    next_nnptr
138                }
139
140                Err(found) => {
141                    // Safe to by-pass the check since we only store non-null
142                    // pointers on the front.
143                    bypass_null(found)
144                }
145            }
146        })
147    }
148}
149
150impl<T> Default for Queue<T> {
151    fn default() -> Self {
152        Self::new()
153    }
154}
155
156impl<T> Drop for Queue<T> {
157    fn drop(&mut self) {
158        let front = self.front.get_mut();
159        while let Some(nnptr) = NonNull::new(*front) {
160            // This is safe because we only store pointers allocated via
161            // `OwnedAlloc`. Also, we have exclusive access to this pointer.
162            let mut node = unsafe { OwnedAlloc::from_raw(nnptr) };
163            *front = *node.next.get_mut();
164        }
165    }
166}
167
168impl<T> FromIterator<T> for Queue<T> {
169    fn from_iter<I>(iterable: I) -> Self
170    where
171        I: IntoIterator<Item = T>,
172    {
173        let this = Self::new();
174        this.extend(iterable);
175        this
176    }
177}
178
179impl<T> Extend<T> for Queue<T> {
180    fn extend<I>(&mut self, iterable: I)
181    where
182        I: IntoIterator<Item = T>,
183    {
184        (*self).extend(iterable)
185    }
186}
187
188impl<T> Iterator for Queue<T> {
189    type Item = T;
190
191    fn next(&mut self) -> Option<T> {
192        let front = self.front.get_mut();
193        // Safe to by-pass it because the queue always have at least one node.
194        let mut front_node = unsafe { NonNull::new_unchecked(*front) };
195        loop {
196            // Safe because we allocated everything properly.
197            let (item, next) = unsafe {
198                let node_ref = front_node.as_mut();
199                (node_ref.item.replace(None), *node_ref.next.get_mut())
200            };
201
202            match (item, NonNull::new(next)) {
203                (Some(item), maybe_next) => {
204                    if let Some(next) = maybe_next {
205                        // Ok to drop it like this because we have exclusive
206                        // reference to the queue.
207                        unsafe { OwnedAlloc::from_raw(front_node) };
208                        *front = next.as_ptr();
209                    }
210
211                    break Some(item);
212                }
213
214                (None, None) => break None,
215
216                (None, Some(next)) => {
217                    // Ok to drop it like this because we have exclusive
218                    // reference to the queue.
219                    unsafe { OwnedAlloc::from_raw(front_node) };
220                    *front = next.as_ptr();
221                    front_node = next;
222                }
223            }
224        }
225    }
226}
227
228impl<T> fmt::Debug for Queue<T> {
229    fn fmt(&self, fmtr: &mut fmt::Formatter) -> fmt::Result {
230        write!(
231            fmtr,
232            "Queue {{ front: {:?}, back: {:?}, incin: {:?} }}",
233            self.front, self.back, self.incin
234        )
235    }
236}
237
238unsafe impl<T> Send for Queue<T> where T: Send {}
239
240unsafe impl<T> Sync for Queue<T> where T: Send {}
241
242/// An iterator based on [`pop`](Queue::pop) operation of the [`Queue`].
243pub struct PopIter<'queue, T>
244where
245    T: 'queue,
246{
247    queue: &'queue Queue<T>,
248}
249
250impl<'queue, T> Iterator for PopIter<'queue, T> {
251    type Item = T;
252
253    fn next(&mut self) -> Option<Self::Item> {
254        self.queue.pop()
255    }
256}
257
258impl<'queue, T> fmt::Debug for PopIter<'queue, T> {
259    fn fmt(&self, fmtr: &mut fmt::Formatter) -> fmt::Result {
260        write!(fmtr, "PopIter {{ queue: {:?} }}", self.queue)
261    }
262}
263
264make_shared_incin! {
265    { "[`Queue`]" }
266    pub SharedIncin<T> of OwnedAlloc<Node<T>>
267}
268
269impl<T> fmt::Debug for SharedIncin<T> {
270    fn fmt(&self, fmtr: &mut fmt::Formatter) -> fmt::Result {
271        write!(fmtr, "SharedIncin {{ inner: {:?} }}", self.inner)
272    }
273}
274
275#[repr(align(/* at least */ 2))]
276struct Node<T> {
277    item: Removable<T>,
278    next: AtomicPtr<Node<T>>,
279}
280
281impl<T> Node<T> {
282    fn new(item: Removable<T>) -> Self {
283        Self {
284            item,
285            next: AtomicPtr::new(null_mut()),
286        }
287    }
288}
289
290// Testing the safety of `unsafe` in this module is done with random operations
291// via fuzzing
292#[cfg(test)]
293mod test {
294    use super::*;
295    use alloc::sync::Arc;
296    use alloc::vec::Vec;
297    use core::sync::atomic::AtomicUsize;
298
299    #[test]
300    fn on_empty_first_pop_is_none() {
301        let queue = Queue::<usize>::new();
302        assert!(queue.pop().is_none());
303    }
304
305    #[test]
306    fn on_empty_last_pop_is_none() {
307        let queue = Queue::new();
308        queue.push(3);
309        queue.push(1234);
310        queue.pop();
311        queue.pop();
312        assert!(queue.pop().is_none());
313    }
314
315    #[test]
316    fn order() {
317        let queue = Queue::new();
318        queue.push(3);
319        queue.push(5);
320        queue.push(6);
321        assert_eq!(queue.pop(), Some(3));
322        assert_eq!(queue.pop(), Some(5));
323        assert_eq!(queue.pop(), Some(6));
324    }
325
326    #[test]
327    fn queue_iter() {
328        let mut queue = Queue::new();
329        queue.push(3);
330        queue.push(5);
331        queue.push(6);
332        assert_eq!(queue.next(), Some(3));
333        assert_eq!(queue.next(), Some(5));
334        assert_eq!(queue.next(), Some(6));
335        assert_eq!(queue.next(), None);
336    }
337
338    #[cfg(feature = "std")]
339    #[test]
340    fn no_data_corruption() {
341        use std::thread;
342        const NTHREAD: usize = 20;
343        const NITER: usize = 800;
344        const NMOD: usize = 55;
345
346        let queue = Arc::new(Queue::new());
347        let mut handles = Vec::with_capacity(NTHREAD);
348        let removed = Arc::new(AtomicUsize::new(0));
349
350        for i in 0..NTHREAD {
351            let removed = removed.clone();
352            let queue = queue.clone();
353            handles.push(thread::spawn(move || {
354                for j in 0..NITER {
355                    let val = (i * NITER) + j;
356                    queue.push(val);
357                    if (val + 1) % NMOD == 0 {
358                        if let Some(val) = queue.pop() {
359                            removed.fetch_add(1, Relaxed);
360                            assert!(val < NITER * NTHREAD);
361                        }
362                    }
363                }
364            }));
365        }
366
367        for handle in handles {
368            handle.join().expect("thread failed");
369        }
370
371        let expected = NITER * NTHREAD - removed.load(Relaxed);
372        let mut res = 0;
373        while let Some(val) = queue.pop() {
374            assert!(val < NITER * NTHREAD);
375            res += 1;
376        }
377
378        assert_eq!(res, expected);
379    }
380}