Skip to main content

doom_fish_utils/
spsc.rs

1//! Single-producer single-consumer lock-free bounded ring buffer.
2//!
3//! Designed for the `CoreAudio` render-thread → async-consumer producer-consumer
4//! pattern. The producer path never takes a mutex and never allocates after the
5//! ring has been constructed.
6//!
7//! Internally this wrapper uses a pre-allocated bounded queue plus an
8//! [`AtomicWaker`](futures_util::task::AtomicWaker) so the consumer can await
9//! the next item without forcing the producer to block.
10//!
11//! # Example
12//!
13//! ```no_run
14//! use doom_fish_utils::spsc::SpscRing;
15//!
16//! # async fn run() {
17//! let (producer, consumer) = SpscRing::<u32, 256>::new();
18//!
19//! producer.push(1).unwrap();
20//! assert_eq!(consumer.pop_async().await, Some(1));
21//! # }
22//! ```
23
24use std::fmt;
25use std::future::Future;
26use std::marker::PhantomData;
27use std::pin::Pin;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::sync::Arc;
30use std::task::{Context, Poll};
31
32use crossbeam_queue::ArrayQueue;
33use futures_util::task::AtomicWaker;
34
35struct Inner<T> {
36    queue: ArrayQueue<T>,
37    producer_closed: AtomicBool,
38    waker: AtomicWaker,
39}
40
41/// Constructor namespace for a bounded single-producer single-consumer ring.
42///
43/// `N` is the maximum supported capacity. Use [`Self::new`] to allocate a ring
44/// with exactly `N` slots, or [`Self::with_capacity`] to choose a smaller
45/// runtime capacity while keeping the type-level upper bound.
46#[derive(Debug, Default)]
47pub struct SpscRing<T, const N: usize>(PhantomData<T>);
48
49/// Producer half of an [`SpscRing`].
50pub struct SpscProducer<T, const N: usize> {
51    inner: Arc<Inner<T>>,
52}
53
54/// Consumer half of an [`SpscRing`].
55pub struct SpscConsumer<T, const N: usize> {
56    inner: Arc<Inner<T>>,
57}
58
59/// Future returned by [`SpscConsumer::pop_async`].
60#[must_use = "futures do nothing unless awaited or polled"]
61pub struct PopFuture<'a, T, const N: usize> {
62    consumer: &'a SpscConsumer<T, N>,
63}
64
65/// Feature-gated [`futures_core::Stream`] wrapper around an [`SpscConsumer`].
66#[cfg(feature = "futures-stream")]
67#[cfg_attr(docsrs, doc(cfg(feature = "futures-stream")))]
68#[must_use = "streams do nothing unless polled"]
69pub struct SpscConsumerStream<'a, T, const N: usize> {
70    consumer: &'a SpscConsumer<T, N>,
71}
72
73#[allow(clippy::new_ret_no_self)]
74impl<T, const N: usize> SpscRing<T, N> {
75    /// Creates a ring with capacity `N`.
76    ///
77    /// # Panics
78    ///
79    /// Panics if `N` is 0.
80    #[must_use]
81    pub fn new() -> (SpscProducer<T, N>, SpscConsumer<T, N>) {
82        Self::with_capacity(N)
83    }
84
85    /// Creates a ring with a runtime capacity up to the type-level maximum `N`.
86    ///
87    /// # Panics
88    ///
89    /// Panics if `capacity` is 0 or larger than `N`.
90    #[must_use]
91    pub fn with_capacity(capacity: usize) -> (SpscProducer<T, N>, SpscConsumer<T, N>) {
92        assert!(N > 0, "SpscRing capacity must be > 0");
93        assert!(capacity > 0, "SpscRing capacity must be > 0");
94        assert!(
95            capacity <= N,
96            "SpscRing capacity {capacity} exceeds type maximum {N}"
97        );
98
99        let inner = Arc::new(Inner {
100            queue: ArrayQueue::new(capacity),
101            producer_closed: AtomicBool::new(false),
102            waker: AtomicWaker::new(),
103        });
104
105        (
106            SpscProducer {
107                inner: Arc::clone(&inner),
108            },
109            SpscConsumer { inner },
110        )
111    }
112}
113
114impl<T, const N: usize> fmt::Debug for SpscProducer<T, N> {
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        f.debug_struct("SpscProducer")
117            .field("buffered", &self.buffered_count())
118            .field("capacity", &self.capacity())
119            .finish_non_exhaustive()
120    }
121}
122
123impl<T, const N: usize> fmt::Debug for SpscConsumer<T, N> {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        f.debug_struct("SpscConsumer")
126            .field("buffered", &self.buffered_count())
127            .field("capacity", &self.capacity())
128            .field("is_closed", &self.is_closed())
129            .finish_non_exhaustive()
130    }
131}
132
133impl<T, const N: usize> fmt::Debug for PopFuture<'_, T, N> {
134    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135        f.debug_struct("PopFuture").finish_non_exhaustive()
136    }
137}
138
139#[cfg(feature = "futures-stream")]
140impl<T, const N: usize> fmt::Debug for SpscConsumerStream<'_, T, N> {
141    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142        f.debug_struct("SpscConsumerStream").finish_non_exhaustive()
143    }
144}
145
146impl<T, const N: usize> SpscProducer<T, N> {
147    /// Attempts to push an item into the ring without blocking.
148    ///
149    /// # Errors
150    ///
151    /// Returns `Err(item)` if the ring is currently full.
152    pub fn push(&self, item: T) -> Result<(), T> {
153        match self.inner.queue.push(item) {
154            Ok(()) => {
155                self.inner.waker.wake();
156                Ok(())
157            }
158            Err(item) => Err(item),
159        }
160    }
161
162    /// Pushes an item into the ring, overwriting the oldest buffered entry if
163    /// necessary.
164    ///
165    /// Returns the displaced oldest item when an overwrite happens.
166    pub fn push_overwrite(&self, item: T) -> Option<T> {
167        let dropped = self.inner.queue.force_push(item);
168        self.inner.waker.wake();
169        dropped
170    }
171
172    /// Returns the current buffered item count.
173    #[must_use]
174    pub fn buffered_count(&self) -> usize {
175        self.inner.queue.len()
176    }
177
178    /// Returns the runtime capacity of the ring.
179    #[must_use]
180    pub fn capacity(&self) -> usize {
181        self.inner.queue.capacity()
182    }
183}
184
185impl<T, const N: usize> Drop for SpscProducer<T, N> {
186    fn drop(&mut self) {
187        self.inner.producer_closed.store(true, Ordering::Release);
188        self.inner.waker.wake();
189    }
190}
191
192impl<T, const N: usize> SpscConsumer<T, N> {
193    /// Attempts to pop the next buffered item without blocking.
194    #[must_use]
195    pub fn pop(&self) -> Option<T> {
196        self.inner.queue.pop()
197    }
198
199    /// Returns a future that resolves to the next buffered item, or `None` once
200    /// the producer has been dropped and the ring is empty.
201    pub const fn pop_async(&self) -> PopFuture<'_, T, N> {
202        PopFuture { consumer: self }
203    }
204
205    /// Returns the current buffered item count.
206    #[must_use]
207    pub fn buffered_count(&self) -> usize {
208        self.inner.queue.len()
209    }
210
211    /// Returns the runtime capacity of the ring.
212    #[must_use]
213    pub fn capacity(&self) -> usize {
214        self.inner.queue.capacity()
215    }
216
217    /// Returns `true` if the producer has been dropped.
218    #[must_use]
219    pub fn is_closed(&self) -> bool {
220        self.inner.producer_closed.load(Ordering::Acquire)
221    }
222
223    #[cfg(feature = "futures-stream")]
224    #[cfg_attr(docsrs, doc(cfg(feature = "futures-stream")))]
225    pub const fn stream(&self) -> SpscConsumerStream<'_, T, N> {
226        SpscConsumerStream { consumer: self }
227    }
228
229    fn poll_pop(&self, cx: &Context<'_>) -> Poll<Option<T>> {
230        if let Some(item) = self.pop() {
231            return Poll::Ready(Some(item));
232        }
233
234        if self.is_closed() {
235            return Poll::Ready(None);
236        }
237
238        self.inner.waker.register(cx.waker());
239
240        if let Some(item) = self.pop() {
241            return Poll::Ready(Some(item));
242        }
243
244        if self.is_closed() {
245            return Poll::Ready(None);
246        }
247
248        Poll::Pending
249    }
250}
251
252impl<T, const N: usize> Future for PopFuture<'_, T, N> {
253    type Output = Option<T>;
254
255    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
256        self.consumer.poll_pop(cx)
257    }
258}
259
260#[cfg(feature = "futures-stream")]
261impl<T, const N: usize> futures_core::Stream for SpscConsumerStream<'_, T, N> {
262    type Item = T;
263
264    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
265        self.consumer.poll_pop(cx)
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    
272    
273    use std::thread;
274    use std::time::{Duration, Instant};
275
276    use super::SpscRing;
277
278    #[test]
279    fn preserves_sequence_in_single_thread() {
280        let (producer, consumer) = SpscRing::<u32, 4>::new();
281
282        assert_eq!(producer.push(1), Ok(()));
283        assert_eq!(producer.push(2), Ok(()));
284        assert_eq!(producer.push(3), Ok(()));
285
286        assert_eq!(consumer.pop(), Some(1));
287        assert_eq!(consumer.pop(), Some(2));
288        assert_eq!(consumer.pop(), Some(3));
289        assert_eq!(consumer.pop(), None);
290    }
291
292    #[test]
293    fn overwrite_drops_oldest_item() {
294        let (producer, consumer) = SpscRing::<u32, 2>::new();
295
296        assert_eq!(producer.push_overwrite(10), None);
297        assert_eq!(producer.push_overwrite(20), None);
298        assert_eq!(producer.push_overwrite(30), Some(10));
299
300        assert_eq!(consumer.pop(), Some(20));
301        assert_eq!(consumer.pop(), Some(30));
302        assert_eq!(consumer.pop(), None);
303    }
304
305    #[test]
306    fn producer_calls_return_immediately_when_full() {
307        let (producer, _consumer) = SpscRing::<u64, 1>::new();
308        assert_eq!(producer.push(7), Ok(()));
309
310        let start = Instant::now();
311        let mut expected_drop = Some(7);
312        for value in 0..100_000 {
313            assert_eq!(producer.push(value), Err(value));
314            assert_eq!(producer.push_overwrite(value), expected_drop);
315            expected_drop = Some(value);
316        }
317
318        assert!(
319            start.elapsed() < Duration::from_secs(2),
320            "producer operations took too long while the ring stayed full"
321        );
322    }
323
324    #[test]
325    fn pop_async_drains_then_closes() {
326        let (producer, consumer) = SpscRing::<u32, 8>::new();
327        producer.push(1).unwrap();
328        producer.push(2).unwrap();
329        drop(producer);
330
331        assert_eq!(pollster::block_on(consumer.pop_async()), Some(1));
332        assert_eq!(pollster::block_on(consumer.pop_async()), Some(2));
333        assert_eq!(pollster::block_on(consumer.pop_async()), None);
334    }
335
336    #[test]
337    fn concurrent_producer_consumer_preserve_order() {
338        let (producer, consumer) = SpscRing::<u64, 1024>::new();
339        let producer_thread = thread::spawn(move || {
340            for expected in 0..50_000_u64 {
341                let mut item = expected;
342                loop {
343                    match producer.push(item) {
344                        Ok(()) => break,
345                        Err(returned) => {
346                            item = returned;
347                            std::hint::spin_loop();
348                        }
349                    }
350                }
351            }
352        });
353
354        for expected in 0..50_000_u64 {
355            let actual = pollster::block_on(consumer.pop_async());
356            assert_eq!(actual, Some(expected));
357        }
358        assert_eq!(pollster::block_on(consumer.pop_async()), None);
359
360        producer_thread.join().unwrap();
361    }
362
363    #[cfg(feature = "futures-stream")]
364    #[test]
365    fn stream_wrapper_yields_items() {
366        use futures_core::Stream;
367
368        let (producer, consumer) = SpscRing::<u32, 4>::new();
369        let mut stream = consumer.stream();
370
371        producer.push(11).unwrap();
372        drop(producer);
373
374        let first = pollster::block_on(poll_fn(|cx| Pin::new(&mut stream).poll_next(cx)));
375        let second = pollster::block_on(poll_fn(|cx| Pin::new(&mut stream).poll_next(cx)));
376
377        assert_eq!(first, Some(11));
378        assert_eq!(second, None);
379    }
380}