maniac_runtime/sync/
spsc.rs

1//! Async and Blocking SPSC (Single-Producer, Single-Consumer) queue implementation.
2//!
3//! This module provides both async and blocking adapters over the lock-free SPSC queue.
4//! The async variants implement `futures::Sink` and `futures::Stream` traits for
5//! seamless integration with async/await code. The blocking variants use efficient
6//! thread parking for synchronous operations.
7//!
8//! # Queue Variants
9//!
10//! - **Async**: [`AsyncSpscProducer`] / [`AsyncSpscConsumer`] - For use with async tasks
11//! - **Blocking**: [`BlockingSpscProducer`] / [`BlockingSpscConsumer`] - For use with threads
12//! - **Mixed**: You can mix async and blocking ends on the same queue!
13//!
14//! All variants share the same waker infrastructure, allowing seamless interoperability.
15//! A blocking producer can wake up an async consumer and vice versa.
16//!
17//! # Design Principles
18//!
19//! ## Correctness Guarantees
20//!
21//! The implementation uses the **double-check pattern** to prevent missed wakeups:
22//! 1. Check if operation is possible (space available / items available)
23//! 2. Register waker if not
24//! 3. Double-check after registering (catches races)
25//!
26//! This pattern, combined with `DiatomicWaker`'s acquire/release memory ordering,
27//! guarantees that no items are lost and no wakeups are missed, even in the presence
28//! of concurrent operations between producer and consumer.
29//!
30//! ## Memory Ordering
31//!
32//! The queue operations synchronize via acquire/release semantics:
33//! - Producer writes data (Release) → Consumer reads data (Acquire)
34//! - Consumer updates tail (Release) → Producer checks space (Acquire)
35//! - Notifications use `DiatomicWaker` with proper ordering
36//!
37//! ## Zero-Copy Design
38//!
39//! Items waiting to be sent are stored in the Future's stack frame, not in the
40//! `AsyncSpscProducer` struct. This eliminates the need for `T: Unpin` and keeps
41//! the implementation simple and efficient.
42//!
43//! ## Performance Characteristics
44//!
45//! - **Fast path**: ~5-15ns for non-blocking operations
46//! - **Notification overhead**: ~1-2ns when no waker registered
47//! - **Zero allocation**: All state lives on stack or in shared queue
48//! - **Cache-friendly**: Wakers are cache-padded to prevent false sharing
49//!
50//! # Examples
51//!
52//! ## Pure Async
53//!
54//! ```ignore
55//! use futures::{SinkExt, StreamExt};
56//!
57//! let (mut producer, mut consumer) = new_async_spsc(signal);
58//!
59//! // Producer task
60//! maniac::spawn(async move {
61//!     producer.send(42).await.unwrap();
62//!     producer.send(43).await.unwrap();
63//! });
64//!
65//! // Consumer task
66//! maniac::spawn(async move {
67//!     while let Some(item) = consumer.next().await {
68//!         println!("Got: {}", item);
69//!     }
70//! });
71//! ```
72//!
73//! ## Pure Blocking
74//!
75//! ```ignore
76//! let (producer, consumer) = new_blocking_spsc(signal);
77//!
78//! // Producer thread
79//! std::thread::spawn(move || {
80//!     producer.send(42).unwrap();
81//!     producer.send(43).unwrap();
82//! });
83//!
84//! // Consumer thread
85//! std::thread::spawn(move || {
86//!     while let Ok(item) = consumer.recv() {
87//!         println!("Got: {}", item);
88//!     }
89//! });
90//! ```
91//!
92//! ## Mixed: Blocking Producer + Async Consumer
93//!
94//! ```ignore
95//! let (producer, mut consumer) = new_blocking_async_spsc(signal);
96//!
97//! // Producer thread (blocking)
98//! std::thread::spawn(move || {
99//!     producer.send(42).unwrap();  // Parks thread if full
100//! });
101//!
102//! // Consumer task (async)
103//! maniac::spawn(async move {
104//!     let item = consumer.recv().await.unwrap();  // Wakes up blocking thread
105//! });
106//! ```
107//!
108//! ## Mixed: Async Producer + Blocking Consumer
109//!
110//! ```ignore
111//! let (mut producer, consumer) = new_async_blocking_spsc(signal);
112//!
113//! // Producer task (async)
114//! maniac::spawn(async move {
115//!     producer.send(42).await.unwrap();  // Wakes up blocking thread
116//! });
117//!
118//! // Consumer thread (blocking)
119//! std::thread::spawn(move || {
120//!     let item = consumer.recv().unwrap();  // Parks thread if empty
121//! });
122//! ```
123
124use std::sync::Arc;
125
126use crate::utils::CachePadded;
127
128use super::signal::AsyncSignalGate;
129
130use crate::{PopError, PushError};
131
132use std::pin::Pin;
133use std::task::{Context, Poll, Waker};
134
135use futures::{sink::Sink, stream::Stream};
136
137use crate::future::waker::DiatomicWaker;
138use crate::parking::{Parker, Unparker};
139use std::task::Wake;
140
141/// A waker implementation that unparks a thread.
142///
143/// Used to integrate blocking operations with the async waker infrastructure,
144/// allowing async and blocking operations to work together seamlessly.
145struct ThreadUnparker {
146    unparker: Unparker,
147}
148
149impl Wake for ThreadUnparker {
150    fn wake(self: Arc<Self>) {
151        self.unparker.unpark();
152    }
153
154    fn wake_by_ref(self: &Arc<Self>) {
155        self.unparker.unpark();
156    }
157}
158
159/// Shared wake infrastructure for the async adapters.
160///
161/// Maintains separate wakers for the producer (waiting for space) and consumer
162/// (waiting for items). The wakers are cache-padded to prevent false sharing
163/// between producer and consumer threads.
164///
165/// # Memory Ordering
166///
167/// The `DiatomicWaker` provides acquire/release semantics that synchronize with
168/// the underlying queue's memory ordering:
169/// - Producer: write data (Release) → notify_items → Consumer: read data (Acquire)
170/// - Consumer: read data → notify_space → Producer: check space (Acquire)
171///
172/// # Correctness
173///
174/// The double-check pattern (check → register → check) prevents missed wakeups:
175/// - If state changes before register, the second check catches it
176/// - If state changes after register, the waker gets notified
177/// - If state changes during register, `DiatomicWaker`'s state machine guarantees
178///   either the second check sees the change or the notifier wakes the waker
179struct AsyncSpscShared {
180    item_waiter: CachePadded<DiatomicWaker>,
181    space_waiter: CachePadded<DiatomicWaker>,
182}
183
184impl AsyncSpscShared {
185    fn new() -> Self {
186        Self {
187            item_waiter: CachePadded::new(DiatomicWaker::new()),
188            space_waiter: CachePadded::new(DiatomicWaker::new()),
189        }
190    }
191
192    #[inline]
193    fn notify_items(&self) {
194        self.item_waiter.notify();
195    }
196
197    #[inline]
198    fn notify_space(&self) {
199        self.space_waiter.notify();
200    }
201
202    #[inline]
203    unsafe fn wait_for_items<P, T>(&self, predicate: P) -> crate::future::waker::WaitUntil<'_, P, T>
204    where
205        P: FnMut() -> Option<T>,
206    {
207        unsafe { self.item_waiter.wait_until(predicate) }
208    }
209
210    #[inline]
211    unsafe fn wait_for_space<P, T>(&self, predicate: P) -> crate::future::waker::WaitUntil<'_, P, T>
212    where
213        P: FnMut() -> Option<T>,
214    {
215        unsafe { self.space_waiter.wait_until(predicate) }
216    }
217
218    #[inline]
219    unsafe fn register_items(&self, waker: &Waker) {
220        unsafe { self.item_waiter.register(waker) };
221    }
222
223    #[inline]
224    unsafe fn register_space(&self, waker: &Waker) {
225        unsafe { self.space_waiter.register(waker) };
226    }
227}
228
229/// Asynchronous producer façade for [`SegSpsc`].
230pub struct AsyncSpscProducer<T, const P: usize, const NUM_SEGS_P2: usize> {
231    sender: crate::spsc::Sender<T, P, NUM_SEGS_P2, AsyncSignalGate>,
232    shared: Arc<AsyncSpscShared>,
233}
234
235impl<T, const P: usize, const NUM_SEGS_P2: usize> AsyncSpscProducer<T, P, NUM_SEGS_P2> {
236    fn new(
237        sender: crate::spsc::Sender<T, P, NUM_SEGS_P2, AsyncSignalGate>,
238        shared: Arc<AsyncSpscShared>,
239    ) -> Self {
240        Self { sender, shared }
241    }
242
243    /// Capacity of the underlying queue.
244    #[inline]
245    pub fn capacity(&self) -> usize {
246        crate::spsc::Spsc::<T, P, NUM_SEGS_P2, AsyncSignalGate>::capacity()
247    }
248
249    /// Fast-path send without suspension.
250    ///
251    /// Attempts to send an item immediately without blocking. Always notifies
252    /// waiting consumers on success to prevent missed wakeups.
253    ///
254    /// # Performance
255    ///
256    /// - Success path: ~5-15ns (queue write + notify check)
257    /// - Notify overhead: ~1-2ns when no consumer waiting
258    #[inline]
259    pub fn try_send(&self, value: T) -> Result<(), PushError<T>> {
260        match self.sender.try_push(value) {
261            Ok(()) => {
262                // Always notify consumers after successful write.
263                // This is cheap (~1-2ns) when no waker is registered.
264                self.shared.notify_items();
265                Ok(())
266            }
267            Err(err) => Err(err),
268        }
269    }
270
271    /// Asynchronously sends a single item.
272    ///
273    /// Tries to send immediately; if the queue is full, suspends until space
274    /// becomes available. The item is held in the Future's stack frame while
275    /// waiting, avoiding the need for a `pending` field in the struct.
276    ///
277    /// # Correctness
278    ///
279    /// The item is never dropped or lost:
280    /// - On success: item is in the queue
281    /// - On `Full`: item is stored in `pending` (Future stack frame)
282    /// - On `Closed`: item is returned in the error
283    ///
284    /// The predicate is called on each wakeup to retry sending. The `wait_for_space`
285    /// future uses the double-check pattern internally to prevent missed wakeups.
286    ///
287    /// # Safety
288    ///
289    /// The `wait_for_space` call is safe because:
290    /// - `AsyncSpscProducer` is `!Clone` and `!Sync` (single-threaded access)
291    /// - SPSC guarantees only one producer thread
292    /// - Therefore, no concurrent calls to `register` or `wait_until` on `space_waiter`
293    pub async fn send(&mut self, value: T) -> Result<(), PushError<T>> {
294        match self.try_send(value) {
295            Ok(()) => Ok(()),
296            Err(PushError::Full(item)) => {
297                // Store item in Future's stack frame (not in struct).
298                // This avoids needing T: Unpin and keeps the struct simple.
299                let mut pending = Some(item);
300                let sender = &self.sender;
301                let shared = &self.shared;
302                unsafe {
303                    shared
304                        .wait_for_space(|| {
305                            // Try to send on each wakeup.
306                            let candidate = pending.take()?;
307                            match sender.try_push(candidate) {
308                                Ok(()) => {
309                                    // Success! Notify waiting consumers.
310                                    shared.notify_items();
311                                    Some(Ok(()))
312                                }
313                                Err(PushError::Full(candidate)) => {
314                                    // Still full, restore item and keep waiting.
315                                    pending = Some(candidate);
316                                    None
317                                }
318                                Err(PushError::Closed(candidate)) => {
319                                    // Channel closed, return error with item.
320                                    Some(Err(PushError::Closed(candidate)))
321                                }
322                            }
323                        })
324                        .await
325                }
326            }
327            Err(PushError::Closed(item)) => Err(PushError::Closed(item)),
328        }
329    }
330
331    /// Sends an entire Vec, awaiting at most once if the queue fills.
332    ///
333    /// Makes progress whenever space is available, writing as many items as possible
334    /// in each attempt. Items are moved out of the Vec using move semantics.
335    /// Notifies consumers after each batch write (not just at the end),
336    /// allowing the consumer to start processing while the producer is still sending.
337    ///
338    /// On return, the Vec will be empty if all items were sent, or contain only
339    /// the items that were not sent (if the channel closed).
340    ///
341    /// # Efficiency
342    ///
343    /// - Amortizes notification overhead across batch (single notify per write batch)
344    /// - Allows progressive consumption (consumer doesn't wait for entire batch)
345    /// - Move semantics (no Clone/Copy required)
346    pub async fn send_batch(&mut self, values: &mut Vec<T>) -> Result<(), PushError<()>> {
347        if values.is_empty() {
348            return Ok(());
349        }
350
351        let sender = &self.sender;
352        let shared = &self.shared;
353
354        match sender.try_push_n(values) {
355            Ok(written) => {
356                if written > 0 {
357                    shared.notify_items();
358                    if values.is_empty() {
359                        return Ok(());
360                    }
361                }
362            }
363            Err(PushError::Closed(())) => return Err(PushError::Closed(())),
364            Err(PushError::Full(())) => {}
365        }
366
367        unsafe {
368            shared
369                .wait_for_space(|| {
370                    if values.is_empty() {
371                        return Some(Ok(()));
372                    }
373
374                    match sender.try_push_n(values) {
375                        Ok(written) => {
376                            if written > 0 {
377                                shared.notify_items();
378                                if values.is_empty() {
379                                    return Some(Ok(()));
380                                }
381                            }
382                            None
383                        }
384                        Err(PushError::Full(())) => None,
385                        Err(PushError::Closed(())) => Some(Err(PushError::Closed(()))),
386                    }
387                })
388                .await
389        }
390    }
391
392    /// Sends every item from the iterator, awaiting as required.
393    pub async fn send_iter<I>(&mut self, iter: I) -> Result<(), PushError<T>>
394    where
395        I: IntoIterator<Item = T>,
396    {
397        for item in iter {
398            self.send(item).await?;
399        }
400        Ok(())
401    }
402
403    /// Closes the queue and wakes any waiters.
404    pub fn close(&mut self) {
405        self.sender.close_channel();
406        self.shared.notify_items();
407        self.shared.notify_space();
408    }
409}
410
411impl<T, const P: usize, const NUM_SEGS_P2: usize> Sink<T> for AsyncSpscProducer<T, P, NUM_SEGS_P2> {
412    type Error = PushError<T>;
413
414    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), PushError<T>>> {
415        // Safety: AsyncSpscProducer is not self-referential, so get_unchecked_mut is safe.
416        // The Pin guarantee gives us exclusive mutable access.
417        let this = unsafe { self.get_unchecked_mut() };
418
419        // Fast path: check if there's space available
420        if !this.sender.is_full() {
421            return Poll::Ready(Ok(()));
422        }
423
424        // No space available. Register waker to be notified when space frees up.
425        //
426        // Safety: This is safe because:
427        // - AsyncSpscProducer is !Clone and !Sync (single-threaded access)
428        // - SPSC guarantees only one producer thread
429        // - Therefore no concurrent calls to register_space or wait_for_space
430        unsafe {
431            this.shared.register_space(cx.waker());
432        }
433
434        // Double-check after registering to prevent missed wakeups.
435        //
436        // Race scenarios:
437        // 1. Consumer frees space BEFORE register: double-check catches it → Ready
438        // 2. Consumer frees space AFTER register: waker gets notified → will poll again
439        // 3. Consumer frees space DURING register: DiatomicWaker state machine guarantees
440        //    either we see the change here, or the consumer sees our waker → no missed wakeup
441        if !this.sender.is_full() {
442            return Poll::Ready(Ok(()));
443        }
444
445        Poll::Pending
446    }
447
448    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), PushError<T>> {
449        // Safety: Same as poll_ready - AsyncSpscProducer is not self-referential.
450        let this = unsafe { self.get_unchecked_mut() };
451
452        // For SPSC with single producer, if poll_ready returned Ready, the queue
453        // cannot become full before start_send (no other producers to race with).
454        // However, the channel might be closed, so we still handle errors properly.
455        this.try_send(item)
456    }
457
458    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), PushError<T>>> {
459        // SPSC queue has no buffering at the Sink level (items go directly to queue),
460        // so flush is always immediately complete.
461        Poll::Ready(Ok(()))
462    }
463
464    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), PushError<T>>> {
465        // Safety: Same as poll_ready - AsyncSpscProducer is not self-referential.
466        let this = unsafe { self.get_unchecked_mut() };
467        this.close();
468        Poll::Ready(Ok(()))
469    }
470}
471
472/// Asynchronous consumer façade for [`SegSpsc`].
473pub struct AsyncSpscConsumer<T, const P: usize, const NUM_SEGS_P2: usize> {
474    receiver: crate::spsc::Receiver<T, P, NUM_SEGS_P2, AsyncSignalGate>,
475    shared: Arc<AsyncSpscShared>,
476}
477
478impl<T, const P: usize, const NUM_SEGS_P2: usize> AsyncSpscConsumer<T, P, NUM_SEGS_P2> {
479    fn new(
480        receiver: crate::spsc::Receiver<T, P, NUM_SEGS_P2, AsyncSignalGate>,
481        shared: Arc<AsyncSpscShared>,
482    ) -> Self {
483        Self { receiver, shared }
484    }
485
486    /// Capacity of the underlying queue.
487    #[inline]
488    pub fn capacity(&self) -> usize {
489        crate::spsc::Spsc::<T, P, NUM_SEGS_P2, AsyncSignalGate>::capacity()
490    }
491
492    /// Attempts to receive without awaiting.
493    ///
494    /// Always notifies waiting producers on success to prevent missed wakeups.
495    ///
496    /// # Performance
497    ///
498    /// - Success path: ~5-15ns (queue read + notify check)
499    /// - Notify overhead: ~1-2ns when no producer waiting
500    #[inline]
501    pub fn try_recv(&self) -> Result<T, PopError> {
502        match self.receiver.try_pop() {
503            Some(value) => {
504                // Always notify producers after successful read.
505                // This is cheap (~1-2ns) when no waker is registered.
506                self.shared.notify_space();
507                Ok(value)
508            }
509            None if self.receiver.is_closed() => Err(PopError::Closed),
510            None => Err(PopError::Empty),
511        }
512    }
513
514    /// Asynchronously receives a single item.
515    ///
516    /// Tries to receive immediately; if the queue is empty, suspends until an
517    /// item becomes available or the channel is closed.
518    ///
519    /// # Correctness
520    ///
521    /// The predicate is called on each wakeup to retry receiving. The `wait_for_items`
522    /// future uses the double-check pattern internally to prevent missed wakeups.
523    ///
524    /// # Safety
525    ///
526    /// The `wait_for_items` call is safe because:
527    /// - `AsyncSpscConsumer` is `!Clone` and `!Sync` (single-threaded access)
528    /// - SPSC guarantees only one consumer thread
529    /// - Therefore, no concurrent calls to `register` or `wait_until` on `item_waiter`
530    pub async fn recv(&mut self) -> Result<T, PopError> {
531        // Fast path: try to receive immediately
532        match self.try_recv() {
533            Ok(value) => return Ok(value),
534            Err(PopError::Empty) | Err(PopError::Timeout) => {}
535            Err(PopError::Closed) => return Err(PopError::Closed),
536        }
537
538        let receiver = &self.receiver;
539        let shared = &self.shared;
540        unsafe {
541            shared
542                .wait_for_items(|| match receiver.try_pop() {
543                    Some(value) => {
544                        // Success! Notify waiting producers.
545                        shared.notify_space();
546                        Some(Ok(value))
547                    }
548                    None if receiver.is_closed() => Some(Err(PopError::Closed)),
549                    None => None, // Still empty, keep waiting
550                })
551                .await
552        }
553    }
554
555    /// Receives up to `dst.len()` items.
556    ///
557    /// Makes progress whenever items are available, reading as many as possible
558    /// in each attempt. Returns when the buffer is full or the channel is closed.
559    /// Notifies producers after each batch read to free up space progressively.
560    ///
561    /// # Returns
562    ///
563    /// - `Ok(count)`: Number of items read (may be less than `dst.len()`)
564    /// - `Err(PopError::Closed)`: Channel closed and no items available
565    ///
566    /// # Efficiency
567    ///
568    /// - Amortizes notification overhead across batch (single notify per read batch)
569    /// - Allows progressive production (producer can send more while consumer processes)
570    pub async fn recv_batch(&mut self, dst: &mut [T]) -> Result<usize, PopError> {
571        if dst.is_empty() {
572            return Ok(0);
573        }
574
575        let receiver = &self.receiver;
576        let shared = &self.shared;
577        let mut filled = match receiver.try_pop_n(dst) {
578            Ok(count) => {
579                if count > 0 {
580                    shared.notify_space();
581                }
582                count
583            }
584            Err(PopError::Empty) | Err(PopError::Timeout) => 0,
585            Err(PopError::Closed) => return Err(PopError::Closed),
586        };
587
588        if filled == dst.len() {
589            return Ok(filled);
590        }
591
592        unsafe {
593            shared
594                .wait_for_items(|| {
595                    if filled == dst.len() {
596                        return Some(Ok(filled));
597                    }
598
599                    match receiver.try_pop_n(&mut dst[filled..]) {
600                        Ok(0) => {
601                            if receiver.is_closed() {
602                                Some(if filled > 0 {
603                                    Ok(filled)
604                                } else {
605                                    Err(PopError::Closed)
606                                })
607                            } else {
608                                None
609                            }
610                        }
611                        Ok(count) => {
612                            filled += count;
613                            shared.notify_space();
614                            if filled == dst.len() {
615                                Some(Ok(filled))
616                            } else {
617                                None
618                            }
619                        }
620                        Err(PopError::Empty) | Err(PopError::Timeout) => {
621                            if receiver.is_closed() {
622                                Some(if filled > 0 {
623                                    Ok(filled)
624                                } else {
625                                    Err(PopError::Closed)
626                                })
627                            } else {
628                                None
629                            }
630                        }
631                        Err(PopError::Closed) => Some(if filled > 0 {
632                            Ok(filled)
633                        } else {
634                            Err(PopError::Closed)
635                        }),
636                    }
637                })
638                .await
639        }
640    }
641}
642
643impl<T, const P: usize, const NUM_SEGS_P2: usize> Stream for AsyncSpscConsumer<T, P, NUM_SEGS_P2> {
644    type Item = T;
645
646    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
647        // Safety: AsyncSpscConsumer is not self-referential, so get_unchecked_mut is safe.
648        // The Pin guarantee gives us exclusive mutable access.
649        let this = unsafe { self.get_unchecked_mut() };
650
651        // Fast path: check if there's an item available
652        match this.try_recv() {
653            Ok(value) => Poll::Ready(Some(value)),
654            Err(PopError::Closed) => Poll::Ready(None),
655            Err(PopError::Empty) | Err(PopError::Timeout) => {
656                // No items available. Register waker to be notified when items arrive.
657                //
658                // Safety: This is safe because:
659                // - AsyncSpscConsumer is !Clone and !Sync (single-threaded access)
660                // - SPSC guarantees only one consumer thread
661                // - Therefore no concurrent calls to register_items or wait_for_items
662                unsafe {
663                    this.shared.register_items(cx.waker());
664                }
665
666                // Double-check after registering to prevent missed wakeups.
667                //
668                // Race scenarios:
669                // 1. Producer sends item BEFORE register: double-check catches it → Ready
670                // 2. Producer sends item AFTER register: waker gets notified → will poll again
671                // 3. Producer sends item DURING register: DiatomicWaker state machine guarantees
672                //    either we see the item here, or the producer sees our waker → no missed wakeup
673                match this.try_recv() {
674                    Ok(value) => Poll::Ready(Some(value)),
675                    Err(PopError::Closed) => Poll::Ready(None),
676                    Err(PopError::Empty) | Err(PopError::Timeout) => Poll::Pending,
677                }
678            }
679        }
680    }
681}
682
683/// Blocking producer for SPSC queue.
684///
685/// This type provides blocking send operations that park the thread until space
686/// is available. It shares the same waker infrastructure as `AsyncSpscProducer`,
687/// allowing blocking and async operations to interoperate seamlessly.
688///
689/// # Interoperability
690///
691/// A `BlockingSpscProducer` can wake up an `AsyncSpscConsumer` and vice versa.
692/// This allows mixing blocking threads with async tasks in the same queue.
693///
694/// # Example
695///
696/// ```ignore
697/// // Create mixed queue: blocking producer, async consumer
698/// let (blocking_producer, async_consumer) = new_blocking_async_spsc(signal);
699///
700/// // Producer thread (blocking)
701/// std::thread::spawn(move || {
702///     blocking_producer.send(42).unwrap();
703/// });
704///
705/// // Consumer task (async)
706/// maniac::spawn(async move {
707///     let item = async_consumer.recv().await.unwrap();
708/// });
709/// ```
710pub struct BlockingSpscProducer<T, const P: usize, const NUM_SEGS_P2: usize> {
711    sender: crate::spsc::Sender<T, P, NUM_SEGS_P2, AsyncSignalGate>,
712    shared: Arc<AsyncSpscShared>,
713}
714
715impl<T, const P: usize, const NUM_SEGS_P2: usize> BlockingSpscProducer<T, P, NUM_SEGS_P2> {
716    fn new(
717        sender: crate::spsc::Sender<T, P, NUM_SEGS_P2, AsyncSignalGate>,
718        shared: Arc<AsyncSpscShared>,
719    ) -> Self {
720        Self { sender, shared }
721    }
722
723    /// Capacity of the underlying queue.
724    #[inline]
725    pub fn capacity(&self) -> usize {
726        crate::spsc::Spsc::<T, P, NUM_SEGS_P2, AsyncSignalGate>::capacity()
727    }
728
729    /// Fast-path send without blocking.
730    ///
731    /// Returns immediately with success or error. Does not block the thread.
732    #[inline]
733    pub fn try_send(&self, value: T) -> Result<(), PushError<T>> {
734        match self.sender.try_push(value) {
735            Ok(()) => {
736                self.shared.notify_items();
737                Ok(())
738            }
739            Err(err) => Err(err),
740        }
741    }
742
743    /// Blocking send that parks the thread until space is available.
744    ///
745    /// Uses efficient thread parking (no busy-waiting). The thread will be
746    /// unparked when the consumer (async or blocking) frees up space.
747    ///
748    /// # Correctness
749    ///
750    /// Uses the double-check pattern to prevent missed wakeups:
751    /// 1. Try to send
752    /// 2. Register waker if full
753    /// 3. Double-check after registering (catches races)
754    /// 4. Park if still full
755    ///
756    /// # Performance
757    ///
758    /// - Fast path (space available): ~5-15ns
759    /// - Blocking path: Efficient thread parking (no spinning)
760    pub fn send(&self, mut value: T) -> Result<(), PushError<T>> {
761        // Fast path: try immediate send
762        match self.try_send(value) {
763            Ok(()) => return Ok(()),
764            Err(PushError::Closed(item)) => return Err(PushError::Closed(item)),
765            Err(PushError::Full(item)) => value = item,
766        }
767
768        // Slow path: need to block
769        let parker = Parker::new();
770        let unparker = parker.unparker();
771        let waker = Waker::from(Arc::new(ThreadUnparker { unparker }));
772
773        loop {
774            // Register our waker
775            unsafe {
776                self.shared.register_space(&waker);
777            }
778
779            // Double-check after registering (prevent missed wakeup)
780            match self.sender.try_push(value) {
781                Ok(()) => {
782                    self.shared.notify_items();
783                    return Ok(());
784                }
785                Err(PushError::Full(item)) => {
786                    value = item;
787                    // Still full, park until woken
788                    parker.park();
789                    // Loop again after wakeup
790                }
791                Err(PushError::Closed(item)) => {
792                    return Err(PushError::Closed(item));
793                }
794            }
795        }
796    }
797
798    /// Blocking send of a Vec.
799    ///
800    /// Makes progress whenever space is available. Items are moved out of the Vec
801    /// using move semantics. More efficient than calling `send()` in a loop due to
802    /// bulk operations.
803    ///
804    /// On return, the Vec will be empty if all items were sent, or contain only
805    /// the items that were not sent (if the channel closed).
806    pub fn send_slice(&self, values: &mut Vec<T>) -> Result<(), PushError<()>> {
807        if values.is_empty() {
808            return Ok(());
809        }
810
811        // Try immediate send of as much as possible
812        match self.sender.try_push_n(values) {
813            Ok(written) => {
814                if written > 0 {
815                    self.shared.notify_items();
816                    if values.is_empty() {
817                        return Ok(());
818                    }
819                }
820            }
821            Err(PushError::Closed(())) => return Err(PushError::Closed(())),
822            Err(PushError::Full(())) => {}
823        }
824
825        // Slow path: need to block
826        let parker = Parker::new();
827        let unparker = parker.unparker();
828        let waker = Waker::from(Arc::new(ThreadUnparker { unparker }));
829
830        loop {
831            // Register our waker
832            unsafe {
833                self.shared.register_space(&waker);
834            }
835
836            // Double-check and try to make progress
837            if values.is_empty() {
838                return Ok(());
839            }
840
841            match self.sender.try_push_n(values) {
842                Ok(written) => {
843                    if written > 0 {
844                        self.shared.notify_items();
845                        if values.is_empty() {
846                            return Ok(());
847                        }
848                    }
849                    // Made progress but not done, park and try again
850                    parker.park();
851                }
852                Err(PushError::Full(())) => {
853                    // No progress, park until woken
854                    parker.park();
855                }
856                Err(PushError::Closed(())) => {
857                    return Err(PushError::Closed(()));
858                }
859            }
860        }
861    }
862
863    /// Closes the queue and wakes any waiters.
864    pub fn close(&mut self) {
865        self.sender.close_channel();
866        self.shared.notify_items();
867        self.shared.notify_space();
868    }
869}
870
871/// Blocking consumer for SPSC queue.
872///
873/// This type provides blocking receive operations that park the thread until
874/// items are available. It shares the same waker infrastructure as `AsyncSpscConsumer`,
875/// allowing blocking and async operations to interoperate seamlessly.
876///
877/// # Interoperability
878///
879/// A `BlockingSpscConsumer` can wake up an `AsyncSpscProducer` and vice versa.
880/// This allows mixing blocking threads with async tasks in the same queue.
881///
882/// # Example
883///
884/// ```ignore
885/// // Create mixed queue: async producer, blocking consumer
886/// let (async_producer, blocking_consumer) = new_async_blocking_spsc(signal);
887///
888/// // Producer task (async)
889/// maniac::spawn(async move {
890///     async_producer.send(42).await.unwrap();
891/// });
892///
893/// // Consumer thread (blocking)
894/// std::thread::spawn(move || {
895///     let item = blocking_consumer.recv().unwrap();
896/// });
897/// ```
898pub struct BlockingSpscConsumer<T, const P: usize, const NUM_SEGS_P2: usize> {
899    receiver: crate::spsc::Receiver<T, P, NUM_SEGS_P2, AsyncSignalGate>,
900    shared: Arc<AsyncSpscShared>,
901}
902
903impl<T, const P: usize, const NUM_SEGS_P2: usize> BlockingSpscConsumer<T, P, NUM_SEGS_P2> {
904    fn new(
905        receiver: crate::spsc::Receiver<T, P, NUM_SEGS_P2, AsyncSignalGate>,
906        shared: Arc<AsyncSpscShared>,
907    ) -> Self {
908        Self { receiver, shared }
909    }
910
911    /// Capacity of the underlying queue.
912    #[inline]
913    pub fn capacity(&self) -> usize {
914        crate::spsc::Spsc::<T, P, NUM_SEGS_P2, AsyncSignalGate>::capacity()
915    }
916
917    /// Fast-path receive without blocking.
918    ///
919    /// Returns immediately with success or error. Does not block the thread.
920    #[inline]
921    pub fn try_recv(&self) -> Result<T, PopError> {
922        match self.receiver.try_pop() {
923            Some(value) => {
924                self.shared.notify_space();
925                Ok(value)
926            }
927            None if self.receiver.is_closed() => Err(PopError::Closed),
928            None => Err(PopError::Empty),
929        }
930    }
931
932    /// Blocking receive that parks the thread until an item is available.
933    ///
934    /// Uses efficient thread parking (no busy-waiting). The thread will be
935    /// unparked when the producer (async or blocking) sends an item.
936    ///
937    /// # Correctness
938    ///
939    /// Uses the double-check pattern to prevent missed wakeups:
940    /// 1. Try to receive
941    /// 2. Register waker if empty
942    /// 3. Double-check after registering (catches races)
943    /// 4. Park if still empty
944    ///
945    /// # Performance
946    ///
947    /// - Fast path (item available): ~5-15ns
948    /// - Blocking path: Efficient thread parking (no spinning)
949    pub fn recv(&self) -> Result<T, PopError> {
950        // Fast path: try immediate receive
951        match self.try_recv() {
952            Ok(value) => return Ok(value),
953            Err(PopError::Closed) => return Err(PopError::Closed),
954            Err(PopError::Empty) | Err(PopError::Timeout) => {}
955        }
956
957        // Slow path: need to block
958        let parker = Parker::new();
959        let unparker = parker.unparker();
960        let waker = Waker::from(Arc::new(ThreadUnparker { unparker }));
961
962        loop {
963            // Register our waker
964            unsafe {
965                self.shared.register_items(&waker);
966            }
967
968            // Double-check after registering (prevent missed wakeup)
969            match self.receiver.try_pop() {
970                Some(value) => {
971                    self.shared.notify_space();
972                    return Ok(value);
973                }
974                None if self.receiver.is_closed() => {
975                    return Err(PopError::Closed);
976                }
977                None => {
978                    // Still empty, park until woken
979                    parker.park();
980                    // Loop again after wakeup
981                }
982            }
983        }
984    }
985
986    /// Blocking receive of multiple items.
987    ///
988    /// Receives up to `dst.len()` items, blocking until at least one is available.
989    /// Returns the number of items actually received.
990    pub fn recv_batch(&self, dst: &mut [T]) -> Result<usize, PopError> {
991        if dst.is_empty() {
992            return Ok(0);
993        }
994
995        let mut filled = match self.receiver.try_pop_n(dst) {
996            Ok(count) => {
997                if count > 0 {
998                    self.shared.notify_space();
999                    return Ok(count);
1000                }
1001                0
1002            }
1003            Err(PopError::Empty) | Err(PopError::Timeout) => 0,
1004            Err(PopError::Closed) => return Err(PopError::Closed),
1005        };
1006
1007        // Slow path: need to block
1008        let parker = Parker::new();
1009        let unparker = parker.unparker();
1010        let waker = Waker::from(Arc::new(ThreadUnparker { unparker }));
1011
1012        loop {
1013            // Register our waker
1014            unsafe {
1015                self.shared.register_items(&waker);
1016            }
1017
1018            // Double-check and try to make progress
1019            match self.receiver.try_pop_n(&mut dst[filled..]) {
1020                Ok(0) => {
1021                    if self.receiver.is_closed() {
1022                        return if filled > 0 {
1023                            Ok(filled)
1024                        } else {
1025                            Err(PopError::Closed)
1026                        };
1027                    }
1028                    // No items, park until woken
1029                    parker.park();
1030                }
1031                Ok(count) => {
1032                    filled += count;
1033                    self.shared.notify_space();
1034                    if filled == dst.len() || self.receiver.is_closed() {
1035                        return Ok(filled);
1036                    }
1037                    // Got some but not all, park and try again
1038                    parker.park();
1039                }
1040                Err(PopError::Empty) | Err(PopError::Timeout) => {
1041                    if self.receiver.is_closed() {
1042                        return if filled > 0 {
1043                            Ok(filled)
1044                        } else {
1045                            Err(PopError::Closed)
1046                        };
1047                    }
1048                    parker.park();
1049                }
1050                Err(PopError::Closed) => {
1051                    return if filled > 0 {
1052                        Ok(filled)
1053                    } else {
1054                        Err(PopError::Closed)
1055                    };
1056                }
1057            }
1058        }
1059    }
1060}
1061
1062/// Creates a default async segmented SPSC queue.
1063pub fn new_async_spsc<T, const P: usize, const NUM_SEGS_P2: usize>(
1064    signal: AsyncSignalGate,
1065) -> (
1066    AsyncSpscProducer<T, P, NUM_SEGS_P2>,
1067    AsyncSpscConsumer<T, P, NUM_SEGS_P2>,
1068) {
1069    new_async_with_config::<T, P, NUM_SEGS_P2>(signal, usize::MAX)
1070}
1071
1072/// Creates an async segmented SPSC queue with a custom pooling target.
1073pub fn new_async_with_config<T, const P: usize, const NUM_SEGS_P2: usize>(
1074    signal: AsyncSignalGate,
1075    max_pooled_segments: usize,
1076) -> (
1077    AsyncSpscProducer<T, P, NUM_SEGS_P2>,
1078    AsyncSpscConsumer<T, P, NUM_SEGS_P2>,
1079) {
1080    let shared = Arc::new(AsyncSpscShared::new());
1081    let (sender, receiver) =
1082        crate::spsc::Spsc::<T, P, NUM_SEGS_P2, AsyncSignalGate>::new_with_gate_and_config(
1083            signal,
1084            max_pooled_segments,
1085        );
1086    (
1087        AsyncSpscProducer::new(sender, Arc::clone(&shared)),
1088        AsyncSpscConsumer::new(receiver, shared),
1089    )
1090}
1091
1092/// Creates a default blocking segmented SPSC queue.
1093///
1094/// Both producer and consumer use blocking operations that park the thread.
1095///
1096/// # Example
1097///
1098/// ```ignore
1099/// let (producer, consumer) = new_blocking_spsc(signal);
1100///
1101/// // Producer thread
1102/// std::thread::spawn(move || {
1103///     producer.send(42).unwrap();
1104/// });
1105///
1106/// // Consumer thread
1107/// std::thread::spawn(move || {
1108///     let item = consumer.recv().unwrap();
1109/// });
1110/// ```
1111pub fn new_blocking_spsc<T, const P: usize, const NUM_SEGS_P2: usize>(
1112    signal: AsyncSignalGate,
1113) -> (
1114    BlockingSpscProducer<T, P, NUM_SEGS_P2>,
1115    BlockingSpscConsumer<T, P, NUM_SEGS_P2>,
1116) {
1117    new_blocking_with_config::<T, P, NUM_SEGS_P2>(signal, usize::MAX)
1118}
1119
1120/// Creates a blocking segmented SPSC queue with a custom pooling target.
1121pub fn new_blocking_with_config<T, const P: usize, const NUM_SEGS_P2: usize>(
1122    signal: AsyncSignalGate,
1123    max_pooled_segments: usize,
1124) -> (
1125    BlockingSpscProducer<T, P, NUM_SEGS_P2>,
1126    BlockingSpscConsumer<T, P, NUM_SEGS_P2>,
1127) {
1128    let shared = Arc::new(AsyncSpscShared::new());
1129    let (sender, receiver) =
1130        crate::spsc::Spsc::<T, P, NUM_SEGS_P2, AsyncSignalGate>::new_with_gate_and_config(
1131            signal,
1132            max_pooled_segments,
1133        );
1134    (
1135        BlockingSpscProducer::new(sender, Arc::clone(&shared)),
1136        BlockingSpscConsumer::new(receiver, shared),
1137    )
1138}
1139
1140/// Creates a mixed SPSC queue with blocking producer and async consumer.
1141///
1142/// The blocking producer and async consumer share the same waker infrastructure,
1143/// so they can wake each other efficiently. This is useful when you have a
1144/// blocking thread that needs to send data to an async task.
1145///
1146/// # Example
1147///
1148/// ```ignore
1149/// let (producer, consumer) = new_blocking_async_spsc(signal);
1150///
1151/// // Producer thread (blocking)
1152/// std::thread::spawn(move || {
1153///     producer.send(42).unwrap();
1154///     producer.send(43).unwrap();
1155/// });
1156///
1157/// // Consumer task (async)
1158/// maniac::spawn(async move {
1159///     while let Some(item) = consumer.next().await {
1160///         println!("Got: {}", item);
1161///     }
1162/// });
1163/// ```
1164pub fn new_blocking_async_spsc<T, const P: usize, const NUM_SEGS_P2: usize>(
1165    signal: AsyncSignalGate,
1166) -> (
1167    BlockingSpscProducer<T, P, NUM_SEGS_P2>,
1168    AsyncSpscConsumer<T, P, NUM_SEGS_P2>,
1169) {
1170    new_blocking_async_with_config::<T, P, NUM_SEGS_P2>(signal, usize::MAX)
1171}
1172
1173/// Creates a mixed SPSC queue with blocking producer and async consumer, with custom pooling.
1174pub fn new_blocking_async_with_config<T, const P: usize, const NUM_SEGS_P2: usize>(
1175    signal: AsyncSignalGate,
1176    max_pooled_segments: usize,
1177) -> (
1178    BlockingSpscProducer<T, P, NUM_SEGS_P2>,
1179    AsyncSpscConsumer<T, P, NUM_SEGS_P2>,
1180) {
1181    let shared = Arc::new(AsyncSpscShared::new());
1182    let (sender, receiver) =
1183        crate::spsc::Spsc::<T, P, NUM_SEGS_P2, AsyncSignalGate>::new_with_gate_and_config(
1184            signal,
1185            max_pooled_segments,
1186        );
1187    (
1188        BlockingSpscProducer::new(sender, Arc::clone(&shared)),
1189        AsyncSpscConsumer::new(receiver, shared),
1190    )
1191}
1192
1193/// Creates a mixed SPSC queue with async producer and blocking consumer.
1194///
1195/// The async producer and blocking consumer share the same waker infrastructure,
1196/// so they can wake each other efficiently. This is useful when you have an
1197/// async task that needs to send data to a blocking thread.
1198///
1199/// # Example
1200///
1201/// ```ignore
1202/// let (producer, consumer) = new_async_blocking_spsc(signal);
1203///
1204/// // Producer task (async)
1205/// maniac::spawn(async move {
1206///     producer.send(42).await.unwrap();
1207///     producer.send(43).await.unwrap();
1208/// });
1209///
1210/// // Consumer thread (blocking)
1211/// std::thread::spawn(move || {
1212///     while let Ok(item) = consumer.recv() {
1213///         println!("Got: {}", item);
1214///     }
1215/// });
1216/// ```
1217pub fn new_async_blocking_spsc<T, const P: usize, const NUM_SEGS_P2: usize>(
1218    signal: AsyncSignalGate,
1219) -> (
1220    AsyncSpscProducer<T, P, NUM_SEGS_P2>,
1221    BlockingSpscConsumer<T, P, NUM_SEGS_P2>,
1222) {
1223    new_async_blocking_with_config::<T, P, NUM_SEGS_P2>(signal, usize::MAX)
1224}
1225
1226/// Creates a mixed SPSC queue with async producer and blocking consumer, with custom pooling.
1227pub fn new_async_blocking_with_config<T, const P: usize, const NUM_SEGS_P2: usize>(
1228    signal: AsyncSignalGate,
1229    max_pooled_segments: usize,
1230) -> (
1231    AsyncSpscProducer<T, P, NUM_SEGS_P2>,
1232    BlockingSpscConsumer<T, P, NUM_SEGS_P2>,
1233) {
1234    let shared = Arc::new(AsyncSpscShared::new());
1235    let (sender, receiver) =
1236        crate::spsc::Spsc::<T, P, NUM_SEGS_P2, AsyncSignalGate>::new_with_gate_and_config(
1237            signal,
1238            max_pooled_segments,
1239        );
1240    (
1241        AsyncSpscProducer::new(sender, Arc::clone(&shared)),
1242        BlockingSpscConsumer::new(receiver, shared),
1243    )
1244}
1245
1246// ══════════════════════════════════════════════════════════════════════════════
1247// Unbounded SPSC Variants
1248// ══════════════════════════════════════════════════════════════════════════════
1249
1250/// Asynchronous producer for unbounded SPSC queue.
1251///
1252/// This type provides async send operations for an unbounded queue that automatically
1253/// expands by creating new segments as needed. Never blocks on full queue since the
1254/// queue grows dynamically.
1255pub struct AsyncUnboundedSpscProducer<T, const P: usize, const NUM_SEGS_P2: usize> {
1256    sender: crate::spsc::UnboundedSender<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>,
1257    shared: Arc<AsyncSpscShared>,
1258}
1259
1260impl<T, const P: usize, const NUM_SEGS_P2: usize> AsyncUnboundedSpscProducer<T, P, NUM_SEGS_P2> {
1261    fn new(
1262        sender: crate::spsc::UnboundedSender<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>,
1263        shared: Arc<AsyncSpscShared>,
1264    ) -> Self {
1265        Self { sender, shared }
1266    }
1267
1268    /// Fast-path send without suspension.
1269    ///
1270    /// For unbounded queues, this always succeeds unless the channel is closed.
1271    #[inline]
1272    pub fn try_send(&self, value: T) -> Result<(), PushError<T>> {
1273        match self.sender.try_push(value) {
1274            Ok(()) => {
1275                self.shared.notify_items();
1276                Ok(())
1277            }
1278            Err(err) => Err(err),
1279        }
1280    }
1281
1282    /// Asynchronously sends a single item.
1283    ///
1284    /// For unbounded queues, this typically completes immediately unless the channel
1285    /// is closed. The async interface is provided for API consistency.
1286    pub async fn send(&mut self, value: T) -> Result<(), PushError<T>> {
1287        self.try_send(value)
1288    }
1289
1290    /// Sends an entire Vec, moving items out using bulk operations.
1291    ///
1292    /// On return, the Vec will be empty if all items were sent, or contain only
1293    /// the items that were not sent (if the channel closed).
1294    pub async fn send_batch(&mut self, values: &mut Vec<T>) -> Result<(), PushError<()>> {
1295        if values.is_empty() {
1296            return Ok(());
1297        }
1298
1299        match self.sender.try_push_n(values) {
1300            Ok(_) => {
1301                self.shared.notify_items();
1302                Ok(())
1303            }
1304            Err(err) => Err(err),
1305        }
1306    }
1307
1308    /// Sends every item from the iterator.
1309    pub async fn send_iter<I>(&mut self, iter: I) -> Result<(), PushError<T>>
1310    where
1311        I: IntoIterator<Item = T>,
1312    {
1313        for item in iter {
1314            self.send(item).await?;
1315        }
1316        Ok(())
1317    }
1318
1319    /// Closes the queue and wakes any waiters.
1320    pub fn close(&mut self) {
1321        self.sender.close_channel();
1322        self.shared.notify_items();
1323        self.shared.notify_space();
1324    }
1325
1326    /// Returns the number of nodes in the unbounded queue.
1327    pub fn node_count(&self) -> usize {
1328        self.sender.node_count()
1329    }
1330
1331    /// Returns the total capacity across all nodes.
1332    pub fn total_capacity(&self) -> usize {
1333        self.sender.total_capacity()
1334    }
1335}
1336
1337impl<T, const P: usize, const NUM_SEGS_P2: usize> Sink<T> for AsyncUnboundedSpscProducer<T, P, NUM_SEGS_P2> {
1338    type Error = PushError<T>;
1339
1340    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), PushError<T>>> {
1341        // Unbounded queue is always ready (unless closed)
1342        Poll::Ready(Ok(()))
1343    }
1344
1345    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), PushError<T>> {
1346        let this = unsafe { self.get_unchecked_mut() };
1347        this.try_send(item)
1348    }
1349
1350    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), PushError<T>>> {
1351        Poll::Ready(Ok(()))
1352    }
1353
1354    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), PushError<T>>> {
1355        let this = unsafe { self.get_unchecked_mut() };
1356        this.close();
1357        Poll::Ready(Ok(()))
1358    }
1359}
1360
1361/// Asynchronous consumer for unbounded SPSC queue.
1362pub struct AsyncUnboundedSpscConsumer<T, const P: usize, const NUM_SEGS_P2: usize> {
1363    receiver: crate::spsc::UnboundedReceiver<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>,
1364    shared: Arc<AsyncSpscShared>,
1365}
1366
1367impl<T, const P: usize, const NUM_SEGS_P2: usize> AsyncUnboundedSpscConsumer<T, P, NUM_SEGS_P2> {
1368    fn new(
1369        receiver: crate::spsc::UnboundedReceiver<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>,
1370        shared: Arc<AsyncSpscShared>,
1371    ) -> Self {
1372        Self { receiver, shared }
1373    }
1374
1375    /// Attempts to receive without awaiting.
1376    #[inline]
1377    pub fn try_recv(&self) -> Result<T, PopError> {
1378        match self.receiver.try_pop() {
1379            Some(value) => {
1380                self.shared.notify_space();
1381                Ok(value)
1382            }
1383            None if self.receiver.is_closed() => Err(PopError::Closed),
1384            None => Err(PopError::Empty),
1385        }
1386    }
1387
1388    /// Asynchronously receives a single item.
1389    pub async fn recv(&mut self) -> Result<T, PopError> {
1390        match self.try_recv() {
1391            Ok(value) => return Ok(value),
1392            Err(PopError::Empty) | Err(PopError::Timeout) => {}
1393            Err(PopError::Closed) => return Err(PopError::Closed),
1394        }
1395
1396        let receiver = &self.receiver;
1397        let shared = &self.shared;
1398        unsafe {
1399            shared
1400                .wait_for_items(|| match receiver.try_pop() {
1401                    Some(value) => {
1402                        shared.notify_space();
1403                        Some(Ok(value))
1404                    }
1405                    None if receiver.is_closed() => Some(Err(PopError::Closed)),
1406                    None => None,
1407                })
1408                .await
1409        }
1410    }
1411
1412    /// Receives up to `dst.len()` items.
1413    pub async fn recv_batch(&mut self, dst: &mut [T]) -> Result<usize, PopError>
1414    where
1415        T: Clone,
1416    {
1417        if dst.is_empty() {
1418            return Ok(0);
1419        }
1420
1421        let receiver = &self.receiver;
1422        let shared = &self.shared;
1423        let mut filled = match receiver.try_pop_n(dst) {
1424            Ok(count) => {
1425                if count > 0 {
1426                    shared.notify_space();
1427                }
1428                count
1429            }
1430            Err(PopError::Empty) | Err(PopError::Timeout) => 0,
1431            Err(PopError::Closed) => return Err(PopError::Closed),
1432        };
1433
1434        if filled == dst.len() {
1435            return Ok(filled);
1436        }
1437
1438        unsafe {
1439            shared
1440                .wait_for_items(|| {
1441                    if filled == dst.len() {
1442                        return Some(Ok(filled));
1443                    }
1444
1445                    match receiver.try_pop_n(&mut dst[filled..]) {
1446                        Ok(0) => {
1447                            if receiver.is_closed() {
1448                                Some(if filled > 0 {
1449                                    Ok(filled)
1450                                } else {
1451                                    Err(PopError::Closed)
1452                                })
1453                            } else {
1454                                None
1455                            }
1456                        }
1457                        Ok(count) => {
1458                            filled += count;
1459                            shared.notify_space();
1460                            if filled == dst.len() {
1461                                Some(Ok(filled))
1462                            } else {
1463                                None
1464                            }
1465                        }
1466                        Err(PopError::Empty) | Err(PopError::Timeout) => {
1467                            if receiver.is_closed() {
1468                                Some(if filled > 0 {
1469                                    Ok(filled)
1470                                } else {
1471                                    Err(PopError::Closed)
1472                                })
1473                            } else {
1474                                None
1475                            }
1476                        }
1477                        Err(PopError::Closed) => Some(if filled > 0 {
1478                            Ok(filled)
1479                        } else {
1480                            Err(PopError::Closed)
1481                        }),
1482                    }
1483                })
1484                .await
1485        }
1486    }
1487
1488    /// Returns the number of nodes in the unbounded queue.
1489    pub fn node_count(&self) -> usize {
1490        self.receiver.node_count()
1491    }
1492
1493    /// Returns the total capacity across all nodes.
1494    pub fn total_capacity(&self) -> usize {
1495        self.receiver.total_capacity()
1496    }
1497}
1498
1499impl<T, const P: usize, const NUM_SEGS_P2: usize> Stream for AsyncUnboundedSpscConsumer<T, P, NUM_SEGS_P2> {
1500    type Item = T;
1501
1502    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1503        let this = unsafe { self.get_unchecked_mut() };
1504
1505        match this.try_recv() {
1506            Ok(value) => Poll::Ready(Some(value)),
1507            Err(PopError::Closed) => Poll::Ready(None),
1508            Err(PopError::Empty) | Err(PopError::Timeout) => {
1509                unsafe {
1510                    this.shared.register_items(cx.waker());
1511                }
1512
1513                match this.try_recv() {
1514                    Ok(value) => Poll::Ready(Some(value)),
1515                    Err(PopError::Closed) => Poll::Ready(None),
1516                    Err(PopError::Empty) | Err(PopError::Timeout) => Poll::Pending,
1517                }
1518            }
1519        }
1520    }
1521}
1522
1523/// Blocking producer for unbounded SPSC queue.
1524pub struct BlockingUnboundedSpscProducer<T, const P: usize, const NUM_SEGS_P2: usize> {
1525    sender: crate::spsc::UnboundedSender<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>,
1526    shared: Arc<AsyncSpscShared>,
1527}
1528
1529impl<T, const P: usize, const NUM_SEGS_P2: usize> BlockingUnboundedSpscProducer<T, P, NUM_SEGS_P2> {
1530    fn new(
1531        sender: crate::spsc::UnboundedSender<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>,
1532        shared: Arc<AsyncSpscShared>,
1533    ) -> Self {
1534        Self { sender, shared }
1535    }
1536
1537    /// Fast-path send without blocking.
1538    ///
1539    /// For unbounded queues, this always succeeds unless the channel is closed.
1540    #[inline]
1541    pub fn try_send(&self, value: T) -> Result<(), PushError<T>> {
1542        match self.sender.try_push(value) {
1543            Ok(()) => {
1544                self.shared.notify_items();
1545                Ok(())
1546            }
1547            Err(err) => Err(err),
1548        }
1549    }
1550
1551    /// Sends a single item.
1552    ///
1553    /// For unbounded queues, this never blocks since the queue grows dynamically.
1554    pub fn send(&self, value: T) -> Result<(), PushError<T>> {
1555        self.try_send(value)
1556    }
1557
1558    /// Sends a Vec of items using bulk operations.
1559    ///
1560    /// On return, the Vec will be empty if all items were sent, or contain only
1561    /// the items that were not sent (if the channel closed).
1562    pub fn send_slice(&self, values: &mut Vec<T>) -> Result<(), PushError<()>> {
1563        if values.is_empty() {
1564            return Ok(());
1565        }
1566
1567        match self.sender.try_push_n(values) {
1568            Ok(_) => {
1569                self.shared.notify_items();
1570                Ok(())
1571            }
1572            Err(err) => Err(err),
1573        }
1574    }
1575
1576    /// Closes the queue and wakes any waiters.
1577    pub fn close(&mut self) {
1578        self.sender.close_channel();
1579        self.shared.notify_items();
1580        self.shared.notify_space();
1581    }
1582
1583    /// Returns the number of nodes in the unbounded queue.
1584    pub fn node_count(&self) -> usize {
1585        self.sender.node_count()
1586    }
1587
1588    /// Returns the total capacity across all nodes.
1589    pub fn total_capacity(&self) -> usize {
1590        self.sender.total_capacity()
1591    }
1592}
1593
1594/// Blocking consumer for unbounded SPSC queue.
1595pub struct BlockingUnboundedSpscConsumer<T, const P: usize, const NUM_SEGS_P2: usize> {
1596    receiver: crate::spsc::UnboundedReceiver<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>,
1597    shared: Arc<AsyncSpscShared>,
1598}
1599
1600impl<T, const P: usize, const NUM_SEGS_P2: usize> BlockingUnboundedSpscConsumer<T, P, NUM_SEGS_P2> {
1601    fn new(
1602        receiver: crate::spsc::UnboundedReceiver<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>,
1603        shared: Arc<AsyncSpscShared>,
1604    ) -> Self {
1605        Self { receiver, shared }
1606    }
1607
1608    /// Fast-path receive without blocking.
1609    #[inline]
1610    pub fn try_recv(&self) -> Result<T, PopError> {
1611        match self.receiver.try_pop() {
1612            Some(value) => {
1613                self.shared.notify_space();
1614                Ok(value)
1615            }
1616            None if self.receiver.is_closed() => Err(PopError::Closed),
1617            None => Err(PopError::Empty),
1618        }
1619    }
1620
1621    /// Blocking receive that parks the thread until an item is available.
1622    pub fn recv(&self) -> Result<T, PopError> {
1623        match self.try_recv() {
1624            Ok(value) => return Ok(value),
1625            Err(PopError::Closed) => return Err(PopError::Closed),
1626            Err(PopError::Empty) | Err(PopError::Timeout) => {}
1627        }
1628
1629        let parker = Parker::new();
1630        let unparker = parker.unparker();
1631        let waker = Waker::from(Arc::new(ThreadUnparker { unparker }));
1632
1633        loop {
1634            unsafe {
1635                self.shared.register_items(&waker);
1636            }
1637
1638            match self.receiver.try_pop() {
1639                Some(value) => {
1640                    self.shared.notify_space();
1641                    return Ok(value);
1642                }
1643                None if self.receiver.is_closed() => {
1644                    return Err(PopError::Closed);
1645                }
1646                None => {
1647                    parker.park();
1648                }
1649            }
1650        }
1651    }
1652
1653    /// Blocking receive of multiple items.
1654    pub fn recv_batch(&self, dst: &mut [T]) -> Result<usize, PopError>
1655    where
1656        T: Clone,
1657    {
1658        if dst.is_empty() {
1659            return Ok(0);
1660        }
1661
1662        let mut filled = match self.receiver.try_pop_n(dst) {
1663            Ok(count) => {
1664                if count > 0 {
1665                    self.shared.notify_space();
1666                    return Ok(count);
1667                }
1668                0
1669            }
1670            Err(PopError::Empty) | Err(PopError::Timeout) => 0,
1671            Err(PopError::Closed) => return Err(PopError::Closed),
1672        };
1673
1674        let parker = Parker::new();
1675        let unparker = parker.unparker();
1676        let waker = Waker::from(Arc::new(ThreadUnparker { unparker }));
1677
1678        loop {
1679            unsafe {
1680                self.shared.register_items(&waker);
1681            }
1682
1683            match self.receiver.try_pop_n(&mut dst[filled..]) {
1684                Ok(0) => {
1685                    if self.receiver.is_closed() {
1686                        return if filled > 0 {
1687                            Ok(filled)
1688                        } else {
1689                            Err(PopError::Closed)
1690                        };
1691                    }
1692                    parker.park();
1693                }
1694                Ok(count) => {
1695                    filled += count;
1696                    self.shared.notify_space();
1697                    if filled == dst.len() || self.receiver.is_closed() {
1698                        return Ok(filled);
1699                    }
1700                    parker.park();
1701                }
1702                Err(PopError::Empty) | Err(PopError::Timeout) => {
1703                    if self.receiver.is_closed() {
1704                        return if filled > 0 {
1705                            Ok(filled)
1706                        } else {
1707                            Err(PopError::Closed)
1708                        };
1709                    }
1710                    parker.park();
1711                }
1712                Err(PopError::Closed) => {
1713                    return if filled > 0 {
1714                        Ok(filled)
1715                    } else {
1716                        Err(PopError::Closed)
1717                    };
1718                }
1719            }
1720        }
1721    }
1722
1723    /// Returns the number of nodes in the unbounded queue.
1724    pub fn node_count(&self) -> usize {
1725        self.receiver.node_count()
1726    }
1727
1728    /// Returns the total capacity across all nodes.
1729    pub fn total_capacity(&self) -> usize {
1730        self.receiver.total_capacity()
1731    }
1732}
1733
1734/// Creates a default async unbounded SPSC queue.
1735///
1736/// The queue automatically grows by creating new segments as needed, so it never
1737/// blocks on full. This is ideal for scenarios where you want to avoid backpressure.
1738///
1739/// # Example
1740///
1741/// ```ignore
1742/// let (mut producer, mut consumer) = new_async_unbounded_spsc(signal);
1743///
1744/// // Producer never blocks on full
1745/// producer.send(42).await.unwrap();
1746/// producer.send(43).await.unwrap();
1747///
1748/// // Consumer receives items
1749/// assert_eq!(consumer.recv().await.unwrap(), 42);
1750/// ```
1751pub fn new_async_unbounded_spsc<T, const P: usize, const NUM_SEGS_P2: usize>(
1752    signal: AsyncSignalGate,
1753) -> (
1754    AsyncUnboundedSpscProducer<T, P, NUM_SEGS_P2>,
1755    AsyncUnboundedSpscConsumer<T, P, NUM_SEGS_P2>,
1756) {
1757    let shared = Arc::new(AsyncSpscShared::new());
1758    let signal_arc = Arc::new(signal);
1759    let (sender, receiver) =
1760        crate::spsc::UnboundedSpsc::<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>::new_with_signal(signal_arc);
1761    (
1762        AsyncUnboundedSpscProducer::new(sender, Arc::clone(&shared)),
1763        AsyncUnboundedSpscConsumer::new(receiver, shared),
1764    )
1765}
1766
1767/// Creates a default blocking unbounded SPSC queue.
1768///
1769/// The queue automatically grows by creating new segments as needed, so the producer
1770/// never blocks on full. The consumer blocks when empty.
1771///
1772/// # Example
1773///
1774/// ```ignore
1775/// let (producer, consumer) = new_blocking_unbounded_spsc(signal);
1776///
1777/// // Producer thread
1778/// std::thread::spawn(move || {
1779///     producer.send(42).unwrap();  // Never blocks on full
1780/// });
1781///
1782/// // Consumer thread
1783/// std::thread::spawn(move || {
1784///     let item = consumer.recv().unwrap();  // Blocks until item available
1785/// });
1786/// ```
1787pub fn new_blocking_unbounded_spsc<T, const P: usize, const NUM_SEGS_P2: usize>(
1788    signal: AsyncSignalGate,
1789) -> (
1790    BlockingUnboundedSpscProducer<T, P, NUM_SEGS_P2>,
1791    BlockingUnboundedSpscConsumer<T, P, NUM_SEGS_P2>,
1792) {
1793    let shared = Arc::new(AsyncSpscShared::new());
1794    let signal_arc = Arc::new(signal);
1795    let (sender, receiver) =
1796        crate::spsc::UnboundedSpsc::<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>::new_with_signal(signal_arc);
1797    (
1798        BlockingUnboundedSpscProducer::new(sender, Arc::clone(&shared)),
1799        BlockingUnboundedSpscConsumer::new(receiver, shared),
1800    )
1801}
1802
1803/// Creates a mixed unbounded SPSC queue with blocking producer and async consumer.
1804///
1805/// # Example
1806///
1807/// ```ignore
1808/// let (producer, consumer) = new_blocking_async_unbounded_spsc(signal);
1809///
1810/// // Producer thread (blocking)
1811/// std::thread::spawn(move || {
1812///     producer.send(42).unwrap();  // Never blocks on full
1813/// });
1814///
1815/// // Consumer task (async)
1816/// maniac::spawn(async move {
1817///     let item = consumer.recv().await.unwrap();
1818/// });
1819/// ```
1820pub fn new_blocking_async_unbounded_spsc<T, const P: usize, const NUM_SEGS_P2: usize>(
1821    signal: AsyncSignalGate,
1822) -> (
1823    BlockingUnboundedSpscProducer<T, P, NUM_SEGS_P2>,
1824    AsyncUnboundedSpscConsumer<T, P, NUM_SEGS_P2>,
1825) {
1826    let shared = Arc::new(AsyncSpscShared::new());
1827    let signal_arc = Arc::new(signal);
1828    let (sender, receiver) =
1829        crate::spsc::UnboundedSpsc::<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>::new_with_signal(signal_arc);
1830    (
1831        BlockingUnboundedSpscProducer::new(sender, Arc::clone(&shared)),
1832        AsyncUnboundedSpscConsumer::new(receiver, shared),
1833    )
1834}
1835
1836/// Creates a mixed unbounded SPSC queue with async producer and blocking consumer.
1837///
1838/// # Example
1839///
1840/// ```ignore
1841/// let (producer, consumer) = new_async_blocking_unbounded_spsc(signal);
1842///
1843/// // Producer task (async)
1844/// maniac::spawn(async move {
1845///     producer.send(42).await.unwrap();  // Never blocks on full
1846/// });
1847///
1848/// // Consumer thread (blocking)
1849/// std::thread::spawn(move || {
1850///     let item = consumer.recv().unwrap();
1851/// });
1852/// ```
1853pub fn new_async_blocking_unbounded_spsc<T, const P: usize, const NUM_SEGS_P2: usize>(
1854    signal: AsyncSignalGate,
1855) -> (
1856    AsyncUnboundedSpscProducer<T, P, NUM_SEGS_P2>,
1857    BlockingUnboundedSpscConsumer<T, P, NUM_SEGS_P2>,
1858) {
1859    let shared = Arc::new(AsyncSpscShared::new());
1860    let signal_arc = Arc::new(signal);
1861    let (sender, receiver) =
1862        crate::spsc::UnboundedSpsc::<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>::new_with_signal(signal_arc);
1863    (
1864        AsyncUnboundedSpscProducer::new(sender, Arc::clone(&shared)),
1865        BlockingUnboundedSpscConsumer::new(receiver, shared),
1866    )
1867}