Skip to main content

nexus_queue/
mpsc.rs

1//! Multi-producer single-consumer bounded queue.
2//!
3//! A lock-free ring buffer optimized for multiple producer threads sending to
4//! one consumer thread. Uses CAS-based slot claiming with Vyukov-style turn
5//! counters for synchronization.
6//!
7//! # Design
8//!
9//! ```text
10//! ┌─────────────────────────────────────────────────────────────┐
11//! │ Shared (Arc):                                               │
12//! │   tail: CachePadded<AtomicUsize>   ← Producers CAS here     │
13//! │   head: CachePadded<AtomicUsize>   ← Consumer writes        │
14//! │   slots: *mut Slot<T>              ← Per-slot turn counters │
15//! └─────────────────────────────────────────────────────────────┘
16//!
17//! ┌─────────────────────┐     ┌─────────────────────┐
18//! │ Producer (Clone):   │     │ Consumer (!Clone):  │
19//! │   cached_head       │     │   local_head        │
20//! │   shared: Arc       │     │   shared: Arc       │
21//! └─────────────────────┘     └─────────────────────┘
22//! ```
23//!
24//! Producers compete via CAS on the tail index. After claiming a slot, the
25//! producer waits for the slot's turn counter to indicate it's writable, writes
26//! the data, then advances the turn to signal readiness.
27//!
28//! The consumer checks the turn counter to know when data is ready, reads it,
29//! then advances the turn for the next producer lap.
30//!
31//! # Turn Counter Protocol
32//!
33//! For slot at index `i` on lap `turn`:
34//! - `turn * 2`: Slot is ready for producer to write
35//! - `turn * 2 + 1`: Slot contains data, ready for consumer
36//!
37//! # Example
38//!
39//! ```
40//! use nexus_queue::mpsc;
41//! use std::thread;
42//!
43//! let (tx, rx) = mpsc::ring_buffer::<u64>(1024);
44//!
45//! let tx2 = tx.clone();
46//! let h1 = thread::spawn(move || {
47//!     for i in 0..100 {
48//!         while tx.push(i).is_err() { std::hint::spin_loop(); }
49//!     }
50//! });
51//! let h2 = thread::spawn(move || {
52//!     for i in 100..200 {
53//!         while tx2.push(i).is_err() { std::hint::spin_loop(); }
54//!     }
55//! });
56//!
57//! let mut received = 0;
58//! while received < 200 {
59//!     if rx.pop().is_some() { received += 1; }
60//! }
61//!
62//! h1.join().unwrap();
63//! h2.join().unwrap();
64//! ```
65
66use std::cell::{Cell, UnsafeCell};
67use std::fmt;
68use std::mem::MaybeUninit;
69use std::sync::Arc;
70use std::sync::atomic::{AtomicUsize, Ordering};
71
72use crossbeam_utils::CachePadded;
73
74use crate::Full;
75
76/// Creates a bounded MPSC ring buffer. Renamed to [`ring_buffer`].
77#[deprecated(since = "1.3.0", note = "renamed to ring_buffer()")]
78#[inline]
79pub fn bounded<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
80    ring_buffer(capacity)
81}
82
83/// Creates a bounded MPSC queue with the given capacity.
84///
85/// Capacity is rounded up to the next power of two.
86///
87/// # Panics
88///
89/// Panics if `capacity` is zero or too large to round to a power of two.
90pub fn ring_buffer<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
91    assert!(capacity > 0, "capacity must be non-zero");
92
93    let capacity = capacity
94        .checked_next_power_of_two()
95        .expect("capacity too large (must be <= usize::MAX / 2)");
96    let mask = capacity - 1;
97
98    // Allocate slots with turn counters initialized to 0 (ready for turn 0 producers)
99    let slots: Vec<Slot<T>> = (0..capacity)
100        .map(|_| Slot {
101            turn: AtomicUsize::new(0),
102            data: UnsafeCell::new(MaybeUninit::uninit()),
103        })
104        .collect();
105    let slots = Box::into_raw(slots.into_boxed_slice()) as *mut Slot<T>;
106
107    let shift = capacity.trailing_zeros();
108
109    let shared = Arc::new(Shared {
110        tail: CachePadded::new(AtomicUsize::new(0)),
111        head: CachePadded::new(AtomicUsize::new(0)),
112        slots,
113        capacity,
114        shift,
115        mask,
116    });
117
118    (
119        Producer {
120            cached_head: Cell::new(0),
121            slots,
122            mask,
123            capacity,
124            shift,
125            shared: Arc::clone(&shared),
126        },
127        Consumer {
128            local_head: Cell::new(0),
129            slots,
130            mask,
131            shift,
132            shared,
133        },
134    )
135}
136
137/// A slot in the ring buffer with turn-based synchronization.
138struct Slot<T> {
139    /// Turn counter for Vyukov-style synchronization.
140    /// - `turn * 2`: ready for producer
141    /// - `turn * 2 + 1`: ready for consumer
142    turn: AtomicUsize,
143    /// The data stored in this slot.
144    data: UnsafeCell<MaybeUninit<T>>,
145}
146
147/// Shared state between producers and the consumer.
148// repr(C): Guarantees field order for cache line layout.
149#[repr(C)]
150struct Shared<T> {
151    /// Tail index - producers CAS on this to claim slots.
152    tail: CachePadded<AtomicUsize>,
153    /// Head index - consumer publishes progress here.
154    head: CachePadded<AtomicUsize>,
155    /// Pointer to the slot array.
156    slots: *mut Slot<T>,
157    /// Actual capacity (power of two).
158    capacity: usize,
159    /// Shift for fast division by capacity (log2(capacity)).
160    shift: u32,
161    /// Mask for fast modulo (capacity - 1).
162    mask: usize,
163}
164
165// SAFETY: Shared contains atomics and raw pointers. Access is synchronized via
166// the turn counters. T: Send ensures data can move between threads.
167unsafe impl<T: Send> Send for Shared<T> {}
168unsafe impl<T: Send> Sync for Shared<T> {}
169
170impl<T> Drop for Shared<T> {
171    fn drop(&mut self) {
172        let head = self.head.load(Ordering::Relaxed);
173        let tail = self.tail.load(Ordering::Relaxed);
174
175        // Drop any remaining elements in the queue
176        let mut i = head;
177        while i != tail {
178            let slot = unsafe { &*self.slots.add(i & self.mask) };
179            let turn = i >> self.shift;
180
181            // Only drop if the slot was actually written (turn is odd = consumer-ready)
182            if slot.turn.load(Ordering::Relaxed) == turn * 2 + 1 {
183                // SAFETY: Slot contains initialized data at this turn.
184                unsafe { (*slot.data.get()).assume_init_drop() };
185            }
186            i = i.wrapping_add(1);
187        }
188
189        // SAFETY: slots was allocated via Box::into_raw from a Vec.
190        unsafe {
191            let _ = Box::from_raw(std::ptr::slice_from_raw_parts_mut(
192                self.slots,
193                self.capacity,
194            ));
195        }
196    }
197}
198
199/// The producer endpoint of an MPSC queue.
200///
201/// This endpoint can be cloned to create additional producers. Each clone
202/// maintains its own cached state for performance.
203// repr(C): Hot fields at struct base share cache line with struct pointer.
204#[repr(C)]
205pub struct Producer<T> {
206    /// Cached head for fast full-check. Only refreshed when cache indicates full.
207    cached_head: Cell<usize>,
208    /// Cached slots pointer (avoids Arc deref on hot path).
209    slots: *mut Slot<T>,
210    /// Cached mask (avoids Arc deref on hot path).
211    mask: usize,
212    /// Cached capacity (avoids Arc deref on hot path).
213    capacity: usize,
214    /// Cached shift for fast division (log2(capacity)).
215    shift: u32,
216    shared: Arc<Shared<T>>,
217}
218
219impl<T> Clone for Producer<T> {
220    fn clone(&self) -> Self {
221        Producer {
222            // Fresh cache - will be populated on first push
223            cached_head: Cell::new(self.shared.head.load(Ordering::Relaxed)),
224            slots: self.slots,
225            mask: self.mask,
226            capacity: self.capacity,
227            shift: self.shift,
228            shared: Arc::clone(&self.shared),
229        }
230    }
231}
232
233// SAFETY: Producer can be sent to another thread. Each Producer instance is
234// used by one thread (not Sync - use clone() for multiple threads).
235unsafe impl<T: Send> Send for Producer<T> {}
236
237impl<T> Producer<T> {
238    /// Pushes a value into the queue.
239    ///
240    /// Returns `Err(Full(value))` if the queue is full, returning ownership
241    /// of the value to the caller for backpressure handling.
242    ///
243    /// This method spins internally on CAS contention but returns immediately
244    /// when the queue is actually full.
245    #[inline]
246    #[must_use = "push returns Err if full, which should be handled"]
247    pub fn push(&self, value: T) -> Result<(), Full<T>> {
248        let mut spin_count = 0u32;
249
250        loop {
251            let tail = self.shared.tail.load(Ordering::Relaxed);
252
253            // Check against cached head (avoids atomic load most of the time)
254            if tail.wrapping_sub(self.cached_head.get()) >= self.capacity {
255                // Cache miss: refresh from shared head
256                self.cached_head
257                    .set(self.shared.head.load(Ordering::Acquire));
258
259                // Re-check with fresh head - if still full, return error
260                if tail.wrapping_sub(self.cached_head.get()) >= self.capacity {
261                    return Err(Full(value));
262                }
263            }
264
265            // SAFETY: slots pointer is valid for the lifetime of shared.
266            let slot = unsafe { &*self.slots.add(tail & self.mask) };
267            let turn = tail >> self.shift;
268            let expected_stamp = turn * 2;
269
270            // Check if slot is ready BEFORE attempting CAS (Vyukov optimization)
271            let stamp = slot.turn.load(Ordering::Acquire);
272
273            if stamp == expected_stamp {
274                // Slot is ready - try to claim it
275                if self
276                    .shared
277                    .tail
278                    .compare_exchange_weak(
279                        tail,
280                        tail.wrapping_add(1),
281                        Ordering::Relaxed,
282                        Ordering::Relaxed,
283                    )
284                    .is_ok()
285                {
286                    // SAFETY: We own this slot via successful CAS.
287                    unsafe { (*slot.data.get()).write(value) };
288
289                    // Signal ready for consumer: turn * 2 + 1
290                    slot.turn.store(turn * 2 + 1, Ordering::Release);
291
292                    return Ok(());
293                }
294            }
295
296            // CAS failed or slot not ready - exponential backoff
297            // Cap at 6 to avoid excessive spinning (1, 2, 4, 8, 16, 32, 64 iterations)
298            let spins = 1 << spin_count.min(6);
299            for _ in 0..spins {
300                std::hint::spin_loop();
301            }
302            spin_count += 1;
303
304            // Periodically check if the consumer disconnected during spin.
305            // Without this, a producer spins forever if the consumer drops
306            // while we're in the retry loop.
307            if spin_count >= 5 && self.is_disconnected() {
308                return Err(Full(value));
309            }
310        }
311    }
312
313    /// Returns the capacity of the queue.
314    #[inline]
315    pub fn capacity(&self) -> usize {
316        1 << self.shift
317    }
318
319    /// Returns `true` if the consumer has been dropped.
320    ///
321    /// With multiple producers, this returns `true` only when this is the
322    /// last handle (all other producers and the consumer are dropped).
323    #[inline]
324    pub fn is_disconnected(&self) -> bool {
325        Arc::strong_count(&self.shared) == 1
326    }
327}
328
329impl<T> fmt::Debug for Producer<T> {
330    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
331        f.debug_struct("Producer")
332            .field("capacity", &self.capacity())
333            .finish_non_exhaustive()
334    }
335}
336
337/// The consumer endpoint of an MPSC queue.
338///
339/// This endpoint cannot be cloned - only one consumer thread is allowed.
340// repr(C): Hot fields at struct base share cache line with struct pointer.
341#[repr(C)]
342pub struct Consumer<T> {
343    /// Local head index - only this thread reads/writes.
344    local_head: Cell<usize>,
345    /// Cached slots pointer (avoids Arc deref on hot path).
346    slots: *mut Slot<T>,
347    /// Cached mask (avoids Arc deref on hot path).
348    mask: usize,
349    /// Cached shift for fast division (log2(capacity)).
350    shift: u32,
351    shared: Arc<Shared<T>>,
352}
353
354// SAFETY: Consumer can be sent to another thread. It has exclusive read access
355// to slots (via turn protocol) and maintains the head index.
356unsafe impl<T: Send> Send for Consumer<T> {}
357
358impl<T> Consumer<T> {
359    /// Pops a value from the queue.
360    ///
361    /// Returns `None` if the queue is empty.
362    #[inline]
363    pub fn pop(&self) -> Option<T> {
364        let head = self.local_head.get();
365        // SAFETY: slots pointer is valid for the lifetime of shared.
366        let slot = unsafe { &*self.slots.add(head & self.mask) };
367        let turn = head >> self.shift;
368
369        // Check if slot is ready (turn * 2 + 1 means producer has written)
370        if slot.turn.load(Ordering::Acquire) != turn * 2 + 1 {
371            return None;
372        }
373
374        // SAFETY: Turn counter confirms producer has written to this slot.
375        let value = unsafe { (*slot.data.get()).assume_init_read() };
376
377        // Signal slot is free for next lap: (turn + 1) * 2
378        slot.turn.store((turn + 1) * 2, Ordering::Release);
379
380        // Advance head and publish for producers' capacity check
381        let new_head = head.wrapping_add(1);
382        self.local_head.set(new_head);
383        self.shared.head.store(new_head, Ordering::Release);
384
385        Some(value)
386    }
387
388    /// Returns the capacity of the queue.
389    #[inline]
390    pub fn capacity(&self) -> usize {
391        1 << self.shift
392    }
393
394    /// Returns `true` if all producers have been dropped.
395    #[inline]
396    pub fn is_disconnected(&self) -> bool {
397        Arc::strong_count(&self.shared) == 1
398    }
399}
400
401impl<T> fmt::Debug for Consumer<T> {
402    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403        f.debug_struct("Consumer")
404            .field("capacity", &self.capacity())
405            .finish_non_exhaustive()
406    }
407}
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412
413    // ============================================================================
414    // Basic Operations
415    // ============================================================================
416
417    #[test]
418    fn basic_push_pop() {
419        let (tx, rx) = ring_buffer::<u64>(4);
420
421        assert!(tx.push(1).is_ok());
422        assert!(tx.push(2).is_ok());
423        assert!(tx.push(3).is_ok());
424
425        assert_eq!(rx.pop(), Some(1));
426        assert_eq!(rx.pop(), Some(2));
427        assert_eq!(rx.pop(), Some(3));
428        assert_eq!(rx.pop(), None);
429    }
430
431    #[test]
432    fn empty_pop_returns_none() {
433        let (_, rx) = ring_buffer::<u64>(4);
434        assert_eq!(rx.pop(), None);
435        assert_eq!(rx.pop(), None);
436    }
437
438    #[test]
439    fn fill_then_drain() {
440        let (tx, rx) = ring_buffer::<u64>(4);
441
442        for i in 0..4 {
443            assert!(tx.push(i).is_ok());
444        }
445
446        for i in 0..4 {
447            assert_eq!(rx.pop(), Some(i));
448        }
449
450        assert_eq!(rx.pop(), None);
451    }
452
453    #[test]
454    fn push_returns_error_when_full() {
455        let (tx, _rx) = ring_buffer::<u64>(4);
456
457        assert!(tx.push(1).is_ok());
458        assert!(tx.push(2).is_ok());
459        assert!(tx.push(3).is_ok());
460        assert!(tx.push(4).is_ok());
461
462        let err = tx.push(5).unwrap_err();
463        assert_eq!(err.into_inner(), 5);
464    }
465
466    // ============================================================================
467    // Interleaved Operations
468    // ============================================================================
469
470    #[test]
471    fn interleaved_single_producer() {
472        let (tx, rx) = ring_buffer::<u64>(8);
473
474        for i in 0..1000 {
475            assert!(tx.push(i).is_ok());
476            assert_eq!(rx.pop(), Some(i));
477        }
478    }
479
480    #[test]
481    fn partial_fill_drain_cycles() {
482        let (tx, rx) = ring_buffer::<u64>(8);
483
484        for round in 0..100 {
485            for i in 0..4 {
486                assert!(tx.push(round * 4 + i).is_ok());
487            }
488
489            for i in 0..4 {
490                assert_eq!(rx.pop(), Some(round * 4 + i));
491            }
492        }
493    }
494
495    // ============================================================================
496    // Multiple Producers
497    // ============================================================================
498
499    #[test]
500    fn two_producers_single_consumer() {
501        use std::thread;
502
503        let (tx, rx) = ring_buffer::<u64>(64);
504        let tx2 = tx.clone();
505
506        let h1 = thread::spawn(move || {
507            for i in 0..1000 {
508                while tx.push(i).is_err() {
509                    std::hint::spin_loop();
510                }
511            }
512        });
513
514        let h2 = thread::spawn(move || {
515            for i in 1000..2000 {
516                while tx2.push(i).is_err() {
517                    std::hint::spin_loop();
518                }
519            }
520        });
521
522        let mut received = Vec::new();
523        while received.len() < 2000 {
524            if let Some(val) = rx.pop() {
525                received.push(val);
526            } else {
527                std::hint::spin_loop();
528            }
529        }
530
531        h1.join().unwrap();
532        h2.join().unwrap();
533
534        // All values received (order not guaranteed across producers)
535        received.sort_unstable();
536        assert_eq!(received, (0..2000).collect::<Vec<_>>());
537    }
538
539    #[test]
540    fn four_producers_single_consumer() {
541        use std::thread;
542
543        let (tx, rx) = ring_buffer::<u64>(256);
544
545        let handles: Vec<_> = (0..4)
546            .map(|p| {
547                let tx = tx.clone();
548                thread::spawn(move || {
549                    for i in 0..1000 {
550                        let val = p * 1000 + i;
551                        while tx.push(val).is_err() {
552                            std::hint::spin_loop();
553                        }
554                    }
555                })
556            })
557            .collect();
558
559        drop(tx); // Drop original producer
560
561        let mut received = Vec::new();
562        while received.len() < 4000 {
563            if let Some(val) = rx.pop() {
564                received.push(val);
565            } else if rx.is_disconnected() && received.len() < 4000 {
566                // Keep trying if not all received
567                std::hint::spin_loop();
568            } else {
569                std::hint::spin_loop();
570            }
571        }
572
573        for h in handles {
574            h.join().unwrap();
575        }
576
577        received.sort_unstable();
578        let expected: Vec<u64> = (0..4)
579            .flat_map(|p| (0..1000).map(move |i| p * 1000 + i))
580            .collect();
581        let mut expected_sorted = expected;
582        expected_sorted.sort_unstable();
583        assert_eq!(received, expected_sorted);
584    }
585
586    // ============================================================================
587    // Single Slot
588    // ============================================================================
589
590    #[test]
591    fn single_slot_bounded() {
592        let (tx, rx) = ring_buffer::<u64>(1);
593
594        assert!(tx.push(1).is_ok());
595        assert!(tx.push(2).is_err());
596
597        assert_eq!(rx.pop(), Some(1));
598        assert!(tx.push(2).is_ok());
599    }
600
601    // ============================================================================
602    // Disconnection
603    // ============================================================================
604
605    #[test]
606    fn producer_disconnected() {
607        let (tx, rx) = ring_buffer::<u64>(4);
608
609        assert!(!rx.is_disconnected());
610        drop(tx);
611        assert!(rx.is_disconnected());
612    }
613
614    #[test]
615    fn consumer_disconnected() {
616        let (tx, rx) = ring_buffer::<u64>(4);
617
618        assert!(!tx.is_disconnected());
619        drop(rx);
620        assert!(tx.is_disconnected());
621    }
622
623    #[test]
624    fn multiple_producers_one_disconnects() {
625        let (tx1, rx) = ring_buffer::<u64>(4);
626        let tx2 = tx1.clone();
627
628        assert!(!rx.is_disconnected());
629        drop(tx1);
630        assert!(!rx.is_disconnected()); // tx2 still alive
631        drop(tx2);
632        assert!(rx.is_disconnected());
633    }
634
635    // ============================================================================
636    // Drop Behavior
637    // ============================================================================
638
639    #[test]
640    fn drop_cleans_up_remaining() {
641        use std::sync::atomic::AtomicUsize;
642
643        static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
644
645        struct DropCounter;
646        impl Drop for DropCounter {
647            fn drop(&mut self) {
648                DROP_COUNT.fetch_add(1, Ordering::SeqCst);
649            }
650        }
651
652        DROP_COUNT.store(0, Ordering::SeqCst);
653
654        let (tx, rx) = ring_buffer::<DropCounter>(4);
655
656        let _ = tx.push(DropCounter);
657        let _ = tx.push(DropCounter);
658        let _ = tx.push(DropCounter);
659
660        assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 0);
661
662        drop(tx);
663        drop(rx);
664
665        assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 3);
666    }
667
668    // ============================================================================
669    // Special Types
670    // ============================================================================
671
672    #[test]
673    fn zero_sized_type() {
674        let (tx, rx) = ring_buffer::<()>(8);
675
676        let _ = tx.push(());
677        let _ = tx.push(());
678
679        assert_eq!(rx.pop(), Some(()));
680        assert_eq!(rx.pop(), Some(()));
681        assert_eq!(rx.pop(), None);
682    }
683
684    #[test]
685    fn string_type() {
686        let (tx, rx) = ring_buffer::<String>(4);
687
688        let _ = tx.push("hello".to_string());
689        let _ = tx.push("world".to_string());
690
691        assert_eq!(rx.pop(), Some("hello".to_string()));
692        assert_eq!(rx.pop(), Some("world".to_string()));
693    }
694
695    #[test]
696    #[should_panic(expected = "capacity must be non-zero")]
697    fn zero_capacity_panics() {
698        let _ = ring_buffer::<u64>(0);
699    }
700
701    #[test]
702    fn large_message_type() {
703        #[repr(C, align(64))]
704        struct LargeMessage {
705            data: [u8; 256],
706        }
707
708        let (tx, rx) = ring_buffer::<LargeMessage>(8);
709
710        let msg = LargeMessage { data: [42u8; 256] };
711        assert!(tx.push(msg).is_ok());
712
713        let received = rx.pop().unwrap();
714        assert_eq!(received.data[0], 42);
715        assert_eq!(received.data[255], 42);
716    }
717
718    #[test]
719    fn multiple_laps() {
720        let (tx, rx) = ring_buffer::<u64>(4);
721
722        // 10 full laps through 4-slot buffer
723        for i in 0..40 {
724            assert!(tx.push(i).is_ok());
725            assert_eq!(rx.pop(), Some(i));
726        }
727    }
728
729    #[test]
730    fn capacity_rounds_to_power_of_two() {
731        let (tx, _) = ring_buffer::<u64>(100);
732        assert_eq!(tx.capacity(), 128);
733
734        let (tx, _) = ring_buffer::<u64>(1000);
735        assert_eq!(tx.capacity(), 1024);
736    }
737
738    // ============================================================================
739    // Stress Tests
740    // ============================================================================
741
742    #[test]
743    fn stress_single_producer() {
744        use std::thread;
745
746        const COUNT: u64 = 100_000;
747
748        let (tx, rx) = ring_buffer::<u64>(1024);
749
750        let producer = thread::spawn(move || {
751            for i in 0..COUNT {
752                while tx.push(i).is_err() {
753                    std::hint::spin_loop();
754                }
755            }
756        });
757
758        let consumer = thread::spawn(move || {
759            let mut sum = 0u64;
760            let mut received = 0u64;
761            while received < COUNT {
762                if let Some(val) = rx.pop() {
763                    sum = sum.wrapping_add(val);
764                    received += 1;
765                } else {
766                    std::hint::spin_loop();
767                }
768            }
769            sum
770        });
771
772        producer.join().unwrap();
773        let sum = consumer.join().unwrap();
774        assert_eq!(sum, COUNT * (COUNT - 1) / 2);
775    }
776
777    #[test]
778    fn stress_multiple_producers() {
779        use std::thread;
780
781        const PRODUCERS: u64 = 4;
782        const PER_PRODUCER: u64 = 25_000;
783        const TOTAL: u64 = PRODUCERS * PER_PRODUCER;
784
785        let (tx, rx) = ring_buffer::<u64>(1024);
786
787        let handles: Vec<_> = (0..PRODUCERS)
788            .map(|_| {
789                let tx = tx.clone();
790                thread::spawn(move || {
791                    for i in 0..PER_PRODUCER {
792                        while tx.push(i).is_err() {
793                            std::hint::spin_loop();
794                        }
795                    }
796                })
797            })
798            .collect();
799
800        drop(tx);
801
802        let mut received = 0u64;
803        while received < TOTAL {
804            if rx.pop().is_some() {
805                received += 1;
806            } else {
807                std::hint::spin_loop();
808            }
809        }
810
811        for h in handles {
812            h.join().unwrap();
813        }
814
815        assert_eq!(received, TOTAL);
816    }
817}