maniac_runtime/sync/
mpsc.rs

1//! Async and Blocking MPSC (Multi-Producer, Single-Consumer) queue implementation.
2//!
3//! This module provides both async and blocking adapters over the lock-free MPSC queue.
4//! Multiple producers can be mixed (some async, some blocking) with either an async or
5//! blocking consumer.
6//!
7//! # Queue Variants
8//!
9//! - **Async**: [`AsyncMpscSender`] / [`AsyncMpscReceiver`] - For use with async tasks
10//! - **Blocking**: [`BlockingMpscSender`] / [`BlockingMpscReceiver`] - For use with threads
11//! - **Mixed**: You can mix any combination of async/blocking senders with async/blocking receiver!
12//!
13//! All variants share the same waker infrastructure, allowing seamless interoperability.
14
15use super::signal::AsyncSignalGate;
16use super::signal::AsyncSignalWaker;
17use crate::spsc::Spsc;
18use crate::spsc::UnboundedSpsc;
19use crate::sync::signal::Signal;
20use crate::utils::bits::find_nearest;
21use crate::CachePadded;
22use crate::{PopError, PushError};
23use std::marker::PhantomData;
24use std::mem::{ManuallyDrop, MaybeUninit};
25use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering};
26use std::sync::Arc;
27use std::thread;
28
29use std::pin::Pin;
30use std::task::{Context, Poll, Waker};
31
32use futures::{sink::Sink, stream::Stream};
33
34use crate::future::waker::{DiatomicWaker, WaitUntil};
35use crate::parking::{Parker, Unparker};
36
37use rand::RngCore;
38
39use std::ptr::{self, NonNull};
40
41/// A waker implementation that unparks a thread.
42///
43/// Used to integrate blocking operations with the async waker infrastructure,
44/// allowing async and blocking operations to work together seamlessly.
45struct ThreadUnparker {
46    unparker: Unparker,
47}
48
49impl std::task::Wake for ThreadUnparker {
50    fn wake(self: Arc<Self>) {
51        self.unparker.unpark();
52    }
53
54    fn wake_by_ref(self: &Arc<Self>) {
55        self.unparker.unpark();
56    }
57}
58
59/// Create a new blocking MPSC queue
60pub fn new<T, const P: usize, const NUM_SEGS_P2: usize>() -> Receiver<T, P, NUM_SEGS_P2> {
61    new_with_waker(Arc::new(AsyncSignalWaker::new()))
62}
63
64/// Create a new blocking MPSC queue with a custom SignalWaker
65///
66/// This allows integration with external notification systems. The waker's
67/// callback will be invoked when work becomes available in the queue.
68///
69/// # Arguments
70///
71/// * `waker` - Custom SignalWaker (typically with a callback to update worker status)
72///
73/// # Example
74///
75/// ```ignore
76/// let worker_status = Arc::new(AtomicU64::new(0));
77/// let status_clone = Arc::clone(&worker_status);
78/// let waker = Arc::new(SignalWaker::new_with_callback(Some(Box::new(move || {
79///     status_clone.fetch_or(WORKER_BIT_QUEUE, Ordering::Release);
80/// }))));
81/// let receiver = mpsc::new_with_waker(waker);
82/// ```
83pub fn new_with_waker<T, const P: usize, const NUM_SEGS_P2: usize>(
84    waker: Arc<AsyncSignalWaker>,
85) -> Receiver<T, P, NUM_SEGS_P2> {
86    // Create sparse array of AtomicPtr<ProducerSlot>, all initialized to null
87    let mut queues = Vec::with_capacity(MAX_QUEUES);
88    for _ in 0..MAX_QUEUES {
89        queues.push(AtomicPtr::new(core::ptr::null_mut()));
90    }
91
92    let signals: Arc<[Signal; SIGNAL_WORDS]> =
93        Arc::new(std::array::from_fn(|i| Signal::with_index(i as u64)));
94
95    let inner = Arc::new(Inner {
96        queues: queues.into_boxed_slice(),
97        queue_count: CachePadded::new(AtomicUsize::new(0)),
98        producer_count: CachePadded::new(AtomicUsize::new(0)),
99        max_producer_id: AtomicUsize::new(0),
100        closed: CachePadded::new(AtomicBool::new(false)),
101        summary: waker,
102        signals,
103    });
104
105    Receiver {
106        inner,
107        misses: 0,
108        seed: rand::rng().next_u64(),
109    }
110}
111
112pub fn new_with_sender<T, const P: usize, const NUM_SEGS_P2: usize>()
113-> (Sender<T, P, NUM_SEGS_P2>, Receiver<T, P, NUM_SEGS_P2>) {
114    let waker = Arc::new(AsyncSignalWaker::new());
115    // Create sparse array of AtomicPtr<ProducerSlot>, all initialized to null
116    let mut queues = Vec::with_capacity(MAX_QUEUES);
117    for _ in 0..MAX_QUEUES {
118        queues.push(AtomicPtr::new(core::ptr::null_mut()));
119    }
120
121    let signals: Arc<[Signal; SIGNAL_WORDS]> =
122        Arc::new(std::array::from_fn(|i| Signal::with_index(i as u64)));
123
124    let inner = Arc::new(Inner {
125        queues: queues.into_boxed_slice(),
126        queue_count: CachePadded::new(AtomicUsize::new(0)),
127        producer_count: CachePadded::new(AtomicUsize::new(0)),
128        max_producer_id: AtomicUsize::new(0),
129        closed: CachePadded::new(AtomicBool::new(false)),
130        summary: waker,
131        signals,
132    });
133
134    (
135        inner
136            .create_sender()
137            .expect("fatal: mpsc won't allow even 1 sender"),
138        Receiver {
139            inner,
140            misses: 0,
141            seed: rand::rng().next_u64(),
142        },
143    )
144}
145
146const RND_MULTIPLIER: u64 = 0x5DEECE66D;
147const RND_ADDEND: u64 = 0xB;
148const RND_MASK: u64 = (1 << 48) - 1;
149
150/// Maximum number of producers (limited by SignalWaker summary capacity)
151const MAX_QUEUES: usize = 64 * 64;
152const QUEUES_PER_PRODUCER: usize = 1;
153const MAX_PRODUCERS: usize = MAX_QUEUES / QUEUES_PER_PRODUCER;
154const MAX_PRODUCERS_MASK: usize = QUEUES_PER_PRODUCER - 1;
155
156/// Number of u64 words needed for the signal bitset
157const SIGNAL_WORDS: usize = 64;
158
159/// Thread-local cache of SPSC queues for each producer thread
160/// Stores (producer_id, queue_ptr, close_fn) where close_fn can close the queue
161type CloseFn = Box<dyn FnOnce()>;
162
163/// A producer queue slot containing both the queue and its associated space waker.
164/// Allocated together for optimal cache locality - the waker is right next to the queue.
165struct ProducerSlot<T, const P: usize, const NUM_SEGS_P2: usize> {
166    queue: Spsc<T, P, NUM_SEGS_P2, AsyncSignalGate>,
167    space_waker: DiatomicWaker,
168}
169
170/// The shared state of the blocking MPSC queue
171///
172/// # Memory Ordering Guarantees
173///
174/// - `queues`: Atomic pointers use Acquire/Release for slot installation and cleanup
175/// - `queue_count`: Relaxed for statistics, not used for synchronization
176/// - `producer_count`: Release on decrement (Sender::drop), Relaxed on read after Acquire fence
177/// - `max_producer_id`: SeqCst for consistent global ordering across all threads
178/// - `closed`: Acquire/Release for happens-before between close and other operations
179/// - Sender::drop uses Release fence + Release store to ensure queued items are visible
180/// - Receiver::try_pop uses Acquire fence before checking producer_count == 0
181struct Inner<T, const P: usize, const NUM_SEGS_P2: usize> {
182    /// Sparse array of producer slot pointers - always MAX_QUEUES size
183    /// Each slot is allocated together with its queue and space waker for cache-friendly access
184    queues: Box<[AtomicPtr<ProducerSlot<T, P, NUM_SEGS_P2>>]>,
185    queue_count: CachePadded<AtomicUsize>,
186    /// Number of registered producers
187    producer_count: CachePadded<AtomicUsize>,
188    max_producer_id: AtomicUsize,
189    /// Closed flag
190    closed: CachePadded<AtomicBool>,
191    summary: Arc<AsyncSignalWaker>,
192    /// Signal bitset - one bit per producer indicating which queues has data
193    signals: Arc<[Signal; SIGNAL_WORDS]>,
194}
195
196impl<T, const P: usize, const NUM_SEGS_P2: usize> Inner<T, P, NUM_SEGS_P2> {
197    /// Check if the queue is closed
198    pub fn is_closed(&self) -> bool {
199        self.closed.load(Ordering::Acquire)
200    }
201
202    /// Get the number of registered producers
203    pub fn producer_count(&self) -> usize {
204        self.producer_count.load(Ordering::Relaxed)
205    }
206
207    pub fn create_sender(self: &Arc<Self>) -> Result<Sender<T, P, NUM_SEGS_P2>, PushError<()>> {
208        self.create_sender_with_config(0)
209    }
210
211    /// Create a new producer handle that bypasses all thread-local caching
212    ///
213    /// This creates a direct, high-performance handle to a specific producer queue.
214    /// The handle provides push-only access without any thread-local overhead, making
215    /// it ideal for scenarios where you need maximum performance and want to maintain
216    /// explicit control over producer instances.
217    ///
218    /// Unlike `get_producer_queue()`, this method:
219    /// - Does not register with thread-local storage
220    /// - Does not use caching mechanisms
221    /// - Provides a standalone handle that can be stored and reused
222    /// - Offers maximum push performance
223    ///
224    /// # Returns
225    ///
226    /// Returns a `ProducerHandle` that can be used to push values, or `PushError::Closed`
227    /// if the MPSC queue is closed.
228    ///
229    /// # Example
230    ///
231    /// ```ignoreignore
232    /// let mpsc = MpscBlocking::<i32, 64>::new();
233    ///
234    /// // Create a direct producer handle
235    /// let producer = mpsc.create_producer_handle().unwrap();
236    ///
237    /// // Use the handle for high-performance pushes
238    /// producer.push(42).unwrap();
239    /// producer.push_bulk(&[1, 2, 3]).unwrap();
240    /// ```ignore
241    pub fn create_sender_with_config(
242        self: &Arc<Self>,
243        max_pooled_segments: usize,
244    ) -> Result<Sender<T, P, NUM_SEGS_P2>, PushError<()>> {
245        if self.is_closed() {
246            return Err(PushError::Closed(()));
247        }
248
249        loop {
250            let current = self.producer_count.load(Ordering::Acquire);
251            if current >= MAX_PRODUCERS {
252                return Err(PushError::Full(()));
253            }
254            if self
255                .producer_count
256                .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
257                .is_ok()
258            {
259                break;
260            }
261        }
262
263        let mut assigned_id = None;
264        let mut slot_arc: Option<Arc<ProducerSlot<T, P, NUM_SEGS_P2>>> = None;
265
266        for signal_index in 0..SIGNAL_WORDS {
267            for bit_index in 0..64 {
268                let queue_index = signal_index * 64 + bit_index;
269                if queue_index >= MAX_QUEUES {
270                    break;
271                }
272                if !self.queues[queue_index].load(Ordering::Acquire).is_null() {
273                    continue;
274                }
275
276                let queue = unsafe {
277                    Spsc::<T, P, NUM_SEGS_P2, AsyncSignalGate>::new_unsafe_with_gate_and_config(
278                        AsyncSignalGate::new(
279                            bit_index as u8,
280                            self.signals[signal_index].clone(),
281                            Arc::clone(&self.summary),
282                        ),
283                        max_pooled_segments,
284                    )
285                };
286
287                let slot = Arc::new(ProducerSlot {
288                    queue,
289                    space_waker: DiatomicWaker::new(),
290                });
291
292                let raw = Arc::into_raw(Arc::clone(&slot)) as *mut ProducerSlot<T, P, NUM_SEGS_P2>;
293                match self.queues[queue_index].compare_exchange(
294                    ptr::null_mut(),
295                    raw,
296                    Ordering::Release,
297                    Ordering::Acquire,
298                ) {
299                    Ok(_) => {
300                        self.queue_count.fetch_add(1, Ordering::Relaxed);
301                        assigned_id = Some(queue_index);
302                        slot_arc = Some(slot);
303                        break;
304                    }
305                    Err(_) => unsafe {
306                        Arc::from_raw(raw);
307                    },
308                }
309            }
310            if assigned_id.is_some() {
311                break;
312            }
313        }
314
315        let producer_id = match assigned_id {
316            Some(id) => id,
317            None => {
318                self.producer_count.fetch_sub(1, Ordering::Release);
319                return Err(PushError::Full(()));
320            }
321        };
322
323        // Update max_producer_id if needed. We use SeqCst to ensure all threads
324        // see a consistent view of the maximum producer ID, which is used by
325        // the receiver to determine which slots to scan.
326        loop {
327            let max_producer_id = self.max_producer_id.load(Ordering::SeqCst);
328            // If our ID is less than or equal to the current max, we're done
329            if producer_id <= max_producer_id {
330                break;
331            }
332            if self.is_closed() {
333                return Err(PushError::Closed(()));
334            }
335            if self
336                .max_producer_id
337                .compare_exchange(
338                    max_producer_id,
339                    producer_id,
340                    Ordering::SeqCst,
341                    Ordering::SeqCst,
342                )
343                .is_ok()
344            {
345                break;
346            }
347        }
348
349        // Arc reference counting: We created Arc::new (refcount=1), then cloned it
350        // and converted the clone to raw via Arc::into_raw(Arc::clone(&slot)).
351        // - Raw pointer stored in queues[producer_id] represents 1 Arc reference
352        // - slot_arc variable holds the other Arc reference
353        // - Total: 2 references, properly balanced for array + Sender ownership
354        let slot_arc = slot_arc.expect("slot arc missing");
355
356        Ok(Sender {
357            inner: Arc::clone(&self),
358            slot: slot_arc,
359            producer_id,
360        })
361    }
362
363    /// Close the queue
364    ///
365    /// After closing, no more items can be pushed. This method will block until all
366    /// SPSC queues are empty. Wakes any waiting consumer.
367    ///
368    /// Note: This waits for consumer to drain all items from all producer queues.
369    pub fn close(&self) -> bool {
370        let was_open = !self.closed.swap(true, Ordering::AcqRel);
371        if was_open {
372            let permits = self.queue_count.load(Ordering::Relaxed).max(1);
373            self.summary.release(permits);
374        }
375
376        for slot_atomic in self.queues.iter() {
377            let slot_ptr = slot_atomic.swap(ptr::null_mut(), Ordering::AcqRel);
378            if slot_ptr.is_null() {
379                continue;
380            }
381
382            unsafe {
383                (*slot_ptr).queue.close();
384                Arc::from_raw(slot_ptr);
385            }
386
387            self.queue_count.fetch_sub(1, Ordering::Relaxed);
388        }
389
390        self.producer_count.store(0, Ordering::Release);
391        true
392    }
393}
394
395/// Blocking MPMC Queue - Multi-Producer Single-Consumer with blocking operations
396///
397/// # Type Parameters
398/// - `T`: The type of elements stored in the queue (must be Copy)
399/// - `P`: log2 of segment size (default 8 = 256 items/segment)
400/// - `NUM_SEGS_P2`: log2 of number of segments (default 2 = 4 segments, total capacity ~1024)
401///
402/// # Examples
403///
404/// ```ignore
405/// use bop_mpmc::mpsc_blocking::MpscBlocking;
406/// use std::thread;
407/// use std::time::Duration;
408///
409/// let mpsc = MpscBlocking::<i32>::new();
410///
411/// // Producer thread
412/// let mpsc_producer = mpsc.clone();
413/// thread::spawn(move || {
414///     thread::sleep(Duration::from_millis(100));
415///     mpsc_producer.push(42).unwrap();
416/// });
417///
418/// // Consumer blocks until item is available
419/// let value = mpsc.pop_blocking().unwrap();
420/// assert_eq!(value, 42);
421/// ```ignore
422pub struct Sender<T, const P: usize, const NUM_SEGS_P2: usize> {
423    inner: Arc<Inner<T, P, NUM_SEGS_P2>>,
424    slot: Arc<ProducerSlot<T, P, NUM_SEGS_P2>>,
425    producer_id: usize,
426}
427
428impl<T, const P: usize, const NUM_SEGS_P2: usize> Sender<T, P, NUM_SEGS_P2> {
429    /// Check if the queue is closed
430    pub fn is_closed(&self) -> bool {
431        self.inner.closed.load(Ordering::Acquire)
432    }
433
434    /// Get the number of registered producers
435    pub fn producer_count(&self) -> usize {
436        self.inner.producer_count.load(Ordering::Relaxed)
437    }
438
439    /// Get the producer ID for this handle
440    pub fn producer_id(&self) -> usize {
441        self.producer_id
442    }
443
444    /// Push a value onto the queue
445    ///
446    /// This will use the thread-local SPSC queue for this producer.
447    /// If successful, notifies any waiting consumer.
448    pub fn try_push(&mut self, value: T) -> Result<(), PushError<T>> {
449        self.slot.queue.try_push(value)
450    }
451
452    /// Push a value onto the queue (spins if full)
453    ///
454    /// This will use the thread-local SPSC queue for this producer.
455    /// If successful, notifies any waiting consumer.
456    pub fn push_spin(&mut self, mut value: T) -> Result<(), PushError<T>> {
457        loop {
458            match self.slot.queue.try_push(value) {
459                Ok(()) => return Ok(()),
460                Err(PushError::Full(returned)) => {
461                    value = returned;
462                    std::hint::spin_loop();
463                }
464                Err(err @ PushError::Closed(_)) => return Err(err),
465            }
466        }
467    }
468
469    /// Push multiple values in bulk
470    pub fn try_push_n(&mut self, values: &[T]) -> Result<usize, PushError<()>> {
471        if self.is_closed() {
472            return Err(PushError::Closed(()));
473        }
474        self.slot.queue.try_push_n(values)
475    }
476
477    /// Push a value onto the queue
478    ///
479    /// This will use the thread-local SPSC queue for this producer.
480    /// If successful, notifies any waiting consumer.
481    pub unsafe fn unsafe_try_push(&self, value: T) -> Result<(), PushError<T>> {
482        self.slot.queue.try_push(value)
483    }
484
485    /// Push multiple values in bulk
486    pub unsafe fn unsafe_try_push_n(&self, values: &[T]) -> Result<usize, PushError<()>> {
487        if self.is_closed() {
488            return Err(PushError::Closed(()));
489        }
490        self.slot.queue.try_push_n(values)
491    }
492
493    /// Close the queue
494    ///
495    /// After closing, no more items can be pushed. This method will block until all
496    /// SPSC queues are empty. Wakes any waiting consumer.
497    ///
498    /// Note: This waits for consumer to drain all items from all producer queues.
499    pub fn close(&mut self) -> bool {
500        self.inner.close()
501    }
502}
503
504impl<T, const P: usize, const NUM_SEGS_P2: usize> Clone for Sender<T, P, NUM_SEGS_P2> {
505    fn clone(&self) -> Self {
506        self.inner.create_sender().expect("too many senders")
507    }
508}
509
510impl<T, const P: usize, const NUM_SEGS_P2: usize> Drop for Sender<T, P, NUM_SEGS_P2> {
511    fn drop(&mut self) {
512        unsafe {
513            self.slot.queue.close();
514        }
515
516        // Release fence to ensure all writes to the queue are visible before we
517        // decrement producer_count. This synchronizes with the Acquire fence in
518        // Receiver::try_pop_with_waker(), ensuring the receiver sees any items
519        // pushed before this sender was dropped.
520        std::sync::atomic::fence(Ordering::Release);
521
522        // Decrement the active producer count. We use Release ordering to publish
523        // the fence above and all prior writes. We intentionally keep the producer
524        // slot registered until the consumer observes the closed queue and cleans it up.
525        self.inner.producer_count.fetch_sub(1, Ordering::Release);
526    }
527}
528
529pub struct Receiver<T, const P: usize, const NUM_SEGS_P2: usize> {
530    inner: Arc<Inner<T, P, NUM_SEGS_P2>>,
531    misses: u64,
532    seed: u64,
533}
534
535impl<T, const P: usize, const NUM_SEGS_P2: usize> Receiver<T, P, NUM_SEGS_P2> {
536    pub fn next(&mut self) -> u64 {
537        let old_seed = self.seed;
538        let next_seed = (old_seed
539            .wrapping_mul(RND_MULTIPLIER)
540            .wrapping_add(RND_ADDEND))
541            & RND_MASK;
542        self.seed = next_seed;
543        next_seed >> 16
544    }
545
546    /// Get a reference to the inner shared state
547    pub(crate) fn inner(&self) -> &Arc<Inner<T, P, NUM_SEGS_P2>> {
548        &self.inner
549    }
550
551    /// Check if the queue is closed
552    pub fn is_closed(&self) -> bool {
553        self.inner.closed.load(Ordering::Acquire)
554    }
555
556    /// Close the queue
557    ///
558    /// After closing, no more items can be pushed. This method will block until all
559    /// SPSC queues are empty. Wakes any waiting consumer.
560    ///
561    /// Note: This waits for consumer to drain all items from all producer queues.
562    pub fn close(&self) -> bool {
563        self.inner.close()
564    }
565
566    /// Get the number of registered producers
567    pub fn producer_count(&self) -> usize {
568        self.inner.producer_count.load(Ordering::Relaxed)
569    }
570
571    pub fn create_sender(&self) -> Result<Sender<T, P, NUM_SEGS_P2>, PushError<()>> {
572        self.create_sender_with_config(0)
573    }
574
575    /// Create a new producer handle that bypasses all thread-local caching
576    ///
577    /// This creates a direct, high-performance handle to a specific producer queue.
578    /// The handle provides push-only access without any thread-local overhead, making
579    /// it ideal for scenarios where you need maximum performance and want to maintain
580    /// explicit control over producer instances.
581    ///
582    /// Unlike `get_producer_queue()`, this method:
583    /// - Does not register with thread-local storage
584    /// - Does not use caching mechanisms
585    /// - Provides a standalone handle that can be stored and reused
586    /// - Offers maximum push performance
587    ///
588    /// # Returns
589    ///
590    /// Returns a `ProducerHandle` that can be used to push values, or `PushError::Closed`
591    /// if the MPSC queue is closed.
592    ///
593    /// # Example
594    ///
595    /// ```ignoreignore
596    /// let mpsc = MpscBlocking::<i32, 64>::new();
597    ///
598    /// // Create a direct producer handle
599    /// let producer = mpsc.create_producer_handle().unwrap();
600    ///
601    /// // Use the handle for high-performance pushes
602    /// producer.push(42).unwrap();
603    /// producer.push_bulk(&[1, 2, 3]).unwrap();
604    /// ```ignore
605    pub fn create_sender_with_config(
606        &self,
607        max_pooled_segments: usize,
608    ) -> Result<Sender<T, P, NUM_SEGS_P2>, PushError<()>> {
609        self.inner.create_sender_with_config(max_pooled_segments)
610    }
611
612    pub fn try_pop_n(&mut self, batch: &mut [T]) -> usize {
613        let (count, waker_opt) = self.try_pop_n_with_slot(batch);
614        if let Some(waker) = waker_opt {
615            waker.notify();
616        }
617        count
618    }
619
620    /// Attempts to pop a single value and returns the item along with the producer id.
621    /// Try to pop a single value, also returning the space waker to notify
622    pub fn try_pop_with_waker(&mut self) -> Result<(T, &DiatomicWaker), PopError> {
623        // Clone Arc to get a separate reference before the mutable borrow
624        let inner = Arc::clone(&self.inner);
625        let mut slot = MaybeUninit::<T>::uninit();
626        let slice = unsafe { std::slice::from_raw_parts_mut(slot.as_mut_ptr(), 1) };
627        let (drained, waker_opt) = self.try_pop_n_with_slot(slice);
628        if drained == 0 {
629            // Acquire fence BEFORE checking state to ensure we see all writes from producers.
630            // This synchronizes with the Release fence in Sender::drop() and ensures we see
631            // any items pushed before the last producer dropped.
632            std::sync::atomic::fence(Ordering::Acquire);
633            
634            // Re-check state after attempting to pop (producer_count may have changed)
635            let is_closed = inner.is_closed();
636            // Use Relaxed here since the fence above provides the necessary synchronization
637            let producer_count = inner.producer_count.load(Ordering::Relaxed);
638
639            // Queue is closed if explicitly closed OR no producers remain (all senders dropped)
640            if is_closed || producer_count == 0 {
641                Err(PopError::Closed)
642            } else {
643                Err(PopError::Empty)
644            }
645        } else {
646            let value = unsafe { slot.assume_init() };
647            let waker = waker_opt.expect("waker missing for drained item");
648            Ok((value, waker))
649        }
650    }
651
652    /// Try to pop a single value (without returning the waker)
653    pub fn try_pop(&mut self) -> Result<T, PopError> {
654        self.try_pop_with_waker().map(|(v, _)| v)
655    }
656
657    fn try_pop_n_with_slot(&mut self, batch: &mut [T]) -> (usize, Option<&DiatomicWaker>) {
658        for _ in 0..64 {
659            match self.acquire() {
660                Some((producer_id, slot_ptr)) => {
661                    let slot = unsafe { &*slot_ptr };
662                    match slot.queue.try_pop_n(batch) {
663                        Ok(size) => {
664                            slot.queue.unmark_and_schedule();
665                            return (size, Some(&slot.space_waker));
666                        }
667                        Err(PopError::Closed) => {
668                            slot.queue.unmark();
669                            self.cleanup_closed_slot(producer_id, slot_ptr);
670                        }
671                        Err(_) => {
672                            slot.queue.unmark();
673                            self.misses += 1;
674                        }
675                    };
676                }
677                None => {
678                    // No available producer this round
679                }
680            }
681        }
682
683        (0, None)
684    }
685
686    fn cleanup_closed_slot(
687        &self,
688        producer_id: usize,
689        slot_ptr: *mut ProducerSlot<T, P, NUM_SEGS_P2>,
690    ) {
691        let slot_atomic = &self.inner.queues[producer_id];
692        let current = slot_atomic.load(Ordering::Acquire);
693        if current != slot_ptr {
694            return;
695        }
696
697        if slot_atomic
698            .compare_exchange(
699                slot_ptr,
700                ptr::null_mut(),
701                Ordering::AcqRel,
702                Ordering::Acquire,
703            )
704            .is_ok()
705        {
706            self.inner.queue_count.fetch_sub(1, Ordering::Relaxed);
707            unsafe {
708                Arc::from_raw(slot_ptr);
709            }
710        }
711    }
712
713    fn acquire(&mut self) -> Option<(usize, *mut ProducerSlot<T, P, NUM_SEGS_P2>)> {
714        let random = self.next() as usize;
715        // Try selecting signal index from summary hint
716        let random_word = random % SIGNAL_WORDS;
717        let mut signal_index = self.inner.summary.summary_select(random_word as u64) as usize;
718
719        if signal_index >= SIGNAL_WORDS {
720            signal_index = random_word;
721        }
722
723        let mut signal_bit = self.next() & 63;
724        let signal = &self.inner.signals[signal_index];
725        let signal_value = signal.load(Ordering::Acquire);
726
727        // Find nearest set bit for fairness
728        signal_bit = find_nearest(signal_value, signal_bit);
729
730        // 64 and over is out of bounds
731        if signal_bit >= 64 {
732            self.misses += 1;
733            return None;
734        }
735
736        // Atomically acquire the bit
737        let (bit, expected, acquired) = signal.try_acquire(signal_bit);
738
739        if !acquired {
740            // Contention - try next
741            // self.contention += 1;
742            std::hint::spin_loop();
743            return None;
744        }
745
746        // Is the signal empty?
747        let empty = expected == bit;
748
749        if empty {
750            self.inner
751                .summary
752                .try_unmark_if_empty(signal.index(), signal.value());
753        }
754
755        // Compute producer id
756        let producer_id = signal_index * 64 + (signal_bit as usize);
757
758        // Load the slot pointer atomically
759        let slot_ptr = self.inner.queues[producer_id].load(Ordering::Acquire);
760        if slot_ptr.is_null() {
761            self.misses += 1;
762            if empty {
763                self.inner
764                    .summary
765                    .try_unmark_if_empty(signal.index(), signal.value());
766            }
767            return None;
768        }
769
770        // SAFETY: The pointer is valid and we have exclusive consumer access
771        let slot = unsafe { &*slot_ptr };
772
773        // Mark as EXECUTING
774        slot.queue.mark();
775
776        Some((producer_id, slot_ptr))
777    }
778}
779
780impl<T, const P: usize, const NUM_SEGS_P2: usize> Drop for Receiver<T, P, NUM_SEGS_P2> {
781    fn drop(&mut self) {
782        self.inner.close();
783    }
784}
785
786impl<T, const P: usize, const NUM_SEGS_P2: usize> Drop for Inner<T, P, NUM_SEGS_P2> {
787    fn drop(&mut self) {
788        self.close();
789    }
790}
791
792struct SenderPtr<'a, T, const P: usize, const NUM_SEGS_P2: usize> {
793    ptr: NonNull<Sender<T, P, NUM_SEGS_P2>>,
794    _marker: PhantomData<&'a ()>,
795}
796
797impl<'a, T, const P: usize, const NUM_SEGS_P2: usize> Copy for SenderPtr<'a, T, P, NUM_SEGS_P2> {}
798
799impl<'a, T, const P: usize, const NUM_SEGS_P2: usize> Clone for SenderPtr<'a, T, P, NUM_SEGS_P2> {
800    fn clone(&self) -> Self {
801        *self
802    }
803}
804
805impl<'a, T: 'a, const P: usize, const NUM_SEGS_P2: usize> SenderPtr<'a, T, P, NUM_SEGS_P2> {
806    #[inline]
807    unsafe fn space_waker(self) -> &'a DiatomicWaker {
808        let sender = self.ptr.as_ptr();
809        let sender_ref: &Sender<T, P, NUM_SEGS_P2> = unsafe { &*sender };
810        &sender_ref.slot.space_waker
811    }
812
813    #[inline]
814    unsafe fn with_mut<R>(self, f: impl FnOnce(&mut Sender<T, P, NUM_SEGS_P2>) -> R) -> R {
815        let sender = self.ptr.as_ptr();
816        let sender_mut: &mut Sender<T, P, NUM_SEGS_P2> = unsafe { &mut *sender };
817        f(sender_mut)
818    }
819}
820
821unsafe impl<T: Send, const P: usize, const NUM_SEGS_P2: usize> Send
822    for SenderPtr<'_, T, P, NUM_SEGS_P2>
823{
824}
825unsafe impl<T: Send, const P: usize, const NUM_SEGS_P2: usize> Sync
826    for SenderPtr<'_, T, P, NUM_SEGS_P2>
827{
828}
829
830/// Shared state for async MPSC coordination.
831///
832/// Provides receiver-side waker management. Producer-side space wakers are stored
833/// per-producer in `Inner::space_wakers` for targeted notification (no spurious wakeups).
834struct AsyncMpscShared<T, const P: usize, const NUM_SEGS_P2: usize> {
835    receiver_waiter: CachePadded<DiatomicWaker>,
836    inner: Arc<Inner<T, P, NUM_SEGS_P2>>,
837}
838
839impl<T, const P: usize, const NUM_SEGS_P2: usize> AsyncMpscShared<T, P, NUM_SEGS_P2> {
840    fn new(inner: Arc<Inner<T, P, NUM_SEGS_P2>>) -> Self {
841        Self {
842            receiver_waiter: CachePadded::new(DiatomicWaker::new()),
843            inner,
844        }
845    }
846
847    #[inline]
848    fn notify_receiver(&self) {
849        self.receiver_waiter.notify();
850    }
851
852    #[inline]
853    unsafe fn wait_for_items<Pred, R>(&self, predicate: Pred) -> WaitUntil<'_, Pred, R>
854    where
855        Pred: FnMut() -> Option<R>,
856    {
857        unsafe { self.receiver_waiter.wait_until(predicate) }
858    }
859
860    #[inline]
861    unsafe fn register_receiver(&self, waker: &Waker) {
862        unsafe { self.receiver_waiter.register(waker) };
863    }
864}
865
866pub struct AsyncMpscSender<T, const P: usize, const NUM_SEGS_P2: usize> {
867    sender: Sender<T, P, NUM_SEGS_P2>,
868    shared: Arc<AsyncMpscShared<T, P, NUM_SEGS_P2>>,
869}
870
871pub struct AsyncMpscReceiver<T, const P: usize, const NUM_SEGS_P2: usize> {
872    receiver: Receiver<T, P, NUM_SEGS_P2>,
873    shared: Arc<AsyncMpscShared<T, P, NUM_SEGS_P2>>,
874}
875
876impl<T, const P: usize, const NUM_SEGS_P2: usize> AsyncMpscSender<T, P, NUM_SEGS_P2> {
877    fn new(
878        sender: Sender<T, P, NUM_SEGS_P2>,
879        shared: Arc<AsyncMpscShared<T, P, NUM_SEGS_P2>>,
880    ) -> Self {
881        // space_waker is already allocated in the ProducerSlot by create_sender
882        Self { sender, shared }
883    }
884
885    /// Attempts to enqueue without blocking.
886    #[inline]
887    pub fn try_send(&mut self, value: T) -> Result<(), PushError<T>> {
888        match self.sender.try_push(value) {
889            Ok(()) => {
890                self.shared.notify_receiver();
891                Ok(())
892            }
893            Err(err) => Err(err),
894        }
895    }
896
897    pub async fn send(&mut self, value: T) -> Result<(), PushError<T>> {
898        match self.try_send(value) {
899            Ok(()) => Ok(()),
900            Err(PushError::Full(item)) => {
901                let mut pending = Some(item);
902                let shared = Arc::clone(&self.shared);
903                let sender_ptr = SenderPtr {
904                    ptr: NonNull::from(&mut self.sender),
905                    _marker: PhantomData,
906                };
907                let waker_ptr = sender_ptr;
908                let wait = unsafe {
909                    waker_ptr.space_waker().wait_until(move || {
910                        let candidate = pending.take()?;
911                        sender_ptr.with_mut(|sender| match sender.try_push(candidate) {
912                            Ok(()) => {
913                                shared.notify_receiver();
914                                Some(Ok(()))
915                            }
916                            Err(PushError::Full(candidate)) => {
917                                pending = Some(candidate);
918                                None
919                            }
920                            Err(PushError::Closed(candidate)) => {
921                                Some(Err(PushError::Closed(candidate)))
922                            }
923                        })
924                    })
925                };
926                wait.await
927            }
928            Err(PushError::Closed(item)) => Err(PushError::Closed(item)),
929        }
930    }
931
932    pub async fn send_slice(&mut self, values: Vec<T>) -> Result<Vec<T>, PushError<Vec<T>>> {
933        let mut values = ManuallyDrop::new(values);
934        let data_ptr = values.as_mut_ptr() as usize;
935        let len = values.len();
936        let cap = values.capacity();
937        std::mem::forget(values);
938
939        if len == 0 {
940            let empty = unsafe { Vec::from_raw_parts(data_ptr as *mut T, 0, cap) };
941            return Ok(empty);
942        }
943
944        let slice_from = |sent: usize| unsafe {
945            std::slice::from_raw_parts((data_ptr as *const T).add(sent), len - sent)
946        };
947        let finish_empty = || unsafe { Vec::from_raw_parts(data_ptr as *mut T, 0, cap) };
948        let finish_remaining = |sent: usize| unsafe {
949            let remaining = len - sent;
950            if remaining > 0 {
951                ptr::copy(
952                    (data_ptr as *mut T).add(sent),
953                    data_ptr as *mut T,
954                    remaining,
955                );
956            }
957            Vec::from_raw_parts(data_ptr as *mut T, remaining, cap)
958        };
959
960        let mut sent = 0usize;
961
962        match self.sender.try_push_n(slice_from(sent)) {
963            Ok(written) => {
964                if written > 0 {
965                    self.shared.notify_receiver();
966                    sent += written;
967                    if sent == len {
968                        return Ok(finish_empty());
969                    }
970                }
971            }
972            Err(PushError::Closed(())) => {
973                let remaining_vec = finish_remaining(sent);
974                return Err(PushError::Closed(remaining_vec));
975            }
976            Err(PushError::Full(())) => {}
977        }
978
979        let shared = Arc::clone(&self.shared);
980        let sender_ptr = SenderPtr {
981            ptr: NonNull::from(&mut self.sender),
982            _marker: PhantomData,
983        };
984        let waker_ptr = sender_ptr;
985        let wait = unsafe {
986            waker_ptr.space_waker().wait_until(move || {
987                if sent == len {
988                    return Some(Ok(finish_empty()));
989                }
990                sender_ptr.with_mut(|sender| match sender.try_push_n(slice_from(sent)) {
991                    Ok(written) => {
992                        if written > 0 {
993                            shared.notify_receiver();
994                            sent += written;
995                            if sent == len {
996                                return Some(Ok(finish_empty()));
997                            }
998                        }
999                        None
1000                    }
1001                    Err(PushError::Full(())) => None,
1002                    Err(PushError::Closed(())) => {
1003                        let remaining_vec = finish_remaining(sent);
1004                        Some(Err(PushError::Closed(remaining_vec)))
1005                    }
1006                })
1007            })
1008        };
1009        wait.await
1010    }
1011
1012    pub async fn send_batch<I>(&mut self, iter: I) -> Result<(), PushError<T>>
1013    where
1014        I: IntoIterator<Item = T>,
1015    {
1016        for item in iter {
1017            self.send(item).await?;
1018        }
1019        Ok(())
1020    }
1021
1022    pub fn close(&mut self) -> bool {
1023        self.sender.close()
1024    }
1025}
1026
1027impl<T, const P: usize, const NUM_SEGS_P2: usize> Clone for AsyncMpscSender<T, P, NUM_SEGS_P2> {
1028    fn clone(&self) -> Self {
1029        let sender = self.sender.clone();
1030        Self::new(sender, Arc::clone(&self.shared))
1031    }
1032}
1033
1034impl<T, const P: usize, const NUM_SEGS_P2: usize> Drop for AsyncMpscSender<T, P, NUM_SEGS_P2> {
1035    fn drop(&mut self) {
1036        self.shared.notify_receiver();
1037    }
1038}
1039
1040impl<T, const P: usize, const NUM_SEGS_P2: usize> Sink<T> for AsyncMpscSender<T, P, NUM_SEGS_P2> {
1041    type Error = PushError<T>;
1042
1043    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), PushError<T>>> {
1044        let this = unsafe { self.get_unchecked_mut() };
1045
1046        // Check if there's space available (not full)
1047        if !this.sender.slot.queue.is_full() {
1048            return Poll::Ready(Ok(()));
1049        }
1050
1051        // No space available, register waker and return pending
1052        unsafe {
1053            this.sender.slot.space_waker.register(cx.waker());
1054        }
1055
1056        // Double-check after registering (prevent missed wakeup)
1057        if !this.sender.slot.queue.is_full() {
1058            return Poll::Ready(Ok(()));
1059        }
1060
1061        Poll::Pending
1062    }
1063
1064    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), PushError<T>> {
1065        let this = unsafe { self.get_unchecked_mut() };
1066        this.try_send(item)
1067    }
1068
1069    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), PushError<T>>> {
1070        Poll::Ready(Ok(()))
1071    }
1072
1073    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), PushError<T>>> {
1074        let this = unsafe { self.get_unchecked_mut() };
1075        this.close();
1076        Poll::Ready(Ok(()))
1077    }
1078}
1079
1080impl<T, const P: usize, const NUM_SEGS_P2: usize> AsyncMpscReceiver<T, P, NUM_SEGS_P2> {
1081    fn new(
1082        receiver: Receiver<T, P, NUM_SEGS_P2>,
1083        shared: Arc<AsyncMpscShared<T, P, NUM_SEGS_P2>>,
1084    ) -> Self {
1085        Self { receiver, shared }
1086    }
1087
1088    #[inline]
1089    pub fn try_recv(&mut self) -> Result<T, PopError> {
1090        match self.receiver.try_pop_with_waker() {
1091            Ok((value, waker)) => {
1092                waker.notify();
1093                Ok(value)
1094            }
1095            Err(err) => Err(err),
1096        }
1097    }
1098
1099    pub async fn recv(&mut self) -> Result<T, PopError> {
1100        match self.try_recv() {
1101            Ok(value) => Ok(value),
1102            Err(PopError::Empty) | Err(PopError::Timeout) => {
1103                let shared = Arc::clone(&self.shared);
1104                let receiver = &mut self.receiver;
1105                unsafe {
1106                    shared
1107                        .wait_for_items(|| match receiver.try_pop_with_waker() {
1108                            Ok((value, waker)) => {
1109                                waker.notify();
1110                                Some(Ok(value))
1111                            }
1112                            Err(PopError::Empty) | Err(PopError::Timeout) => None,
1113                            Err(PopError::Closed) => Some(Err(PopError::Closed)),
1114                        })
1115                        .await
1116                }
1117            }
1118            Err(PopError::Closed) => Err(PopError::Closed),
1119        }
1120    }
1121
1122    /// Asynchronously receives multiple items into a destination slice.
1123    ///
1124    /// This method fills the provided slice with items from the queue, blocking
1125    /// until at least one item is available. It returns as soon as any items are
1126    /// received (partial fill) or when the queue is closed.
1127    ///
1128    /// # Advantages over `recv()` in a loop
1129    ///
1130    /// - **Bulk operations**: More efficient than calling `recv()` repeatedly
1131    /// - **Partial results**: Returns immediately with whatever is available
1132    /// - **Zero allocation**: Slice reference lives in Future stack frame
1133    /// - **Automatic backpressure**: Notifies producers as space becomes available
1134    ///
1135    /// # Returns
1136    ///
1137    /// - `Ok(count)`: Number of items written to `dst` (1..=dst.len())
1138    /// - `Err(PopError::Closed)`: Queue closed with no items available
1139    ///
1140    /// # Example
1141    ///
1142    /// ```ignore
1143    /// let mut buffer = [0u32; 128];
1144    /// match receiver.recv_batch(&mut buffer).await {
1145    ///     Ok(count) => {
1146    ///         // Process buffer[..count]
1147    ///     }
1148    ///     Err(PopError::Closed) => {
1149    ///         // Queue closed
1150    ///     }
1151    /// }
1152    /// ```
1153    pub async fn recv_batch(&mut self, dst: &mut [T]) -> Result<usize, PopError> {
1154        if dst.is_empty() {
1155            return Ok(0);
1156        }
1157
1158        let receiver = &mut self.receiver;
1159        let shared = &self.shared;
1160
1161        // Fast path: try immediate receive
1162        // try_pop_n already notifies the waker internally
1163        let mut filled = receiver.try_pop_n(dst);
1164        if filled > 0 {
1165            return Ok(filled);
1166        }
1167
1168        // Queue is closed if explicitly closed OR no producers remain (all senders dropped)
1169        if receiver.is_closed() || receiver.producer_count() == 0 {
1170            return Err(PopError::Closed);
1171        }
1172
1173        // Slow path: need to block
1174        unsafe {
1175            shared
1176                .wait_for_items(|| {
1177                    if filled == dst.len() {
1178                        return Some(Ok(filled));
1179                    }
1180
1181                    let count = receiver.try_pop_n(&mut dst[filled..]);
1182                    if count == 0 {
1183                        // Queue is closed if explicitly closed OR no producers remain
1184                        if receiver.is_closed() || receiver.producer_count() == 0 {
1185                            Some(if filled > 0 {
1186                                Ok(filled)
1187                            } else {
1188                                Err(PopError::Closed)
1189                            })
1190                        } else {
1191                            None
1192                        }
1193                    } else {
1194                        filled += count;
1195                        // try_pop_n already notifies the waker internally
1196                        if filled == dst.len() {
1197                            Some(Ok(filled))
1198                        } else {
1199                            None
1200                        }
1201                    }
1202                })
1203                .await
1204        }
1205    }
1206}
1207
1208impl<T, const P: usize, const NUM_SEGS_P2: usize> Stream for AsyncMpscReceiver<T, P, NUM_SEGS_P2> {
1209    type Item = T;
1210
1211    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1212        let this = unsafe { self.get_unchecked_mut() };
1213        match this.try_recv() {
1214            Ok(value) => Poll::Ready(Some(value)),
1215            Err(PopError::Closed) => Poll::Ready(None),
1216            Err(PopError::Empty) | Err(PopError::Timeout) => {
1217                unsafe {
1218                    this.shared.register_receiver(cx.waker());
1219                }
1220                match this.try_recv() {
1221                    Ok(value) => Poll::Ready(Some(value)),
1222                    Err(PopError::Closed) => Poll::Ready(None),
1223                    Err(PopError::Empty) | Err(PopError::Timeout) => Poll::Pending,
1224                }
1225            }
1226        }
1227    }
1228}
1229
1230/// Blocking MPSC sender.
1231///
1232/// Multiple blocking senders can be created and used concurrently. They share the same
1233/// waker infrastructure as async senders, allowing perfect interoperability.
1234pub struct BlockingMpscSender<T, const P: usize, const NUM_SEGS_P2: usize> {
1235    sender: Sender<T, P, NUM_SEGS_P2>,
1236    shared: Arc<AsyncMpscShared<T, P, NUM_SEGS_P2>>,
1237    parker: Parker,
1238    parker_waker: Arc<ThreadUnparker>,
1239}
1240
1241impl<T, const P: usize, const NUM_SEGS_P2: usize> BlockingMpscSender<T, P, NUM_SEGS_P2> {
1242    fn new(
1243        sender: Sender<T, P, NUM_SEGS_P2>,
1244        shared: Arc<AsyncMpscShared<T, P, NUM_SEGS_P2>>,
1245    ) -> Self {
1246        let parker = Parker::new();
1247        let parker_waker = Arc::new(ThreadUnparker {
1248            unparker: parker.unparker(),
1249        });
1250        Self {
1251            sender,
1252            shared,
1253            parker,
1254            parker_waker,
1255        }
1256    }
1257
1258    /// Register a waker for space availability notifications
1259    #[inline]
1260    unsafe fn register_space_waker(&self, waker: &Waker) {
1261        unsafe { self.sender.slot.space_waker.register(waker) };
1262    }
1263
1264    #[inline]
1265    pub fn try_send(&mut self, value: T) -> Result<(), PushError<T>> {
1266        match self.sender.try_push(value) {
1267            Ok(()) => {
1268                self.shared.notify_receiver();
1269                Ok(())
1270            }
1271            Err(err) => Err(err),
1272        }
1273    }
1274
1275    /// Blocking send that parks the thread until space is available.
1276    pub fn send(&mut self, mut value: T) -> Result<(), PushError<T>> {
1277        let sender_ptr = &mut self.sender as *mut Sender<T, P, NUM_SEGS_P2>;
1278
1279        // Fast path: try immediate send
1280        match unsafe { (&mut *sender_ptr).try_push(value) } {
1281            Ok(()) => return Ok(()),
1282            Err(PushError::Closed(item)) => return Err(PushError::Closed(item)),
1283            Err(PushError::Full(item)) => value = item,
1284        }
1285
1286        // Slow path: need to block
1287        let waker = Waker::from(Arc::clone(&self.parker_waker));
1288
1289        loop {
1290            unsafe {
1291                self.register_space_waker(&waker);
1292            }
1293
1294            match unsafe { (&mut *sender_ptr).try_push(value) } {
1295                Ok(()) => {
1296                    self.shared.notify_receiver();
1297                    return Ok(());
1298                }
1299                Err(PushError::Full(item)) => {
1300                    value = item;
1301                    self.parker.park();
1302                }
1303                Err(PushError::Closed(item)) => {
1304                    return Err(PushError::Closed(item));
1305                }
1306            }
1307        }
1308    }
1309
1310    /// Sends multiple items from a slice, blocking until all are sent.
1311    pub fn send_slice(&mut self, values: Vec<T>) -> Result<Vec<T>, PushError<Vec<T>>> {
1312        let mut values = ManuallyDrop::new(values);
1313        let data_ptr = values.as_mut_ptr();
1314        let len = values.len();
1315        let cap = values.capacity();
1316
1317        if len == 0 {
1318            let empty = unsafe { Vec::from_raw_parts(data_ptr, 0, cap) };
1319            return Ok(empty);
1320        }
1321
1322        let slice_from =
1323            |sent: usize| unsafe { std::slice::from_raw_parts(data_ptr.add(sent), len - sent) };
1324        let finish_empty = || unsafe { Vec::from_raw_parts(data_ptr, 0, cap) };
1325        let finish_remaining = |sent: usize| unsafe {
1326            let remaining = len - sent;
1327            if remaining > 0 {
1328                ptr::copy(data_ptr.add(sent), data_ptr, remaining);
1329            }
1330            Vec::from_raw_parts(data_ptr, remaining, cap)
1331        };
1332
1333        let waker = Waker::from(Arc::clone(&self.parker_waker));
1334        let sender_ptr = SenderPtr {
1335            ptr: NonNull::from(&mut self.sender),
1336            _marker: PhantomData,
1337        };
1338        let mut sent = 0usize;
1339
1340        match self.sender.try_push_n(slice_from(sent)) {
1341            Ok(written) => {
1342                if written > 0 {
1343                    self.shared.notify_receiver();
1344                    sent += written;
1345                    if sent == len {
1346                        return Ok(finish_empty());
1347                    }
1348                }
1349            }
1350            Err(PushError::Closed(())) => {
1351                let remaining_vec = finish_remaining(sent);
1352                return Err(PushError::Closed(remaining_vec));
1353            }
1354            Err(PushError::Full(())) => {}
1355        }
1356
1357        loop {
1358            unsafe {
1359                self.register_space_waker(&waker);
1360            }
1361
1362            if sent == len {
1363                return Ok(finish_empty());
1364            }
1365
1366            match unsafe { sender_ptr.with_mut(|sender| sender.try_push_n(slice_from(sent))) } {
1367                Ok(written) => {
1368                    if written > 0 {
1369                        self.shared.notify_receiver();
1370                        sent += written;
1371                        if sent == len {
1372                            return Ok(finish_empty());
1373                        }
1374                    }
1375                    self.parker.park();
1376                }
1377                Err(PushError::Full(())) => {
1378                    self.parker.park();
1379                }
1380                Err(PushError::Closed(())) => {
1381                    let remaining_vec = finish_remaining(sent);
1382                    return Err(PushError::Closed(remaining_vec));
1383                }
1384            }
1385        }
1386    }
1387
1388    pub fn close(&mut self) -> bool {
1389        self.sender.close()
1390    }
1391}
1392
1393impl<T, const P: usize, const NUM_SEGS_P2: usize> Clone for BlockingMpscSender<T, P, NUM_SEGS_P2> {
1394    fn clone(&self) -> Self {
1395        let sender = self.sender.clone();
1396        Self::new(sender, Arc::clone(&self.shared))
1397    }
1398}
1399
1400impl<T, const P: usize, const NUM_SEGS_P2: usize> Drop for BlockingMpscSender<T, P, NUM_SEGS_P2> {
1401    fn drop(&mut self) {
1402        self.shared.notify_receiver();
1403    }
1404}
1405
1406/// Blocking MPSC receiver.
1407///
1408/// The blocking receiver can work with both async and blocking senders.
1409pub struct BlockingMpscReceiver<T, const P: usize, const NUM_SEGS_P2: usize> {
1410    receiver: Receiver<T, P, NUM_SEGS_P2>,
1411    shared: Arc<AsyncMpscShared<T, P, NUM_SEGS_P2>>,
1412}
1413
1414impl<T, const P: usize, const NUM_SEGS_P2: usize> BlockingMpscReceiver<T, P, NUM_SEGS_P2> {
1415    fn new(
1416        receiver: Receiver<T, P, NUM_SEGS_P2>,
1417        shared: Arc<AsyncMpscShared<T, P, NUM_SEGS_P2>>,
1418    ) -> Self {
1419        Self { receiver, shared }
1420    }
1421
1422    #[inline]
1423    pub fn try_recv(&mut self) -> Result<T, PopError> {
1424        match self.receiver.try_pop_with_waker() {
1425            Ok((value, waker)) => {
1426                waker.notify();
1427                Ok(value)
1428            }
1429            Err(err) => Err(err),
1430        }
1431    }
1432
1433    /// Blocking receive that parks the thread until an item is available.
1434    pub fn recv(&mut self) -> Result<T, PopError> {
1435        // Fast path: try immediate receive
1436        match self.try_recv() {
1437            Ok(value) => return Ok(value),
1438            Err(PopError::Closed) => return Err(PopError::Closed),
1439            Err(PopError::Empty) | Err(PopError::Timeout) => {}
1440        }
1441
1442        // Slow path: need to block
1443        let parker = Parker::new();
1444        let unparker = parker.unparker();
1445        let waker = Waker::from(Arc::new(ThreadUnparker { unparker }));
1446
1447        loop {
1448            // Register our waker
1449            unsafe {
1450                self.shared.register_receiver(&waker);
1451            }
1452
1453            // Double-check after registering (prevent missed wakeup)
1454            match self.receiver.try_pop_with_waker() {
1455                Ok((value, waker)) => {
1456                    waker.notify();
1457                    return Ok(value);
1458                }
1459                Err(PopError::Closed) => {
1460                    return Err(PopError::Closed);
1461                }
1462                Err(PopError::Empty) | Err(PopError::Timeout) => {
1463                    parker.park();
1464                }
1465            }
1466        }
1467    }
1468
1469    /// Receives multiple items into a destination slice, blocking until at least one arrives.
1470    ///
1471    /// This method fills the provided slice with items from the queue, blocking the thread
1472    /// until at least one item is available. It returns as soon as any items are received
1473    /// (partial fill) or when the queue is closed.
1474    ///
1475    /// # Advantages over `recv()` in a loop
1476    ///
1477    /// - **Bulk operations**: More efficient than calling `recv()` repeatedly
1478    /// - **Partial results**: Returns immediately with whatever is available
1479    /// - **Reduced overhead**: Fewer waker registrations and context switches
1480    /// - **Automatic backpressure**: Notifies producers as space becomes available
1481    ///
1482    /// # Returns
1483    ///
1484    /// - `Ok(count)`: Number of items written to `dst` (1..=dst.len())
1485    /// - `Err(PopError::Closed)`: Queue closed with no items available
1486    ///
1487    /// # Example
1488    ///
1489    /// ```ignore
1490    /// let mut buffer = [0u32; 128];
1491    /// match receiver.recv_batch(&mut buffer) {
1492    ///     Ok(count) => {
1493    ///         // Process buffer[..count]
1494    ///     }
1495    ///     Err(PopError::Closed) => {
1496    ///         // Queue closed
1497    ///     }
1498    /// }
1499    /// ```
1500    pub fn recv_batch(&mut self, dst: &mut [T]) -> Result<usize, PopError> {
1501        if dst.is_empty() {
1502            return Ok(0);
1503        }
1504
1505        // Fast path: try immediate receive with per-producer notification
1506        let mut filled = 0;
1507        while filled < dst.len() {
1508            match self.receiver.try_pop_with_waker() {
1509                Ok((value, waker)) => {
1510                    dst[filled] = value;
1511                    filled += 1;
1512                    waker.notify();
1513                }
1514                Err(_) => break,
1515            }
1516        }
1517
1518        if filled > 0 {
1519            return Ok(filled);
1520        }
1521
1522        // Queue is closed if explicitly closed OR no producers remain (all senders dropped)
1523        if self.receiver.is_closed() || self.receiver.producer_count() == 0 {
1524            return Err(PopError::Closed);
1525        }
1526
1527        // Slow path: need to block
1528        let parker = Parker::new();
1529        let unparker = parker.unparker();
1530        let waker = Waker::from(Arc::new(ThreadUnparker { unparker }));
1531
1532        loop {
1533            // Register our waker
1534            unsafe {
1535                self.shared.register_receiver(&waker);
1536            }
1537
1538            // Double-check and try to make progress
1539            while filled < dst.len() {
1540                match self.receiver.try_pop_with_waker() {
1541                    Ok((value, waker)) => {
1542                        dst[filled] = value;
1543                        filled += 1;
1544                        waker.notify();
1545                    }
1546                    Err(_) => break,
1547                }
1548            }
1549
1550            if filled > 0 {
1551                // Queue is closed if explicitly closed OR no producers remain
1552                if filled == dst.len()
1553                    || self.receiver.is_closed()
1554                    || self.receiver.producer_count() == 0
1555                {
1556                    return Ok(filled);
1557                }
1558                // Got some but not all, park and try again
1559                parker.park();
1560            } else if self.receiver.is_closed() || self.receiver.producer_count() == 0 {
1561                return Err(PopError::Closed);
1562            } else {
1563                // No items, park until woken
1564                parker.park();
1565            }
1566        }
1567    }
1568}
1569
1570pub fn async_mpsc<T, const P: usize, const NUM_SEGS_P2: usize>() -> (
1571    AsyncMpscSender<T, P, NUM_SEGS_P2>,
1572    AsyncMpscReceiver<T, P, NUM_SEGS_P2>,
1573) {
1574    let (sender, receiver) = new_with_sender();
1575    async_mpsc_from_parts(sender, receiver)
1576}
1577
1578pub fn async_mpsc_from_parts<T, const P: usize, const NUM_SEGS_P2: usize>(
1579    sender: Sender<T, P, NUM_SEGS_P2>,
1580    receiver: Receiver<T, P, NUM_SEGS_P2>,
1581) -> (
1582    AsyncMpscSender<T, P, NUM_SEGS_P2>,
1583    AsyncMpscReceiver<T, P, NUM_SEGS_P2>,
1584) {
1585    let inner = Arc::clone(receiver.inner());
1586    let shared = Arc::new(AsyncMpscShared::new(inner));
1587    let async_sender = AsyncMpscSender::new(sender, Arc::clone(&shared));
1588    let async_receiver = AsyncMpscReceiver::new(receiver, shared);
1589    (async_sender, async_receiver)
1590}
1591
1592/// Creates a default blocking MPSC queue.
1593///
1594/// Both senders and receiver use blocking operations that park threads.
1595pub fn blocking_mpsc<T, const P: usize, const NUM_SEGS_P2: usize>() -> (
1596    BlockingMpscSender<T, P, NUM_SEGS_P2>,
1597    BlockingMpscReceiver<T, P, NUM_SEGS_P2>,
1598) {
1599    let (sender, receiver) = new_with_sender();
1600    blocking_mpsc_from_parts(sender, receiver)
1601}
1602
1603/// Creates a blocking MPSC queue from existing parts.
1604pub fn blocking_mpsc_from_parts<T, const P: usize, const NUM_SEGS_P2: usize>(
1605    sender: Sender<T, P, NUM_SEGS_P2>,
1606    receiver: Receiver<T, P, NUM_SEGS_P2>,
1607) -> (
1608    BlockingMpscSender<T, P, NUM_SEGS_P2>,
1609    BlockingMpscReceiver<T, P, NUM_SEGS_P2>,
1610) {
1611    let inner = Arc::clone(receiver.inner());
1612    let shared = Arc::new(AsyncMpscShared::new(inner));
1613    let blocking_sender = BlockingMpscSender::new(sender, Arc::clone(&shared));
1614    let blocking_receiver = BlockingMpscReceiver::new(receiver, shared);
1615    (blocking_sender, blocking_receiver)
1616}
1617
1618/// Creates a mixed MPSC queue with blocking senders and async receiver.
1619///
1620/// The blocking senders and async receiver share the same waker infrastructure,
1621/// so they can wake each other efficiently. This is useful when you have
1622/// blocking threads that need to send data to an async task.
1623///
1624/// # Example
1625///
1626/// ```ignore
1627/// let (sender, receiver) = blocking_async_mpsc();
1628///
1629/// // Senders (blocking threads)
1630/// for i in 0..4 {
1631///     let sender = sender.clone();
1632///     std::thread::spawn(move || {
1633///         sender.send(i).unwrap();
1634///     });
1635/// }
1636///
1637/// // Receiver (async task)
1638/// tokio::spawn(async move {
1639///     while let Some(item) = receiver.next().await {
1640///         println!("Got: {}", item);
1641///     }
1642/// });
1643/// ```
1644pub fn blocking_async_mpsc<T, const P: usize, const NUM_SEGS_P2: usize>() -> (
1645    BlockingMpscSender<T, P, NUM_SEGS_P2>,
1646    AsyncMpscReceiver<T, P, NUM_SEGS_P2>,
1647) {
1648    let (sender, receiver) = new_with_sender();
1649    blocking_async_mpsc_from_parts(sender, receiver)
1650}
1651
1652/// Creates a mixed MPSC queue with blocking senders and async receiver from existing parts.
1653pub fn blocking_async_mpsc_from_parts<T, const P: usize, const NUM_SEGS_P2: usize>(
1654    sender: Sender<T, P, NUM_SEGS_P2>,
1655    receiver: Receiver<T, P, NUM_SEGS_P2>,
1656) -> (
1657    BlockingMpscSender<T, P, NUM_SEGS_P2>,
1658    AsyncMpscReceiver<T, P, NUM_SEGS_P2>,
1659) {
1660    let inner = Arc::clone(receiver.inner());
1661    let shared = Arc::new(AsyncMpscShared::new(inner));
1662    let blocking_sender = BlockingMpscSender::new(sender, Arc::clone(&shared));
1663    let async_receiver = AsyncMpscReceiver::new(receiver, shared);
1664    (blocking_sender, async_receiver)
1665}
1666
1667/// Creates a mixed MPSC queue with async senders and blocking receiver.
1668///
1669/// The async senders and blocking receiver share the same waker infrastructure,
1670/// so they can wake each other efficiently. This is useful when you have async
1671/// tasks that need to send data to a blocking thread.
1672///
1673/// # Example
1674///
1675/// ```ignore
1676/// let (sender, receiver) = async_blocking_mpsc();
1677///
1678/// // Senders (async tasks)
1679/// for i in 0..4 {
1680///     let sender = sender.clone();
1681///     tokio::spawn(async move {
1682///         sender.send(i).await.unwrap();
1683///     });
1684/// }
1685///
1686/// // Receiver (blocking thread)
1687/// std::thread::spawn(move || {
1688///     while let Ok(item) = receiver.recv() {
1689///         println!("Got: {}", item);
1690///     }
1691/// });
1692/// ```
1693pub fn async_blocking_mpsc<T, const P: usize, const NUM_SEGS_P2: usize>() -> (
1694    AsyncMpscSender<T, P, NUM_SEGS_P2>,
1695    BlockingMpscReceiver<T, P, NUM_SEGS_P2>,
1696) {
1697    let (sender, receiver) = new_with_sender();
1698    async_blocking_mpsc_from_parts(sender, receiver)
1699}
1700
1701/// Creates a mixed MPSC queue with async senders and blocking receiver from existing parts.
1702pub fn async_blocking_mpsc_from_parts<T, const P: usize, const NUM_SEGS_P2: usize>(
1703    sender: Sender<T, P, NUM_SEGS_P2>,
1704    receiver: Receiver<T, P, NUM_SEGS_P2>,
1705) -> (
1706    AsyncMpscSender<T, P, NUM_SEGS_P2>,
1707    BlockingMpscReceiver<T, P, NUM_SEGS_P2>,
1708) {
1709    let inner = Arc::clone(receiver.inner());
1710    let shared = Arc::new(AsyncMpscShared::new(inner));
1711    let async_sender = AsyncMpscSender::new(sender, Arc::clone(&shared));
1712    let blocking_receiver = BlockingMpscReceiver::new(receiver, shared);
1713    (async_sender, blocking_receiver)
1714}
1715
1716/// Unbounded MPSC using UnboundedSpsc internally
1717///
1718/// This variant has no capacity limits - it grows dynamically as needed.
1719/// Each producer has its own unbounded queue, but unlike the bounded version,
1720/// there's no fixed upper limit on the number of items.
1721///
1722/// # Architecture
1723///
1724/// The unbounded MPSC uses `UnboundedSpsc` queues internally, where each producer
1725/// gets its own dynamically-growing queue. The receiver multiplexes across all
1726/// producer queues using a fair acquisition strategy.
1727///
1728/// ```text
1729/// Producer 1 ──→ UnboundedSpsc ──┐
1730/// Producer 2 ──→ UnboundedSpsc ──┤
1731/// Producer 3 ──→ UnboundedSpsc ──┼→ Receiver (polls all queues)
1732///     ...                        │
1733/// Producer N ──→ UnboundedSpsc ──┘
1734/// ```
1735///
1736/// # Features
1737///
1738/// - **True unbounded capacity**: Each producer queue grows dynamically
1739/// - **Lock-free**: Uses atomic operations for minimal contention
1740/// - **Fair scheduling**: Receiver uses randomized scheduling across producer queues
1741/// - **Interoperable**: Async and blocking senders/receivers can be mixed
1742/// - **Space waker integration**: Notifies producers when space becomes available
1743///
1744/// # Variants
1745///
1746/// - `async_unbounded_mpsc()` - Both senders and receiver are async
1747/// - `blocking_unbounded_mpsc()` - Both senders and receiver are blocking
1748/// - `blocking_async_unbounded_mpsc()` - Blocking senders, async receiver
1749/// - `async_blocking_unbounded_mpsc()` - Async senders, blocking receiver
1750///
1751/// # Example
1752///
1753/// ```ignore
1754/// use maniac_runtime::sync::mpsc::unbounded;
1755///
1756/// // Create async unbounded MPSC
1757/// let (mut sender, mut receiver) = unbounded::async_unbounded_mpsc::<u64>();
1758///
1759/// // Send from async context
1760/// tokio::spawn(async move {
1761///     for i in 0..1000 {
1762///         sender.send(i).await.unwrap();
1763///     }
1764/// });
1765///
1766/// // Receive from async context
1767/// tokio::spawn(async move {
1768///     while let Ok(value) = receiver.recv().await {
1769///         println!("Got: {}", value);
1770///     }
1771/// });
1772/// ```
1773pub mod unbounded {
1774    use super::*;
1775    use crate::spsc::unbounded::{UnboundedSender as UnboundedSender_, UnboundedReceiver as UnboundedReceiver_};
1776    use crate::spsc::NoOpSignal;
1777
1778    /// A producer slot for unbounded MPSC containing an UnboundedSpsc queue
1779    struct UnboundedProducerSlot<T> {
1780        sender: UnboundedSender_<T>,
1781        receiver: UnboundedReceiver_<T>,
1782        space_waker: DiatomicWaker,
1783    }
1784
1785    /// Shared state for unbounded MPSC
1786    ///
1787    /// # Memory Ordering Guarantees
1788    ///
1789    /// - `queues`: Atomic pointers use Acquire/Release for slot installation and cleanup
1790    /// - `queue_count`: Relaxed for statistics, not used for synchronization
1791    /// - `producer_count`: Release on decrement (UnboundedSender::drop), Relaxed on read after Acquire fence
1792    /// - `max_producer_id`: SeqCst for consistent global ordering across all threads
1793    /// - `closed`: Acquire/Release for happens-before between close and other operations
1794    /// - UnboundedSender::drop uses Release fence + Release store to ensure queued items are visible
1795    /// - UnboundedReceiver::try_pop uses Acquire fence before checking producer_count == 0
1796    struct UnboundedInner<T> {
1797        /// Sparse array of producer slot pointers
1798        queues: Box<[AtomicPtr<UnboundedProducerSlot<T>>]>,
1799        queue_count: CachePadded<AtomicUsize>,
1800        producer_count: CachePadded<AtomicUsize>,
1801        max_producer_id: AtomicUsize,
1802        closed: CachePadded<AtomicBool>,
1803        summary: Arc<AsyncSignalWaker>,
1804        signals: Arc<[Signal; SIGNAL_WORDS]>,
1805    }
1806
1807    impl<T> UnboundedInner<T> {
1808        fn is_closed(&self) -> bool {
1809            self.closed.load(Ordering::Acquire)
1810        }
1811
1812        fn producer_count(&self) -> usize {
1813            self.producer_count.load(Ordering::Relaxed)
1814        }
1815
1816        fn create_sender(self: &Arc<Self>) -> Result<UnboundedSender<T>, PushError<()>> {
1817            if self.is_closed() {
1818                return Err(PushError::Closed(()));
1819            }
1820
1821            loop {
1822                let current = self.producer_count.load(Ordering::Acquire);
1823                if current >= MAX_PRODUCERS {
1824                    return Err(PushError::Full(()));
1825                }
1826                if self
1827                    .producer_count
1828                    .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
1829                    .is_ok()
1830                {
1831                    break;
1832                }
1833            }
1834
1835            let mut assigned_id = None;
1836            let mut slot_arc: Option<Arc<UnboundedProducerSlot<T>>> = None;
1837
1838            for signal_index in 0..SIGNAL_WORDS {
1839                for bit_index in 0..64 {
1840                    let queue_index = signal_index * 64 + bit_index;
1841                    if queue_index >= MAX_QUEUES {
1842                        break;
1843                    }
1844                    if !self.queues[queue_index].load(Ordering::Acquire).is_null() {
1845                        continue;
1846                    }
1847
1848                    let (sender, receiver) = UnboundedSpsc::<T, 6, 8, NoOpSignal>::new();
1849                    let slot = Arc::new(UnboundedProducerSlot {
1850                        sender,
1851                        receiver,
1852                        space_waker: DiatomicWaker::new(),
1853                    });
1854
1855                    let raw = Arc::into_raw(Arc::clone(&slot)) as *mut UnboundedProducerSlot<T>;
1856                    match self.queues[queue_index].compare_exchange(
1857                        ptr::null_mut(),
1858                        raw,
1859                        Ordering::Release,
1860                        Ordering::Acquire,
1861                    ) {
1862                        Ok(_) => {
1863                            self.queue_count.fetch_add(1, Ordering::Relaxed);
1864                            assigned_id = Some(queue_index);
1865                            slot_arc = Some(slot);
1866                            break;
1867                        }
1868                        Err(_) => unsafe {
1869                            Arc::from_raw(raw);
1870                        },
1871                    }
1872                }
1873                if assigned_id.is_some() {
1874                    break;
1875                }
1876            }
1877
1878            let producer_id = match assigned_id {
1879                Some(id) => id,
1880                None => {
1881                    self.producer_count.fetch_sub(1, Ordering::Release);
1882                    return Err(PushError::Full(()));
1883                }
1884            };
1885
1886            // Update max_producer_id if needed. We use SeqCst to ensure all threads
1887            // see a consistent view of the maximum producer ID, which is used by
1888            // the receiver to determine which slots to scan.
1889            loop {
1890                let max_producer_id = self.max_producer_id.load(Ordering::SeqCst);
1891                // If our ID is less than or equal to the current max, we're done
1892                if producer_id <= max_producer_id {
1893                    break;
1894                }
1895                if self.is_closed() {
1896                    return Err(PushError::Closed(()));
1897                }
1898                if self
1899                    .max_producer_id
1900                    .compare_exchange(
1901                        max_producer_id,
1902                        producer_id,
1903                        Ordering::SeqCst,
1904                        Ordering::SeqCst,
1905                    )
1906                    .is_ok()
1907                {
1908                    break;
1909                }
1910            }
1911
1912            // Use the slot_arc that was already captured during the CAS operation.
1913            // Arc reference counting: We created an Arc::new (refcount=1), then cloned it
1914            // and converted the clone to raw via Arc::into_raw(Arc::clone(&slot)).
1915            // - Raw pointer stored in array represents 1 Arc reference
1916            // - Original slot_arc variable holds the other Arc reference  
1917            // - Total: 2 references, properly balanced for array + Sender ownership
1918            let slot_arc = slot_arc.expect("slot arc missing");
1919
1920            Ok(UnboundedSender {
1921                inner: Arc::clone(self),
1922                slot: slot_arc,
1923                producer_id,
1924            })
1925        }
1926
1927        fn close(&self) -> bool {
1928            let was_open = !self.closed.swap(true, Ordering::AcqRel);
1929            if was_open {
1930                let permits = self.queue_count.load(Ordering::Relaxed).max(1);
1931                self.summary.release(permits);
1932            }
1933
1934            for slot_atomic in self.queues.iter() {
1935                let slot_ptr = slot_atomic.swap(ptr::null_mut(), Ordering::AcqRel);
1936                if slot_ptr.is_null() {
1937                    continue;
1938                }
1939
1940                unsafe {
1941                    (*slot_ptr).sender.close_channel();
1942                    Arc::from_raw(slot_ptr);
1943                }
1944
1945                self.queue_count.fetch_sub(1, Ordering::Relaxed);
1946            }
1947
1948            self.producer_count.store(0, Ordering::Release);
1949            true
1950        }
1951    }
1952
1953    impl<T> Drop for UnboundedInner<T> {
1954        fn drop(&mut self) {
1955            self.close();
1956        }
1957    }
1958
1959    /// Create a new unbounded MPSC queue
1960    pub fn unbounded_new<T>() -> UnboundedReceiver<T> {
1961        unbounded_new_with_waker(Arc::new(AsyncSignalWaker::new()))
1962    }
1963
1964    /// Create a new unbounded MPSC queue with a custom waker
1965    pub fn unbounded_new_with_waker<T>(
1966        waker: Arc<AsyncSignalWaker>,
1967    ) -> UnboundedReceiver<T> {
1968        let mut queues = Vec::with_capacity(MAX_QUEUES);
1969        for _ in 0..MAX_QUEUES {
1970            queues.push(AtomicPtr::new(core::ptr::null_mut()));
1971        }
1972
1973        let signals: Arc<[Signal; SIGNAL_WORDS]> =
1974            Arc::new(std::array::from_fn(|i| Signal::with_index(i as u64)));
1975
1976        let inner = Arc::new(UnboundedInner {
1977            queues: queues.into_boxed_slice(),
1978            queue_count: CachePadded::new(AtomicUsize::new(0)),
1979            producer_count: CachePadded::new(AtomicUsize::new(0)),
1980            max_producer_id: AtomicUsize::new(0),
1981            closed: CachePadded::new(AtomicBool::new(false)),
1982            summary: waker,
1983            signals,
1984        });
1985
1986        UnboundedReceiver {
1987            inner,
1988            misses: 0,
1989            seed: rand::rng().next_u64(),
1990        }
1991    }
1992
1993    /// Create a new unbounded MPSC queue with a sender and receiver pair
1994    pub fn unbounded_new_with_sender<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
1995        let waker = Arc::new(AsyncSignalWaker::new());
1996        let mut queues = Vec::with_capacity(MAX_QUEUES);
1997        for _ in 0..MAX_QUEUES {
1998            queues.push(AtomicPtr::new(core::ptr::null_mut()));
1999        }
2000
2001        let signals: Arc<[Signal; SIGNAL_WORDS]> =
2002            Arc::new(std::array::from_fn(|i| Signal::with_index(i as u64)));
2003
2004        let inner = Arc::new(UnboundedInner {
2005            queues: queues.into_boxed_slice(),
2006            queue_count: CachePadded::new(AtomicUsize::new(0)),
2007            producer_count: CachePadded::new(AtomicUsize::new(0)),
2008            max_producer_id: AtomicUsize::new(0),
2009            closed: CachePadded::new(AtomicBool::new(false)),
2010            summary: waker,
2011            signals,
2012        });
2013
2014        let receiver = UnboundedReceiver {
2015            inner: inner.clone(),
2016            misses: 0,
2017            seed: rand::rng().next_u64(),
2018        };
2019
2020        let sender = inner
2021            .create_sender()
2022            .expect("fatal: unbounded mpsc won't allow even 1 sender");
2023
2024        (sender, receiver)
2025    }
2026
2027    /// Unbounded MPSC sender
2028    pub struct UnboundedSender<T> {
2029        inner: Arc<UnboundedInner<T>>,
2030        slot: Arc<UnboundedProducerSlot<T>>,
2031        producer_id: usize,
2032    }
2033
2034    impl<T> UnboundedSender<T> {
2035        pub fn is_closed(&self) -> bool {
2036            self.inner.is_closed()
2037        }
2038
2039        pub fn producer_count(&self) -> usize {
2040            self.inner.producer_count()
2041        }
2042
2043        pub fn producer_id(&self) -> usize {
2044            self.producer_id
2045        }
2046
2047        /// Try to push a value onto the queue
2048        pub fn try_push(&mut self, value: T) -> Result<(), PushError<T>> {
2049            self.slot.sender.try_push(value)
2050        }
2051
2052        /// Close the queue
2053        pub fn close(&mut self) -> bool {
2054            self.inner.close()
2055        }
2056    }
2057
2058    impl<T> Clone for UnboundedSender<T> {
2059        fn clone(&self) -> Self {
2060            self.inner
2061                .create_sender()
2062                .expect("too many senders")
2063        }
2064    }
2065
2066    impl<T> Drop for UnboundedSender<T> {
2067        fn drop(&mut self) {
2068            // Close the channel to signal no more items will be sent from this sender
2069            // The slot itself remains alive because the receiver holds a reference via
2070            // the raw pointer in the queues array, and the Arc in the array keeps it alive
2071            self.slot.sender.close_channel();
2072            
2073            // Release fence to ensure all writes to the queue (from close_channel and
2074            // any prior pushes) are visible before we decrement producer_count.
2075            // This synchronizes with the Acquire fence in UnboundedReceiver::try_pop_with_waker(),
2076            // ensuring the receiver sees any items pushed before this sender was dropped.
2077            std::sync::atomic::fence(Ordering::Release);
2078            
2079            // Decrement the active producer count using Release ordering to publish
2080            // the fence above and all prior writes.
2081            self.inner.producer_count.fetch_sub(1, Ordering::Release);
2082        }
2083    }
2084
2085    /// Unbounded MPSC receiver
2086    pub struct UnboundedReceiver<T> {
2087        inner: Arc<UnboundedInner<T>>,
2088        misses: u64,
2089        seed: u64,
2090    }
2091
2092    impl<T> UnboundedReceiver<T> {
2093        fn next(&mut self) -> u64 {
2094            let old_seed = self.seed;
2095            let next_seed = (old_seed
2096                .wrapping_mul(RND_MULTIPLIER)
2097                .wrapping_add(RND_ADDEND))
2098                & RND_MASK;
2099            self.seed = next_seed;
2100            next_seed >> 16
2101        }
2102
2103        pub fn is_closed(&self) -> bool {
2104            self.inner.closed.load(Ordering::Acquire)
2105        }
2106
2107        pub fn close(&self) -> bool {
2108            self.inner.close()
2109        }
2110
2111        pub fn producer_count(&self) -> usize {
2112            self.inner.producer_count()
2113        }
2114
2115        pub fn create_sender(&self) -> Result<UnboundedSender<T>, PushError<()>> {
2116            self.inner.create_sender()
2117        }
2118
2119        /// Try to pop a single value
2120        pub fn try_pop(&mut self) -> Result<T, PopError> {
2121            self.try_pop_with_waker().map(|(v, _)| v)
2122        }
2123
2124        /// Try to pop a single value and return the waker
2125        pub fn try_pop_with_waker(&mut self) -> Result<(T, &DiatomicWaker), PopError> {
2126            // For unbounded, iterate through all registered producer slots
2127            let max_producer_id = self.inner.max_producer_id.load(Ordering::Acquire);
2128            
2129            // Try to pop from any queue
2130            for producer_id in 0..=max_producer_id {
2131                let slot_ptr = self.inner.queues[producer_id].load(Ordering::Acquire);
2132                if slot_ptr.is_null() {
2133                    continue;
2134                }
2135
2136                // Safety: The slot pointer is valid for the lifetime of UnboundedInner.
2137                // Slots are never freed until Inner drops, so this is safe.
2138                let slot = unsafe { &*slot_ptr };
2139                if let Some(value) = slot.receiver.try_pop() {
2140                    return Ok((value, &slot.space_waker));
2141                }
2142            }
2143
2144            // No items found in any queue.
2145            // Acquire fence BEFORE checking state to ensure we see all writes from producers.
2146            // This synchronizes with the Release fence in UnboundedSender::drop() and ensures
2147            // we see any items pushed before the last producer dropped.
2148            std::sync::atomic::fence(Ordering::Acquire);
2149            
2150            // Check if the channel is closed or all producers are gone
2151            let is_closed = self.inner.is_closed();
2152            // Use Relaxed here since the fence above provides the necessary synchronization
2153            let producer_count = self.inner.producer_count.load(Ordering::Relaxed);
2154
2155            if is_closed || producer_count == 0 {
2156                // Do one final scan after the fence to catch any items that were pushed
2157                // just before the last producer dropped. The fence ensures we see them.
2158                for producer_id in 0..=max_producer_id {
2159                    let slot_ptr = self.inner.queues[producer_id].load(Ordering::Acquire);
2160                    if slot_ptr.is_null() {
2161                        continue;
2162                    }
2163
2164                    // Safety: Same as above - slots are never freed until Inner drops
2165                    let slot = unsafe { &*slot_ptr };
2166                    if let Some(value) = slot.receiver.try_pop() {
2167                        return Ok((value, &slot.space_waker));
2168                    }
2169                }
2170                
2171                Err(PopError::Closed)
2172            } else {
2173                Err(PopError::Empty)
2174            }
2175        }
2176
2177        fn acquire(&mut self) -> Option<*mut UnboundedProducerSlot<T>> {
2178            let random = self.next() as usize;
2179            let random_word = random % SIGNAL_WORDS;
2180            let mut signal_index = self.inner.summary.summary_select(random_word as u64) as usize;
2181
2182            if signal_index >= SIGNAL_WORDS {
2183                signal_index = random_word;
2184            }
2185
2186            let mut signal_bit = self.next() & 63;
2187            let signal = &self.inner.signals[signal_index];
2188            let signal_value = signal.load(Ordering::Acquire);
2189
2190            signal_bit = find_nearest(signal_value, signal_bit);
2191
2192            if signal_bit >= 64 {
2193                self.misses += 1;
2194                return None;
2195            }
2196
2197            let (bit, expected, acquired) = signal.try_acquire(signal_bit);
2198
2199            if !acquired {
2200                std::hint::spin_loop();
2201                return None;
2202            }
2203
2204            let empty = expected == bit;
2205
2206            if empty {
2207                self.inner
2208                    .summary
2209                    .try_unmark_if_empty(signal.index(), signal.value());
2210            }
2211
2212            let producer_id = signal_index * 64 + (signal_bit as usize);
2213            let slot_ptr = self.inner.queues[producer_id].load(Ordering::Acquire);
2214            
2215            if slot_ptr.is_null() {
2216                self.misses += 1;
2217                if empty {
2218                    self.inner
2219                        .summary
2220                        .try_unmark_if_empty(signal.index(), signal.value());
2221                }
2222                return None;
2223            }
2224
2225            Some(slot_ptr)
2226        }
2227    }
2228
2229    impl<T> Drop for UnboundedReceiver<T> {
2230        fn drop(&mut self) {
2231            self.inner.close();
2232        }
2233    }
2234
2235    /// Shared state for async unbounded MPSC
2236    struct UnboundedAsyncMpscShared<T> {
2237        receiver_waiter: CachePadded<DiatomicWaker>,
2238        inner: Arc<UnboundedInner<T>>,
2239    }
2240
2241    impl<T> UnboundedAsyncMpscShared<T> {
2242        fn new(inner: Arc<UnboundedInner<T>>) -> Self {
2243            Self {
2244                receiver_waiter: CachePadded::new(DiatomicWaker::new()),
2245                inner,
2246            }
2247        }
2248
2249        #[inline]
2250        fn notify_receiver(&self) {
2251            self.receiver_waiter.notify();
2252        }
2253
2254        #[inline]
2255        unsafe fn wait_for_items<Pred, R>(&self, predicate: Pred) -> WaitUntil<'_, Pred, R>
2256        where
2257            Pred: FnMut() -> Option<R>,
2258        {
2259            unsafe { self.receiver_waiter.wait_until(predicate) }
2260        }
2261
2262        #[inline]
2263        unsafe fn register_receiver(&self, waker: &Waker) {
2264            unsafe { self.receiver_waiter.register(waker) };
2265        }
2266    }
2267
2268    /// Async unbounded MPSC sender
2269    pub struct AsyncUnboundedMpscSender<T> {
2270        sender: UnboundedSender<T>,
2271        shared: Arc<UnboundedAsyncMpscShared<T>>,
2272    }
2273
2274    /// Async unbounded MPSC receiver
2275    pub struct AsyncUnboundedMpscReceiver<T> {
2276        receiver: UnboundedReceiver<T>,
2277        shared: Arc<UnboundedAsyncMpscShared<T>>,
2278    }
2279
2280    impl<T> AsyncUnboundedMpscSender<T> {
2281        fn new(
2282            sender: UnboundedSender<T>,
2283            shared: Arc<UnboundedAsyncMpscShared<T>>,
2284        ) -> Self {
2285            Self { sender, shared }
2286        }
2287
2288        /// Try to send without blocking
2289        #[inline]
2290        pub fn try_send(&mut self, value: T) -> Result<(), PushError<T>> {
2291            match self.sender.try_push(value) {
2292                Ok(()) => {
2293                    self.shared.notify_receiver();
2294                    Ok(())
2295                }
2296                Err(err) => Err(err),
2297            }
2298        }
2299
2300        /// Send a value asynchronously
2301        pub async fn send(&mut self, value: T) -> Result<(), PushError<T>> {
2302            match self.try_send(value) {
2303                Ok(()) => Ok(()),
2304                Err(PushError::Full(item)) => Err(PushError::Full(item)), // Unbounded never fills
2305                Err(PushError::Closed(item)) => Err(PushError::Closed(item)),
2306            }
2307        }
2308
2309        pub fn close(&mut self) -> bool {
2310            self.sender.close()
2311        }
2312
2313        /// Create a blocking sender that shares the same queue
2314        pub fn create_blocking_sender(&self) -> BlockingUnboundedMpscSender<T> {
2315            let sender = self.sender.clone();
2316            BlockingUnboundedMpscSender::new(sender, Arc::clone(&self.shared))
2317        }
2318    }
2319
2320    impl<T> Clone for AsyncUnboundedMpscSender<T> {
2321        fn clone(&self) -> Self {
2322            let sender = self.sender.clone();
2323            Self::new(sender, Arc::clone(&self.shared))
2324        }
2325    }
2326
2327    impl<T> Drop for AsyncUnboundedMpscSender<T> {
2328        fn drop(&mut self) {
2329            self.shared.notify_receiver();
2330        }
2331    }
2332
2333    impl<T> AsyncUnboundedMpscReceiver<T> {
2334        fn new(
2335            receiver: UnboundedReceiver<T>,
2336            shared: Arc<UnboundedAsyncMpscShared<T>>,
2337        ) -> Self {
2338            Self { receiver, shared }
2339        }
2340
2341        /// Get the number of registered producers
2342        pub fn producer_count(&self) -> usize {
2343            self.receiver.producer_count()
2344        }
2345
2346        /// Try to receive without blocking
2347        #[inline]
2348        pub fn try_recv(&mut self) -> Result<T, PopError> {
2349            match self.receiver.try_pop_with_waker() {
2350                Ok((value, waker)) => {
2351                    waker.notify();
2352                    Ok(value)
2353                }
2354                Err(err) => Err(err),
2355            }
2356        }
2357
2358        /// Receive a value asynchronously
2359        pub async fn recv(&mut self) -> Result<T, PopError> {
2360            match self.try_recv() {
2361                Ok(value) => Ok(value),
2362                Err(PopError::Empty) | Err(PopError::Timeout) => {
2363                    let shared = Arc::clone(&self.shared);
2364                    let receiver = &mut self.receiver;
2365                    unsafe {
2366                        shared
2367                            .wait_for_items(|| match receiver.try_pop_with_waker() {
2368                                Ok((value, waker)) => {
2369                                    waker.notify();
2370                                    Some(Ok(value))
2371                                }
2372                                Err(PopError::Empty) | Err(PopError::Timeout) => None,
2373                                Err(PopError::Closed) => Some(Err(PopError::Closed)),
2374                            })
2375                            .await
2376                    }
2377                }
2378                Err(PopError::Closed) => Err(PopError::Closed),
2379            }
2380        }
2381    }
2382
2383    impl<T> Stream for AsyncUnboundedMpscReceiver<T> {
2384        type Item = T;
2385
2386        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2387            let this = unsafe { self.get_unchecked_mut() };
2388            match this.try_recv() {
2389                Ok(value) => Poll::Ready(Some(value)),
2390                Err(PopError::Closed) => Poll::Ready(None),
2391                Err(PopError::Empty) | Err(PopError::Timeout) => {
2392                    unsafe {
2393                        this.shared.register_receiver(cx.waker());
2394                    }
2395                    match this.try_recv() {
2396                        Ok(value) => Poll::Ready(Some(value)),
2397                        Err(PopError::Closed) => Poll::Ready(None),
2398                        Err(PopError::Empty) | Err(PopError::Timeout) => Poll::Pending,
2399                    }
2400                }
2401            }
2402        }
2403    }
2404
2405    /// Blocking unbounded MPSC sender
2406    pub struct BlockingUnboundedMpscSender<T> {
2407        sender: UnboundedSender<T>,
2408        shared: Arc<UnboundedAsyncMpscShared<T>>,
2409        parker: Parker,
2410        parker_waker: Arc<ThreadUnparker>,
2411    }
2412
2413    impl<T> BlockingUnboundedMpscSender<T> {
2414        fn new(
2415            sender: UnboundedSender<T>,
2416            shared: Arc<UnboundedAsyncMpscShared<T>>,
2417        ) -> Self {
2418            let parker = Parker::new();
2419            let parker_waker = Arc::new(ThreadUnparker {
2420                unparker: parker.unparker(),
2421            });
2422            Self {
2423                sender,
2424                shared,
2425                parker,
2426                parker_waker,
2427            }
2428        }
2429
2430        /// Try to send without blocking
2431        #[inline]
2432        pub fn try_send(&mut self, value: T) -> Result<(), PushError<T>> {
2433            match self.sender.try_push(value) {
2434                Ok(()) => {
2435                    self.shared.notify_receiver();
2436                    Ok(())
2437                }
2438                Err(err) => Err(err),
2439            }
2440        }
2441
2442        /// Send a value, blocking until space is available if needed
2443        pub fn send(&mut self, value: T) -> Result<(), PushError<T>> {
2444            // For unbounded, we never fill, so this is just try_send
2445            self.try_send(value)
2446        }
2447
2448        pub fn close(&mut self) -> bool {
2449            self.sender.close()
2450        }
2451
2452        /// Create an async sender that shares the same queue
2453        pub fn create_async_sender(&self) -> AsyncUnboundedMpscSender<T> {
2454            let sender = self.sender.clone();
2455            AsyncUnboundedMpscSender::new(sender, Arc::clone(&self.shared))
2456        }
2457    }
2458
2459    impl<T> Clone for BlockingUnboundedMpscSender<T> {
2460        fn clone(&self) -> Self {
2461            let sender = self.sender.clone();
2462            Self::new(sender, Arc::clone(&self.shared))
2463        }
2464    }
2465
2466    impl<T> Drop for BlockingUnboundedMpscSender<T> {
2467        fn drop(&mut self) {
2468            self.shared.notify_receiver();
2469        }
2470    }
2471
2472    /// Blocking unbounded MPSC receiver
2473    pub struct BlockingUnboundedMpscReceiver<T> {
2474        receiver: UnboundedReceiver<T>,
2475        shared: Arc<UnboundedAsyncMpscShared<T>>,
2476    }
2477
2478    impl<T> BlockingUnboundedMpscReceiver<T> {
2479        fn new(
2480            receiver: UnboundedReceiver<T>,
2481            shared: Arc<UnboundedAsyncMpscShared<T>>,
2482        ) -> Self {
2483            Self { receiver, shared }
2484        }
2485
2486        /// Get the number of registered producers
2487        pub fn producer_count(&self) -> usize {
2488            self.receiver.producer_count()
2489        }
2490
2491        /// Try to receive without blocking
2492        #[inline]
2493        pub fn try_recv(&mut self) -> Result<T, PopError> {
2494            match self.receiver.try_pop_with_waker() {
2495                Ok((value, waker)) => {
2496                    waker.notify();
2497                    Ok(value)
2498                }
2499                Err(err) => Err(err),
2500            }
2501        }
2502
2503        /// Blocking receive that parks the thread until an item is available
2504        pub fn recv(&mut self) -> Result<T, PopError> {
2505            match self.try_recv() {
2506                Ok(value) => return Ok(value),
2507                Err(PopError::Closed) => return Err(PopError::Closed),
2508                Err(PopError::Empty) | Err(PopError::Timeout) => {}
2509            }
2510
2511            let parker = Parker::new();
2512            let unparker = parker.unparker();
2513            let waker = Waker::from(Arc::new(ThreadUnparker { unparker }));
2514
2515            loop {
2516                unsafe {
2517                    self.shared.register_receiver(&waker);
2518                }
2519
2520                match self.receiver.try_pop_with_waker() {
2521                    Ok((value, waker)) => {
2522                        waker.notify();
2523                        return Ok(value);
2524                    }
2525                    Err(PopError::Closed) => {
2526                        return Err(PopError::Closed);
2527                    }
2528                    Err(PopError::Empty) | Err(PopError::Timeout) => {
2529                        parker.park();
2530                    }
2531                }
2532            }
2533        }
2534    }
2535
2536
2537    /// Create an unbounded async MPSC queue
2538    pub fn async_unbounded_mpsc<T>() -> (AsyncUnboundedMpscSender<T>, AsyncUnboundedMpscReceiver<T>) {
2539        let (sender, receiver) = unbounded_new_with_sender();
2540        let inner = receiver.inner.clone();
2541        let shared = Arc::new(UnboundedAsyncMpscShared::new(inner));
2542        let async_sender = AsyncUnboundedMpscSender::new(sender, Arc::clone(&shared));
2543        let async_receiver = AsyncUnboundedMpscReceiver::new(receiver, shared);
2544        (async_sender, async_receiver)
2545    }
2546
2547    /// Create a blocking unbounded MPSC queue
2548    pub fn blocking_unbounded_mpsc<T>() -> (BlockingUnboundedMpscSender<T>, BlockingUnboundedMpscReceiver<T>) {
2549        let (sender, receiver) = unbounded_new_with_sender();
2550        let inner = receiver.inner.clone();
2551        let shared = Arc::new(UnboundedAsyncMpscShared::new(inner));
2552        let blocking_sender = BlockingUnboundedMpscSender::new(sender, Arc::clone(&shared));
2553        let blocking_receiver = BlockingUnboundedMpscReceiver::new(receiver, shared);
2554        (blocking_sender, blocking_receiver)
2555    }
2556
2557    /// Create a mixed unbounded MPSC with blocking senders and async receiver
2558    pub fn blocking_async_unbounded_mpsc<T>() -> (BlockingUnboundedMpscSender<T>, AsyncUnboundedMpscReceiver<T>) {
2559        let (sender, receiver) = unbounded_new_with_sender();
2560        let inner = receiver.inner.clone();
2561        let shared = Arc::new(UnboundedAsyncMpscShared::new(inner));
2562        let blocking_sender = BlockingUnboundedMpscSender::new(sender, Arc::clone(&shared));
2563        let async_receiver = AsyncUnboundedMpscReceiver::new(receiver, shared);
2564        (blocking_sender, async_receiver)
2565    }
2566
2567    /// Create a mixed unbounded MPSC with async senders and blocking receiver
2568    pub fn async_blocking_unbounded_mpsc<T>() -> (AsyncUnboundedMpscSender<T>, BlockingUnboundedMpscReceiver<T>) {
2569        let (sender, receiver) = unbounded_new_with_sender();
2570        let inner = receiver.inner.clone();
2571        let shared = Arc::new(UnboundedAsyncMpscShared::new(inner));
2572        let async_sender = AsyncUnboundedMpscSender::new(sender, Arc::clone(&shared));
2573        let blocking_receiver = BlockingUnboundedMpscReceiver::new(receiver, shared);
2574        (async_sender, blocking_receiver)
2575    }
2576}
2577
2578#[cfg(test)]
2579mod tests {
2580    use super::*;
2581    use std::sync::atomic::Ordering;
2582    use std::sync::Arc;
2583    use std::thread;
2584    use std::time::Duration;
2585
2586    #[test]
2587    fn try_pop_drains_and_reports_closed() {
2588        let (mut tx, mut rx) = new_with_sender::<u64, 6, 8>();
2589
2590        tx.try_push(42).unwrap();
2591        assert_eq!(rx.try_pop().unwrap(), 42);
2592        assert_eq!(rx.try_pop(), Err(PopError::Empty));
2593
2594        assert!(rx.close());
2595        assert_eq!(rx.try_pop(), Err(PopError::Closed));
2596    }
2597
2598    #[test]
2599    fn dropping_local_sender_clears_producer_slot() {
2600        let (tx, rx) = new_with_sender::<u64, 6, 8>();
2601        assert_eq!(tx.producer_count(), 1);
2602
2603        drop(tx);
2604
2605        // Closing will walk the slots and remove the dropped sender.
2606        assert!(rx.close());
2607        assert_eq!(rx.producer_count(), 0);
2608        assert_eq!(rx.inner.queue_count.load(Ordering::SeqCst), 0);
2609    }
2610
2611    // ===== Async Sender + Async Receiver Tests =====
2612
2613    #[tokio::test]
2614    async fn async_async_basic_send_recv() {
2615        let (mut sender, mut receiver) = async_mpsc::<u64, 6, 8>();
2616
2617        sender.send(42).await.unwrap();
2618        assert_eq!(receiver.recv().await.unwrap(), 42);
2619    }
2620
2621    #[tokio::test]
2622    async fn async_async_multiple_senders() {
2623        let (sender, mut receiver) = async_mpsc::<u64, 6, 8>();
2624        let mut sender1 = sender.clone();
2625        let mut sender2 = sender.clone();
2626        let mut sender3 = sender.clone();
2627
2628        tokio::spawn(async move {
2629            sender1.send(1).await.unwrap();
2630        });
2631        tokio::spawn(async move {
2632            sender2.send(2).await.unwrap();
2633        });
2634        tokio::spawn(async move {
2635            sender3.send(3).await.unwrap();
2636        });
2637
2638        let mut received = Vec::new();
2639        for _ in 0..3 {
2640            received.push(receiver.recv().await.unwrap());
2641        }
2642        received.sort();
2643        assert_eq!(received, vec![1, 2, 3]);
2644    }
2645
2646    #[tokio::test]
2647    async fn async_async_batch_operations() {
2648        let (mut sender, mut receiver) = async_mpsc::<u64, 6, 8>();
2649
2650        // Send batch
2651        assert!(
2652            sender
2653                .send_slice(vec![1, 2, 3, 4, 5])
2654                .await
2655                .unwrap()
2656                .is_empty()
2657        );
2658
2659        // Receive batch
2660        let mut buf = [0u64; 5];
2661        let count = receiver.recv_batch(&mut buf).await.unwrap();
2662        assert_eq!(count, 5);
2663        assert_eq!(buf, [1, 2, 3, 4, 5]);
2664    }
2665
2666    #[tokio::test]
2667    async fn async_async_closed_queue() {
2668        let (mut sender, mut receiver) = async_mpsc::<u64, 6, 8>();
2669
2670        sender.send(42).await.unwrap();
2671        assert_eq!(receiver.recv().await.unwrap(), 42);
2672
2673        // Drop sender - producer_count is decremented immediately in drop()
2674        drop(sender);
2675
2676        // Try to receive - should get Closed error (producer_count == 0 means closed)
2677        assert_eq!(receiver.recv().await, Err(PopError::Closed));
2678    }
2679
2680    // ===== Blocking Sender + Blocking Receiver Tests =====
2681
2682    #[test]
2683    fn blocking_blocking_basic_send_recv() {
2684        let (mut sender, mut receiver) = blocking_mpsc::<u64, 6, 8>();
2685
2686        sender.send(42).unwrap();
2687        assert_eq!(receiver.recv().unwrap(), 42);
2688    }
2689
2690    #[test]
2691    fn blocking_blocking_multiple_senders() {
2692        let (sender, receiver) = blocking_mpsc::<u64, 6, 8>();
2693        let receiver = Arc::new(std::sync::Mutex::new(receiver));
2694
2695        let mut handles = Vec::new();
2696        for i in 0..5 {
2697            let mut sender = sender.clone();
2698            let receiver = Arc::clone(&receiver);
2699            handles.push(thread::spawn(move || {
2700                sender.send(i).unwrap();
2701            }));
2702        }
2703
2704        for handle in handles {
2705            handle.join().unwrap();
2706        }
2707
2708        let mut received = Vec::new();
2709        let mut receiver = receiver.lock().unwrap();
2710        for _ in 0..5 {
2711            received.push(receiver.recv().unwrap());
2712        }
2713        received.sort();
2714        assert_eq!(received, vec![0, 1, 2, 3, 4]);
2715    }
2716
2717    #[test]
2718    fn blocking_blocking_batch_operations() {
2719        let (mut sender, mut receiver) = blocking_mpsc::<u64, 6, 8>();
2720
2721        // Send batch
2722        assert!(sender.send_slice(vec![1, 2, 3, 4, 5]).unwrap().is_empty());
2723
2724        // Receive batch
2725        let mut buf = [0u64; 5];
2726        let count = receiver.recv_batch(&mut buf).unwrap();
2727        assert_eq!(count, 5);
2728        assert_eq!(buf, [1, 2, 3, 4, 5]);
2729    }
2730
2731    #[test]
2732    fn blocking_blocking_closed_queue() {
2733        let (mut sender, mut receiver) = blocking_mpsc::<u64, 6, 8>();
2734
2735        sender.send(42).unwrap();
2736        assert_eq!(receiver.recv().unwrap(), 42);
2737
2738        // Drop sender - producer_count is decremented immediately in drop()
2739        drop(sender);
2740
2741        // Try to receive - should get Closed error (producer_count == 0 means closed)
2742        assert_eq!(receiver.recv(), Err(PopError::Closed));
2743    }
2744
2745    // ===== Blocking Sender + Async Receiver Tests =====
2746
2747    #[tokio::test]
2748    async fn blocking_async_basic_send_recv() {
2749        let (mut sender, mut receiver) = blocking_async_mpsc::<u64, 6, 8>();
2750
2751        // Blocking sender in a thread
2752        let handle = thread::spawn(move || {
2753            sender.send(42).unwrap();
2754        });
2755
2756        handle.join().unwrap();
2757        assert_eq!(receiver.recv().await.unwrap(), 42);
2758    }
2759
2760    #[tokio::test]
2761    async fn blocking_async_multiple_blocking_senders() {
2762        let (sender, mut receiver) = blocking_async_mpsc::<u64, 6, 8>();
2763
2764        let mut handles = Vec::new();
2765        for i in 0..5 {
2766            let mut sender = sender.clone();
2767            handles.push(thread::spawn(move || {
2768                sender.send(i).unwrap();
2769            }));
2770        }
2771
2772        for handle in handles {
2773            handle.join().unwrap();
2774        }
2775
2776        let mut received = Vec::new();
2777        for _ in 0..5 {
2778            received.push(receiver.recv().await.unwrap());
2779        }
2780        received.sort();
2781        assert_eq!(received, vec![0, 1, 2, 3, 4]);
2782    }
2783
2784    #[tokio::test]
2785    async fn blocking_async_wakeup_async_receiver() {
2786        let (mut sender, mut receiver) = blocking_async_mpsc::<u64, 6, 8>();
2787
2788        // Start async receiver waiting
2789        let recv_handle = tokio::spawn(async move { receiver.recv().await.unwrap() });
2790
2791        // Give async task time to register waker
2792        tokio::time::sleep(Duration::from_millis(10)).await;
2793
2794        // Blocking sender wakes async receiver
2795        thread::spawn(move || {
2796            sender.send(99).unwrap();
2797        });
2798
2799        assert_eq!(recv_handle.await.unwrap(), 99);
2800    }
2801
2802    #[tokio::test]
2803    async fn blocking_async_batch_operations() {
2804        let (mut sender, mut receiver) = blocking_async_mpsc::<u64, 6, 8>();
2805
2806        // Blocking sender sends batch
2807        thread::spawn(move || {
2808            let _ = sender.send_slice(vec![1, 2, 3, 4, 5]).unwrap();
2809        });
2810
2811        // Async receiver receives batch
2812        let mut buf = [0u64; 5];
2813        let count = receiver.recv_batch(&mut buf).await.unwrap();
2814        assert_eq!(count, 5);
2815        assert_eq!(buf, [1, 2, 3, 4, 5]);
2816    }
2817
2818    // ===== Async Sender + Blocking Receiver Tests =====
2819
2820    #[tokio::test]
2821    async fn async_blocking_basic_send_recv() {
2822        let (mut sender, receiver) = async_blocking_mpsc::<u64, 6, 8>();
2823        let receiver = Arc::new(std::sync::Mutex::new(receiver));
2824
2825        // Start blocking receiver first (will wait)
2826        let receiver_clone = Arc::clone(&receiver);
2827        let handle = thread::spawn(move || receiver_clone.lock().unwrap().recv().unwrap());
2828
2829        // Give blocking thread time to park
2830        tokio::time::sleep(Duration::from_millis(10)).await;
2831
2832        // Async sender wakes blocking receiver
2833        sender.send(42).await.unwrap();
2834
2835        assert_eq!(handle.join().unwrap(), 42);
2836    }
2837
2838    #[tokio::test]
2839    async fn async_blocking_multiple_async_senders() {
2840        let (sender, receiver) = async_blocking_mpsc::<u64, 6, 8>();
2841        let receiver = Arc::new(std::sync::Mutex::new(receiver));
2842
2843        // Multiple async senders
2844        for i in 0..5 {
2845            let mut sender = sender.clone();
2846            tokio::spawn(async move {
2847                sender.send(i).await.unwrap();
2848            });
2849        }
2850
2851        // Give async tasks time to send
2852        tokio::time::sleep(Duration::from_millis(50)).await;
2853
2854        // Blocking receiver receives all
2855        let mut received = Vec::new();
2856        let mut receiver = receiver.lock().unwrap();
2857        for _ in 0..5 {
2858            received.push(receiver.recv().unwrap());
2859        }
2860        received.sort();
2861        assert_eq!(received, vec![0, 1, 2, 3, 4]);
2862    }
2863
2864    #[tokio::test]
2865    async fn async_blocking_wakeup_blocking_receiver() {
2866        let (mut sender, receiver) = async_blocking_mpsc::<u64, 6, 8>();
2867        let receiver = Arc::new(std::sync::Mutex::new(receiver));
2868
2869        // Start blocking receiver waiting
2870        let receiver_clone = Arc::clone(&receiver);
2871        let recv_handle = thread::spawn(move || receiver_clone.lock().unwrap().recv().unwrap());
2872
2873        // Give blocking thread time to park
2874        tokio::time::sleep(Duration::from_millis(10)).await;
2875
2876        // Async sender wakes blocking receiver
2877        let send_handle = tokio::spawn(async move {
2878            sender.send(88).await.unwrap();
2879        });
2880
2881        // Wait for both to complete
2882        send_handle.await.unwrap();
2883        assert_eq!(recv_handle.join().unwrap(), 88);
2884    }
2885
2886    #[tokio::test]
2887    async fn async_blocking_batch_operations() {
2888        let (mut sender, receiver) = async_blocking_mpsc::<u64, 6, 8>();
2889        let receiver = Arc::new(std::sync::Mutex::new(receiver));
2890
2891        // Async sender sends batch
2892        tokio::spawn(async move {
2893            assert!(
2894                sender
2895                    .send_slice(vec![1, 2, 3, 4, 5])
2896                    .await
2897                    .unwrap()
2898                    .is_empty()
2899            );
2900        });
2901
2902        // Give async task time to send
2903        tokio::time::sleep(Duration::from_millis(10)).await;
2904
2905        // Blocking receiver receives batch
2906        let receiver_clone = Arc::clone(&receiver);
2907        let handle = thread::spawn(move || {
2908            let mut buf = [0u64; 5];
2909            let count = receiver_clone.lock().unwrap().recv_batch(&mut buf).unwrap();
2910            (count, buf)
2911        });
2912
2913        let (count, buf) = handle.join().unwrap();
2914        assert_eq!(count, 5);
2915        assert_eq!(buf, [1, 2, 3, 4, 5]);
2916    }
2917
2918    // ===== Mixed Sender Types Tests =====
2919    // Note: Testing true mixed senders (async + blocking from same queue) requires
2920    // creating senders from the same underlying queue, which is complex with the current API.
2921    // The individual combination tests above demonstrate that async and blocking
2922    // components can interoperate correctly through the shared waker infrastructure.
2923
2924    // ===== Error Handling Tests =====
2925
2926    #[tokio::test]
2927    async fn async_async_try_send_full() {
2928        let (mut sender, _receiver) = async_mpsc::<u64, 6, 8>();
2929
2930        // Fill the queue (this depends on queue capacity)
2931        // For now, just test that try_send works
2932        assert!(sender.try_send(1).is_ok());
2933    }
2934
2935    #[test]
2936    fn blocking_blocking_try_send_full() {
2937        let (mut sender, _receiver) = blocking_mpsc::<u64, 6, 8>();
2938
2939        assert!(sender.try_send(1).is_ok());
2940    }
2941
2942    #[tokio::test]
2943    async fn async_async_closed_sender() {
2944        let (mut sender, mut receiver) = async_mpsc::<u64, 6, 8>();
2945
2946        // Send one item and receive it
2947        sender.send(1).await.unwrap();
2948        assert_eq!(receiver.recv().await.unwrap(), 1);
2949
2950        // Drop receiver to close the queue
2951        drop(receiver);
2952
2953        // Sender should now see closed queue
2954        assert_eq!(sender.send(42).await, Err(PushError::Closed(42)));
2955    }
2956
2957    #[test]
2958    fn blocking_blocking_closed_sender() {
2959        let (mut sender, mut receiver) = blocking_mpsc::<u64, 6, 8>();
2960
2961        // Send one item and receive it
2962        sender.send(1).unwrap();
2963        assert_eq!(receiver.recv().unwrap(), 1);
2964
2965        // Drop receiver to close the queue
2966        drop(receiver);
2967
2968        // Sender should now see closed queue
2969        assert_eq!(sender.send(42), Err(PushError::Closed(42)));
2970    }
2971
2972    // ===== Unbounded MPSC Tests (Mirror of Bounded Tests) =====
2973
2974    #[test]
2975    fn unbounded_try_pop_drains_and_reports_closed() {
2976        use crate::sync::mpsc::unbounded;
2977        let (mut tx, mut rx) = unbounded::unbounded_new_with_sender::<u64>();
2978
2979        tx.try_push(42).unwrap();
2980        assert_eq!(rx.try_pop().unwrap(), 42);
2981        assert_eq!(rx.try_pop(), Err(PopError::Empty));
2982
2983        assert!(rx.close());
2984        assert_eq!(rx.try_pop(), Err(PopError::Closed));
2985    }
2986
2987    #[test]
2988    fn unbounded_dropping_local_sender_clears_producer_slot() {
2989        use crate::sync::mpsc::unbounded;
2990        let (tx, rx) = unbounded::unbounded_new_with_sender::<u64>();
2991        assert_eq!(tx.producer_count(), 1);
2992
2993        drop(tx);
2994
2995        // Closing will walk the slots and remove the dropped sender.
2996        assert!(rx.close());
2997        assert_eq!(rx.producer_count(), 0);
2998    }
2999
3000    // ===== Unbounded Async Sender + Async Receiver Tests =====
3001
3002    #[tokio::test]
3003    async fn unbounded_async_async_basic_send_recv() {
3004        use crate::sync::mpsc::unbounded;
3005        let (mut sender, mut receiver) = unbounded::async_unbounded_mpsc::<u64>();
3006
3007        sender.send(42).await.unwrap();
3008        assert_eq!(receiver.recv().await.unwrap(), 42);
3009    }
3010
3011    #[tokio::test]
3012    async fn unbounded_async_async_multiple_senders() {
3013        use crate::sync::mpsc::unbounded;
3014        let (sender, mut receiver) = unbounded::async_unbounded_mpsc::<u64>();
3015        let mut sender1 = sender.clone();
3016        let mut sender2 = sender.clone();
3017        let mut sender3 = sender.clone();
3018
3019        tokio::spawn(async move {
3020            sender1.send(1).await.unwrap();
3021        });
3022        tokio::spawn(async move {
3023            sender2.send(2).await.unwrap();
3024        });
3025        tokio::spawn(async move {
3026            sender3.send(3).await.unwrap();
3027        });
3028
3029        let mut received = Vec::new();
3030        for _ in 0..3 {
3031            received.push(receiver.recv().await.unwrap());
3032        }
3033        received.sort();
3034        assert_eq!(received, vec![1, 2, 3]);
3035    }
3036
3037    #[tokio::test]
3038    async fn unbounded_async_async_closed_queue() {
3039        use crate::sync::mpsc::unbounded;
3040        let (mut sender, mut receiver) = unbounded::async_unbounded_mpsc::<u64>();
3041
3042        sender.send(42).await.unwrap();
3043        assert_eq!(receiver.recv().await.unwrap(), 42);
3044
3045        // Drop sender - producer_count is decremented immediately in drop()
3046        drop(sender);
3047
3048        // Try to receive - should get Closed error (producer_count == 0 means closed)
3049        assert_eq!(receiver.recv().await, Err(PopError::Closed));
3050    }
3051
3052    #[tokio::test]
3053    async fn unbounded_async_async_try_send_full() {
3054        use crate::sync::mpsc::unbounded;
3055        let (mut sender, _receiver) = unbounded::async_unbounded_mpsc::<u64>();
3056
3057        // Unbounded never fills, so this should always succeed
3058        assert!(sender.try_send(1).is_ok());
3059        assert!(sender.try_send(2).is_ok());
3060        assert!(sender.try_send(3).is_ok());
3061    }
3062
3063    #[tokio::test]
3064    async fn unbounded_async_async_closed_sender() {
3065        use crate::sync::mpsc::unbounded;
3066        let (mut sender, mut receiver) = unbounded::async_unbounded_mpsc::<u64>();
3067
3068        // Send one item and receive it
3069        sender.send(1).await.unwrap();
3070        assert_eq!(receiver.recv().await.unwrap(), 1);
3071
3072        // Drop receiver to close the queue
3073        drop(receiver);
3074
3075        // Sender should now see closed queue
3076        assert_eq!(sender.send(42).await, Err(PushError::Closed(42)));
3077    }
3078
3079    // ===== Unbounded Blocking Sender + Blocking Receiver Tests =====
3080
3081    #[test]
3082    fn unbounded_blocking_blocking_basic_send_recv() {
3083        use crate::sync::mpsc::unbounded;
3084        let (mut sender, mut receiver) = unbounded::blocking_unbounded_mpsc::<u64>();
3085
3086        sender.send(42).unwrap();
3087        assert_eq!(receiver.recv().unwrap(), 42);
3088    }
3089
3090    #[test]
3091    fn unbounded_blocking_blocking_multiple_senders() {
3092        use crate::sync::mpsc::unbounded;
3093        let (sender, receiver) = unbounded::blocking_unbounded_mpsc::<u64>();
3094        let receiver = Arc::new(std::sync::Mutex::new(receiver));
3095
3096        let mut handles = Vec::new();
3097        for i in 0..5 {
3098            let mut sender = sender.clone();
3099            let receiver = Arc::clone(&receiver);
3100            handles.push(thread::spawn(move || {
3101                sender.send(i).unwrap();
3102            }));
3103        }
3104
3105        for handle in handles {
3106            handle.join().unwrap();
3107        }
3108
3109        let mut received = Vec::new();
3110        let mut receiver = receiver.lock().unwrap();
3111        for _ in 0..5 {
3112            received.push(receiver.recv().unwrap());
3113        }
3114        received.sort();
3115        assert_eq!(received, vec![0, 1, 2, 3, 4]);
3116    }
3117
3118    #[test]
3119    fn unbounded_blocking_blocking_closed_queue() {
3120        use crate::sync::mpsc::unbounded;
3121        let (mut sender, mut receiver) = unbounded::blocking_unbounded_mpsc::<u64>();
3122
3123        sender.send(42).unwrap();
3124        assert_eq!(receiver.recv().unwrap(), 42);
3125
3126        // Drop sender - producer_count is decremented immediately in drop()
3127        drop(sender);
3128
3129        // Try to receive - should get Closed error (producer_count == 0 means closed)
3130        assert_eq!(receiver.recv(), Err(PopError::Closed));
3131    }
3132
3133    #[test]
3134    fn unbounded_blocking_blocking_try_send_full() {
3135        use crate::sync::mpsc::unbounded;
3136        let (mut sender, _receiver) = unbounded::blocking_unbounded_mpsc::<u64>();
3137
3138        // Unbounded never fills, so this should always succeed
3139        assert!(sender.try_send(1).is_ok());
3140    }
3141
3142    #[test]
3143    fn unbounded_blocking_blocking_closed_sender() {
3144        use crate::sync::mpsc::unbounded;
3145        let (mut sender, mut receiver) = unbounded::blocking_unbounded_mpsc::<u64>();
3146
3147        // Send one item and receive it
3148        sender.send(1).unwrap();
3149        assert_eq!(receiver.recv().unwrap(), 1);
3150
3151        // Drop receiver to close the queue
3152        drop(receiver);
3153
3154        // Sender should now see closed queue
3155        assert_eq!(sender.send(42), Err(PushError::Closed(42)));
3156    }
3157
3158    // ===== Unbounded Blocking Sender + Async Receiver Tests =====
3159
3160    #[tokio::test]
3161    async fn unbounded_blocking_async_basic_send_recv() {
3162        use crate::sync::mpsc::unbounded;
3163        let (mut sender, mut receiver) = unbounded::blocking_async_unbounded_mpsc::<u64>();
3164
3165        // Blocking sender in a thread
3166        let handle = thread::spawn(move || {
3167            sender.send(42).unwrap();
3168        });
3169
3170        handle.join().unwrap();
3171        assert_eq!(receiver.recv().await.unwrap(), 42);
3172    }
3173
3174    #[tokio::test]
3175    async fn unbounded_blocking_async_multiple_blocking_senders() {
3176        use crate::sync::mpsc::unbounded;
3177        let (sender, mut receiver) = unbounded::blocking_async_unbounded_mpsc::<u64>();
3178
3179        let mut handles = Vec::new();
3180        for i in 0..5 {
3181            let mut sender = sender.clone();
3182            handles.push(thread::spawn(move || {
3183                sender.send(i).unwrap();
3184            }));
3185        }
3186
3187        for handle in handles {
3188            handle.join().unwrap();
3189        }
3190
3191        let mut received = Vec::new();
3192        for _ in 0..5 {
3193            received.push(receiver.recv().await.unwrap());
3194        }
3195        received.sort();
3196        assert_eq!(received, vec![0, 1, 2, 3, 4]);
3197    }
3198
3199    #[tokio::test]
3200    async fn unbounded_blocking_async_wakeup_async_receiver() {
3201        use crate::sync::mpsc::unbounded;
3202        let (mut sender, mut receiver) = unbounded::blocking_async_unbounded_mpsc::<u64>();
3203
3204        // Start async receiver waiting
3205        let recv_handle = tokio::spawn(async move { receiver.recv().await.unwrap() });
3206
3207        // Give async task time to register waker
3208        tokio::time::sleep(Duration::from_millis(10)).await;
3209
3210        // Blocking sender wakes async receiver
3211        thread::spawn(move || {
3212            sender.send(99).unwrap();
3213        });
3214
3215        assert_eq!(recv_handle.await.unwrap(), 99);
3216    }
3217
3218    // ===== Unbounded Async Sender + Blocking Receiver Tests =====
3219
3220    #[tokio::test]
3221    async fn unbounded_async_blocking_basic_send_recv() {
3222        use crate::sync::mpsc::unbounded;
3223        let (mut sender, receiver) = unbounded::async_blocking_unbounded_mpsc::<u64>();
3224        let receiver = Arc::new(std::sync::Mutex::new(receiver));
3225
3226        // Start blocking receiver first (will wait)
3227        let receiver_clone = Arc::clone(&receiver);
3228        let handle = thread::spawn(move || receiver_clone.lock().unwrap().recv().unwrap());
3229
3230        // Give blocking thread time to park
3231        tokio::time::sleep(Duration::from_millis(10)).await;
3232
3233        // Async sender wakes blocking receiver
3234        sender.send(42).await.unwrap();
3235
3236        assert_eq!(handle.join().unwrap(), 42);
3237    }
3238
3239    #[tokio::test]
3240    async fn unbounded_async_blocking_multiple_async_senders() {
3241        use crate::sync::mpsc::unbounded;
3242        let (sender, receiver) = unbounded::async_blocking_unbounded_mpsc::<u64>();
3243        let receiver = Arc::new(std::sync::Mutex::new(receiver));
3244
3245        // Multiple async senders
3246        for i in 0..5 {
3247            let mut sender = sender.clone();
3248            tokio::spawn(async move {
3249                sender.send(i).await.unwrap();
3250            });
3251        }
3252
3253        // Give async tasks time to send
3254        tokio::time::sleep(Duration::from_millis(50)).await;
3255
3256        // Blocking receiver receives all
3257        let mut received = Vec::new();
3258        let mut receiver = receiver.lock().unwrap();
3259        for _ in 0..5 {
3260            received.push(receiver.recv().unwrap());
3261        }
3262        received.sort();
3263        assert_eq!(received, vec![0, 1, 2, 3, 4]);
3264    }
3265
3266    #[tokio::test]
3267    async fn unbounded_async_blocking_wakeup_blocking_receiver() {
3268        use crate::sync::mpsc::unbounded;
3269        let (mut sender, receiver) = unbounded::async_blocking_unbounded_mpsc::<u64>();
3270        let receiver = Arc::new(std::sync::Mutex::new(receiver));
3271
3272        // Start blocking receiver waiting
3273        let receiver_clone = Arc::clone(&receiver);
3274        let recv_handle = thread::spawn(move || receiver_clone.lock().unwrap().recv().unwrap());
3275
3276        // Give blocking thread time to park
3277        tokio::time::sleep(Duration::from_millis(10)).await;
3278
3279        // Async sender wakes blocking receiver
3280        let send_handle = tokio::spawn(async move {
3281            sender.send(88).await.unwrap();
3282        });
3283
3284        // Wait for both to complete
3285        send_handle.await.unwrap();
3286        assert_eq!(recv_handle.join().unwrap(), 88);
3287    }
3288
3289    // ===== Unbounded High-Volume Stress Tests =====
3290
3291    #[test]
3292    fn unbounded_high_volume_stress() {
3293        use crate::sync::mpsc::unbounded;
3294        let (sender, receiver) = unbounded::blocking_unbounded_mpsc::<u64>();
3295        let receiver = Arc::new(std::sync::Mutex::new(receiver));
3296
3297        // Producer threads
3298        let mut producer_handles = Vec::new();
3299        for producer_id in 0..4 {
3300            let mut sender = sender.clone();
3301            let handle = thread::spawn(move || {
3302                for i in 0..250 {
3303                    let value = producer_id * 1000 + i;
3304                    sender.send(value).unwrap();
3305                }
3306            });
3307            producer_handles.push(handle);
3308        }
3309
3310        // Consumer thread
3311        let receiver_clone = Arc::clone(&receiver);
3312        let consumer_handle = thread::spawn(move || {
3313            let mut received = Vec::new();
3314            let mut receiver = receiver_clone.lock().unwrap();
3315            
3316            loop {
3317                match receiver.recv() {
3318                    Ok(value) => {
3319                        received.push(value);
3320                    }
3321                    Err(PopError::Closed) => break,
3322                    Err(PopError::Empty) | Err(PopError::Timeout) => {
3323                        // Keep retrying - the channel will eventually close
3324                        thread::sleep(std::time::Duration::from_micros(1));
3325                    }
3326                }
3327            }
3328            received
3329        });
3330
3331        // Wait for producers to complete
3332        for handle in producer_handles {
3333            handle.join().unwrap();
3334        }
3335        
3336        // Small sleep to ensure all items are settled in queues
3337        thread::sleep(Duration::from_millis(10));
3338        
3339        drop(sender); // Close the channel
3340
3341        // Wait for consumer
3342        let mut received = consumer_handle.join().unwrap();
3343        received.sort();
3344
3345        // Verify all items received
3346        assert_eq!(received.len(), 1000, "Expected 1000 items but got {}", received.len());
3347        
3348        // Build expected values: each producer sends 250 items in its range
3349        let mut expected = Vec::new();
3350        for producer_id in 0..4 {
3351            for i in 0..250 {
3352                expected.push(producer_id * 1000 + i);
3353            }
3354        }
3355        expected.sort();
3356        
3357        assert_eq!(received, expected);
3358    }
3359
3360    #[tokio::test]
3361    async fn unbounded_async_high_volume_stress() {
3362        use crate::sync::mpsc::unbounded;
3363        let (sender, receiver) = unbounded::async_unbounded_mpsc::<u64>();
3364        let receiver = Arc::new(std::sync::Mutex::new(receiver));
3365
3366        // Multiple async producers
3367        let mut producer_tasks = Vec::new();
3368        for producer_id in 0..4 {
3369            let mut sender = sender.clone();
3370            let task = tokio::spawn(async move {
3371                for i in 0..250 {
3372                    let value = producer_id * 1000 + i;
3373                    sender.send(value).await.unwrap();
3374                }
3375            });
3376            producer_tasks.push(task);
3377        }
3378
3379        // Collect all received items using a loop that breaks on close
3380        let receiver_clone = Arc::clone(&receiver);
3381        let receiver_task = tokio::spawn(async move {
3382            let mut items: Vec<u64> = Vec::new();
3383            
3384            loop {
3385                let result = {
3386                    let mut receiver_guard = receiver_clone.lock().unwrap();
3387                    receiver_guard.try_recv()
3388                };
3389                
3390                match result {
3391                    Ok(value) => {
3392                        items.push(value);
3393                    }
3394                    Err(PopError::Closed) => break,
3395                    Err(PopError::Empty) | Err(PopError::Timeout) => {
3396                        // Keep retrying - the channel will eventually close
3397                        tokio::time::sleep(Duration::from_micros(1)).await;
3398                    }
3399                }
3400            }
3401            items
3402        });
3403
3404        // Wait for producers to complete
3405        for task in producer_tasks {
3406            task.await.unwrap();
3407        }
3408        
3409        // Small delay to ensure all items are settled
3410        tokio::time::sleep(Duration::from_millis(10)).await;
3411        
3412        drop(sender); // Close the sender
3413
3414        // Wait for receiver task
3415        let mut recv_items = receiver_task.await.unwrap();
3416
3417        // Verify all items received
3418        recv_items.sort();
3419        assert_eq!(recv_items.len(), 1000, "Expected 1000 items but got {}", recv_items.len());
3420        
3421        // Build expected values: each producer sends 250 items in its range
3422        let mut expected = Vec::new();
3423        for producer_id in 0..4 {
3424            for i in 0..250 {
3425                expected.push(producer_id * 1000 + i);
3426            }
3427        }
3428        expected.sort();
3429        
3430        assert_eq!(recv_items, expected);
3431    }
3432
3433    // ===== Unbounded Capacity Tests =====
3434
3435    #[test]
3436    fn unbounded_can_push_beyond_typical_limits() {
3437        use crate::sync::mpsc::unbounded;
3438        let (mut sender, mut receiver) = unbounded::unbounded_new_with_sender::<u64>();
3439
3440        // Push way more items than would fit in a bounded queue
3441        const NUM_ITEMS: u64 = 10_000;
3442        for i in 0..NUM_ITEMS {
3443            sender.try_push(i).unwrap();
3444        }
3445
3446        // All items should be retrievable
3447        for i in 0..NUM_ITEMS {
3448            assert_eq!(receiver.try_pop().unwrap(), i);
3449        }
3450
3451        assert_eq!(receiver.try_pop(), Err(PopError::Empty));
3452    }
3453
3454    #[tokio::test]
3455    async fn unbounded_async_can_push_beyond_typical_limits() {
3456        use crate::sync::mpsc::unbounded;
3457        let (mut sender, mut receiver) = unbounded::async_unbounded_mpsc::<u64>();
3458
3459        // Push many items asynchronously
3460        const NUM_ITEMS: u64 = 10_000;
3461        for i in 0..NUM_ITEMS {
3462            sender.send(i).await.unwrap();
3463        }
3464
3465        // All items should be retrievable
3466        for i in 0..NUM_ITEMS {
3467            assert_eq!(receiver.recv().await.unwrap(), i);
3468        }
3469
3470        // Close and verify
3471        drop(sender);
3472        assert_eq!(receiver.recv().await, Err(PopError::Closed));
3473    }
3474
3475    // ===== Unbounded Producer Count Tests =====
3476
3477    #[test]
3478    fn unbounded_producer_count_tracking() {
3479        use crate::sync::mpsc::unbounded;
3480        let (sender, rx) = unbounded::unbounded_new_with_sender::<u64>();
3481        assert_eq!(sender.producer_count(), 1);
3482
3483        let sender2 = sender.clone();
3484        assert_eq!(sender2.producer_count(), 2);
3485
3486        let sender3 = sender.clone();
3487        assert_eq!(sender3.producer_count(), 3);
3488
3489        drop(sender);
3490        assert_eq!(sender2.producer_count(), 2);
3491
3492        drop(sender2);
3493        assert_eq!(sender3.producer_count(), 1);
3494
3495        drop(sender3);
3496        // After all senders dropped, closing should report 0
3497        rx.close();
3498    }
3499
3500    // ===== Unified MPSC Tests (Mixed Async/Blocking) =====
3501
3502    #[tokio::test]
3503    async fn unbounded_mixed_async_blocking_senders() {
3504        use crate::sync::mpsc::unbounded;
3505        let (async_sender, mut receiver) = unbounded::async_unbounded_mpsc::<u64>();
3506
3507        // Create a blocking sender from the async sender
3508        let blocking_sender = async_sender.create_blocking_sender();
3509
3510        // Async producer
3511        let mut async_sender_clone = async_sender.clone();
3512        let async_task = tokio::spawn(async move {
3513            for i in 0..100 {
3514                async_sender_clone.send(i).await.unwrap();
3515            }
3516        });
3517
3518        // Blocking producer in a thread
3519        let blocking_sender_clone = blocking_sender.clone();
3520        let blocking_thread = thread::spawn(move || {
3521            let mut sender = blocking_sender_clone;
3522            for i in 100..200 {
3523                sender.send(i).unwrap();
3524            }
3525        });
3526
3527        // Drop original senders
3528        drop(async_sender);
3529        drop(blocking_sender);
3530
3531        // Wait for producers
3532        async_task.await.unwrap();
3533        blocking_thread.join().unwrap();
3534
3535        // Collect all items using async recv
3536        let mut items = Vec::new();
3537        loop {
3538            match receiver.recv().await {
3539                Ok(value) => items.push(value),
3540                Err(PopError::Closed) => break,
3541                Err(_) => continue,
3542            }
3543        }
3544
3545        // Verify
3546        items.sort();
3547        assert_eq!(items.len(), 200);
3548        assert_eq!(items, (0..200).collect::<Vec<_>>());
3549    }
3550
3551    #[test]
3552    fn unbounded_mixed_blocking_recv() {
3553        use crate::sync::mpsc::unbounded;
3554        let (blocking_sender, mut receiver) = unbounded::blocking_unbounded_mpsc::<u64>();
3555
3556        // Create an async sender from the blocking sender
3557        let async_sender = blocking_sender.create_async_sender();
3558
3559        // Blocking producer in a thread
3560        let blocking_sender_clone = blocking_sender.clone();
3561        let blocking_thread = thread::spawn(move || {
3562            let mut sender = blocking_sender_clone;
3563            for i in 0..100 {
3564                sender.send(i).unwrap();
3565            }
3566        });
3567
3568        // Drop original senders
3569        drop(async_sender);
3570        drop(blocking_sender);
3571
3572        // Wait for producer
3573        blocking_thread.join().unwrap();
3574
3575        // Collect all items using blocking recv
3576        let mut items = Vec::new();
3577        loop {
3578            match receiver.recv() {
3579                Ok(value) => items.push(value),
3580                Err(PopError::Closed) => break,
3581                Err(_) => continue,
3582            }
3583        }
3584
3585        // Verify
3586        items.sort();
3587        assert_eq!(items.len(), 100);
3588        assert_eq!(items, (0..100).collect::<Vec<_>>());
3589    }
3590}