llq/
lib.rs

1//! A wait-free single-producer single-consumer linked-list queue with
2//! individually reusable nodes.
3//!
4//! Queue operations ([`Producer::push()`] and [`Consumer::pop()`]) do not
5//! block or allocate memory. Individual [`Node`]s are allocated and managed
6//! separately, and can be reused on multiple queues.
7//!
8//! # Examples
9//!
10//! Using a queue to send values between threads:
11//!
12//! ```rust
13//! use llq::{Node, Queue};
14//!
15//! let (mut producer, mut consumer) = Queue::<usize>::new().split();
16//!
17//! producer.push(Node::new(0));
18//! producer.push(Node::new(1));
19//! producer.push(Node::new(2));
20//!
21//! std::thread::spawn(move || {
22//!     assert_eq!(*consumer.pop().unwrap(), 0);
23//!     assert_eq!(*consumer.pop().unwrap(), 1);
24//!     assert_eq!(*consumer.pop().unwrap(), 2);
25//!     assert!(consumer.pop().is_none());
26//! }).join().unwrap();
27//!
28//! ```
29//!
30//! Reusing a node between multiple queues:
31//!
32//! ```rust
33//! use llq::{Node, Queue};
34//!
35//! let (mut producer1, mut consumer1) = Queue::<usize>::new().split();
36//! let (mut producer2, mut consumer2) = Queue::<usize>::new().split();
37//!
38//! let node = Node::new(3);
39//! producer1.push(node);
40//! let node = consumer1.pop().unwrap();
41//! producer2.push(node);
42//! let node = consumer2.pop().unwrap();
43//!
44//! assert_eq!(*node, 3);
45//! ```
46//!
47//! [`Producer::push()`]: crate::Producer::push
48//! [`Consumer::pop()`]: crate::Consumer::pop
49//! [`Node`]: crate::Node
50
51#![no_std]
52
53extern crate alloc;
54
55use alloc::boxed::Box;
56use alloc::sync::Arc;
57use core::cell::Cell;
58use core::marker::PhantomData;
59use core::mem;
60use core::mem::MaybeUninit;
61use core::ops::{Deref, DerefMut};
62use core::ptr;
63use core::ptr::NonNull;
64use core::sync::atomic::{AtomicPtr, Ordering};
65
66/// An individual node which may be pushed onto and popped from a [`Queue`].
67///
68/// [`Queue`]: crate::Queue
69pub struct Node<T> {
70    inner: NonNull<NodeInner<T>>,
71    phantom: PhantomData<T>,
72}
73
74unsafe impl<T: Send> Send for Node<T> {}
75unsafe impl<T: Sync> Sync for Node<T> {}
76
77struct NodeInner<T> {
78    next: AtomicPtr<NodeInner<T>>,
79    data: MaybeUninit<T>,
80}
81
82impl<T> Node<T> {
83    /// Allocates a new node containing the given value.
84    pub fn new(data: T) -> Node<T> {
85        Node {
86            inner: unsafe {
87                NonNull::new_unchecked(Box::into_raw(Box::new(NodeInner {
88                    next: AtomicPtr::new(ptr::null_mut()),
89                    data: MaybeUninit::new(data),
90                })))
91            },
92            phantom: PhantomData,
93        }
94    }
95
96    /// Deallocates a `Node` and returns the inner value.
97    pub fn into_inner(this: Node<T>) -> T {
98        unsafe {
99            let data = ptr::read(this.inner.as_ref().data.as_ptr());
100            drop(Box::from_raw(this.inner.as_ptr()));
101            mem::forget(this);
102            data
103        }
104    }
105}
106
107impl<T> Deref for Node<T> {
108    type Target = T;
109
110    fn deref(&self) -> &Self::Target {
111        unsafe { &*self.inner.as_ref().data.as_ptr() }
112    }
113}
114
115impl<T> DerefMut for Node<T> {
116    fn deref_mut(&mut self) -> &mut Self::Target {
117        unsafe { &mut *self.inner.as_mut().data.as_mut_ptr() }
118    }
119}
120
121impl<T> Drop for Node<T> {
122    fn drop(&mut self) {
123        unsafe {
124            ptr::drop_in_place(self.inner.as_mut().data.as_mut_ptr());
125            drop(Box::from_raw(self.inner.as_ptr()));
126        }
127    }
128}
129
130/// A wait-free SPSC linked-list queue.
131pub struct Queue<T> {
132    head: Cell<*mut NodeInner<T>>,
133    phantom: PhantomData<T>,
134}
135
136unsafe impl<T: Send> Send for Queue<T> {}
137
138impl<T> Queue<T> {
139    /// Creates a new queue.
140    pub fn new() -> Queue<T> {
141        let node = Box::into_raw(Box::new(NodeInner {
142            next: AtomicPtr::new(ptr::null_mut()),
143            data: MaybeUninit::uninit(),
144        }));
145
146        Queue { head: Cell::new(node), phantom: PhantomData }
147    }
148
149    /// Splits a queue into its producer and consumer halves.
150    pub fn split(self) -> (Producer<T>, Consumer<T>) {
151        let queue = Arc::new(self);
152
153        let producer = Producer { queue: queue.clone(), tail: queue.head.get() };
154        let consumer = Consumer { queue };
155
156        (producer, consumer)
157    }
158}
159
160impl<T> Drop for Queue<T> {
161    fn drop(&mut self) {
162        unsafe {
163            let head = self.head.get();
164            let mut current = (*head).next.load(Ordering::Relaxed);
165
166            drop(Box::from_raw(head));
167
168            while !current.is_null() {
169                let next = (*current).next.load(Ordering::Relaxed);
170                ptr::drop_in_place((*current).data.as_mut_ptr());
171                drop(Box::from_raw(current));
172                current = next;
173            }
174        }
175    }
176}
177
178/// The consumer half of a [`Queue`].
179///
180/// [`Queue`]: crate::Queue
181pub struct Consumer<T> {
182    queue: Arc<Queue<T>>,
183}
184
185unsafe impl<T: Send> Send for Consumer<T> {}
186
187impl<T> Consumer<T> {
188    /// Attempts to remove and return an element from the queue. Returns `None`
189    /// if the queue is empty.
190    pub fn pop(&mut self) -> Option<Node<T>> {
191        unsafe {
192            let head = self.queue.head.get();
193            let next = (*head).next.load(Ordering::Acquire);
194
195            if !next.is_null() {
196                ptr::copy_nonoverlapping((*next).data.as_ptr(), (*head).data.as_mut_ptr(), 1);
197                (*head).next.store(ptr::null_mut(), Ordering::Relaxed);
198
199                self.queue.head.set(next);
200
201                return Some(Node { inner: NonNull::new_unchecked(head), phantom: PhantomData });
202            }
203
204            None
205        }
206    }
207}
208
209/// The producer half of a [`Queue`].
210///
211/// [`Queue`]: crate::Queue
212pub struct Producer<T> {
213    #[allow(unused)]
214    queue: Arc<Queue<T>>,
215    tail: *mut NodeInner<T>,
216}
217
218unsafe impl<T: Send> Send for Producer<T> {}
219
220impl<T> Producer<T> {
221    /// Adds an element to the queue.
222    pub fn push(&mut self, node: Node<T>) {
223        unsafe {
224            let node_ptr = node.inner.as_ptr();
225            mem::forget(node);
226
227            let tail = &*self.tail;
228            tail.next.store(node_ptr, Ordering::Release);
229
230            self.tail = node_ptr;
231        }
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238
239    extern crate std;
240
241    #[test]
242    fn multithreaded() {
243        let (mut producer, mut consumer) = Queue::new().split();
244
245        let thread1 = std::thread::spawn(move || {
246            for _ in 0..10000 {
247                producer.push(Node::new(false));
248            }
249
250            producer.push(Node::new(true));
251        });
252
253        let thread2 = std::thread::spawn(move || {
254            let mut counter = 0;
255
256            loop {
257                if let Some(node) = consumer.pop() {
258                    if *node {
259                        break;
260                    } else {
261                        counter += 1;
262                    }
263                }
264            }
265
266            assert_eq!(counter, 10000);
267        });
268
269        thread1.join().unwrap();
270        thread2.join().unwrap();
271    }
272
273    #[test]
274    fn multiple_queues() {
275        let (mut producer1, mut consumer1) = Queue::new().split();
276        let (mut producer2, mut consumer2) = Queue::new().split();
277
278        for _ in 0..10000 {
279            producer1.push(Node::new(()));
280        }
281
282        let mut counter = 0;
283        while let Some(node) = consumer1.pop() {
284            producer2.push(node);
285            counter += 1;
286        }
287        assert_eq!(counter, 10000);
288
289        let mut counter = 0;
290        while let Some(_) = consumer2.pop() {
291            counter += 1;
292        }
293        assert_eq!(counter, 10000);
294    }
295
296    #[test]
297    fn drop_occurs() {
298        struct S(Arc<Cell<usize>>);
299
300        impl Drop for S {
301            fn drop(&mut self) {
302                self.0.set(self.0.get() + 1);
303            }
304        }
305
306        let (mut producer, mut consumer) = Queue::new().split();
307
308        let counter = Arc::new(Cell::new(0));
309
310        for _ in 0..10000 {
311            producer.push(Node::new(S(counter.clone())));
312        }
313
314        while let Some(_) = consumer.pop() {}
315
316        assert_eq!(counter.get(), 10000);
317    }
318}