Skip to main content

subms_mpsc_queue/
lib.rs

1//! Vyukov-style multi-producer single-consumer linked queue.
2//!
3//! Producers enqueue with one `swap` on the tail; the consumer drains by
4//! following `next` pointers from the head. The dangling-tail window is the
5//! load-bearing detail: between the producer's CAS-of-tail and the
6//! prev.next = new link, the consumer can see `next == null` while there is
7//! actually a publisher in flight. [`MpscQueue::try_pop`] returns
8//! [`PopResult::Inconsistent`] in that window so the caller can spin or back
9//! off rather than treating it as empty.
10//!
11//! ```
12//! use subms_mpsc_queue::{MpscQueue, PopResult};
13//! let mut q: MpscQueue<u32> = MpscQueue::new();
14//! q.push(7);
15//! q.push(8);
16//! assert!(matches!(q.try_pop(), PopResult::Some(7)));
17//! assert!(matches!(q.try_pop(), PopResult::Some(8)));
18//! ```
19
20use std::cell::UnsafeCell;
21use std::ptr;
22use std::sync::atomic::{AtomicPtr, Ordering};
23
24struct Node<T> {
25    value: UnsafeCell<Option<T>>,
26    next: AtomicPtr<Node<T>>,
27}
28
29/// One-shot result of [`MpscQueue::try_pop`].
30pub enum PopResult<T> {
31    /// A value was dequeued.
32    Some(T),
33    /// Queue is truly empty.
34    Empty,
35    /// A producer is mid-publish; head was reached but `next` is not yet
36    /// linked. Callers should retry (spin or back off).
37    Inconsistent,
38}
39
40/// Multi-producer single-consumer linked queue.
41///
42/// Cloneable handles are not provided; share via `Arc<MpscQueue<T>>`.
43/// Consumer methods (`try_pop`) require `&mut self` to encode single-consumer
44/// invariant at the type level.
45pub struct MpscQueue<T> {
46    head: AtomicPtr<Node<T>>,
47    tail: UnsafeCell<*mut Node<T>>,
48    stub: Box<Node<T>>,
49}
50
51unsafe impl<T: Send> Sync for MpscQueue<T> {}
52unsafe impl<T: Send> Send for MpscQueue<T> {}
53
54impl<T> MpscQueue<T> {
55    pub fn new() -> Self {
56        let stub = Box::new(Node {
57            value: UnsafeCell::new(None),
58            next: AtomicPtr::new(ptr::null_mut()),
59        });
60        let stub_ptr = stub.as_ref() as *const Node<T> as *mut Node<T>;
61        Self {
62            head: AtomicPtr::new(stub_ptr),
63            tail: UnsafeCell::new(stub_ptr),
64            stub,
65        }
66    }
67
68    /// Multi-producer push. Wait-free for the producer once the node is
69    /// allocated.
70    pub fn push(&self, value: T) {
71        let node = Box::into_raw(Box::new(Node {
72            value: UnsafeCell::new(Some(value)),
73            next: AtomicPtr::new(ptr::null_mut()),
74        }));
75        // swap-head exchanges the publication point atomically. Producers
76        // can race here; each gets a distinct prev.
77        let prev = self.head.swap(node, Ordering::AcqRel);
78        // The dangling-tail window opens here: prev exists, but prev.next is
79        // still null until the next line. Consumer must tolerate it.
80        unsafe { (*prev).next.store(node, Ordering::Release) };
81    }
82
83    /// Consume one entry. Returns [`PopResult::Inconsistent`] if a producer
84    /// is mid-publish; callers should retry.
85    ///
86    /// Single consumer only: requires `&mut self`.
87    pub fn try_pop(&mut self) -> PopResult<T> {
88        // Safety: `tail` is consumer-private (only one consumer at a time).
89        let tail = unsafe { *self.tail.get() };
90        let next = unsafe { (*tail).next.load(Ordering::Acquire) };
91
92        // The stub trick: tail starts at the stub. Once we've drained past
93        // it, swap stub to the new tail so the head's reference is preserved.
94        let stub_ptr = self.stub.as_ref() as *const Node<T> as *mut Node<T>;
95        if tail == stub_ptr {
96            if next.is_null() {
97                // Stub is still the only node: either empty or producer
98                // mid-publish.
99                if self.head.load(Ordering::Acquire) == stub_ptr {
100                    return PopResult::Empty;
101                }
102                return PopResult::Inconsistent;
103            }
104            // Move past the stub.
105            unsafe { *self.tail.get() = next };
106            let value = unsafe { (*next).value.get().replace(None) };
107            return match value {
108                Some(v) => PopResult::Some(v),
109                None => PopResult::Inconsistent,
110            };
111        }
112
113        if !next.is_null() {
114            unsafe { *self.tail.get() = next };
115            // Drop the consumed node now that tail has advanced past it.
116            let consumed = unsafe { Box::from_raw(tail) };
117            drop(consumed);
118            let value = unsafe { (*next).value.get().replace(None) };
119            return match value {
120                Some(v) => PopResult::Some(v),
121                None => PopResult::Inconsistent,
122            };
123        }
124
125        // tail.next is null but tail is not the stub: either truly drained
126        // or a producer is racing the link write.
127        if self.head.load(Ordering::Acquire) == tail {
128            PopResult::Empty
129        } else {
130            PopResult::Inconsistent
131        }
132    }
133}
134
135impl<T> Default for MpscQueue<T> {
136    fn default() -> Self {
137        Self::new()
138    }
139}
140
141impl<T> Drop for MpscQueue<T> {
142    fn drop(&mut self) {
143        // Drain remaining nodes so their values' Drop impls run.
144        while let PopResult::Some(_) = self.try_pop() {}
145        // The stub is owned by the Box field; nothing to do for it. Any
146        // non-stub nodes were freed by try_pop as it walked past them.
147    }
148}
149
150#[cfg(feature = "harness")]
151pub mod recipe;
152
153// Opt-in feature modules. Each is independent of the base queue and
154// gated by its own Cargo feature; `cargo add subms-mpsc-queue` alone
155// keeps the base zero-dep + std-only shape.
156//
157// See README and the cookbook page for the per-feature p99 numbers
158// and composition guidance.
159#[cfg(any(
160    feature = "mpmc",
161    feature = "bounded",
162    feature = "batch",
163    feature = "metrics",
164    feature = "affinity",
165))]
166pub mod features;
167
168#[cfg(feature = "affinity")]
169pub use features::affinity::{AffinityError, set_affinity};
170#[cfg(feature = "batch")]
171pub use features::batch::BatchMpscQueue;
172#[cfg(feature = "bounded")]
173pub use features::bounded::BoundedMpscQueue;
174#[cfg(feature = "metrics")]
175pub use features::metrics::{MetricsMpscQueue, QueueMetricsSnapshot};
176#[cfg(feature = "mpmc")]
177pub use features::mpmc::MpmcQueue;