1use std::fmt;
25use std::future::Future;
26use std::marker::PhantomData;
27use std::pin::Pin;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::sync::Arc;
30use std::task::{Context, Poll};
31
32use crossbeam_queue::ArrayQueue;
33use futures_util::task::AtomicWaker;
34
35struct Inner<T> {
36 queue: ArrayQueue<T>,
37 producer_closed: AtomicBool,
38 waker: AtomicWaker,
39}
40
41#[derive(Debug, Default)]
47pub struct SpscRing<T, const N: usize>(PhantomData<T>);
48
49pub struct SpscProducer<T, const N: usize> {
51 inner: Arc<Inner<T>>,
52}
53
54pub struct SpscConsumer<T, const N: usize> {
56 inner: Arc<Inner<T>>,
57}
58
59#[must_use = "futures do nothing unless awaited or polled"]
61pub struct PopFuture<'a, T, const N: usize> {
62 consumer: &'a SpscConsumer<T, N>,
63}
64
65#[cfg(feature = "futures-stream")]
67#[cfg_attr(docsrs, doc(cfg(feature = "futures-stream")))]
68#[must_use = "streams do nothing unless polled"]
69pub struct SpscConsumerStream<'a, T, const N: usize> {
70 consumer: &'a SpscConsumer<T, N>,
71}
72
73#[allow(clippy::new_ret_no_self)]
74impl<T, const N: usize> SpscRing<T, N> {
75 #[must_use]
81 pub fn new() -> (SpscProducer<T, N>, SpscConsumer<T, N>) {
82 Self::with_capacity(N)
83 }
84
85 #[must_use]
91 pub fn with_capacity(capacity: usize) -> (SpscProducer<T, N>, SpscConsumer<T, N>) {
92 assert!(N > 0, "SpscRing capacity must be > 0");
93 assert!(capacity > 0, "SpscRing capacity must be > 0");
94 assert!(
95 capacity <= N,
96 "SpscRing capacity {capacity} exceeds type maximum {N}"
97 );
98
99 let inner = Arc::new(Inner {
100 queue: ArrayQueue::new(capacity),
101 producer_closed: AtomicBool::new(false),
102 waker: AtomicWaker::new(),
103 });
104
105 (
106 SpscProducer {
107 inner: Arc::clone(&inner),
108 },
109 SpscConsumer { inner },
110 )
111 }
112}
113
114impl<T, const N: usize> fmt::Debug for SpscProducer<T, N> {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 f.debug_struct("SpscProducer")
117 .field("buffered", &self.buffered_count())
118 .field("capacity", &self.capacity())
119 .finish_non_exhaustive()
120 }
121}
122
123impl<T, const N: usize> fmt::Debug for SpscConsumer<T, N> {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 f.debug_struct("SpscConsumer")
126 .field("buffered", &self.buffered_count())
127 .field("capacity", &self.capacity())
128 .field("is_closed", &self.is_closed())
129 .finish_non_exhaustive()
130 }
131}
132
133impl<T, const N: usize> fmt::Debug for PopFuture<'_, T, N> {
134 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135 f.debug_struct("PopFuture").finish_non_exhaustive()
136 }
137}
138
139#[cfg(feature = "futures-stream")]
140impl<T, const N: usize> fmt::Debug for SpscConsumerStream<'_, T, N> {
141 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142 f.debug_struct("SpscConsumerStream").finish_non_exhaustive()
143 }
144}
145
146impl<T, const N: usize> SpscProducer<T, N> {
147 pub fn push(&self, item: T) -> Result<(), T> {
153 match self.inner.queue.push(item) {
154 Ok(()) => {
155 self.inner.waker.wake();
156 Ok(())
157 }
158 Err(item) => Err(item),
159 }
160 }
161
162 pub fn push_overwrite(&self, item: T) -> Option<T> {
167 let dropped = self.inner.queue.force_push(item);
168 self.inner.waker.wake();
169 dropped
170 }
171
172 #[must_use]
174 pub fn buffered_count(&self) -> usize {
175 self.inner.queue.len()
176 }
177
178 #[must_use]
180 pub fn capacity(&self) -> usize {
181 self.inner.queue.capacity()
182 }
183}
184
185impl<T, const N: usize> Drop for SpscProducer<T, N> {
186 fn drop(&mut self) {
187 self.inner.producer_closed.store(true, Ordering::Release);
188 self.inner.waker.wake();
189 }
190}
191
192impl<T, const N: usize> SpscConsumer<T, N> {
193 #[must_use]
195 pub fn pop(&self) -> Option<T> {
196 self.inner.queue.pop()
197 }
198
199 pub const fn pop_async(&self) -> PopFuture<'_, T, N> {
202 PopFuture { consumer: self }
203 }
204
205 #[must_use]
207 pub fn buffered_count(&self) -> usize {
208 self.inner.queue.len()
209 }
210
211 #[must_use]
213 pub fn capacity(&self) -> usize {
214 self.inner.queue.capacity()
215 }
216
217 #[must_use]
219 pub fn is_closed(&self) -> bool {
220 self.inner.producer_closed.load(Ordering::Acquire)
221 }
222
223 #[cfg(feature = "futures-stream")]
224 #[cfg_attr(docsrs, doc(cfg(feature = "futures-stream")))]
225 pub const fn stream(&self) -> SpscConsumerStream<'_, T, N> {
226 SpscConsumerStream { consumer: self }
227 }
228
229 fn poll_pop(&self, cx: &Context<'_>) -> Poll<Option<T>> {
230 if let Some(item) = self.pop() {
231 return Poll::Ready(Some(item));
232 }
233
234 if self.is_closed() {
235 return Poll::Ready(None);
236 }
237
238 self.inner.waker.register(cx.waker());
239
240 if let Some(item) = self.pop() {
241 return Poll::Ready(Some(item));
242 }
243
244 if self.is_closed() {
245 return Poll::Ready(None);
246 }
247
248 Poll::Pending
249 }
250}
251
252impl<T, const N: usize> Future for PopFuture<'_, T, N> {
253 type Output = Option<T>;
254
255 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
256 self.consumer.poll_pop(cx)
257 }
258}
259
260#[cfg(feature = "futures-stream")]
261impl<T, const N: usize> futures_core::Stream for SpscConsumerStream<'_, T, N> {
262 type Item = T;
263
264 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
265 self.consumer.poll_pop(cx)
266 }
267}
268
269#[cfg(test)]
270mod tests {
271
272
273 use std::thread;
274 use std::time::{Duration, Instant};
275
276 use super::SpscRing;
277
278 #[test]
279 fn preserves_sequence_in_single_thread() {
280 let (producer, consumer) = SpscRing::<u32, 4>::new();
281
282 assert_eq!(producer.push(1), Ok(()));
283 assert_eq!(producer.push(2), Ok(()));
284 assert_eq!(producer.push(3), Ok(()));
285
286 assert_eq!(consumer.pop(), Some(1));
287 assert_eq!(consumer.pop(), Some(2));
288 assert_eq!(consumer.pop(), Some(3));
289 assert_eq!(consumer.pop(), None);
290 }
291
292 #[test]
293 fn overwrite_drops_oldest_item() {
294 let (producer, consumer) = SpscRing::<u32, 2>::new();
295
296 assert_eq!(producer.push_overwrite(10), None);
297 assert_eq!(producer.push_overwrite(20), None);
298 assert_eq!(producer.push_overwrite(30), Some(10));
299
300 assert_eq!(consumer.pop(), Some(20));
301 assert_eq!(consumer.pop(), Some(30));
302 assert_eq!(consumer.pop(), None);
303 }
304
305 #[test]
306 fn producer_calls_return_immediately_when_full() {
307 let (producer, _consumer) = SpscRing::<u64, 1>::new();
308 assert_eq!(producer.push(7), Ok(()));
309
310 let start = Instant::now();
311 let mut expected_drop = Some(7);
312 for value in 0..100_000 {
313 assert_eq!(producer.push(value), Err(value));
314 assert_eq!(producer.push_overwrite(value), expected_drop);
315 expected_drop = Some(value);
316 }
317
318 assert!(
319 start.elapsed() < Duration::from_secs(2),
320 "producer operations took too long while the ring stayed full"
321 );
322 }
323
324 #[test]
325 fn pop_async_drains_then_closes() {
326 let (producer, consumer) = SpscRing::<u32, 8>::new();
327 producer.push(1).unwrap();
328 producer.push(2).unwrap();
329 drop(producer);
330
331 assert_eq!(pollster::block_on(consumer.pop_async()), Some(1));
332 assert_eq!(pollster::block_on(consumer.pop_async()), Some(2));
333 assert_eq!(pollster::block_on(consumer.pop_async()), None);
334 }
335
336 #[test]
337 fn concurrent_producer_consumer_preserve_order() {
338 let (producer, consumer) = SpscRing::<u64, 1024>::new();
339 let producer_thread = thread::spawn(move || {
340 for expected in 0..50_000_u64 {
341 let mut item = expected;
342 loop {
343 match producer.push(item) {
344 Ok(()) => break,
345 Err(returned) => {
346 item = returned;
347 std::hint::spin_loop();
348 }
349 }
350 }
351 }
352 });
353
354 for expected in 0..50_000_u64 {
355 let actual = pollster::block_on(consumer.pop_async());
356 assert_eq!(actual, Some(expected));
357 }
358 assert_eq!(pollster::block_on(consumer.pop_async()), None);
359
360 producer_thread.join().unwrap();
361 }
362
363 #[cfg(feature = "futures-stream")]
364 #[test]
365 fn stream_wrapper_yields_items() {
366 use futures_core::Stream;
367
368 let (producer, consumer) = SpscRing::<u32, 4>::new();
369 let mut stream = consumer.stream();
370
371 producer.push(11).unwrap();
372 drop(producer);
373
374 let first = pollster::block_on(poll_fn(|cx| Pin::new(&mut stream).poll_next(cx)));
375 let second = pollster::block_on(poll_fn(|cx| Pin::new(&mut stream).poll_next(cx)));
376
377 assert_eq!(first, Some(11));
378 assert_eq!(second, None);
379 }
380}