subms_mpsc_queue/lib.rs
1//! Vyukov-style multi-producer single-consumer linked queue.
2//!
3//! Producers enqueue with one `swap` on the tail; the consumer drains by
4//! following `next` pointers from the head. The dangling-tail window is the
5//! load-bearing detail: between the producer's CAS-of-tail and the
6//! prev.next = new link, the consumer can see `next == null` while there is
7//! actually a publisher in flight. [`MpscQueue::try_pop`] returns
8//! [`PopResult::Inconsistent`] in that window so the caller can spin or back
9//! off rather than treating it as empty.
10//!
11//! ```
12//! use subms_mpsc_queue::{MpscQueue, PopResult};
13//! let mut q: MpscQueue<u32> = MpscQueue::new();
14//! q.push(7);
15//! q.push(8);
16//! assert!(matches!(q.try_pop(), PopResult::Some(7)));
17//! assert!(matches!(q.try_pop(), PopResult::Some(8)));
18//! ```
19
20use std::cell::UnsafeCell;
21use std::ptr;
22use std::sync::atomic::{AtomicPtr, Ordering};
23
24struct Node<T> {
25 value: UnsafeCell<Option<T>>,
26 next: AtomicPtr<Node<T>>,
27}
28
29/// One-shot result of [`MpscQueue::try_pop`].
30pub enum PopResult<T> {
31 /// A value was dequeued.
32 Some(T),
33 /// Queue is truly empty.
34 Empty,
35 /// A producer is mid-publish; head was reached but `next` is not yet
36 /// linked. Callers should retry (spin or back off).
37 Inconsistent,
38}
39
40/// Multi-producer single-consumer linked queue.
41///
42/// Cloneable handles are not provided; share via `Arc<MpscQueue<T>>`.
43/// Consumer methods (`try_pop`) require `&mut self` to encode single-consumer
44/// invariant at the type level.
45pub struct MpscQueue<T> {
46 head: AtomicPtr<Node<T>>,
47 tail: UnsafeCell<*mut Node<T>>,
48 stub: Box<Node<T>>,
49}
50
51unsafe impl<T: Send> Sync for MpscQueue<T> {}
52unsafe impl<T: Send> Send for MpscQueue<T> {}
53
54impl<T> MpscQueue<T> {
55 pub fn new() -> Self {
56 let stub = Box::new(Node {
57 value: UnsafeCell::new(None),
58 next: AtomicPtr::new(ptr::null_mut()),
59 });
60 let stub_ptr = stub.as_ref() as *const Node<T> as *mut Node<T>;
61 Self {
62 head: AtomicPtr::new(stub_ptr),
63 tail: UnsafeCell::new(stub_ptr),
64 stub,
65 }
66 }
67
68 /// Multi-producer push. Wait-free for the producer once the node is
69 /// allocated.
70 pub fn push(&self, value: T) {
71 let node = Box::into_raw(Box::new(Node {
72 value: UnsafeCell::new(Some(value)),
73 next: AtomicPtr::new(ptr::null_mut()),
74 }));
75 // swap-head exchanges the publication point atomically. Producers
76 // can race here; each gets a distinct prev.
77 let prev = self.head.swap(node, Ordering::AcqRel);
78 // The dangling-tail window opens here: prev exists, but prev.next is
79 // still null until the next line. Consumer must tolerate it.
80 unsafe { (*prev).next.store(node, Ordering::Release) };
81 }
82
83 /// Consume one entry. Returns [`PopResult::Inconsistent`] if a producer
84 /// is mid-publish; callers should retry.
85 ///
86 /// Single consumer only: requires `&mut self`.
87 pub fn try_pop(&mut self) -> PopResult<T> {
88 // Safety: `tail` is consumer-private (only one consumer at a time).
89 let tail = unsafe { *self.tail.get() };
90 let next = unsafe { (*tail).next.load(Ordering::Acquire) };
91
92 // The stub trick: tail starts at the stub. Once we've drained past
93 // it, swap stub to the new tail so the head's reference is preserved.
94 let stub_ptr = self.stub.as_ref() as *const Node<T> as *mut Node<T>;
95 if tail == stub_ptr {
96 if next.is_null() {
97 // Stub is still the only node: either empty or producer
98 // mid-publish.
99 if self.head.load(Ordering::Acquire) == stub_ptr {
100 return PopResult::Empty;
101 }
102 return PopResult::Inconsistent;
103 }
104 // Move past the stub.
105 unsafe { *self.tail.get() = next };
106 let value = unsafe { (*next).value.get().replace(None) };
107 return match value {
108 Some(v) => PopResult::Some(v),
109 None => PopResult::Inconsistent,
110 };
111 }
112
113 if !next.is_null() {
114 unsafe { *self.tail.get() = next };
115 // Drop the consumed node now that tail has advanced past it.
116 let consumed = unsafe { Box::from_raw(tail) };
117 drop(consumed);
118 let value = unsafe { (*next).value.get().replace(None) };
119 return match value {
120 Some(v) => PopResult::Some(v),
121 None => PopResult::Inconsistent,
122 };
123 }
124
125 // tail.next is null but tail is not the stub: either truly drained
126 // or a producer is racing the link write.
127 if self.head.load(Ordering::Acquire) == tail {
128 PopResult::Empty
129 } else {
130 PopResult::Inconsistent
131 }
132 }
133}
134
135impl<T> Default for MpscQueue<T> {
136 fn default() -> Self {
137 Self::new()
138 }
139}
140
141impl<T> Drop for MpscQueue<T> {
142 fn drop(&mut self) {
143 // Drain remaining nodes so their values' Drop impls run.
144 while let PopResult::Some(_) = self.try_pop() {}
145 // The stub is owned by the Box field; nothing to do for it. Any
146 // non-stub nodes were freed by try_pop as it walked past them.
147 }
148}
149
150#[cfg(feature = "harness")]
151pub mod recipe;