Skip to main content

nexus_queue/
spmc.rs

1//! Single-producer multi-consumer bounded queue.
2//!
3//! A lock-free ring buffer optimized for one producer thread fanning out to
4//! multiple consumer threads. Uses Vyukov-style turn counters with CAS-based
5//! head claiming for consumers.
6//!
7//! # Design
8//!
9//! ```text
10//! ┌─────────────────────────────────────────────────────────────────┐
11//! │ Shared (Arc):                                                   │
12//! │   head: CachePadded<AtomicUsize>   ← Consumers CAS here         │
13//! │   tail: CachePadded<AtomicUsize>   ← Producer publishes here    │
14//! │   producer_alive: AtomicBool       ← Disconnection detection    │
15//! │   slots: *mut Slot<T>              ← Per-slot turn counters     │
16//! └─────────────────────────────────────────────────────────────────┘
17//!
18//! ┌─────────────────────┐     ┌─────────────────────┐
19//! │ Producer (!Clone):  │     │ Consumer (Clone):    │
20//! │   local_tail        │     │   shared: Arc        │
21//! │   shared: Arc       │     └─────────────────────┘
22//! └─────────────────────┘
23//! ```
24//!
25//! The producer writes directly (no CAS) since it's the sole writer. Consumers
26//! compete via CAS on the head index to claim slots. After claiming, the consumer
27//! reads the data and advances the turn counter for the next producer lap.
28//!
29//! # Turn Counter Protocol
30//!
31//! For slot at index `i` on lap `turn`:
32//! - `turn * 2`: Slot is ready for producer to write
33//! - `turn * 2 + 1`: Slot contains data, ready for consumer
34//!
35//! # Disconnection
36//!
37//! Unlike MPSC where `Arc::strong_count == 1` detects disconnection on both
38//! sides, SPMC consumers hold Arc refs to each other. An `AtomicBool` tracks
39//! whether the producer is alive so consumers can detect disconnection.
40//!
41//! # Example
42//!
43//! ```
44//! use nexus_queue::spmc;
45//! use std::thread;
46//!
47//! let (tx, rx) = spmc::bounded::<u64>(1024);
48//!
49//! let rx2 = rx.clone();
50//! let rx1 = rx;
51//! let h1 = thread::spawn(move || {
52//!     let mut received = Vec::new();
53//!     loop {
54//!         if let Some(v) = rx1.pop() {
55//!             received.push(v);
56//!         } else if rx1.is_disconnected() {
57//!             while let Some(v) = rx1.pop() { received.push(v); }
58//!             break;
59//!         } else {
60//!             std::hint::spin_loop();
61//!         }
62//!     }
63//!     received
64//! });
65//! let h2 = thread::spawn(move || {
66//!     let mut received = Vec::new();
67//!     loop {
68//!         if let Some(v) = rx2.pop() {
69//!             received.push(v);
70//!         } else if rx2.is_disconnected() {
71//!             while let Some(v) = rx2.pop() { received.push(v); }
72//!             break;
73//!         } else {
74//!             std::hint::spin_loop();
75//!         }
76//!     }
77//!     received
78//! });
79//!
80//! for i in 0..200 {
81//!     while tx.push(i).is_err() { std::hint::spin_loop(); }
82//! }
83//! drop(tx);
84//!
85//! let mut all: Vec<_> = h1.join().unwrap();
86//! all.extend(h2.join().unwrap());
87//! all.sort();
88//! assert_eq!(all, (0..200).collect::<Vec<_>>());
89//! ```
90
91use std::cell::{Cell, UnsafeCell};
92use std::fmt;
93use std::mem::MaybeUninit;
94use std::sync::Arc;
95use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
96
97use crossbeam_utils::CachePadded;
98
99use crate::Full;
100
101/// Creates a bounded SPMC queue with the given capacity.
102///
103/// Capacity is rounded up to the next power of two.
104///
105/// # Panics
106///
107/// Panics if `capacity` is zero.
108pub fn bounded<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
109    assert!(capacity > 0, "capacity must be non-zero");
110
111    let capacity = capacity.next_power_of_two();
112    let mask = capacity - 1;
113
114    // Allocate slots with turn counters initialized to 0 (ready for turn 0 producer)
115    let slots: Vec<Slot<T>> = (0..capacity)
116        .map(|_| Slot {
117            turn: AtomicUsize::new(0),
118            data: UnsafeCell::new(MaybeUninit::uninit()),
119        })
120        .collect();
121    let slots = Box::into_raw(slots.into_boxed_slice()) as *mut Slot<T>;
122
123    let shift = capacity.trailing_zeros();
124
125    let shared = Arc::new(Shared {
126        head: CachePadded::new(AtomicUsize::new(0)),
127        tail: CachePadded::new(AtomicUsize::new(0)),
128        producer_alive: AtomicBool::new(true),
129        slots,
130        capacity,
131        shift,
132        mask,
133    });
134
135    (
136        Producer {
137            local_tail: Cell::new(0),
138            slots,
139            mask,
140            shift,
141            shared: Arc::clone(&shared),
142        },
143        Consumer {
144            slots,
145            mask,
146            shift,
147            shared,
148        },
149    )
150}
151
152/// A slot in the ring buffer with turn-based synchronization.
153struct Slot<T> {
154    /// Turn counter for Vyukov-style synchronization.
155    /// - `turn * 2`: ready for producer
156    /// - `turn * 2 + 1`: ready for consumer
157    turn: AtomicUsize,
158    /// The data stored in this slot.
159    data: UnsafeCell<MaybeUninit<T>>,
160}
161
162/// Shared state between the producer and consumers.
163// repr(C): Guarantees field order for cache line layout.
164#[repr(C)]
165struct Shared<T> {
166    /// Head index - consumers CAS on this to claim slots.
167    head: CachePadded<AtomicUsize>,
168    /// Tail index - written by producer on drop for Shared::drop cleanup.
169    tail: CachePadded<AtomicUsize>,
170    /// Whether the producer is still alive (for consumer disconnection detection).
171    producer_alive: AtomicBool,
172    /// Pointer to the slot array.
173    slots: *mut Slot<T>,
174    /// Actual capacity (power of two).
175    capacity: usize,
176    /// Shift for fast division by capacity (log2(capacity)).
177    shift: u32,
178    /// Mask for fast modulo (capacity - 1).
179    mask: usize,
180}
181
182// SAFETY: Shared contains atomics and raw pointers. Access is synchronized via
183// the turn counters. T: Send ensures data can move between threads.
184unsafe impl<T: Send> Send for Shared<T> {}
185unsafe impl<T: Send> Sync for Shared<T> {}
186
187impl<T> Drop for Shared<T> {
188    fn drop(&mut self) {
189        let head = self.head.load(Ordering::Relaxed);
190        let tail = self.tail.load(Ordering::Relaxed);
191
192        // Drop any remaining elements in the queue
193        let mut i = head;
194        while i != tail {
195            let slot = unsafe { &*self.slots.add(i & self.mask) };
196            let turn = i >> self.shift;
197
198            // Only drop if the slot was actually written (turn is odd = consumer-ready)
199            if slot.turn.load(Ordering::Relaxed) == turn * 2 + 1 {
200                // SAFETY: Slot contains initialized data at this turn.
201                unsafe { (*slot.data.get()).assume_init_drop() };
202            }
203            i = i.wrapping_add(1);
204        }
205
206        // SAFETY: slots was allocated via Box::into_raw from a Vec.
207        unsafe {
208            let _ = Box::from_raw(std::ptr::slice_from_raw_parts_mut(
209                self.slots,
210                self.capacity,
211            ));
212        }
213    }
214}
215
216/// The producer endpoint of an SPMC queue.
217///
218/// This endpoint cannot be cloned - only one producer thread is allowed.
219/// The single-writer design eliminates CAS contention on the tail index.
220// repr(C): Hot fields at struct base share cache line with struct pointer.
221#[repr(C)]
222pub struct Producer<T> {
223    /// Local tail index - only this thread reads/writes.
224    local_tail: Cell<usize>,
225    /// Cached slots pointer (avoids Arc deref on hot path).
226    slots: *mut Slot<T>,
227    /// Cached mask (avoids Arc deref on hot path).
228    mask: usize,
229    /// Cached shift for fast division (log2(capacity)).
230    shift: u32,
231    shared: Arc<Shared<T>>,
232}
233
234// SAFETY: Producer can be sent to another thread. It has exclusive write access
235// to slots (via turn protocol) and maintains the tail index.
236unsafe impl<T: Send> Send for Producer<T> {}
237
238impl<T> Producer<T> {
239    /// Pushes a value into the queue.
240    ///
241    /// Returns `Err(Full(value))` if the queue is full, returning ownership
242    /// of the value to the caller for backpressure handling.
243    ///
244    /// No CAS required - single writer principle.
245    #[inline]
246    #[must_use = "push returns Err if full, which should be handled"]
247    pub fn push(&self, value: T) -> Result<(), Full<T>> {
248        let tail = self.local_tail.get();
249        // SAFETY: slots pointer is valid for the lifetime of shared.
250        let slot = unsafe { &*self.slots.add(tail & self.mask) };
251        let turn = tail >> self.shift;
252
253        // Check if slot is ready (consumer has freed it).
254        if slot.turn.load(Ordering::Acquire) != turn * 2 {
255            return Err(Full(value));
256        }
257
258        // SAFETY: Turn counter confirms slot is free for this lap.
259        unsafe { (*slot.data.get()).write(value) };
260
261        // Signal ready for consumer: turn * 2 + 1
262        slot.turn.store(turn * 2 + 1, Ordering::Release);
263
264        self.local_tail.set(tail.wrapping_add(1));
265
266        Ok(())
267    }
268
269    /// Returns the capacity of the queue.
270    #[inline]
271    pub fn capacity(&self) -> usize {
272        1 << self.shift
273    }
274
275    /// Returns `true` if all consumers have been dropped.
276    #[inline]
277    pub fn is_disconnected(&self) -> bool {
278        Arc::strong_count(&self.shared) == 1
279    }
280}
281
282impl<T> Drop for Producer<T> {
283    fn drop(&mut self) {
284        // Publish final tail for Shared::drop cleanup
285        self.shared.tail.store(self.local_tail.get(), Ordering::Relaxed);
286        self.shared.producer_alive.store(false, Ordering::Release);
287    }
288}
289
290impl<T> fmt::Debug for Producer<T> {
291    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
292        f.debug_struct("Producer")
293            .field("capacity", &self.capacity())
294            .finish_non_exhaustive()
295    }
296}
297
298/// The consumer endpoint of an SPMC queue.
299///
300/// This endpoint can be cloned to create additional consumers. Each clone
301/// competes via CAS on the shared head index.
302// repr(C): Hot fields at struct base share cache line with struct pointer.
303#[repr(C)]
304pub struct Consumer<T> {
305    /// Cached slots pointer (avoids Arc deref on hot path).
306    slots: *mut Slot<T>,
307    /// Cached mask (avoids Arc deref on hot path).
308    mask: usize,
309    /// Cached shift for fast division (log2(capacity)).
310    shift: u32,
311    shared: Arc<Shared<T>>,
312}
313
314impl<T> Clone for Consumer<T> {
315    fn clone(&self) -> Self {
316        Consumer {
317            slots: self.slots,
318            mask: self.mask,
319            shift: self.shift,
320            shared: Arc::clone(&self.shared),
321        }
322    }
323}
324
325// SAFETY: Consumer can be sent to another thread. Each Consumer instance is
326// used by one thread (not Sync - use clone() for multiple threads).
327unsafe impl<T: Send> Send for Consumer<T> {}
328
329impl<T> Consumer<T> {
330    /// Pops a value from the queue.
331    ///
332    /// Returns `None` if the queue is empty.
333    ///
334    /// This method spins internally on CAS contention but returns immediately
335    /// when the queue is actually empty.
336    #[inline]
337    pub fn pop(&self) -> Option<T> {
338        let mut spin_count = 0u32;
339
340        loop {
341            let head = self.shared.head.load(Ordering::Relaxed);
342
343            // SAFETY: slots pointer is valid for the lifetime of shared.
344            let slot = unsafe { &*self.slots.add(head & self.mask) };
345            let turn = head >> self.shift;
346
347            let stamp = slot.turn.load(Ordering::Acquire);
348
349            if stamp == turn * 2 + 1 {
350                // Slot has data - try to claim it
351                if self
352                    .shared
353                    .head
354                    .compare_exchange_weak(
355                        head,
356                        head.wrapping_add(1),
357                        Ordering::Relaxed,
358                        Ordering::Relaxed,
359                    )
360                    .is_ok()
361                {
362                    // SAFETY: We own this slot via successful CAS.
363                    let value = unsafe { (*slot.data.get()).assume_init_read() };
364
365                    // Signal slot is free for next lap: (turn + 1) * 2
366                    slot.turn.store((turn + 1) * 2, Ordering::Release);
367
368                    return Some(value);
369                }
370
371                // CAS failed - another consumer claimed it, retry with backoff
372                let spins = 1 << spin_count.min(6);
373                for _ in 0..spins {
374                    std::hint::spin_loop();
375                }
376                spin_count += 1;
377            } else {
378                // Slot not ready - queue is empty
379                return None;
380            }
381        }
382    }
383
384    /// Returns the capacity of the queue.
385    #[inline]
386    pub fn capacity(&self) -> usize {
387        1 << self.shift
388    }
389
390    /// Returns `true` if the producer has been dropped.
391    #[inline]
392    pub fn is_disconnected(&self) -> bool {
393        !self.shared.producer_alive.load(Ordering::Acquire)
394    }
395}
396
397impl<T> fmt::Debug for Consumer<T> {
398    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
399        f.debug_struct("Consumer")
400            .field("capacity", &self.capacity())
401            .finish_non_exhaustive()
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408
409    // ============================================================================
410    // Basic Operations
411    // ============================================================================
412
413    #[test]
414    fn basic_push_pop() {
415        let (tx, rx) = bounded::<u64>(4);
416
417        assert!(tx.push(1).is_ok());
418        assert!(tx.push(2).is_ok());
419        assert!(tx.push(3).is_ok());
420
421        assert_eq!(rx.pop(), Some(1));
422        assert_eq!(rx.pop(), Some(2));
423        assert_eq!(rx.pop(), Some(3));
424        assert_eq!(rx.pop(), None);
425    }
426
427    #[test]
428    fn empty_pop_returns_none() {
429        let (_, rx) = bounded::<u64>(4);
430        assert_eq!(rx.pop(), None);
431        assert_eq!(rx.pop(), None);
432    }
433
434    #[test]
435    fn fill_then_drain() {
436        let (tx, rx) = bounded::<u64>(4);
437
438        for i in 0..4 {
439            assert!(tx.push(i).is_ok());
440        }
441
442        for i in 0..4 {
443            assert_eq!(rx.pop(), Some(i));
444        }
445
446        assert_eq!(rx.pop(), None);
447    }
448
449    #[test]
450    fn push_returns_error_when_full() {
451        let (tx, _rx) = bounded::<u64>(4);
452
453        assert!(tx.push(1).is_ok());
454        assert!(tx.push(2).is_ok());
455        assert!(tx.push(3).is_ok());
456        assert!(tx.push(4).is_ok());
457
458        let err = tx.push(5).unwrap_err();
459        assert_eq!(err.into_inner(), 5);
460    }
461
462    // ============================================================================
463    // Interleaved Operations
464    // ============================================================================
465
466    #[test]
467    fn interleaved_single_consumer() {
468        let (tx, rx) = bounded::<u64>(8);
469
470        for i in 0..1000 {
471            assert!(tx.push(i).is_ok());
472            assert_eq!(rx.pop(), Some(i));
473        }
474    }
475
476    #[test]
477    fn partial_fill_drain_cycles() {
478        let (tx, rx) = bounded::<u64>(8);
479
480        for round in 0..100 {
481            for i in 0..4 {
482                assert!(tx.push(round * 4 + i).is_ok());
483            }
484
485            for i in 0..4 {
486                assert_eq!(rx.pop(), Some(round * 4 + i));
487            }
488        }
489    }
490
491    // ============================================================================
492    // Multiple Consumers
493    // ============================================================================
494
495    #[test]
496    fn two_consumers_single_producer() {
497        use std::thread;
498
499        let (tx, rx) = bounded::<u64>(64);
500        let rx2 = rx.clone();
501
502        let rx1 = rx;
503        let h1 = thread::spawn(move || {
504            let mut received = Vec::new();
505            loop {
506                if let Some(val) = rx1.pop() {
507                    received.push(val);
508                } else if rx1.is_disconnected() {
509                    while let Some(val) = rx1.pop() {
510                        received.push(val);
511                    }
512                    break;
513                } else {
514                    std::hint::spin_loop();
515                }
516            }
517            received
518        });
519
520        let h2 = thread::spawn(move || {
521            let mut received = Vec::new();
522            loop {
523                if let Some(val) = rx2.pop() {
524                    received.push(val);
525                } else if rx2.is_disconnected() {
526                    while let Some(val) = rx2.pop() {
527                        received.push(val);
528                    }
529                    break;
530                } else {
531                    std::hint::spin_loop();
532                }
533            }
534            received
535        });
536
537        for i in 0..2000 {
538            while tx.push(i).is_err() {
539                std::hint::spin_loop();
540            }
541        }
542        drop(tx);
543
544        let mut received = h1.join().unwrap();
545        received.extend(h2.join().unwrap());
546
547        // All values received (order not guaranteed across consumers)
548        received.sort_unstable();
549        assert_eq!(received, (0..2000).collect::<Vec<_>>());
550    }
551
552    #[test]
553    fn four_consumers_single_producer() {
554        use std::thread;
555
556        let (tx, rx) = bounded::<u64>(256);
557
558        let handles: Vec<_> = (0..4)
559            .map(|_| {
560                let rx = rx.clone();
561                thread::spawn(move || {
562                    let mut received = Vec::new();
563                    loop {
564                        if let Some(val) = rx.pop() {
565                            received.push(val);
566                        } else if rx.is_disconnected() {
567                            while let Some(val) = rx.pop() {
568                                received.push(val);
569                            }
570                            break;
571                        } else {
572                            std::hint::spin_loop();
573                        }
574                    }
575                    received
576                })
577            })
578            .collect();
579
580        drop(rx); // Drop original consumer
581
582        for i in 0..4000u64 {
583            while tx.push(i).is_err() {
584                std::hint::spin_loop();
585            }
586        }
587        drop(tx);
588
589        let mut received = Vec::new();
590        for h in handles {
591            received.extend(h.join().unwrap());
592        }
593
594        received.sort_unstable();
595        assert_eq!(received, (0..4000).collect::<Vec<_>>());
596    }
597
598    // ============================================================================
599    // Single Slot
600    // ============================================================================
601
602    #[test]
603    fn single_slot_bounded() {
604        let (tx, rx) = bounded::<u64>(1);
605
606        assert!(tx.push(1).is_ok());
607        assert!(tx.push(2).is_err());
608
609        assert_eq!(rx.pop(), Some(1));
610        assert!(tx.push(2).is_ok());
611    }
612
613    // ============================================================================
614    // Disconnection
615    // ============================================================================
616
617    #[test]
618    fn consumer_detects_producer_drop() {
619        let (tx, rx) = bounded::<u64>(4);
620
621        assert!(!rx.is_disconnected());
622        drop(tx);
623        assert!(rx.is_disconnected());
624    }
625
626    #[test]
627    fn producer_detects_all_consumers_drop() {
628        let (tx, rx) = bounded::<u64>(4);
629
630        assert!(!tx.is_disconnected());
631        drop(rx);
632        assert!(tx.is_disconnected());
633    }
634
635    #[test]
636    fn one_consumer_drops_others_alive() {
637        let (tx, rx) = bounded::<u64>(4);
638        let rx2 = rx.clone();
639
640        assert!(!tx.is_disconnected());
641        drop(rx);
642        assert!(!tx.is_disconnected()); // rx2 still alive
643        assert!(!rx2.is_disconnected()); // producer still alive
644        drop(rx2);
645        assert!(tx.is_disconnected());
646    }
647
648    // ============================================================================
649    // Drop Behavior
650    // ============================================================================
651
652    #[test]
653    fn drop_cleans_up_remaining() {
654        use std::sync::atomic::AtomicUsize;
655
656        static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
657
658        struct DropCounter;
659        impl Drop for DropCounter {
660            fn drop(&mut self) {
661                DROP_COUNT.fetch_add(1, Ordering::SeqCst);
662            }
663        }
664
665        DROP_COUNT.store(0, Ordering::SeqCst);
666
667        let (tx, rx) = bounded::<DropCounter>(4);
668
669        let _ = tx.push(DropCounter);
670        let _ = tx.push(DropCounter);
671        let _ = tx.push(DropCounter);
672
673        assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 0);
674
675        drop(tx);
676        drop(rx);
677
678        assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 3);
679    }
680
681    // ============================================================================
682    // Special Types
683    // ============================================================================
684
685    #[test]
686    fn zero_sized_type() {
687        let (tx, rx) = bounded::<()>(8);
688
689        let _ = tx.push(());
690        let _ = tx.push(());
691
692        assert_eq!(rx.pop(), Some(()));
693        assert_eq!(rx.pop(), Some(()));
694        assert_eq!(rx.pop(), None);
695    }
696
697    #[test]
698    fn string_type() {
699        let (tx, rx) = bounded::<String>(4);
700
701        let _ = tx.push("hello".to_string());
702        let _ = tx.push("world".to_string());
703
704        assert_eq!(rx.pop(), Some("hello".to_string()));
705        assert_eq!(rx.pop(), Some("world".to_string()));
706    }
707
708    #[test]
709    #[should_panic(expected = "capacity must be non-zero")]
710    fn zero_capacity_panics() {
711        let _ = bounded::<u64>(0);
712    }
713
714    #[test]
715    fn large_message_type() {
716        #[repr(C, align(64))]
717        struct LargeMessage {
718            data: [u8; 256],
719        }
720
721        let (tx, rx) = bounded::<LargeMessage>(8);
722
723        let msg = LargeMessage { data: [42u8; 256] };
724        assert!(tx.push(msg).is_ok());
725
726        let received = rx.pop().unwrap();
727        assert_eq!(received.data[0], 42);
728        assert_eq!(received.data[255], 42);
729    }
730
731    #[test]
732    fn multiple_laps() {
733        let (tx, rx) = bounded::<u64>(4);
734
735        // 10 full laps through 4-slot buffer
736        for i in 0..40 {
737            assert!(tx.push(i).is_ok());
738            assert_eq!(rx.pop(), Some(i));
739        }
740    }
741
742    #[test]
743    fn capacity_rounds_to_power_of_two() {
744        let (tx, _) = bounded::<u64>(100);
745        assert_eq!(tx.capacity(), 128);
746
747        let (tx, _) = bounded::<u64>(1000);
748        assert_eq!(tx.capacity(), 1024);
749    }
750
751    // ============================================================================
752    // Stress Tests
753    // ============================================================================
754
755    #[test]
756    fn stress_single_consumer() {
757        use std::thread;
758
759        const COUNT: u64 = 100_000;
760
761        let (tx, rx) = bounded::<u64>(1024);
762
763        let producer = thread::spawn(move || {
764            for i in 0..COUNT {
765                while tx.push(i).is_err() {
766                    std::hint::spin_loop();
767                }
768            }
769        });
770
771        let consumer = thread::spawn(move || {
772            let mut sum = 0u64;
773            let mut received = 0u64;
774            while received < COUNT {
775                if let Some(val) = rx.pop() {
776                    sum = sum.wrapping_add(val);
777                    received += 1;
778                } else {
779                    std::hint::spin_loop();
780                }
781            }
782            sum
783        });
784
785        producer.join().unwrap();
786        let sum = consumer.join().unwrap();
787        assert_eq!(sum, COUNT * (COUNT - 1) / 2);
788    }
789
790    #[test]
791    fn stress_multiple_consumers() {
792        use std::thread;
793
794        const CONSUMERS: usize = 4;
795        const TOTAL: u64 = 100_000;
796
797        let (tx, rx) = bounded::<u64>(1024);
798
799        let handles: Vec<_> = (0..CONSUMERS)
800            .map(|_| {
801                let rx = rx.clone();
802                thread::spawn(move || {
803                    let mut received = Vec::new();
804                    loop {
805                        if let Some(val) = rx.pop() {
806                            received.push(val);
807                        } else if rx.is_disconnected() {
808                            while let Some(val) = rx.pop() {
809                                received.push(val);
810                            }
811                            break;
812                        } else {
813                            std::hint::spin_loop();
814                        }
815                    }
816                    received
817                })
818            })
819            .collect();
820
821        drop(rx);
822
823        let producer = thread::spawn(move || {
824            for i in 0..TOTAL {
825                while tx.push(i).is_err() {
826                    std::hint::spin_loop();
827                }
828            }
829        });
830
831        producer.join().unwrap();
832
833        let mut all_received = Vec::new();
834        for h in handles {
835            all_received.extend(h.join().unwrap());
836        }
837
838        all_received.sort_unstable();
839        assert_eq!(all_received, (0..TOTAL).collect::<Vec<_>>());
840    }
841}