Skip to main content

nexus_queue/
spmc.rs

1//! Single-producer multi-consumer bounded queue.
2//!
3//! A lock-free ring buffer optimized for one producer thread fanning out to
4//! multiple consumer threads. Uses Vyukov-style turn counters with CAS-based
5//! head claiming for consumers.
6//!
7//! # Design
8//!
9//! ```text
10//! ┌─────────────────────────────────────────────────────────────────┐
11//! │ Shared (Arc):                                                   │
12//! │   head: CachePadded<AtomicUsize>   ← Consumers CAS here         │
13//! │   tail: CachePadded<AtomicUsize>   ← Producer publishes here    │
14//! │   producer_alive: AtomicBool       ← Disconnection detection    │
15//! │   slots: *mut Slot<T>              ← Per-slot turn counters     │
16//! └─────────────────────────────────────────────────────────────────┘
17//!
18//! ┌─────────────────────┐     ┌─────────────────────┐
19//! │ Producer (!Clone):  │     │ Consumer (Clone):    │
20//! │   local_tail        │     │   shared: Arc        │
21//! │   shared: Arc       │     └─────────────────────┘
22//! └─────────────────────┘
23//! ```
24//!
25//! The producer writes directly (no CAS) since it's the sole writer. Consumers
26//! compete via CAS on the head index to claim slots. After claiming, the consumer
27//! reads the data and advances the turn counter for the next producer lap.
28//!
29//! # Turn Counter Protocol
30//!
31//! For slot at index `i` on lap `turn`:
32//! - `turn * 2`: Slot is ready for producer to write
33//! - `turn * 2 + 1`: Slot contains data, ready for consumer
34//!
35//! # Disconnection
36//!
37//! Unlike MPSC where `Arc::strong_count == 1` detects disconnection on both
38//! sides, SPMC consumers hold Arc refs to each other. An `AtomicBool` tracks
39//! whether the producer is alive so consumers can detect disconnection.
40//!
41//! # Example
42//!
43//! ```
44//! use nexus_queue::spmc;
45//! use std::thread;
46//!
47//! let (tx, rx) = spmc::ring_buffer::<u64>(1024);
48//!
49//! let rx2 = rx.clone();
50//! let rx1 = rx;
51//! let h1 = thread::spawn(move || {
52//!     let mut received = Vec::new();
53//!     loop {
54//!         if let Some(v) = rx1.pop() {
55//!             received.push(v);
56//!         } else if rx1.is_disconnected() {
57//!             while let Some(v) = rx1.pop() { received.push(v); }
58//!             break;
59//!         } else {
60//!             std::hint::spin_loop();
61//!         }
62//!     }
63//!     received
64//! });
65//! let h2 = thread::spawn(move || {
66//!     let mut received = Vec::new();
67//!     loop {
68//!         if let Some(v) = rx2.pop() {
69//!             received.push(v);
70//!         } else if rx2.is_disconnected() {
71//!             while let Some(v) = rx2.pop() { received.push(v); }
72//!             break;
73//!         } else {
74//!             std::hint::spin_loop();
75//!         }
76//!     }
77//!     received
78//! });
79//!
80//! for i in 0..200 {
81//!     while tx.push(i).is_err() { std::hint::spin_loop(); }
82//! }
83//! drop(tx);
84//!
85//! let mut all: Vec<_> = h1.join().unwrap();
86//! all.extend(h2.join().unwrap());
87//! all.sort();
88//! assert_eq!(all, (0..200).collect::<Vec<_>>());
89//! ```
90
91use std::cell::{Cell, UnsafeCell};
92use std::fmt;
93use std::mem::MaybeUninit;
94use std::sync::Arc;
95use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
96
97use crossbeam_utils::CachePadded;
98
99use crate::Full;
100
101/// Creates a bounded SPMC ring buffer. Renamed to [`ring_buffer`].
102#[deprecated(since = "1.3.0", note = "renamed to ring_buffer()")]
103#[inline]
104pub fn bounded<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
105    ring_buffer(capacity)
106}
107
108/// Creates a bounded SPMC queue with the given capacity.
109///
110/// Capacity is rounded up to the next power of two.
111///
112/// # Panics
113///
114/// Panics if `capacity` is zero or too large to round to a power of two.
115pub fn ring_buffer<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
116    assert!(capacity > 0, "capacity must be non-zero");
117
118    let capacity = capacity
119        .checked_next_power_of_two()
120        .expect("capacity too large (must be <= usize::MAX / 2)");
121    let mask = capacity - 1;
122
123    // Allocate slots with turn counters initialized to 0 (ready for turn 0 producer)
124    let slots: Vec<Slot<T>> = (0..capacity)
125        .map(|_| Slot {
126            turn: AtomicUsize::new(0),
127            data: UnsafeCell::new(MaybeUninit::uninit()),
128        })
129        .collect();
130    let slots = Box::into_raw(slots.into_boxed_slice()) as *mut Slot<T>;
131
132    let shift = capacity.trailing_zeros();
133
134    let shared = Arc::new(Shared {
135        head: CachePadded::new(AtomicUsize::new(0)),
136        tail: CachePadded::new(AtomicUsize::new(0)),
137        producer_alive: AtomicBool::new(true),
138        slots,
139        capacity,
140        shift,
141        mask,
142    });
143
144    (
145        Producer {
146            local_tail: Cell::new(0),
147            slots,
148            mask,
149            shift,
150            shared: Arc::clone(&shared),
151        },
152        Consumer {
153            slots,
154            mask,
155            shift,
156            shared,
157        },
158    )
159}
160
161/// A slot in the ring buffer with turn-based synchronization.
162struct Slot<T> {
163    /// Turn counter for Vyukov-style synchronization.
164    /// - `turn * 2`: ready for producer
165    /// - `turn * 2 + 1`: ready for consumer
166    turn: AtomicUsize,
167    /// The data stored in this slot.
168    data: UnsafeCell<MaybeUninit<T>>,
169}
170
171/// Shared state between the producer and consumers.
172// repr(C): Guarantees field order for cache line layout.
173#[repr(C)]
174struct Shared<T> {
175    /// Head index - consumers CAS on this to claim slots.
176    head: CachePadded<AtomicUsize>,
177    /// Tail index - written by producer on drop for Shared::drop cleanup.
178    tail: CachePadded<AtomicUsize>,
179    /// Whether the producer is still alive (for consumer disconnection detection).
180    producer_alive: AtomicBool,
181    /// Pointer to the slot array.
182    slots: *mut Slot<T>,
183    /// Actual capacity (power of two).
184    capacity: usize,
185    /// Shift for fast division by capacity (log2(capacity)).
186    shift: u32,
187    /// Mask for fast modulo (capacity - 1).
188    mask: usize,
189}
190
191// SAFETY: Shared contains atomics and raw pointers. Access is synchronized via
192// the turn counters. T: Send ensures data can move between threads.
193unsafe impl<T: Send> Send for Shared<T> {}
194unsafe impl<T: Send> Sync for Shared<T> {}
195
196impl<T> Drop for Shared<T> {
197    fn drop(&mut self) {
198        let head = self.head.load(Ordering::Relaxed);
199        let tail = self.tail.load(Ordering::Relaxed);
200
201        // Drop any remaining elements in the queue
202        let mut i = head;
203        while i != tail {
204            let slot = unsafe { &*self.slots.add(i & self.mask) };
205            let turn = i >> self.shift;
206
207            // Only drop if the slot was actually written (turn is odd = consumer-ready)
208            if slot.turn.load(Ordering::Relaxed) == turn * 2 + 1 {
209                // SAFETY: Slot contains initialized data at this turn.
210                unsafe { (*slot.data.get()).assume_init_drop() };
211            }
212            i = i.wrapping_add(1);
213        }
214
215        // SAFETY: slots was allocated via Box::into_raw from a Vec.
216        unsafe {
217            let _ = Box::from_raw(std::ptr::slice_from_raw_parts_mut(
218                self.slots,
219                self.capacity,
220            ));
221        }
222    }
223}
224
225/// The producer endpoint of an SPMC queue.
226///
227/// This endpoint cannot be cloned - only one producer thread is allowed.
228/// The single-writer design eliminates CAS contention on the tail index.
229// repr(C): Hot fields at struct base share cache line with struct pointer.
230#[repr(C)]
231pub struct Producer<T> {
232    /// Local tail index - only this thread reads/writes.
233    local_tail: Cell<usize>,
234    /// Cached slots pointer (avoids Arc deref on hot path).
235    slots: *mut Slot<T>,
236    /// Cached mask (avoids Arc deref on hot path).
237    mask: usize,
238    /// Cached shift for fast division (log2(capacity)).
239    shift: u32,
240    shared: Arc<Shared<T>>,
241}
242
243// SAFETY: Producer can be sent to another thread. It has exclusive write access
244// to slots (via turn protocol) and maintains the tail index.
245unsafe impl<T: Send> Send for Producer<T> {}
246
247impl<T> Producer<T> {
248    /// Pushes a value into the queue.
249    ///
250    /// Returns `Err(Full(value))` if the queue is full, returning ownership
251    /// of the value to the caller for backpressure handling.
252    ///
253    /// No CAS required - single writer principle.
254    #[inline]
255    #[must_use = "push returns Err if full, which should be handled"]
256    pub fn push(&self, value: T) -> Result<(), Full<T>> {
257        let tail = self.local_tail.get();
258        // SAFETY: slots pointer is valid for the lifetime of shared.
259        let slot = unsafe { &*self.slots.add(tail & self.mask) };
260        let turn = tail >> self.shift;
261
262        // Check if slot is ready (consumer has freed it).
263        if slot.turn.load(Ordering::Acquire) != turn * 2 {
264            return Err(Full(value));
265        }
266
267        // SAFETY: Turn counter confirms slot is free for this lap.
268        unsafe { (*slot.data.get()).write(value) };
269
270        // Signal ready for consumer: turn * 2 + 1
271        slot.turn.store(turn * 2 + 1, Ordering::Release);
272
273        self.local_tail.set(tail.wrapping_add(1));
274
275        Ok(())
276    }
277
278    /// Returns the capacity of the queue.
279    #[inline]
280    pub fn capacity(&self) -> usize {
281        1 << self.shift
282    }
283
284    /// Returns `true` if all consumers have been dropped.
285    #[inline]
286    pub fn is_disconnected(&self) -> bool {
287        Arc::strong_count(&self.shared) == 1
288    }
289}
290
291impl<T> Drop for Producer<T> {
292    fn drop(&mut self) {
293        // Publish final tail for Shared::drop cleanup
294        self.shared
295            .tail
296            .store(self.local_tail.get(), Ordering::Relaxed);
297        self.shared.producer_alive.store(false, Ordering::Release);
298    }
299}
300
301impl<T> fmt::Debug for Producer<T> {
302    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303        f.debug_struct("Producer")
304            .field("capacity", &self.capacity())
305            .finish_non_exhaustive()
306    }
307}
308
309/// The consumer endpoint of an SPMC queue.
310///
311/// This endpoint can be cloned to create additional consumers. Each clone
312/// competes via CAS on the shared head index.
313// repr(C): Hot fields at struct base share cache line with struct pointer.
314#[repr(C)]
315pub struct Consumer<T> {
316    /// Cached slots pointer (avoids Arc deref on hot path).
317    slots: *mut Slot<T>,
318    /// Cached mask (avoids Arc deref on hot path).
319    mask: usize,
320    /// Cached shift for fast division (log2(capacity)).
321    shift: u32,
322    shared: Arc<Shared<T>>,
323}
324
325impl<T> Clone for Consumer<T> {
326    fn clone(&self) -> Self {
327        Consumer {
328            slots: self.slots,
329            mask: self.mask,
330            shift: self.shift,
331            shared: Arc::clone(&self.shared),
332        }
333    }
334}
335
336// SAFETY: Consumer can be sent to another thread. Each Consumer instance is
337// used by one thread (not Sync - use clone() for multiple threads).
338unsafe impl<T: Send> Send for Consumer<T> {}
339
340impl<T> Consumer<T> {
341    /// Pops a value from the queue.
342    ///
343    /// Returns `None` if the queue is empty.
344    ///
345    /// This method spins internally on CAS contention but returns immediately
346    /// when the queue is actually empty.
347    #[inline]
348    pub fn pop(&self) -> Option<T> {
349        let mut spin_count = 0u32;
350
351        loop {
352            let head = self.shared.head.load(Ordering::Relaxed);
353
354            // SAFETY: slots pointer is valid for the lifetime of shared.
355            let slot = unsafe { &*self.slots.add(head & self.mask) };
356            let turn = head >> self.shift;
357
358            let stamp = slot.turn.load(Ordering::Acquire);
359
360            if stamp == turn * 2 + 1 {
361                // Slot has data - try to claim it
362                if self
363                    .shared
364                    .head
365                    .compare_exchange_weak(
366                        head,
367                        head.wrapping_add(1),
368                        Ordering::Relaxed,
369                        Ordering::Relaxed,
370                    )
371                    .is_ok()
372                {
373                    // SAFETY: We own this slot via successful CAS.
374                    let value = unsafe { (*slot.data.get()).assume_init_read() };
375
376                    // Signal slot is free for next lap: (turn + 1) * 2
377                    slot.turn.store((turn + 1) * 2, Ordering::Release);
378
379                    return Some(value);
380                }
381
382                // CAS failed - another consumer claimed it, retry with backoff
383                let spins = 1 << spin_count.min(6);
384                for _ in 0..spins {
385                    std::hint::spin_loop();
386                }
387                spin_count += 1;
388            } else {
389                // Slot not ready - queue is empty
390                return None;
391            }
392        }
393    }
394
395    /// Returns the capacity of the queue.
396    #[inline]
397    pub fn capacity(&self) -> usize {
398        1 << self.shift
399    }
400
401    /// Returns `true` if the producer has been dropped.
402    #[inline]
403    pub fn is_disconnected(&self) -> bool {
404        !self.shared.producer_alive.load(Ordering::Acquire)
405    }
406}
407
408impl<T> fmt::Debug for Consumer<T> {
409    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
410        f.debug_struct("Consumer")
411            .field("capacity", &self.capacity())
412            .finish_non_exhaustive()
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419
420    // ============================================================================
421    // Basic Operations
422    // ============================================================================
423
424    #[test]
425    fn basic_push_pop() {
426        let (tx, rx) = ring_buffer::<u64>(4);
427
428        assert!(tx.push(1).is_ok());
429        assert!(tx.push(2).is_ok());
430        assert!(tx.push(3).is_ok());
431
432        assert_eq!(rx.pop(), Some(1));
433        assert_eq!(rx.pop(), Some(2));
434        assert_eq!(rx.pop(), Some(3));
435        assert_eq!(rx.pop(), None);
436    }
437
438    #[test]
439    fn empty_pop_returns_none() {
440        let (_, rx) = ring_buffer::<u64>(4);
441        assert_eq!(rx.pop(), None);
442        assert_eq!(rx.pop(), None);
443    }
444
445    #[test]
446    fn fill_then_drain() {
447        let (tx, rx) = ring_buffer::<u64>(4);
448
449        for i in 0..4 {
450            assert!(tx.push(i).is_ok());
451        }
452
453        for i in 0..4 {
454            assert_eq!(rx.pop(), Some(i));
455        }
456
457        assert_eq!(rx.pop(), None);
458    }
459
460    #[test]
461    fn push_returns_error_when_full() {
462        let (tx, _rx) = ring_buffer::<u64>(4);
463
464        assert!(tx.push(1).is_ok());
465        assert!(tx.push(2).is_ok());
466        assert!(tx.push(3).is_ok());
467        assert!(tx.push(4).is_ok());
468
469        let err = tx.push(5).unwrap_err();
470        assert_eq!(err.into_inner(), 5);
471    }
472
473    // ============================================================================
474    // Interleaved Operations
475    // ============================================================================
476
477    #[test]
478    fn interleaved_single_consumer() {
479        let (tx, rx) = ring_buffer::<u64>(8);
480
481        for i in 0..1000 {
482            assert!(tx.push(i).is_ok());
483            assert_eq!(rx.pop(), Some(i));
484        }
485    }
486
487    #[test]
488    fn partial_fill_drain_cycles() {
489        let (tx, rx) = ring_buffer::<u64>(8);
490
491        for round in 0..100 {
492            for i in 0..4 {
493                assert!(tx.push(round * 4 + i).is_ok());
494            }
495
496            for i in 0..4 {
497                assert_eq!(rx.pop(), Some(round * 4 + i));
498            }
499        }
500    }
501
502    // ============================================================================
503    // Multiple Consumers
504    // ============================================================================
505
506    #[test]
507    fn two_consumers_single_producer() {
508        use std::thread;
509
510        let (tx, rx) = ring_buffer::<u64>(64);
511        let rx2 = rx.clone();
512
513        let rx1 = rx;
514        let h1 = thread::spawn(move || {
515            let mut received = Vec::new();
516            loop {
517                if let Some(val) = rx1.pop() {
518                    received.push(val);
519                } else if rx1.is_disconnected() {
520                    while let Some(val) = rx1.pop() {
521                        received.push(val);
522                    }
523                    break;
524                } else {
525                    std::hint::spin_loop();
526                }
527            }
528            received
529        });
530
531        let h2 = thread::spawn(move || {
532            let mut received = Vec::new();
533            loop {
534                if let Some(val) = rx2.pop() {
535                    received.push(val);
536                } else if rx2.is_disconnected() {
537                    while let Some(val) = rx2.pop() {
538                        received.push(val);
539                    }
540                    break;
541                } else {
542                    std::hint::spin_loop();
543                }
544            }
545            received
546        });
547
548        for i in 0..2000 {
549            while tx.push(i).is_err() {
550                std::hint::spin_loop();
551            }
552        }
553        drop(tx);
554
555        let mut received = h1.join().unwrap();
556        received.extend(h2.join().unwrap());
557
558        // All values received (order not guaranteed across consumers)
559        received.sort_unstable();
560        assert_eq!(received, (0..2000).collect::<Vec<_>>());
561    }
562
563    #[test]
564    fn four_consumers_single_producer() {
565        use std::thread;
566
567        let (tx, rx) = ring_buffer::<u64>(256);
568
569        let handles: Vec<_> = (0..4)
570            .map(|_| {
571                let rx = rx.clone();
572                thread::spawn(move || {
573                    let mut received = Vec::new();
574                    loop {
575                        if let Some(val) = rx.pop() {
576                            received.push(val);
577                        } else if rx.is_disconnected() {
578                            while let Some(val) = rx.pop() {
579                                received.push(val);
580                            }
581                            break;
582                        } else {
583                            std::hint::spin_loop();
584                        }
585                    }
586                    received
587                })
588            })
589            .collect();
590
591        drop(rx); // Drop original consumer
592
593        for i in 0..4000u64 {
594            while tx.push(i).is_err() {
595                std::hint::spin_loop();
596            }
597        }
598        drop(tx);
599
600        let mut received = Vec::new();
601        for h in handles {
602            received.extend(h.join().unwrap());
603        }
604
605        received.sort_unstable();
606        assert_eq!(received, (0..4000).collect::<Vec<_>>());
607    }
608
609    // ============================================================================
610    // Single Slot
611    // ============================================================================
612
613    #[test]
614    fn single_slot_bounded() {
615        let (tx, rx) = ring_buffer::<u64>(1);
616
617        assert!(tx.push(1).is_ok());
618        assert!(tx.push(2).is_err());
619
620        assert_eq!(rx.pop(), Some(1));
621        assert!(tx.push(2).is_ok());
622    }
623
624    // ============================================================================
625    // Disconnection
626    // ============================================================================
627
628    #[test]
629    fn consumer_detects_producer_drop() {
630        let (tx, rx) = ring_buffer::<u64>(4);
631
632        assert!(!rx.is_disconnected());
633        drop(tx);
634        assert!(rx.is_disconnected());
635    }
636
637    #[test]
638    fn producer_detects_all_consumers_drop() {
639        let (tx, rx) = ring_buffer::<u64>(4);
640
641        assert!(!tx.is_disconnected());
642        drop(rx);
643        assert!(tx.is_disconnected());
644    }
645
646    #[test]
647    fn one_consumer_drops_others_alive() {
648        let (tx, rx) = ring_buffer::<u64>(4);
649        let rx2 = rx.clone();
650
651        assert!(!tx.is_disconnected());
652        drop(rx);
653        assert!(!tx.is_disconnected()); // rx2 still alive
654        assert!(!rx2.is_disconnected()); // producer still alive
655        drop(rx2);
656        assert!(tx.is_disconnected());
657    }
658
659    // ============================================================================
660    // Drop Behavior
661    // ============================================================================
662
663    #[test]
664    fn drop_cleans_up_remaining() {
665        use std::sync::atomic::AtomicUsize;
666
667        static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
668
669        struct DropCounter;
670        impl Drop for DropCounter {
671            fn drop(&mut self) {
672                DROP_COUNT.fetch_add(1, Ordering::SeqCst);
673            }
674        }
675
676        DROP_COUNT.store(0, Ordering::SeqCst);
677
678        let (tx, rx) = ring_buffer::<DropCounter>(4);
679
680        let _ = tx.push(DropCounter);
681        let _ = tx.push(DropCounter);
682        let _ = tx.push(DropCounter);
683
684        assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 0);
685
686        drop(tx);
687        drop(rx);
688
689        assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 3);
690    }
691
692    // ============================================================================
693    // Special Types
694    // ============================================================================
695
696    #[test]
697    fn zero_sized_type() {
698        let (tx, rx) = ring_buffer::<()>(8);
699
700        let _ = tx.push(());
701        let _ = tx.push(());
702
703        assert_eq!(rx.pop(), Some(()));
704        assert_eq!(rx.pop(), Some(()));
705        assert_eq!(rx.pop(), None);
706    }
707
708    #[test]
709    fn string_type() {
710        let (tx, rx) = ring_buffer::<String>(4);
711
712        let _ = tx.push("hello".to_string());
713        let _ = tx.push("world".to_string());
714
715        assert_eq!(rx.pop(), Some("hello".to_string()));
716        assert_eq!(rx.pop(), Some("world".to_string()));
717    }
718
719    #[test]
720    #[should_panic(expected = "capacity must be non-zero")]
721    fn zero_capacity_panics() {
722        let _ = ring_buffer::<u64>(0);
723    }
724
725    #[test]
726    fn large_message_type() {
727        #[repr(C, align(64))]
728        struct LargeMessage {
729            data: [u8; 256],
730        }
731
732        let (tx, rx) = ring_buffer::<LargeMessage>(8);
733
734        let msg = LargeMessage { data: [42u8; 256] };
735        assert!(tx.push(msg).is_ok());
736
737        let received = rx.pop().unwrap();
738        assert_eq!(received.data[0], 42);
739        assert_eq!(received.data[255], 42);
740    }
741
742    #[test]
743    fn multiple_laps() {
744        let (tx, rx) = ring_buffer::<u64>(4);
745
746        // 10 full laps through 4-slot buffer
747        for i in 0..40 {
748            assert!(tx.push(i).is_ok());
749            assert_eq!(rx.pop(), Some(i));
750        }
751    }
752
753    #[test]
754    fn capacity_rounds_to_power_of_two() {
755        let (tx, _) = ring_buffer::<u64>(100);
756        assert_eq!(tx.capacity(), 128);
757
758        let (tx, _) = ring_buffer::<u64>(1000);
759        assert_eq!(tx.capacity(), 1024);
760    }
761
762    // ============================================================================
763    // Stress Tests
764    // ============================================================================
765
766    #[test]
767    fn stress_single_consumer() {
768        use std::thread;
769
770        const COUNT: u64 = 100_000;
771
772        let (tx, rx) = ring_buffer::<u64>(1024);
773
774        let producer = thread::spawn(move || {
775            for i in 0..COUNT {
776                while tx.push(i).is_err() {
777                    std::hint::spin_loop();
778                }
779            }
780        });
781
782        let consumer = thread::spawn(move || {
783            let mut sum = 0u64;
784            let mut received = 0u64;
785            while received < COUNT {
786                if let Some(val) = rx.pop() {
787                    sum = sum.wrapping_add(val);
788                    received += 1;
789                } else {
790                    std::hint::spin_loop();
791                }
792            }
793            sum
794        });
795
796        producer.join().unwrap();
797        let sum = consumer.join().unwrap();
798        assert_eq!(sum, COUNT * (COUNT - 1) / 2);
799    }
800
801    #[test]
802    fn stress_multiple_consumers() {
803        use std::thread;
804
805        const CONSUMERS: usize = 4;
806        const TOTAL: u64 = 100_000;
807
808        let (tx, rx) = ring_buffer::<u64>(1024);
809
810        let handles: Vec<_> = (0..CONSUMERS)
811            .map(|_| {
812                let rx = rx.clone();
813                thread::spawn(move || {
814                    let mut received = Vec::new();
815                    loop {
816                        if let Some(val) = rx.pop() {
817                            received.push(val);
818                        } else if rx.is_disconnected() {
819                            while let Some(val) = rx.pop() {
820                                received.push(val);
821                            }
822                            break;
823                        } else {
824                            std::hint::spin_loop();
825                        }
826                    }
827                    received
828                })
829            })
830            .collect();
831
832        drop(rx);
833
834        let producer = thread::spawn(move || {
835            for i in 0..TOTAL {
836                while tx.push(i).is_err() {
837                    std::hint::spin_loop();
838                }
839            }
840        });
841
842        producer.join().unwrap();
843
844        let mut all_received = Vec::new();
845        for h in handles {
846            all_received.extend(h.join().unwrap());
847        }
848
849        all_received.sort_unstable();
850        assert_eq!(all_received, (0..TOTAL).collect::<Vec<_>>());
851    }
852}