Skip to main content

doom_fish_utils/
spsc.rs

1//! Single-producer single-consumer lock-free bounded ring buffer.
2//!
3//! Designed for the `CoreAudio` render-thread → async-consumer producer-consumer
4//! pattern. The producer path never takes a mutex and never allocates after the
5//! ring has been constructed.
6//!
7//! Internally this wrapper uses a pre-allocated bounded queue plus an
8//! [`AtomicWaker`](futures_util::task::AtomicWaker) so the consumer can await
9//! the next item without forcing the producer to block.
10//!
11//! # Example
12//!
13//! ```no_run
14//! use doom_fish_utils::spsc::SpscRing;
15//!
16//! # async fn run() {
17//! let (producer, consumer) = SpscRing::<u32, 256>::new();
18//!
19//! producer.push(1).unwrap();
20//! assert_eq!(consumer.pop_async().await, Some(1));
21//! # }
22//! ```
23
24use std::fmt;
25use std::future::Future;
26use std::marker::PhantomData;
27use std::pin::Pin;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::sync::Arc;
30use std::task::{Context, Poll};
31
32use crossbeam_queue::ArrayQueue;
33use futures_util::task::AtomicWaker;
34
35struct Inner<T> {
36    queue: ArrayQueue<T>,
37    producer_closed: AtomicBool,
38    waker: AtomicWaker,
39}
40
41/// Constructor namespace for a bounded single-producer single-consumer ring.
42///
43/// `N` is the maximum supported capacity. Use [`Self::new`] to allocate a ring
44/// with exactly `N` slots, or [`Self::with_capacity`] to choose a smaller
45/// runtime capacity while keeping the type-level upper bound.
46#[derive(Debug, Default)]
47pub struct SpscRing<T, const N: usize>(PhantomData<T>);
48
49/// Producer half of an [`SpscRing`].
50pub struct SpscProducer<T, const N: usize> {
51    inner: Arc<Inner<T>>,
52}
53
54/// Consumer half of an [`SpscRing`].
55pub struct SpscConsumer<T, const N: usize> {
56    inner: Arc<Inner<T>>,
57}
58
59/// Future returned by [`SpscConsumer::pop_async`].
60#[must_use = "futures do nothing unless awaited or polled"]
61pub struct PopFuture<'a, T, const N: usize> {
62    consumer: &'a SpscConsumer<T, N>,
63}
64
65/// Feature-gated [`futures_core::Stream`] wrapper around an [`SpscConsumer`].
66#[cfg(feature = "futures-stream")]
67#[cfg_attr(docsrs, doc(cfg(feature = "futures-stream")))]
68#[must_use = "streams do nothing unless polled"]
69pub struct SpscConsumerStream<'a, T, const N: usize> {
70    consumer: &'a SpscConsumer<T, N>,
71}
72
73#[allow(clippy::new_ret_no_self)]
74impl<T, const N: usize> SpscRing<T, N> {
75    /// Creates a ring with capacity `N`.
76    ///
77    /// # Panics
78    ///
79    /// Panics if `N` is 0.
80    #[must_use]
81    pub fn new() -> (SpscProducer<T, N>, SpscConsumer<T, N>) {
82        Self::with_capacity(N)
83    }
84
85    /// Creates a ring with a runtime capacity up to the type-level maximum `N`.
86    ///
87    /// # Panics
88    ///
89    /// Panics if `capacity` is 0 or larger than `N`.
90    #[must_use]
91    pub fn with_capacity(capacity: usize) -> (SpscProducer<T, N>, SpscConsumer<T, N>) {
92        assert!(N > 0, "SpscRing capacity must be > 0");
93        assert!(capacity > 0, "SpscRing capacity must be > 0");
94        assert!(
95            capacity <= N,
96            "SpscRing capacity {capacity} exceeds type maximum {N}"
97        );
98
99        let inner = Arc::new(Inner {
100            queue: ArrayQueue::new(capacity),
101            producer_closed: AtomicBool::new(false),
102            waker: AtomicWaker::new(),
103        });
104
105        (
106            SpscProducer {
107                inner: Arc::clone(&inner),
108            },
109            SpscConsumer { inner },
110        )
111    }
112}
113
114impl<T, const N: usize> fmt::Debug for SpscProducer<T, N> {
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        f.debug_struct("SpscProducer")
117            .field("buffered", &self.buffered_count())
118            .field("capacity", &self.capacity())
119            .finish_non_exhaustive()
120    }
121}
122
123impl<T, const N: usize> fmt::Debug for SpscConsumer<T, N> {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        f.debug_struct("SpscConsumer")
126            .field("buffered", &self.buffered_count())
127            .field("capacity", &self.capacity())
128            .field("is_closed", &self.is_closed())
129            .finish_non_exhaustive()
130    }
131}
132
133impl<T, const N: usize> fmt::Debug for PopFuture<'_, T, N> {
134    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135        f.debug_struct("PopFuture").finish_non_exhaustive()
136    }
137}
138
139#[cfg(feature = "futures-stream")]
140impl<T, const N: usize> fmt::Debug for SpscConsumerStream<'_, T, N> {
141    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142        f.debug_struct("SpscConsumerStream").finish_non_exhaustive()
143    }
144}
145
146impl<T, const N: usize> SpscProducer<T, N> {
147    /// Attempts to push an item into the ring without blocking.
148    ///
149    /// # Errors
150    ///
151    /// Returns `Err(item)` if the ring is currently full.
152    pub fn push(&self, item: T) -> Result<(), T> {
153        match self.inner.queue.push(item) {
154            Ok(()) => {
155                self.inner.waker.wake();
156                Ok(())
157            }
158            Err(item) => Err(item),
159        }
160    }
161
162    /// Pushes an item into the ring, overwriting the oldest buffered entry if
163    /// necessary.
164    ///
165    /// Returns the displaced oldest item when an overwrite happens.
166    pub fn push_overwrite(&self, item: T) -> Option<T> {
167        let dropped = self.inner.queue.force_push(item);
168        self.inner.waker.wake();
169        dropped
170    }
171
172    /// Returns the current buffered item count.
173    #[must_use]
174    pub fn buffered_count(&self) -> usize {
175        self.inner.queue.len()
176    }
177
178    /// Returns the runtime capacity of the ring.
179    #[must_use]
180    pub fn capacity(&self) -> usize {
181        self.inner.queue.capacity()
182    }
183}
184
185impl<T, const N: usize> Drop for SpscProducer<T, N> {
186    fn drop(&mut self) {
187        self.inner.producer_closed.store(true, Ordering::Release);
188        self.inner.waker.wake();
189    }
190}
191
192impl<T, const N: usize> SpscConsumer<T, N> {
193    /// Attempts to pop the next buffered item without blocking.
194    #[must_use]
195    pub fn pop(&self) -> Option<T> {
196        self.inner.queue.pop()
197    }
198
199    /// Returns a future that resolves to the next buffered item, or `None` once
200    /// the producer has been dropped and the ring is empty.
201    pub const fn pop_async(&self) -> PopFuture<'_, T, N> {
202        PopFuture { consumer: self }
203    }
204
205    /// Returns the current buffered item count.
206    #[must_use]
207    pub fn buffered_count(&self) -> usize {
208        self.inner.queue.len()
209    }
210
211    /// Returns the runtime capacity of the ring.
212    #[must_use]
213    pub fn capacity(&self) -> usize {
214        self.inner.queue.capacity()
215    }
216
217    /// Returns `true` if the producer has been dropped.
218    #[must_use]
219    pub fn is_closed(&self) -> bool {
220        self.inner.producer_closed.load(Ordering::Acquire)
221    }
222
223    #[cfg(feature = "futures-stream")]
224    #[cfg_attr(docsrs, doc(cfg(feature = "futures-stream")))]
225    pub const fn stream(&self) -> SpscConsumerStream<'_, T, N> {
226        SpscConsumerStream { consumer: self }
227    }
228
229    fn poll_pop(&self, cx: &Context<'_>) -> Poll<Option<T>> {
230        if let Some(item) = self.pop() {
231            return Poll::Ready(Some(item));
232        }
233
234        if self.is_closed() {
235            return Poll::Ready(None);
236        }
237
238        self.inner.waker.register(cx.waker());
239
240        if let Some(item) = self.pop() {
241            return Poll::Ready(Some(item));
242        }
243
244        if self.is_closed() {
245            return Poll::Ready(None);
246        }
247
248        Poll::Pending
249    }
250}
251
252impl<T, const N: usize> Future for PopFuture<'_, T, N> {
253    type Output = Option<T>;
254
255    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
256        self.consumer.poll_pop(cx)
257    }
258}
259
260#[cfg(feature = "futures-stream")]
261impl<T, const N: usize> futures_core::Stream for SpscConsumerStream<'_, T, N> {
262    type Item = T;
263
264    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
265        self.consumer.poll_pop(cx)
266    }
267}
268
269#[cfg(test)]
270mod tests {
271
272    use std::thread;
273    use std::time::{Duration, Instant};
274
275    use super::SpscRing;
276
277    #[test]
278    fn preserves_sequence_in_single_thread() {
279        let (producer, consumer) = SpscRing::<u32, 4>::new();
280
281        assert_eq!(producer.push(1), Ok(()));
282        assert_eq!(producer.push(2), Ok(()));
283        assert_eq!(producer.push(3), Ok(()));
284
285        assert_eq!(consumer.pop(), Some(1));
286        assert_eq!(consumer.pop(), Some(2));
287        assert_eq!(consumer.pop(), Some(3));
288        assert_eq!(consumer.pop(), None);
289    }
290
291    #[test]
292    fn overwrite_drops_oldest_item() {
293        let (producer, consumer) = SpscRing::<u32, 2>::new();
294
295        assert_eq!(producer.push_overwrite(10), None);
296        assert_eq!(producer.push_overwrite(20), None);
297        assert_eq!(producer.push_overwrite(30), Some(10));
298
299        assert_eq!(consumer.pop(), Some(20));
300        assert_eq!(consumer.pop(), Some(30));
301        assert_eq!(consumer.pop(), None);
302    }
303
304    #[test]
305    fn producer_calls_return_immediately_when_full() {
306        let (producer, _consumer) = SpscRing::<u64, 1>::new();
307        assert_eq!(producer.push(7), Ok(()));
308
309        let start = Instant::now();
310        let mut expected_drop = Some(7);
311        for value in 0..100_000 {
312            assert_eq!(producer.push(value), Err(value));
313            assert_eq!(producer.push_overwrite(value), expected_drop);
314            expected_drop = Some(value);
315        }
316
317        assert!(
318            start.elapsed() < Duration::from_secs(2),
319            "producer operations took too long while the ring stayed full"
320        );
321    }
322
323    #[test]
324    fn pop_async_drains_then_closes() {
325        let (producer, consumer) = SpscRing::<u32, 8>::new();
326        producer.push(1).unwrap();
327        producer.push(2).unwrap();
328        drop(producer);
329
330        assert_eq!(pollster::block_on(consumer.pop_async()), Some(1));
331        assert_eq!(pollster::block_on(consumer.pop_async()), Some(2));
332        assert_eq!(pollster::block_on(consumer.pop_async()), None);
333    }
334
335    #[test]
336    fn concurrent_producer_consumer_preserve_order() {
337        let (producer, consumer) = SpscRing::<u64, 1024>::new();
338        let producer_thread = thread::spawn(move || {
339            for expected in 0..50_000_u64 {
340                let mut item = expected;
341                loop {
342                    match producer.push(item) {
343                        Ok(()) => break,
344                        Err(returned) => {
345                            item = returned;
346                            std::hint::spin_loop();
347                        }
348                    }
349                }
350            }
351        });
352
353        for expected in 0..50_000_u64 {
354            let actual = pollster::block_on(consumer.pop_async());
355            assert_eq!(actual, Some(expected));
356        }
357        assert_eq!(pollster::block_on(consumer.pop_async()), None);
358
359        producer_thread.join().unwrap();
360    }
361
362    #[cfg(feature = "futures-stream")]
363    #[test]
364    fn stream_wrapper_yields_items() {
365        use futures_core::Stream;
366
367        let (producer, consumer) = SpscRing::<u32, 4>::new();
368        let mut stream = consumer.stream();
369
370        producer.push(11).unwrap();
371        drop(producer);
372
373        let first = pollster::block_on(poll_fn(|cx| Pin::new(&mut stream).poll_next(cx)));
374        let second = pollster::block_on(poll_fn(|cx| Pin::new(&mut stream).poll_next(cx)));
375
376        assert_eq!(first, Some(11));
377        assert_eq!(second, None);
378    }
379}