maniac_runtime/runtime/
mpsc.rs

1use super::signal::{SIGNAL_MASK, Signal, SignalGate};
2use super::waker::{STATUS_SUMMARY_WORDS, WorkerWaker};
3use crate::utils::bits::find_nearest;
4use crate::utils::CachePadded;
5use crate::{PopError, PushError};
6use std::cell::UnsafeCell;
7use std::mem::MaybeUninit;
8use std::ptr;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering};
11use std::thread;
12
13use crate::spsc::{UnboundedSpsc, UnboundedSender};
14use rand::RngCore;
15
16/// Create a new blocking MPSC queue
17pub fn new<T, const P: usize, const NUM_SEGS_P2: usize>() -> Receiver<T, P, NUM_SEGS_P2> {
18    new_with_waker(Arc::new(WorkerWaker::new()))
19}
20
21/// Create a new blocking MPSC queue with a custom SignalWaker
22///
23/// This allows integration with external notification systems. The waker's
24/// callback will be invoked when work becomes available in the queue.
25///
26/// # Arguments
27///
28/// * `waker` - Custom SignalWaker (typically with a callback to update worker status)
29///
30/// # Example
31///
32/// ```ignore
33/// let worker_status = Arc::new(AtomicU64::new(0));
34/// let status_clone = Arc::clone(&worker_status);
35/// let waker = Arc::new(SignalWaker::new_with_callback(Some(Box::new(move || {
36///     status_clone.fetch_or(WORKER_BIT_QUEUE, Ordering::Release);
37/// }))));
38/// let receiver = mpsc::new_with_waker(waker);
39/// ```
40pub fn new_with_waker<T, const P: usize, const NUM_SEGS_P2: usize>(
41    waker: Arc<WorkerWaker>,
42) -> Receiver<T, P, NUM_SEGS_P2> {
43    // Create sparse array of AtomicPtr, all initialized to null
44    let mut queues = Vec::with_capacity(MAX_QUEUES);
45    for _ in 0..MAX_QUEUES {
46        queues.push(AtomicPtr::new(core::ptr::null_mut()));
47    }
48
49    let signals: Arc<[Signal; SIGNAL_WORDS]> =
50        Arc::new(std::array::from_fn(|i| Signal::with_index(i as u64)));
51
52    let inner = Arc::new(Inner {
53        queues: queues.into_boxed_slice(),
54        queue_count: CachePadded::new(AtomicUsize::new(0)),
55        producer_count: CachePadded::new(AtomicUsize::new(0)),
56        max_producer_id: AtomicUsize::new(0),
57        closed: CachePadded::new(AtomicBool::new(false)),
58        waker,
59        signals,
60    });
61
62    Receiver {
63        inner,
64        misses: 0,
65        seed: rand::rng().next_u64(),
66    }
67}
68
69pub fn new_with_sender<T, const P: usize, const NUM_SEGS_P2: usize>()
70-> (Sender<T, P, NUM_SEGS_P2>, Receiver<T, P, NUM_SEGS_P2>) {
71    let waker = Arc::new(WorkerWaker::new());
72    // Create sparse array of AtomicPtr, all initialized to null
73    let mut queues = Vec::with_capacity(MAX_QUEUES);
74    for _ in 0..MAX_QUEUES {
75        queues.push(AtomicPtr::new(core::ptr::null_mut()));
76    }
77
78    let signals: Arc<[Signal; SIGNAL_WORDS]> =
79        Arc::new(std::array::from_fn(|i| Signal::with_index(i as u64)));
80
81    let inner = Arc::new(Inner {
82        queues: queues.into_boxed_slice(),
83        queue_count: CachePadded::new(AtomicUsize::new(0)),
84        producer_count: CachePadded::new(AtomicUsize::new(0)),
85        max_producer_id: AtomicUsize::new(0),
86        closed: CachePadded::new(AtomicBool::new(false)),
87        waker: waker,
88        signals,
89    });
90
91    (
92        inner
93            .create_sender()
94            .expect("fatal: mpsc won't allow even 1 sender"),
95        Receiver {
96            inner,
97            misses: 0,
98            seed: rand::rng().next_u64(),
99        },
100    )
101}
102
103const RND_MULTIPLIER: u64 = 0x5DEECE66D;
104const RND_ADDEND: u64 = 0xB;
105const RND_MASK: u64 = (1 << 48) - 1;
106
107/// Maximum number of producers (limited by SignalWaker summary capacity)
108const MAX_QUEUES: usize = STATUS_SUMMARY_WORDS * 64;
109const QUEUES_PER_PRODUCER: usize = 1;
110const MAX_PRODUCERS: usize = MAX_QUEUES / QUEUES_PER_PRODUCER;
111const MAX_PRODUCERS_MASK: usize = QUEUES_PER_PRODUCER - 1;
112
113/// Number of u64 words needed for the signal bitset
114const SIGNAL_WORDS: usize = STATUS_SUMMARY_WORDS;
115
116/// Thread-local cache of SPSC queues for each producer thread
117/// Stores (producer_id, queue_ptr, close_fn) where close_fn can close the queue
118type CloseFn = Box<dyn FnOnce()>;
119
120/// The shared state of the blocking MPSC queue
121struct Inner<T, const P: usize, const NUM_SEGS_P2: usize> {
122    /// Sparse array of producer queues - always MAX_PRODUCERS size
123    /// Each slot is an AtomicPtr for thread-safe registration and access
124    queues: Box<[AtomicPtr<UnboundedSpsc<T, P, NUM_SEGS_P2, Arc<SignalGate>>>]>,
125    queue_count: CachePadded<AtomicUsize>,
126    /// Number of registered producers
127    producer_count: CachePadded<AtomicUsize>,
128    max_producer_id: AtomicUsize,
129    /// Closed flag
130    closed: CachePadded<AtomicBool>,
131    waker: Arc<WorkerWaker>,
132    /// Signal bitset - one bit per producer indicating which queues has data
133    signals: Arc<[Signal; SIGNAL_WORDS]>,
134}
135
136impl<T, const P: usize, const NUM_SEGS_P2: usize> Inner<T, P, NUM_SEGS_P2> {
137    /// Check if the queue is closed
138    pub fn is_closed(&self) -> bool {
139        self.closed.load(Ordering::Acquire)
140    }
141
142    /// Get the number of registered producers
143    pub fn producer_count(&self) -> usize {
144        self.producer_count.load(Ordering::Relaxed)
145    }
146
147    pub fn create_sender(self: &Arc<Self>) -> Result<Sender<T, P, NUM_SEGS_P2>, PushError<()>> {
148        self.create_sender_with_config(0)
149    }
150
151    /// Create a new producer handle that bypasses all thread-local caching
152    ///
153    /// This creates a direct, high-performance handle to a specific producer queue.
154    /// The handle provides push-only access without any thread-local overhead, making
155    /// it ideal for scenarios where you need maximum performance and want to maintain
156    /// explicit control over producer instances.
157    ///
158    /// Unlike `get_producer_queue()`, this method:
159    /// - Does not register with thread-local storage
160    /// - Does not use caching mechanisms
161    /// - Provides a standalone handle that can be stored and reused
162    /// - Offers maximum push performance
163    ///
164    /// # Returns
165    ///
166    /// Returns a `ProducerHandle` that can be used to push values, or `PushError::Closed`
167    /// if the MPSC queue is closed.
168    ///
169    /// # Example
170    ///
171    /// ```ignoreignore
172    /// let mpsc = MpscBlocking::<i32, 64>::new();
173    ///
174    /// // Create a direct producer handle
175    /// let producer = mpsc.create_producer_handle().unwrap();
176    ///
177    /// // Use the handle for high-performance pushes
178    /// producer.push(42).unwrap();
179    /// producer.push_bulk(&[1, 2, 3]).unwrap();
180    /// ```ignore
181    pub fn create_sender_with_config(
182        self: &Arc<Self>,
183        max_pooled_segments: usize,
184    ) -> Result<Sender<T, P, NUM_SEGS_P2>, PushError<()>> {
185        if self.is_closed() {
186            return Err(PushError::Closed(()));
187        }
188
189        loop {
190            let current = self.producer_count.load(Ordering::Acquire);
191            if current >= MAX_PRODUCERS {
192                return Err(PushError::Full(()));
193            }
194            if self
195                .producer_count
196                .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
197                .is_ok()
198            {
199                break;
200            }
201        }
202
203        let mut assigned_id = None;
204        let mut sender: Option<UnboundedSender<T, P, NUM_SEGS_P2, Arc<SignalGate>>> = None;
205
206        for signal_index in 0..SIGNAL_WORDS {
207            for bit_index in 0..64 {
208                let queue_index = signal_index * 64 + bit_index;
209                if queue_index >= MAX_QUEUES {
210                    break;
211                }
212                if !self.queues[queue_index].load(Ordering::Acquire).is_null() {
213                    continue;
214                }
215
216                let signal_gate = Arc::new(SignalGate::new(
217                    bit_index as u8,
218                    self.signals[signal_index].clone(),
219                    Arc::clone(&self.waker),
220                ));
221
222                let (tx, _rx) = UnboundedSpsc::<T, P, NUM_SEGS_P2, Arc<SignalGate>>::new_with_signal(signal_gate);
223                
224                // Get the Arc pointer to the UnboundedSpsc from the sender
225                let unbounded_arc = tx.unbounded_arc();
226                let raw = Arc::into_raw(unbounded_arc) as *mut UnboundedSpsc<T, P, NUM_SEGS_P2, Arc<SignalGate>>;
227                
228                match self.queues[queue_index].compare_exchange(
229                    ptr::null_mut(),
230                    raw,
231                    Ordering::Release,
232                    Ordering::Acquire,
233                ) {
234                    Ok(_) => {
235                        self.queue_count.fetch_add(1, Ordering::Relaxed);
236                        assigned_id = Some(queue_index);
237                        sender = Some(tx);
238                        break;
239                    }
240                    Err(_) => unsafe {
241                        Arc::from_raw(raw);
242                    },
243                }
244            }
245            if assigned_id.is_some() {
246                break;
247            }
248        }
249
250        let producer_id = match assigned_id {
251            Some(id) => id,
252            None => {
253                self.producer_count.fetch_sub(1, Ordering::Release);
254                return Err(PushError::Full(()));
255            }
256        };
257
258        loop {
259            let max_producer_id = self.max_producer_id.load(Ordering::SeqCst);
260            if producer_id < max_producer_id {
261                break;
262            }
263            if self.is_closed() {
264                return Err(PushError::Closed(()));
265            }
266            if self
267                .max_producer_id
268                .compare_exchange(
269                    max_producer_id,
270                    producer_id,
271                    Ordering::SeqCst,
272                    Ordering::SeqCst,
273                )
274                .is_ok()
275            {
276                break;
277            }
278        }
279
280        let sender = sender.expect("sender missing");
281
282        Ok(Sender {
283            inner: Arc::clone(&self),
284            sender,
285            producer_id,
286        })
287    }
288
289    /// Close the queue
290    ///
291    /// After closing, no more items can be pushed. This method will block until all
292    /// SPSC queues are empty. Wakes any waiting consumer.
293    ///
294    /// Note: This waits for consumer to drain all items from all producer queues.
295    pub fn close(&self) -> bool {
296        let was_open = !self.closed.swap(true, Ordering::AcqRel);
297        if was_open {
298            let permits = self.queue_count.load(Ordering::Relaxed).max(1);
299            self.waker.release(permits);
300        }
301
302        for slot in self.queues.iter() {
303            let queue_ptr = slot.swap(ptr::null_mut(), Ordering::AcqRel);
304            if queue_ptr.is_null() {
305                continue;
306            }
307
308            unsafe {
309                (&*queue_ptr).close();
310                Arc::from_raw(queue_ptr);
311            }
312
313            self.queue_count.fetch_sub(1, Ordering::Relaxed);
314            self.producer_count.fetch_sub(1, Ordering::Relaxed);
315        }
316
317        self.producer_count.load(Ordering::Acquire) == 0
318    }
319}
320
321/// Blocking MPMC Queue - Multi-Producer Single-Consumer with blocking operations
322///
323/// # Type Parameters
324/// - `T`: The type of elements stored in the queue (must be Copy)
325/// - `P`: log2 of segment size (default 8 = 256 items/segment)
326/// - `NUM_SEGS_P2`: log2 of number of segments (default 2 = 4 segments, total capacity ~1024)
327///
328/// # Examples
329///
330/// ```ignore
331/// use bop_mpmc::mpsc_blocking::MpscBlocking;
332/// use std::thread;
333/// use std::time::Duration;
334///
335/// let mpsc = MpscBlocking::<i32>::new();
336///
337/// // Producer thread
338/// let mpsc_producer = mpsc.clone();
339/// thread::spawn(move || {
340///     thread::sleep(Duration::from_millis(100));
341///     mpsc_producer.push(42).unwrap();
342/// });
343///
344/// // Consumer blocks until item is available
345/// let value = mpsc.pop_blocking().unwrap();
346/// assert_eq!(value, 42);
347/// ```ignore
348pub struct Sender<T, const P: usize, const NUM_SEGS_P2: usize> {
349    inner: Arc<Inner<T, P, NUM_SEGS_P2>>,
350    sender: UnboundedSender<T, P, NUM_SEGS_P2, Arc<SignalGate>>,
351    producer_id: usize,
352}
353
354impl<T, const P: usize, const NUM_SEGS_P2: usize> Sender<T, P, NUM_SEGS_P2> {
355    /// Check if the queue is closed
356    pub fn is_closed(&self) -> bool {
357        self.inner.closed.load(Ordering::Acquire)
358    }
359
360    /// Get the number of registered producers
361    pub fn producer_count(&self) -> usize {
362        self.inner.producer_count.load(Ordering::Relaxed)
363    }
364
365    /// Get the producer ID for this handle
366    pub fn producer_id(&self) -> usize {
367        self.producer_id
368    }
369
370    /// Push a value onto the queue
371    ///
372    /// This will use the unbounded SPSC queue for this producer.
373    /// Never blocks since the queue is unbounded.
374    pub fn try_push(&mut self, value: T) -> Result<(), PushError<T>> {
375        self.sender.try_push(value)
376    }
377
378    /// Push multiple values in bulk
379    pub fn try_push_n(&mut self, values: &mut Vec<T>) -> Result<usize, PushError<()>> {
380        if self.is_closed() {
381            return Err(PushError::Closed(()));
382        }
383        self.sender.try_push_n(values)
384    }
385
386    /// Push a value onto the queue
387    ///
388    /// This will use the unbounded SPSC queue for this producer.
389    pub unsafe fn unsafe_try_push(&self, value: T) -> Result<(), PushError<T>> {
390        self.sender.try_push(value)
391    }
392
393    /// Push multiple values in bulk
394    pub unsafe fn unsafe_try_push_n(&self, values: &mut Vec<T>) -> Result<usize, PushError<()>> {
395        if self.is_closed() {
396            return Err(PushError::Closed(()));
397        }
398        self.sender.try_push_n(values)
399    }
400
401    /// Close the queue
402    ///
403    /// After closing, no more items can be pushed. This method will block until all
404    /// SPSC queues are empty. Wakes any waiting consumer.
405    ///
406    /// Note: This waits for consumer to drain all items from all producer queues.
407    pub fn close(&mut self) -> bool {
408        self.inner.close()
409    }
410}
411
412impl<T, const P: usize, const NUM_SEGS_P2: usize> Clone for Sender<T, P, NUM_SEGS_P2> {
413    fn clone(&self) -> Self {
414        self.inner.create_sender().expect("too many senders")
415    }
416}
417
418impl<T, const P: usize, const NUM_SEGS_P2: usize> Drop for Sender<T, P, NUM_SEGS_P2> {
419    fn drop(&mut self) {
420        self.sender.close_channel();
421    }
422}
423
424pub struct Receiver<T, const P: usize, const NUM_SEGS_P2: usize> {
425    inner: Arc<Inner<T, P, NUM_SEGS_P2>>,
426    misses: u64,
427    seed: u64,
428}
429
430impl<T, const P: usize, const NUM_SEGS_P2: usize> Receiver<T, P, NUM_SEGS_P2> {
431    pub fn next(&mut self) -> u64 {
432        let old_seed = self.seed;
433        let next_seed = (old_seed
434            .wrapping_mul(RND_MULTIPLIER)
435            .wrapping_add(RND_ADDEND))
436            & RND_MASK;
437        self.seed = next_seed;
438        next_seed >> 16
439    }
440
441    /// Check if the queue is closed
442    pub fn is_closed(&self) -> bool {
443        self.inner.closed.load(Ordering::Acquire)
444    }
445
446    /// Close the queue
447    ///
448    /// After closing, no more items can be pushed. This method will block until all
449    /// SPSC queues are empty. Wakes any waiting consumer.
450    ///
451    /// Note: This waits for consumer to drain all items from all producer queues.
452    pub fn close(&self) -> bool {
453        self.inner.close()
454    }
455
456    /// Get the number of registered producers
457    pub fn producer_count(&self) -> usize {
458        self.inner.producer_count.load(Ordering::Relaxed)
459    }
460
461    pub fn create_sender(&self) -> Result<Sender<T, P, NUM_SEGS_P2>, PushError<()>> {
462        self.create_sender_with_config(0)
463    }
464
465    /// Create a new producer handle that bypasses all thread-local caching
466    ///
467    /// This creates a direct, high-performance handle to a specific producer queue.
468    /// The handle provides push-only access without any thread-local overhead, making
469    /// it ideal for scenarios where you need maximum performance and want to maintain
470    /// explicit control over producer instances.
471    ///
472    /// Unlike `get_producer_queue()`, this method:
473    /// - Does not register with thread-local storage
474    /// - Does not use caching mechanisms
475    /// - Provides a standalone handle that can be stored and reused
476    /// - Offers maximum push performance
477    ///
478    /// # Returns
479    ///
480    /// Returns a `ProducerHandle` that can be used to push values, or `PushError::Closed`
481    /// if the MPSC queue is closed.
482    ///
483    /// # Example
484    ///
485    /// ```ignoreignore
486    /// let mpsc = MpscBlocking::<i32, 64>::new();
487    ///
488    /// // Create a direct producer handle
489    /// let producer = mpsc.create_producer_handle().unwrap();
490    ///
491    /// // Use the handle for high-performance pushes
492    /// producer.push(42).unwrap();
493    /// producer.push_bulk(&[1, 2, 3]).unwrap();
494    /// ```ignore
495    pub fn create_sender_with_config(
496        &self,
497        max_pooled_segments: usize,
498    ) -> Result<Sender<T, P, NUM_SEGS_P2>, PushError<()>> {
499        self.inner.create_sender_with_config(max_pooled_segments)
500    }
501
502    /// Pop a value from the queue (non-blocking) using supplied Selector
503    ///
504    /// Returns immediately with Empty if no items are available.
505    ///
506    /// # Safety
507    ///
508    /// This method may not be called concurrently from multiple threads.
509    pub fn try_pop(&mut self) -> Result<T, PopError> {
510        let mut slot = MaybeUninit::<T>::uninit();
511        let slice = unsafe { std::slice::from_raw_parts_mut(slot.as_mut_ptr(), 1) };
512
513        let drained = self.try_pop_n(slice);
514        if drained == 0 {
515            if self.is_closed() && self.inner.producer_count.load(Ordering::Acquire) == 0 {
516                Err(PopError::Closed)
517            } else {
518                Err(PopError::Empty)
519            }
520        } else {
521            Ok(unsafe { slot.assume_init() })
522        }
523    }
524
525    // /// Drain up to max_count items from the queue with a provided selector
526    // ///
527    // /// This method efficiently drains items from multiple producer queues, automatically
528    // /// cleaning up producer queues that are empty and closed.
529    // ///
530    // /// Returns the total number of items drained across all producer queues.
531    // ///
532    // /// # Multi-Consumer Safety
533    // ///
534    // /// This method IS safe to call concurrently from multiple consumer threads.
535    // /// The implementation uses atomic signal acquisition and CAS-based draining to coordinate.
536    // pub fn consume_in_place<F>(&mut self, mut f: F, max_count: usize) -> usize
537    // where
538    //     F: FnMut(T),
539    // {
540    //     let mut total_drained = 0;
541    //     let mut remaining = max_count;
542
543    //     // Try to drain from each producer queue using the selector for fairness
544    //     // We'll cycle through producers until we've drained max_count items or all queues are empty
545    //     let mut consecutive_empty = 0;
546
547    //     for _ in 0..8 {
548    //         // If we've checked all producers and found them all empty, we're done
549    //         if consecutive_empty >= MAX_QUEUES {
550    //             break;
551    //         }
552
553    //         // Get signal index from selector for fairness
554    //         let mut producer_id = (self.next() as usize) & MAX_QUEUES_MASK;
555    //         let signal_index = producer_id / 64;
556    //         let mut signal_bit = (producer_id - (signal_index * 64)) as u64;
557
558    //         let signal = &self.inner.signals[signal_index];
559    //         let signal_value = signal.load(Ordering::Acquire);
560
561    //         // Any signals?
562    //         if signal_value == 0 {
563    //             consecutive_empty += 1;
564    //             continue;
565    //         }
566
567    //         // Find nearest set bit for fairness
568    //         signal_bit = find_nearest(signal_value, signal_bit);
569
570    //         if signal_bit >= 64 {
571    //             consecutive_empty += 1;
572    //             continue;
573    //         }
574
575    //         producer_id = signal_index * 64 + (signal_bit as usize);
576
577    //         // Atomically acquire the bit
578    //         let (bit, expected, acquired) = signal.try_acquire(signal_bit);
579
580    //         if !acquired {
581    //             // Contention - try next
582    //             std::hint::spin_loop();
583    //             continue;
584    //         }
585
586    //         // Is the signal empty?
587    //         let empty = expected == bit;
588
589    //         if empty {
590    //             self.inner
591    //                 .waker
592    //                 .try_unmark_if_empty(signal.index(), signal.value());
593    //         }
594
595    //         // Load the queue pointer atomically
596    //         let queue_ptr = self.inner.queues[producer_id].load(Ordering::Acquire);
597    //         if queue_ptr.is_null() {
598    //             consecutive_empty += 1;
599    //             continue;
600    //         }
601
602    //         // SAFETY: The pointer is valid and we have exclusive consumer access
603    //         let queue = unsafe { &*queue_ptr };
604
605    //         // Drain from this producer's queue using consume_in_place
606    //         let drained = queue.consume_in_place(remaining, |chunk| {
607    //             for item in chunk {
608    //                 f(*item);
609    //             }
610    //             chunk.len()
611    //         });
612
613    //         total_drained += drained;
614    //         remaining -= drained;
615
616    //         // Handle cleanup and rescheduling
617    //         let is_empty = queue.is_empty();
618    //         let can_dispose = self.is_closed() && is_empty;
619
620    //         if can_dispose {
621    //             // Queue is empty and closed - clean up the slot
622    //             let old_ptr =
623    //                 self.inner.queues[producer_id].swap(ptr::null_mut(), Ordering::AcqRel);
624
625    //             if !old_ptr.is_null() {
626    //                 // Decrement producer count
627    //                 self.inner.producer_count.fetch_sub(1, Ordering::Relaxed);
628    //                 self.inner.queue_count.fetch_sub(1, Ordering::Relaxed);
629
630    //                 unsafe {
631    //                     Arc::from_raw(old_ptr);
632    //                 }
633    //             }
634
635    //             consecutive_empty += 1;
636    //         } else if drained > 0 && !is_empty {
637    //             // Queue still has items - reschedule
638    //             let prev = signal.set_with_bit(bit);
639    //             if prev == 0 && !empty {
640    //                 self.inner.waker.mark_active(signal.index());
641    //                 // self.inner.waker.increment();
642    //             }
643    //         } else if drained == 0 {
644    //             // Queue is empty
645    //             consecutive_empty += 1;
646    //         }
647    //     }
648
649    //     total_drained
650    // }
651
652    pub fn try_pop_n(&mut self, batch: &mut [T]) -> usize {
653        self.try_pop_n_with_producer(batch).0
654    }
655
656    /// Attempts to pop a single value and returns the item along with the producer id.
657    pub fn try_pop_with_id(&mut self) -> Result<(T, usize), PopError> {
658        let mut slot = MaybeUninit::<T>::uninit();
659        let slice = unsafe { std::slice::from_raw_parts_mut(slot.as_mut_ptr(), 1) };
660        let (drained, producer_id) = self.try_pop_n_with_producer(slice);
661        if drained == 0 {
662            if self.is_closed() && self.inner.producer_count.load(Ordering::Acquire) == 0 {
663                Err(PopError::Closed)
664            } else {
665                Err(PopError::Empty)
666            }
667        } else {
668            debug_assert!(producer_id.is_some());
669            let value = unsafe { slot.assume_init() };
670            Ok((
671                value,
672                producer_id.expect("producer id missing for drained item"),
673            ))
674        }
675    }
676
677    fn try_pop_n_with_producer(&mut self, batch: &mut [T]) -> (usize, Option<usize>) {
678        for _ in 0..64 {
679            match self.acquire() {
680                Some((producer_id, queue)) => {
681                    match queue.try_pop_n(batch) {
682                        Ok(size) => {
683                            queue.unmark_and_schedule();
684                            return (size, Some(producer_id));
685                        }
686                        Err(PopError::Closed) => {
687                            queue.unmark();
688                            let old_ptr = self.inner.queues[producer_id]
689                                .swap(ptr::null_mut(), Ordering::AcqRel);
690
691                            if !old_ptr.is_null() {
692                                self.inner.producer_count.fetch_sub(1, Ordering::Relaxed);
693                                self.inner.queue_count.fetch_sub(1, Ordering::Relaxed);
694
695                                unsafe {
696                                    Arc::from_raw(old_ptr);
697                                }
698                            }
699                        }
700                        Err(_) => {
701                            queue.unmark();
702                            self.misses += 1;
703                        }
704                    };
705                }
706                None => {
707                    // No available producer this round
708                }
709            }
710        }
711
712        (0, None)
713    }
714
715    fn acquire(&mut self) -> Option<(usize, crate::spsc::UnboundedReceiver<T, P, NUM_SEGS_P2, Arc<SignalGate>>)> {
716        let random = self.next() as usize;
717        // Try selecting signal index from summary hint
718        let random_word = random % SIGNAL_WORDS;
719        let mut signal_index = self.inner.waker.summary_select(random_word as u64) as usize;
720
721        if signal_index >= SIGNAL_WORDS {
722            signal_index = random_word;
723        }
724
725        let mut signal_bit = self.next() & 63;
726        let signal = &self.inner.signals[signal_index];
727        let signal_value = signal.load(Ordering::Acquire);
728
729        // Find nearest set bit for fairness
730        signal_bit = find_nearest(signal_value, signal_bit);
731
732        // 64 and over is out of bounds
733        if signal_bit >= 64 {
734            self.misses += 1;
735            return None;
736        }
737
738        // Atomically acquire the bit
739        let (bit, expected, acquired) = signal.try_acquire(signal_bit);
740
741        if !acquired {
742            // Contention - try next
743            // self.contention += 1;
744            std::hint::spin_loop();
745            return None;
746        }
747
748        // Is the signal empty?
749        let empty = expected == bit;
750
751        if empty {
752            self.inner
753                .waker
754                .try_unmark_if_empty(signal.index(), signal.value());
755        }
756
757        // Compute producer id
758        let producer_id = signal_index * 64 + (signal_bit as usize);
759
760        // Load the queue pointer atomically
761        let queue_ptr = self.inner.queues[producer_id].load(Ordering::Acquire);
762        if queue_ptr.is_null() {
763            self.misses += 1;
764            if empty {
765                self.inner
766                    .waker
767                    .try_unmark_if_empty(signal.index(), signal.value());
768            }
769            return None;
770        }
771
772        // SAFETY: The pointer is valid and we have exclusive consumer access
773        let unbounded_arc = unsafe { Arc::from_raw(queue_ptr) };
774        let receiver = unbounded_arc.create_receiver();
775        
776        // Mark as EXECUTING
777        receiver.mark();
778        
779        // Don't drop the Arc - we're just borrowing it
780        let _ = Arc::into_raw(unbounded_arc);
781
782        Some((producer_id, receiver))
783    }
784}
785
786impl<T, const P: usize, const NUM_SEGS_P2: usize> Drop for Receiver<T, P, NUM_SEGS_P2> {
787    fn drop(&mut self) {
788        self.inner.close();
789    }
790}
791
792impl<T, const P: usize, const NUM_SEGS_P2: usize> Drop for Inner<T, P, NUM_SEGS_P2> {
793    fn drop(&mut self) {
794        self.close();
795    }
796}
797
798unsafe impl<T: Send, const P: usize, const NUM_SEGS_P2: usize> Send for Sender<T, P, NUM_SEGS_P2> {}
799unsafe impl<T: Send, const P: usize, const NUM_SEGS_P2: usize> Send
800    for Receiver<T, P, NUM_SEGS_P2>
801{
802}
803
804#[cfg(test)]
805mod tests {
806    use super::*;
807    use std::sync::atomic::Ordering;
808
809    #[test]
810    fn try_pop_drains_and_reports_closed() {
811        let (mut tx, mut rx) = new_with_sender::<u64, 6, 8>();
812
813        tx.try_push(42).unwrap();
814        assert_eq!(rx.try_pop().unwrap(), 42);
815        assert_eq!(rx.try_pop(), Err(PopError::Empty));
816
817        assert!(rx.close());
818        assert_eq!(rx.try_pop(), Err(PopError::Closed));
819    }
820
821    #[test]
822    fn dropping_local_sender_clears_producer_slot() {
823        let (tx, rx) = new_with_sender::<u64, 6, 8>();
824        assert_eq!(tx.producer_count(), 1);
825
826        drop(tx);
827
828        // Closing will walk the slots and remove the dropped sender.
829        assert!(rx.close());
830        assert_eq!(rx.producer_count(), 0);
831        assert_eq!(rx.inner.queue_count.load(Ordering::SeqCst), 0);
832    }
833
834    #[test]
835    fn single_producer_multiple_items() {
836        let (mut tx, mut rx) = new_with_sender::<u64, 6, 8>();
837
838        // Push multiple items
839        for i in 0..100 {
840            tx.try_push(i).expect("push should not fail for unbounded queue");
841        }
842
843        // Pop and verify
844        for i in 0..100 {
845            assert_eq!(rx.try_pop().unwrap(), i);
846        }
847
848        // Queue should be empty
849        assert_eq!(rx.try_pop(), Err(PopError::Empty));
850    }
851
852    #[test]
853    fn multiple_producers_single_consumer() {
854        use std::thread;
855
856        let (tx, mut rx) = new_with_sender::<u64, 6, 8>();
857        let mut received = vec![false; 30]; // Track which items we received
858
859        // Spawn 3 producer threads, each pushing 10 items
860        let handles: Vec<_> = (0..3)
861            .map(|producer_id| {
862                let tx = tx.clone();
863                thread::spawn(move || {
864                    for i in 0..10 {
865                        let value = (producer_id * 10 + i) as u64;
866                        tx.clone().try_push(value).expect("push should succeed");
867                    }
868                })
869            })
870            .collect();
871
872        // Wait for all producers to finish
873        for handle in handles {
874            handle.join().unwrap();
875        }
876
877        // Consume all items and verify they're in range
878        let mut count = 0;
879        for _ in 0..100 {
880            if let Ok(value) = rx.try_pop() {
881                assert!(value < 30, "received unexpected value: {}", value);
882                received[value as usize] = true;
883                count += 1;
884            } else {
885                break;
886            }
887        }
888
889        // Should have received all 30 items
890        assert_eq!(count, 30, "expected 30 items, got {}", count);
891        for (i, &received_item) in received.iter().enumerate() {
892            assert!(received_item, "item {} was not received", i);
893        }
894    }
895
896    #[test]
897    fn unbounded_growth_with_large_batches() {
898        let (mut tx, mut rx) = new_with_sender::<u64, 2, 2>(); // Small segments for growth testing
899
900        // Push many items, causing growth
901        let mut items_to_push: Vec<u64> = (0..1000).collect();
902        let pushed = tx.try_push_n(&mut items_to_push).expect("bulk push should succeed");
903        assert_eq!(pushed, 1000, "should push all items");
904        assert!(items_to_push.is_empty(), "Vec should be drained");
905
906        // Verify all items are received
907        for i in 0..1000 {
908            assert_eq!(rx.try_pop().unwrap(), i as u64);
909        }
910
911        assert_eq!(rx.try_pop(), Err(PopError::Empty));
912    }
913
914    #[test]
915    fn close_stops_receives() {
916        let (mut tx, mut rx) = new_with_sender::<u64, 6, 8>();
917
918        tx.try_push(42).unwrap();
919        assert_eq!(rx.try_pop().unwrap(), 42);
920
921        // Close the queue
922        rx.close();
923
924        // No more receives should succeed
925        assert_eq!(rx.try_pop(), Err(PopError::Closed));
926
927        // Tries to push should fail
928        assert_eq!(tx.try_push(100), Err(PushError::Closed(100)));
929    }
930
931    #[test]
932    fn multiple_senders_cloning() {
933        let (tx, mut rx) = new_with_sender::<u64, 6, 8>();
934        assert_eq!(tx.producer_count(), 1);
935
936        let mut tx2 = tx.clone();
937        assert_eq!(tx2.producer_count(), 2);
938
939        let mut tx3 = tx.clone();
940        assert_eq!(tx3.producer_count(), 3);
941
942        // All clones can push
943        drop(tx);
944        tx2.try_push(1).unwrap();
945        tx3.try_push(2).unwrap();
946
947        // Collect items (order may vary with MPSC)
948        let mut items = Vec::new();
949        for _ in 0..2 {
950            if let Ok(val) = rx.try_pop() {
951                items.push(val);
952            }
953        }
954        
955        items.sort();
956        assert_eq!(items, vec![1, 2]);
957    }
958
959    #[test]
960    fn interleaved_push_pop() {
961        let (mut tx, mut rx) = new_with_sender::<u64, 6, 8>();
962
963        // Push and pop interleaved
964        let mut received = Vec::new();
965        for i in 0..10 {
966            tx.try_push(i * 2).unwrap();
967            tx.try_push(i * 2 + 1).unwrap();
968            // Try to pop immediately - may or may not get anything
969            while let Ok(value) = rx.try_pop() {
970                received.push(value);
971            }
972        }
973
974        // Pop remaining items
975        while let Ok(value) = rx.try_pop() {
976            received.push(value);
977        }
978
979        // Verify we got all 20 items
980        assert_eq!(received.len(), 20);
981        received.sort();
982        for i in 0..20 {
983            assert_eq!(received[i], i as u64);
984        }
985
986        assert_eq!(rx.try_pop(), Err(PopError::Empty));
987    }
988
989    #[test]
990    fn batch_push_pop() {
991        let (mut tx, mut rx) = new_with_sender::<u64, 6, 8>();
992
993        // Push batch
994        let mut items: Vec<u64> = (0..50).collect();
995        let pushed = tx.try_push_n(&mut items).expect("bulk push should succeed");
996        assert_eq!(pushed, 50);
997        assert!(items.is_empty());
998
999        // Pop all items
1000        let mut dst = [0u64; 100];
1001        let popped = rx.try_pop_n(&mut dst);
1002        assert_eq!(popped, 50);
1003
1004        for i in 0..50 {
1005            assert_eq!(dst[i], i as u64);
1006        }
1007    }
1008
1009    #[test]
1010    fn concurrent_push_pop() {
1011        use std::thread;
1012        use std::sync::Arc;
1013        use std::sync::{Mutex};
1014        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1015
1016        let (tx, rx) = new_with_sender::<u64, 6, 8>();
1017        let counter = Arc::new(AtomicUsize::new(0));
1018
1019        let rx = Arc::new(Mutex::new(rx));
1020        let rx_clone = Arc::clone(&rx);
1021        let counter_clone = Arc::clone(&counter);
1022
1023        // Producer thread
1024        let producer = thread::spawn(move || {
1025            let mut tx = tx;
1026            for i in 0..1000 {
1027                tx.try_push(i).expect("push should succeed");
1028                thread::yield_now();
1029            }
1030        });
1031
1032        // Consumer thread
1033        let consumer = thread::spawn(move || {
1034            let mut rx = rx_clone.lock().unwrap();
1035            let mut count: usize = 0;
1036            for _ in 0..10000 {
1037                if let Ok(value) = rx.try_pop() {
1038                    assert_eq!(value, count as u64);
1039                    count += 1;
1040                    if count >= 1000 {
1041                        break;
1042                    }
1043                } else {
1044                    thread::yield_now();
1045                }
1046            }
1047            counter_clone.fetch_add(count, AtomicOrdering::Relaxed);
1048        });
1049
1050        producer.join().unwrap();
1051        consumer.join().unwrap();
1052
1053        let final_count = counter.load(AtomicOrdering::Relaxed);
1054        assert_eq!(final_count, 1000, "should have received all 1000 items");
1055    }
1056
1057    #[test]
1058    fn stress_test_unbounded_expansion() {
1059        let (mut tx, mut rx) = new_with_sender::<u64, 2, 2>(); // Very small segments
1060
1061        // Push 10000 items to force significant expansion
1062        let mut items: Vec<u64> = (0..10000).collect();
1063        let pushed = tx.try_push_n(&mut items).expect("bulk push should succeed");
1064        assert_eq!(pushed, 10000);
1065
1066        // Verify all items in order
1067        for i in 0..10000 {
1068            assert_eq!(rx.try_pop().unwrap(), i as u64, "item {} mismatch", i);
1069        }
1070
1071        assert_eq!(rx.try_pop(), Err(PopError::Empty));
1072    }
1073
1074    #[test]
1075    fn producer_id_uniqueness() {
1076        let (tx1, _rx) = new_with_sender::<u64, 6, 8>();
1077        let tx2 = tx1.clone();
1078        let tx3 = tx1.clone();
1079
1080        // All should have different producer IDs
1081        assert_ne!(tx1.producer_id(), tx2.producer_id());
1082        assert_ne!(tx2.producer_id(), tx3.producer_id());
1083        assert_ne!(tx1.producer_id(), tx3.producer_id());
1084    }
1085
1086    #[test]
1087    fn receiver_count_tracking() {
1088        let (tx1, mut rx) = new_with_sender::<u64, 6, 8>();
1089        assert_eq!(rx.producer_count(), 1);
1090
1091        let tx2 = tx1.clone();
1092        assert_eq!(rx.producer_count(), 2);
1093
1094        let tx3 = tx2.clone();
1095        assert_eq!(rx.producer_count(), 3);
1096
1097        drop(tx1);
1098        rx.close(); // Trigger cleanup
1099        
1100        // After closing, all producers should be cleaned up
1101        assert_eq!(rx.producer_count(), 0);
1102    }
1103}