1use std::fmt;
25use std::future::Future;
26use std::marker::PhantomData;
27use std::pin::Pin;
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::sync::Arc;
30use std::task::{Context, Poll};
31
32use crossbeam_queue::ArrayQueue;
33use futures_util::task::AtomicWaker;
34
35struct Inner<T> {
36 queue: ArrayQueue<T>,
37 producer_closed: AtomicBool,
38 waker: AtomicWaker,
39}
40
41#[derive(Debug, Default)]
47pub struct SpscRing<T, const N: usize>(PhantomData<T>);
48
49pub struct SpscProducer<T, const N: usize> {
51 inner: Arc<Inner<T>>,
52}
53
54pub struct SpscConsumer<T, const N: usize> {
56 inner: Arc<Inner<T>>,
57}
58
59#[must_use = "futures do nothing unless awaited or polled"]
61pub struct PopFuture<'a, T, const N: usize> {
62 consumer: &'a SpscConsumer<T, N>,
63}
64
65#[cfg(feature = "futures-stream")]
67#[cfg_attr(docsrs, doc(cfg(feature = "futures-stream")))]
68#[must_use = "streams do nothing unless polled"]
69pub struct SpscConsumerStream<'a, T, const N: usize> {
70 consumer: &'a SpscConsumer<T, N>,
71}
72
73#[allow(clippy::new_ret_no_self)]
74impl<T, const N: usize> SpscRing<T, N> {
75 #[must_use]
81 pub fn new() -> (SpscProducer<T, N>, SpscConsumer<T, N>) {
82 Self::with_capacity(N)
83 }
84
85 #[must_use]
91 pub fn with_capacity(capacity: usize) -> (SpscProducer<T, N>, SpscConsumer<T, N>) {
92 assert!(N > 0, "SpscRing capacity must be > 0");
93 assert!(capacity > 0, "SpscRing capacity must be > 0");
94 assert!(
95 capacity <= N,
96 "SpscRing capacity {capacity} exceeds type maximum {N}"
97 );
98
99 let inner = Arc::new(Inner {
100 queue: ArrayQueue::new(capacity),
101 producer_closed: AtomicBool::new(false),
102 waker: AtomicWaker::new(),
103 });
104
105 (
106 SpscProducer {
107 inner: Arc::clone(&inner),
108 },
109 SpscConsumer { inner },
110 )
111 }
112}
113
114impl<T, const N: usize> fmt::Debug for SpscProducer<T, N> {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 f.debug_struct("SpscProducer")
117 .field("buffered", &self.buffered_count())
118 .field("capacity", &self.capacity())
119 .finish_non_exhaustive()
120 }
121}
122
123impl<T, const N: usize> fmt::Debug for SpscConsumer<T, N> {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 f.debug_struct("SpscConsumer")
126 .field("buffered", &self.buffered_count())
127 .field("capacity", &self.capacity())
128 .field("is_closed", &self.is_closed())
129 .finish_non_exhaustive()
130 }
131}
132
133impl<T, const N: usize> fmt::Debug for PopFuture<'_, T, N> {
134 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135 f.debug_struct("PopFuture").finish_non_exhaustive()
136 }
137}
138
139#[cfg(feature = "futures-stream")]
140impl<T, const N: usize> fmt::Debug for SpscConsumerStream<'_, T, N> {
141 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142 f.debug_struct("SpscConsumerStream").finish_non_exhaustive()
143 }
144}
145
146impl<T, const N: usize> SpscProducer<T, N> {
147 pub fn push(&self, item: T) -> Result<(), T> {
153 match self.inner.queue.push(item) {
154 Ok(()) => {
155 self.inner.waker.wake();
156 Ok(())
157 }
158 Err(item) => Err(item),
159 }
160 }
161
162 pub fn push_overwrite(&self, item: T) -> Option<T> {
167 let dropped = self.inner.queue.force_push(item);
168 self.inner.waker.wake();
169 dropped
170 }
171
172 #[must_use]
174 pub fn buffered_count(&self) -> usize {
175 self.inner.queue.len()
176 }
177
178 #[must_use]
180 pub fn capacity(&self) -> usize {
181 self.inner.queue.capacity()
182 }
183}
184
185impl<T, const N: usize> Drop for SpscProducer<T, N> {
186 fn drop(&mut self) {
187 self.inner.producer_closed.store(true, Ordering::Release);
188 self.inner.waker.wake();
189 }
190}
191
192impl<T, const N: usize> SpscConsumer<T, N> {
193 #[must_use]
195 pub fn pop(&self) -> Option<T> {
196 self.inner.queue.pop()
197 }
198
199 pub const fn pop_async(&self) -> PopFuture<'_, T, N> {
202 PopFuture { consumer: self }
203 }
204
205 #[must_use]
207 pub fn buffered_count(&self) -> usize {
208 self.inner.queue.len()
209 }
210
211 #[must_use]
213 pub fn capacity(&self) -> usize {
214 self.inner.queue.capacity()
215 }
216
217 #[must_use]
219 pub fn is_closed(&self) -> bool {
220 self.inner.producer_closed.load(Ordering::Acquire)
221 }
222
223 #[cfg(feature = "futures-stream")]
224 #[cfg_attr(docsrs, doc(cfg(feature = "futures-stream")))]
225 pub const fn stream(&self) -> SpscConsumerStream<'_, T, N> {
226 SpscConsumerStream { consumer: self }
227 }
228
229 fn poll_pop(&self, cx: &Context<'_>) -> Poll<Option<T>> {
230 if let Some(item) = self.pop() {
231 return Poll::Ready(Some(item));
232 }
233
234 if self.is_closed() {
235 return Poll::Ready(None);
236 }
237
238 self.inner.waker.register(cx.waker());
239
240 if let Some(item) = self.pop() {
241 return Poll::Ready(Some(item));
242 }
243
244 if self.is_closed() {
245 return Poll::Ready(None);
246 }
247
248 Poll::Pending
249 }
250}
251
252impl<T, const N: usize> Future for PopFuture<'_, T, N> {
253 type Output = Option<T>;
254
255 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
256 self.consumer.poll_pop(cx)
257 }
258}
259
260#[cfg(feature = "futures-stream")]
261impl<T, const N: usize> futures_core::Stream for SpscConsumerStream<'_, T, N> {
262 type Item = T;
263
264 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
265 self.consumer.poll_pop(cx)
266 }
267}
268
269#[cfg(test)]
270mod tests {
271
272 use std::thread;
273 use std::time::{Duration, Instant};
274
275 use super::SpscRing;
276
277 #[test]
278 fn preserves_sequence_in_single_thread() {
279 let (producer, consumer) = SpscRing::<u32, 4>::new();
280
281 assert_eq!(producer.push(1), Ok(()));
282 assert_eq!(producer.push(2), Ok(()));
283 assert_eq!(producer.push(3), Ok(()));
284
285 assert_eq!(consumer.pop(), Some(1));
286 assert_eq!(consumer.pop(), Some(2));
287 assert_eq!(consumer.pop(), Some(3));
288 assert_eq!(consumer.pop(), None);
289 }
290
291 #[test]
292 fn overwrite_drops_oldest_item() {
293 let (producer, consumer) = SpscRing::<u32, 2>::new();
294
295 assert_eq!(producer.push_overwrite(10), None);
296 assert_eq!(producer.push_overwrite(20), None);
297 assert_eq!(producer.push_overwrite(30), Some(10));
298
299 assert_eq!(consumer.pop(), Some(20));
300 assert_eq!(consumer.pop(), Some(30));
301 assert_eq!(consumer.pop(), None);
302 }
303
304 #[test]
305 fn producer_calls_return_immediately_when_full() {
306 let (producer, _consumer) = SpscRing::<u64, 1>::new();
307 assert_eq!(producer.push(7), Ok(()));
308
309 let start = Instant::now();
310 let mut expected_drop = Some(7);
311 for value in 0..100_000 {
312 assert_eq!(producer.push(value), Err(value));
313 assert_eq!(producer.push_overwrite(value), expected_drop);
314 expected_drop = Some(value);
315 }
316
317 assert!(
318 start.elapsed() < Duration::from_secs(2),
319 "producer operations took too long while the ring stayed full"
320 );
321 }
322
323 #[test]
324 fn pop_async_drains_then_closes() {
325 let (producer, consumer) = SpscRing::<u32, 8>::new();
326 producer.push(1).unwrap();
327 producer.push(2).unwrap();
328 drop(producer);
329
330 assert_eq!(pollster::block_on(consumer.pop_async()), Some(1));
331 assert_eq!(pollster::block_on(consumer.pop_async()), Some(2));
332 assert_eq!(pollster::block_on(consumer.pop_async()), None);
333 }
334
335 #[test]
336 fn concurrent_producer_consumer_preserve_order() {
337 let (producer, consumer) = SpscRing::<u64, 1024>::new();
338 let producer_thread = thread::spawn(move || {
339 for expected in 0..50_000_u64 {
340 let mut item = expected;
341 loop {
342 match producer.push(item) {
343 Ok(()) => break,
344 Err(returned) => {
345 item = returned;
346 std::hint::spin_loop();
347 }
348 }
349 }
350 }
351 });
352
353 for expected in 0..50_000_u64 {
354 let actual = pollster::block_on(consumer.pop_async());
355 assert_eq!(actual, Some(expected));
356 }
357 assert_eq!(pollster::block_on(consumer.pop_async()), None);
358
359 producer_thread.join().unwrap();
360 }
361
362 #[cfg(feature = "futures-stream")]
363 #[test]
364 fn stream_wrapper_yields_items() {
365 use futures_core::Stream;
366
367 let (producer, consumer) = SpscRing::<u32, 4>::new();
368 let mut stream = consumer.stream();
369
370 producer.push(11).unwrap();
371 drop(producer);
372
373 let first = pollster::block_on(poll_fn(|cx| Pin::new(&mut stream).poll_next(cx)));
374 let second = pollster::block_on(poll_fn(|cx| Pin::new(&mut stream).poll_next(cx)));
375
376 assert_eq!(first, Some(11));
377 assert_eq!(second, None);
378 }
379}