maniac_runtime/sync/spsc.rs
1//! Async and Blocking SPSC (Single-Producer, Single-Consumer) queue implementation.
2//!
3//! This module provides both async and blocking adapters over the lock-free SPSC queue.
4//! The async variants implement `futures::Sink` and `futures::Stream` traits for
5//! seamless integration with async/await code. The blocking variants use efficient
6//! thread parking for synchronous operations.
7//!
8//! # Queue Variants
9//!
10//! - **Async**: [`AsyncSpscProducer`] / [`AsyncSpscConsumer`] - For use with async tasks
11//! - **Blocking**: [`BlockingSpscProducer`] / [`BlockingSpscConsumer`] - For use with threads
12//! - **Mixed**: You can mix async and blocking ends on the same queue!
13//!
14//! All variants share the same waker infrastructure, allowing seamless interoperability.
15//! A blocking producer can wake up an async consumer and vice versa.
16//!
17//! # Design Principles
18//!
19//! ## Correctness Guarantees
20//!
21//! The implementation uses the **double-check pattern** to prevent missed wakeups:
22//! 1. Check if operation is possible (space available / items available)
23//! 2. Register waker if not
24//! 3. Double-check after registering (catches races)
25//!
26//! This pattern, combined with `DiatomicWaker`'s acquire/release memory ordering,
27//! guarantees that no items are lost and no wakeups are missed, even in the presence
28//! of concurrent operations between producer and consumer.
29//!
30//! ## Memory Ordering
31//!
32//! The queue operations synchronize via acquire/release semantics:
33//! - Producer writes data (Release) → Consumer reads data (Acquire)
34//! - Consumer updates tail (Release) → Producer checks space (Acquire)
35//! - Notifications use `DiatomicWaker` with proper ordering
36//!
37//! ## Zero-Copy Design
38//!
39//! Items waiting to be sent are stored in the Future's stack frame, not in the
40//! `AsyncSpscProducer` struct. This eliminates the need for `T: Unpin` and keeps
41//! the implementation simple and efficient.
42//!
43//! ## Performance Characteristics
44//!
45//! - **Fast path**: ~5-15ns for non-blocking operations
46//! - **Notification overhead**: ~1-2ns when no waker registered
47//! - **Zero allocation**: All state lives on stack or in shared queue
48//! - **Cache-friendly**: Wakers are cache-padded to prevent false sharing
49//!
50//! # Examples
51//!
52//! ## Pure Async
53//!
54//! ```ignore
55//! use futures::{SinkExt, StreamExt};
56//!
57//! let (mut producer, mut consumer) = new_async_spsc(signal);
58//!
59//! // Producer task
60//! maniac::spawn(async move {
61//! producer.send(42).await.unwrap();
62//! producer.send(43).await.unwrap();
63//! });
64//!
65//! // Consumer task
66//! maniac::spawn(async move {
67//! while let Some(item) = consumer.next().await {
68//! println!("Got: {}", item);
69//! }
70//! });
71//! ```
72//!
73//! ## Pure Blocking
74//!
75//! ```ignore
76//! let (producer, consumer) = new_blocking_spsc(signal);
77//!
78//! // Producer thread
79//! std::thread::spawn(move || {
80//! producer.send(42).unwrap();
81//! producer.send(43).unwrap();
82//! });
83//!
84//! // Consumer thread
85//! std::thread::spawn(move || {
86//! while let Ok(item) = consumer.recv() {
87//! println!("Got: {}", item);
88//! }
89//! });
90//! ```
91//!
92//! ## Mixed: Blocking Producer + Async Consumer
93//!
94//! ```ignore
95//! let (producer, mut consumer) = new_blocking_async_spsc(signal);
96//!
97//! // Producer thread (blocking)
98//! std::thread::spawn(move || {
99//! producer.send(42).unwrap(); // Parks thread if full
100//! });
101//!
102//! // Consumer task (async)
103//! maniac::spawn(async move {
104//! let item = consumer.recv().await.unwrap(); // Wakes up blocking thread
105//! });
106//! ```
107//!
108//! ## Mixed: Async Producer + Blocking Consumer
109//!
110//! ```ignore
111//! let (mut producer, consumer) = new_async_blocking_spsc(signal);
112//!
113//! // Producer task (async)
114//! maniac::spawn(async move {
115//! producer.send(42).await.unwrap(); // Wakes up blocking thread
116//! });
117//!
118//! // Consumer thread (blocking)
119//! std::thread::spawn(move || {
120//! let item = consumer.recv().unwrap(); // Parks thread if empty
121//! });
122//! ```
123
124use std::sync::Arc;
125
126use crate::utils::CachePadded;
127
128use super::signal::AsyncSignalGate;
129
130use crate::{PopError, PushError};
131
132use std::pin::Pin;
133use std::task::{Context, Poll, Waker};
134
135use futures::{sink::Sink, stream::Stream};
136
137use crate::future::waker::DiatomicWaker;
138use crate::parking::{Parker, Unparker};
139use std::task::Wake;
140
141/// A waker implementation that unparks a thread.
142///
143/// Used to integrate blocking operations with the async waker infrastructure,
144/// allowing async and blocking operations to work together seamlessly.
145struct ThreadUnparker {
146 unparker: Unparker,
147}
148
149impl Wake for ThreadUnparker {
150 fn wake(self: Arc<Self>) {
151 self.unparker.unpark();
152 }
153
154 fn wake_by_ref(self: &Arc<Self>) {
155 self.unparker.unpark();
156 }
157}
158
159/// Shared wake infrastructure for the async adapters.
160///
161/// Maintains separate wakers for the producer (waiting for space) and consumer
162/// (waiting for items). The wakers are cache-padded to prevent false sharing
163/// between producer and consumer threads.
164///
165/// # Memory Ordering
166///
167/// The `DiatomicWaker` provides acquire/release semantics that synchronize with
168/// the underlying queue's memory ordering:
169/// - Producer: write data (Release) → notify_items → Consumer: read data (Acquire)
170/// - Consumer: read data → notify_space → Producer: check space (Acquire)
171///
172/// # Correctness
173///
174/// The double-check pattern (check → register → check) prevents missed wakeups:
175/// - If state changes before register, the second check catches it
176/// - If state changes after register, the waker gets notified
177/// - If state changes during register, `DiatomicWaker`'s state machine guarantees
178/// either the second check sees the change or the notifier wakes the waker
179struct AsyncSpscShared {
180 item_waiter: CachePadded<DiatomicWaker>,
181 space_waiter: CachePadded<DiatomicWaker>,
182}
183
184impl AsyncSpscShared {
185 fn new() -> Self {
186 Self {
187 item_waiter: CachePadded::new(DiatomicWaker::new()),
188 space_waiter: CachePadded::new(DiatomicWaker::new()),
189 }
190 }
191
192 #[inline]
193 fn notify_items(&self) {
194 self.item_waiter.notify();
195 }
196
197 #[inline]
198 fn notify_space(&self) {
199 self.space_waiter.notify();
200 }
201
202 #[inline]
203 unsafe fn wait_for_items<P, T>(&self, predicate: P) -> crate::future::waker::WaitUntil<'_, P, T>
204 where
205 P: FnMut() -> Option<T>,
206 {
207 unsafe { self.item_waiter.wait_until(predicate) }
208 }
209
210 #[inline]
211 unsafe fn wait_for_space<P, T>(&self, predicate: P) -> crate::future::waker::WaitUntil<'_, P, T>
212 where
213 P: FnMut() -> Option<T>,
214 {
215 unsafe { self.space_waiter.wait_until(predicate) }
216 }
217
218 #[inline]
219 unsafe fn register_items(&self, waker: &Waker) {
220 unsafe { self.item_waiter.register(waker) };
221 }
222
223 #[inline]
224 unsafe fn register_space(&self, waker: &Waker) {
225 unsafe { self.space_waiter.register(waker) };
226 }
227}
228
229/// Asynchronous producer façade for [`SegSpsc`].
230pub struct AsyncSpscProducer<T, const P: usize, const NUM_SEGS_P2: usize> {
231 sender: crate::spsc::Sender<T, P, NUM_SEGS_P2, AsyncSignalGate>,
232 shared: Arc<AsyncSpscShared>,
233}
234
235impl<T, const P: usize, const NUM_SEGS_P2: usize> AsyncSpscProducer<T, P, NUM_SEGS_P2> {
236 fn new(
237 sender: crate::spsc::Sender<T, P, NUM_SEGS_P2, AsyncSignalGate>,
238 shared: Arc<AsyncSpscShared>,
239 ) -> Self {
240 Self { sender, shared }
241 }
242
243 /// Capacity of the underlying queue.
244 #[inline]
245 pub fn capacity(&self) -> usize {
246 crate::spsc::Spsc::<T, P, NUM_SEGS_P2, AsyncSignalGate>::capacity()
247 }
248
249 /// Fast-path send without suspension.
250 ///
251 /// Attempts to send an item immediately without blocking. Always notifies
252 /// waiting consumers on success to prevent missed wakeups.
253 ///
254 /// # Performance
255 ///
256 /// - Success path: ~5-15ns (queue write + notify check)
257 /// - Notify overhead: ~1-2ns when no consumer waiting
258 #[inline]
259 pub fn try_send(&self, value: T) -> Result<(), PushError<T>> {
260 match self.sender.try_push(value) {
261 Ok(()) => {
262 // Always notify consumers after successful write.
263 // This is cheap (~1-2ns) when no waker is registered.
264 self.shared.notify_items();
265 Ok(())
266 }
267 Err(err) => Err(err),
268 }
269 }
270
271 /// Asynchronously sends a single item.
272 ///
273 /// Tries to send immediately; if the queue is full, suspends until space
274 /// becomes available. The item is held in the Future's stack frame while
275 /// waiting, avoiding the need for a `pending` field in the struct.
276 ///
277 /// # Correctness
278 ///
279 /// The item is never dropped or lost:
280 /// - On success: item is in the queue
281 /// - On `Full`: item is stored in `pending` (Future stack frame)
282 /// - On `Closed`: item is returned in the error
283 ///
284 /// The predicate is called on each wakeup to retry sending. The `wait_for_space`
285 /// future uses the double-check pattern internally to prevent missed wakeups.
286 ///
287 /// # Safety
288 ///
289 /// The `wait_for_space` call is safe because:
290 /// - `AsyncSpscProducer` is `!Clone` and `!Sync` (single-threaded access)
291 /// - SPSC guarantees only one producer thread
292 /// - Therefore, no concurrent calls to `register` or `wait_until` on `space_waiter`
293 pub async fn send(&mut self, value: T) -> Result<(), PushError<T>> {
294 match self.try_send(value) {
295 Ok(()) => Ok(()),
296 Err(PushError::Full(item)) => {
297 // Store item in Future's stack frame (not in struct).
298 // This avoids needing T: Unpin and keeps the struct simple.
299 let mut pending = Some(item);
300 let sender = &self.sender;
301 let shared = &self.shared;
302 unsafe {
303 shared
304 .wait_for_space(|| {
305 // Try to send on each wakeup.
306 let candidate = pending.take()?;
307 match sender.try_push(candidate) {
308 Ok(()) => {
309 // Success! Notify waiting consumers.
310 shared.notify_items();
311 Some(Ok(()))
312 }
313 Err(PushError::Full(candidate)) => {
314 // Still full, restore item and keep waiting.
315 pending = Some(candidate);
316 None
317 }
318 Err(PushError::Closed(candidate)) => {
319 // Channel closed, return error with item.
320 Some(Err(PushError::Closed(candidate)))
321 }
322 }
323 })
324 .await
325 }
326 }
327 Err(PushError::Closed(item)) => Err(PushError::Closed(item)),
328 }
329 }
330
331 /// Sends an entire Vec, awaiting at most once if the queue fills.
332 ///
333 /// Makes progress whenever space is available, writing as many items as possible
334 /// in each attempt. Items are moved out of the Vec using move semantics.
335 /// Notifies consumers after each batch write (not just at the end),
336 /// allowing the consumer to start processing while the producer is still sending.
337 ///
338 /// On return, the Vec will be empty if all items were sent, or contain only
339 /// the items that were not sent (if the channel closed).
340 ///
341 /// # Efficiency
342 ///
343 /// - Amortizes notification overhead across batch (single notify per write batch)
344 /// - Allows progressive consumption (consumer doesn't wait for entire batch)
345 /// - Move semantics (no Clone/Copy required)
346 pub async fn send_batch(&mut self, values: &mut Vec<T>) -> Result<(), PushError<()>> {
347 if values.is_empty() {
348 return Ok(());
349 }
350
351 let sender = &self.sender;
352 let shared = &self.shared;
353
354 match sender.try_push_n(values) {
355 Ok(written) => {
356 if written > 0 {
357 shared.notify_items();
358 if values.is_empty() {
359 return Ok(());
360 }
361 }
362 }
363 Err(PushError::Closed(())) => return Err(PushError::Closed(())),
364 Err(PushError::Full(())) => {}
365 }
366
367 unsafe {
368 shared
369 .wait_for_space(|| {
370 if values.is_empty() {
371 return Some(Ok(()));
372 }
373
374 match sender.try_push_n(values) {
375 Ok(written) => {
376 if written > 0 {
377 shared.notify_items();
378 if values.is_empty() {
379 return Some(Ok(()));
380 }
381 }
382 None
383 }
384 Err(PushError::Full(())) => None,
385 Err(PushError::Closed(())) => Some(Err(PushError::Closed(()))),
386 }
387 })
388 .await
389 }
390 }
391
392 /// Sends every item from the iterator, awaiting as required.
393 pub async fn send_iter<I>(&mut self, iter: I) -> Result<(), PushError<T>>
394 where
395 I: IntoIterator<Item = T>,
396 {
397 for item in iter {
398 self.send(item).await?;
399 }
400 Ok(())
401 }
402
403 /// Closes the queue and wakes any waiters.
404 pub fn close(&mut self) {
405 self.sender.close_channel();
406 self.shared.notify_items();
407 self.shared.notify_space();
408 }
409}
410
411impl<T, const P: usize, const NUM_SEGS_P2: usize> Sink<T> for AsyncSpscProducer<T, P, NUM_SEGS_P2> {
412 type Error = PushError<T>;
413
414 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), PushError<T>>> {
415 // Safety: AsyncSpscProducer is not self-referential, so get_unchecked_mut is safe.
416 // The Pin guarantee gives us exclusive mutable access.
417 let this = unsafe { self.get_unchecked_mut() };
418
419 // Fast path: check if there's space available
420 if !this.sender.is_full() {
421 return Poll::Ready(Ok(()));
422 }
423
424 // No space available. Register waker to be notified when space frees up.
425 //
426 // Safety: This is safe because:
427 // - AsyncSpscProducer is !Clone and !Sync (single-threaded access)
428 // - SPSC guarantees only one producer thread
429 // - Therefore no concurrent calls to register_space or wait_for_space
430 unsafe {
431 this.shared.register_space(cx.waker());
432 }
433
434 // Double-check after registering to prevent missed wakeups.
435 //
436 // Race scenarios:
437 // 1. Consumer frees space BEFORE register: double-check catches it → Ready
438 // 2. Consumer frees space AFTER register: waker gets notified → will poll again
439 // 3. Consumer frees space DURING register: DiatomicWaker state machine guarantees
440 // either we see the change here, or the consumer sees our waker → no missed wakeup
441 if !this.sender.is_full() {
442 return Poll::Ready(Ok(()));
443 }
444
445 Poll::Pending
446 }
447
448 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), PushError<T>> {
449 // Safety: Same as poll_ready - AsyncSpscProducer is not self-referential.
450 let this = unsafe { self.get_unchecked_mut() };
451
452 // For SPSC with single producer, if poll_ready returned Ready, the queue
453 // cannot become full before start_send (no other producers to race with).
454 // However, the channel might be closed, so we still handle errors properly.
455 this.try_send(item)
456 }
457
458 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), PushError<T>>> {
459 // SPSC queue has no buffering at the Sink level (items go directly to queue),
460 // so flush is always immediately complete.
461 Poll::Ready(Ok(()))
462 }
463
464 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), PushError<T>>> {
465 // Safety: Same as poll_ready - AsyncSpscProducer is not self-referential.
466 let this = unsafe { self.get_unchecked_mut() };
467 this.close();
468 Poll::Ready(Ok(()))
469 }
470}
471
472/// Asynchronous consumer façade for [`SegSpsc`].
473pub struct AsyncSpscConsumer<T, const P: usize, const NUM_SEGS_P2: usize> {
474 receiver: crate::spsc::Receiver<T, P, NUM_SEGS_P2, AsyncSignalGate>,
475 shared: Arc<AsyncSpscShared>,
476}
477
478impl<T, const P: usize, const NUM_SEGS_P2: usize> AsyncSpscConsumer<T, P, NUM_SEGS_P2> {
479 fn new(
480 receiver: crate::spsc::Receiver<T, P, NUM_SEGS_P2, AsyncSignalGate>,
481 shared: Arc<AsyncSpscShared>,
482 ) -> Self {
483 Self { receiver, shared }
484 }
485
486 /// Capacity of the underlying queue.
487 #[inline]
488 pub fn capacity(&self) -> usize {
489 crate::spsc::Spsc::<T, P, NUM_SEGS_P2, AsyncSignalGate>::capacity()
490 }
491
492 /// Attempts to receive without awaiting.
493 ///
494 /// Always notifies waiting producers on success to prevent missed wakeups.
495 ///
496 /// # Performance
497 ///
498 /// - Success path: ~5-15ns (queue read + notify check)
499 /// - Notify overhead: ~1-2ns when no producer waiting
500 #[inline]
501 pub fn try_recv(&self) -> Result<T, PopError> {
502 match self.receiver.try_pop() {
503 Some(value) => {
504 // Always notify producers after successful read.
505 // This is cheap (~1-2ns) when no waker is registered.
506 self.shared.notify_space();
507 Ok(value)
508 }
509 None if self.receiver.is_closed() => Err(PopError::Closed),
510 None => Err(PopError::Empty),
511 }
512 }
513
514 /// Asynchronously receives a single item.
515 ///
516 /// Tries to receive immediately; if the queue is empty, suspends until an
517 /// item becomes available or the channel is closed.
518 ///
519 /// # Correctness
520 ///
521 /// The predicate is called on each wakeup to retry receiving. The `wait_for_items`
522 /// future uses the double-check pattern internally to prevent missed wakeups.
523 ///
524 /// # Safety
525 ///
526 /// The `wait_for_items` call is safe because:
527 /// - `AsyncSpscConsumer` is `!Clone` and `!Sync` (single-threaded access)
528 /// - SPSC guarantees only one consumer thread
529 /// - Therefore, no concurrent calls to `register` or `wait_until` on `item_waiter`
530 pub async fn recv(&mut self) -> Result<T, PopError> {
531 // Fast path: try to receive immediately
532 match self.try_recv() {
533 Ok(value) => return Ok(value),
534 Err(PopError::Empty) | Err(PopError::Timeout) => {}
535 Err(PopError::Closed) => return Err(PopError::Closed),
536 }
537
538 let receiver = &self.receiver;
539 let shared = &self.shared;
540 unsafe {
541 shared
542 .wait_for_items(|| match receiver.try_pop() {
543 Some(value) => {
544 // Success! Notify waiting producers.
545 shared.notify_space();
546 Some(Ok(value))
547 }
548 None if receiver.is_closed() => Some(Err(PopError::Closed)),
549 None => None, // Still empty, keep waiting
550 })
551 .await
552 }
553 }
554
555 /// Receives up to `dst.len()` items.
556 ///
557 /// Makes progress whenever items are available, reading as many as possible
558 /// in each attempt. Returns when the buffer is full or the channel is closed.
559 /// Notifies producers after each batch read to free up space progressively.
560 ///
561 /// # Returns
562 ///
563 /// - `Ok(count)`: Number of items read (may be less than `dst.len()`)
564 /// - `Err(PopError::Closed)`: Channel closed and no items available
565 ///
566 /// # Efficiency
567 ///
568 /// - Amortizes notification overhead across batch (single notify per read batch)
569 /// - Allows progressive production (producer can send more while consumer processes)
570 pub async fn recv_batch(&mut self, dst: &mut [T]) -> Result<usize, PopError> {
571 if dst.is_empty() {
572 return Ok(0);
573 }
574
575 let receiver = &self.receiver;
576 let shared = &self.shared;
577 let mut filled = match receiver.try_pop_n(dst) {
578 Ok(count) => {
579 if count > 0 {
580 shared.notify_space();
581 }
582 count
583 }
584 Err(PopError::Empty) | Err(PopError::Timeout) => 0,
585 Err(PopError::Closed) => return Err(PopError::Closed),
586 };
587
588 if filled == dst.len() {
589 return Ok(filled);
590 }
591
592 unsafe {
593 shared
594 .wait_for_items(|| {
595 if filled == dst.len() {
596 return Some(Ok(filled));
597 }
598
599 match receiver.try_pop_n(&mut dst[filled..]) {
600 Ok(0) => {
601 if receiver.is_closed() {
602 Some(if filled > 0 {
603 Ok(filled)
604 } else {
605 Err(PopError::Closed)
606 })
607 } else {
608 None
609 }
610 }
611 Ok(count) => {
612 filled += count;
613 shared.notify_space();
614 if filled == dst.len() {
615 Some(Ok(filled))
616 } else {
617 None
618 }
619 }
620 Err(PopError::Empty) | Err(PopError::Timeout) => {
621 if receiver.is_closed() {
622 Some(if filled > 0 {
623 Ok(filled)
624 } else {
625 Err(PopError::Closed)
626 })
627 } else {
628 None
629 }
630 }
631 Err(PopError::Closed) => Some(if filled > 0 {
632 Ok(filled)
633 } else {
634 Err(PopError::Closed)
635 }),
636 }
637 })
638 .await
639 }
640 }
641}
642
643impl<T, const P: usize, const NUM_SEGS_P2: usize> Stream for AsyncSpscConsumer<T, P, NUM_SEGS_P2> {
644 type Item = T;
645
646 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
647 // Safety: AsyncSpscConsumer is not self-referential, so get_unchecked_mut is safe.
648 // The Pin guarantee gives us exclusive mutable access.
649 let this = unsafe { self.get_unchecked_mut() };
650
651 // Fast path: check if there's an item available
652 match this.try_recv() {
653 Ok(value) => Poll::Ready(Some(value)),
654 Err(PopError::Closed) => Poll::Ready(None),
655 Err(PopError::Empty) | Err(PopError::Timeout) => {
656 // No items available. Register waker to be notified when items arrive.
657 //
658 // Safety: This is safe because:
659 // - AsyncSpscConsumer is !Clone and !Sync (single-threaded access)
660 // - SPSC guarantees only one consumer thread
661 // - Therefore no concurrent calls to register_items or wait_for_items
662 unsafe {
663 this.shared.register_items(cx.waker());
664 }
665
666 // Double-check after registering to prevent missed wakeups.
667 //
668 // Race scenarios:
669 // 1. Producer sends item BEFORE register: double-check catches it → Ready
670 // 2. Producer sends item AFTER register: waker gets notified → will poll again
671 // 3. Producer sends item DURING register: DiatomicWaker state machine guarantees
672 // either we see the item here, or the producer sees our waker → no missed wakeup
673 match this.try_recv() {
674 Ok(value) => Poll::Ready(Some(value)),
675 Err(PopError::Closed) => Poll::Ready(None),
676 Err(PopError::Empty) | Err(PopError::Timeout) => Poll::Pending,
677 }
678 }
679 }
680 }
681}
682
683/// Blocking producer for SPSC queue.
684///
685/// This type provides blocking send operations that park the thread until space
686/// is available. It shares the same waker infrastructure as `AsyncSpscProducer`,
687/// allowing blocking and async operations to interoperate seamlessly.
688///
689/// # Interoperability
690///
691/// A `BlockingSpscProducer` can wake up an `AsyncSpscConsumer` and vice versa.
692/// This allows mixing blocking threads with async tasks in the same queue.
693///
694/// # Example
695///
696/// ```ignore
697/// // Create mixed queue: blocking producer, async consumer
698/// let (blocking_producer, async_consumer) = new_blocking_async_spsc(signal);
699///
700/// // Producer thread (blocking)
701/// std::thread::spawn(move || {
702/// blocking_producer.send(42).unwrap();
703/// });
704///
705/// // Consumer task (async)
706/// maniac::spawn(async move {
707/// let item = async_consumer.recv().await.unwrap();
708/// });
709/// ```
710pub struct BlockingSpscProducer<T, const P: usize, const NUM_SEGS_P2: usize> {
711 sender: crate::spsc::Sender<T, P, NUM_SEGS_P2, AsyncSignalGate>,
712 shared: Arc<AsyncSpscShared>,
713}
714
715impl<T, const P: usize, const NUM_SEGS_P2: usize> BlockingSpscProducer<T, P, NUM_SEGS_P2> {
716 fn new(
717 sender: crate::spsc::Sender<T, P, NUM_SEGS_P2, AsyncSignalGate>,
718 shared: Arc<AsyncSpscShared>,
719 ) -> Self {
720 Self { sender, shared }
721 }
722
723 /// Capacity of the underlying queue.
724 #[inline]
725 pub fn capacity(&self) -> usize {
726 crate::spsc::Spsc::<T, P, NUM_SEGS_P2, AsyncSignalGate>::capacity()
727 }
728
729 /// Fast-path send without blocking.
730 ///
731 /// Returns immediately with success or error. Does not block the thread.
732 #[inline]
733 pub fn try_send(&self, value: T) -> Result<(), PushError<T>> {
734 match self.sender.try_push(value) {
735 Ok(()) => {
736 self.shared.notify_items();
737 Ok(())
738 }
739 Err(err) => Err(err),
740 }
741 }
742
743 /// Blocking send that parks the thread until space is available.
744 ///
745 /// Uses efficient thread parking (no busy-waiting). The thread will be
746 /// unparked when the consumer (async or blocking) frees up space.
747 ///
748 /// # Correctness
749 ///
750 /// Uses the double-check pattern to prevent missed wakeups:
751 /// 1. Try to send
752 /// 2. Register waker if full
753 /// 3. Double-check after registering (catches races)
754 /// 4. Park if still full
755 ///
756 /// # Performance
757 ///
758 /// - Fast path (space available): ~5-15ns
759 /// - Blocking path: Efficient thread parking (no spinning)
760 pub fn send(&self, mut value: T) -> Result<(), PushError<T>> {
761 // Fast path: try immediate send
762 match self.try_send(value) {
763 Ok(()) => return Ok(()),
764 Err(PushError::Closed(item)) => return Err(PushError::Closed(item)),
765 Err(PushError::Full(item)) => value = item,
766 }
767
768 // Slow path: need to block
769 let parker = Parker::new();
770 let unparker = parker.unparker();
771 let waker = Waker::from(Arc::new(ThreadUnparker { unparker }));
772
773 loop {
774 // Register our waker
775 unsafe {
776 self.shared.register_space(&waker);
777 }
778
779 // Double-check after registering (prevent missed wakeup)
780 match self.sender.try_push(value) {
781 Ok(()) => {
782 self.shared.notify_items();
783 return Ok(());
784 }
785 Err(PushError::Full(item)) => {
786 value = item;
787 // Still full, park until woken
788 parker.park();
789 // Loop again after wakeup
790 }
791 Err(PushError::Closed(item)) => {
792 return Err(PushError::Closed(item));
793 }
794 }
795 }
796 }
797
798 /// Blocking send of a Vec.
799 ///
800 /// Makes progress whenever space is available. Items are moved out of the Vec
801 /// using move semantics. More efficient than calling `send()` in a loop due to
802 /// bulk operations.
803 ///
804 /// On return, the Vec will be empty if all items were sent, or contain only
805 /// the items that were not sent (if the channel closed).
806 pub fn send_slice(&self, values: &mut Vec<T>) -> Result<(), PushError<()>> {
807 if values.is_empty() {
808 return Ok(());
809 }
810
811 // Try immediate send of as much as possible
812 match self.sender.try_push_n(values) {
813 Ok(written) => {
814 if written > 0 {
815 self.shared.notify_items();
816 if values.is_empty() {
817 return Ok(());
818 }
819 }
820 }
821 Err(PushError::Closed(())) => return Err(PushError::Closed(())),
822 Err(PushError::Full(())) => {}
823 }
824
825 // Slow path: need to block
826 let parker = Parker::new();
827 let unparker = parker.unparker();
828 let waker = Waker::from(Arc::new(ThreadUnparker { unparker }));
829
830 loop {
831 // Register our waker
832 unsafe {
833 self.shared.register_space(&waker);
834 }
835
836 // Double-check and try to make progress
837 if values.is_empty() {
838 return Ok(());
839 }
840
841 match self.sender.try_push_n(values) {
842 Ok(written) => {
843 if written > 0 {
844 self.shared.notify_items();
845 if values.is_empty() {
846 return Ok(());
847 }
848 }
849 // Made progress but not done, park and try again
850 parker.park();
851 }
852 Err(PushError::Full(())) => {
853 // No progress, park until woken
854 parker.park();
855 }
856 Err(PushError::Closed(())) => {
857 return Err(PushError::Closed(()));
858 }
859 }
860 }
861 }
862
863 /// Closes the queue and wakes any waiters.
864 pub fn close(&mut self) {
865 self.sender.close_channel();
866 self.shared.notify_items();
867 self.shared.notify_space();
868 }
869}
870
871/// Blocking consumer for SPSC queue.
872///
873/// This type provides blocking receive operations that park the thread until
874/// items are available. It shares the same waker infrastructure as `AsyncSpscConsumer`,
875/// allowing blocking and async operations to interoperate seamlessly.
876///
877/// # Interoperability
878///
879/// A `BlockingSpscConsumer` can wake up an `AsyncSpscProducer` and vice versa.
880/// This allows mixing blocking threads with async tasks in the same queue.
881///
882/// # Example
883///
884/// ```ignore
885/// // Create mixed queue: async producer, blocking consumer
886/// let (async_producer, blocking_consumer) = new_async_blocking_spsc(signal);
887///
888/// // Producer task (async)
889/// maniac::spawn(async move {
890/// async_producer.send(42).await.unwrap();
891/// });
892///
893/// // Consumer thread (blocking)
894/// std::thread::spawn(move || {
895/// let item = blocking_consumer.recv().unwrap();
896/// });
897/// ```
898pub struct BlockingSpscConsumer<T, const P: usize, const NUM_SEGS_P2: usize> {
899 receiver: crate::spsc::Receiver<T, P, NUM_SEGS_P2, AsyncSignalGate>,
900 shared: Arc<AsyncSpscShared>,
901}
902
903impl<T, const P: usize, const NUM_SEGS_P2: usize> BlockingSpscConsumer<T, P, NUM_SEGS_P2> {
904 fn new(
905 receiver: crate::spsc::Receiver<T, P, NUM_SEGS_P2, AsyncSignalGate>,
906 shared: Arc<AsyncSpscShared>,
907 ) -> Self {
908 Self { receiver, shared }
909 }
910
911 /// Capacity of the underlying queue.
912 #[inline]
913 pub fn capacity(&self) -> usize {
914 crate::spsc::Spsc::<T, P, NUM_SEGS_P2, AsyncSignalGate>::capacity()
915 }
916
917 /// Fast-path receive without blocking.
918 ///
919 /// Returns immediately with success or error. Does not block the thread.
920 #[inline]
921 pub fn try_recv(&self) -> Result<T, PopError> {
922 match self.receiver.try_pop() {
923 Some(value) => {
924 self.shared.notify_space();
925 Ok(value)
926 }
927 None if self.receiver.is_closed() => Err(PopError::Closed),
928 None => Err(PopError::Empty),
929 }
930 }
931
932 /// Blocking receive that parks the thread until an item is available.
933 ///
934 /// Uses efficient thread parking (no busy-waiting). The thread will be
935 /// unparked when the producer (async or blocking) sends an item.
936 ///
937 /// # Correctness
938 ///
939 /// Uses the double-check pattern to prevent missed wakeups:
940 /// 1. Try to receive
941 /// 2. Register waker if empty
942 /// 3. Double-check after registering (catches races)
943 /// 4. Park if still empty
944 ///
945 /// # Performance
946 ///
947 /// - Fast path (item available): ~5-15ns
948 /// - Blocking path: Efficient thread parking (no spinning)
949 pub fn recv(&self) -> Result<T, PopError> {
950 // Fast path: try immediate receive
951 match self.try_recv() {
952 Ok(value) => return Ok(value),
953 Err(PopError::Closed) => return Err(PopError::Closed),
954 Err(PopError::Empty) | Err(PopError::Timeout) => {}
955 }
956
957 // Slow path: need to block
958 let parker = Parker::new();
959 let unparker = parker.unparker();
960 let waker = Waker::from(Arc::new(ThreadUnparker { unparker }));
961
962 loop {
963 // Register our waker
964 unsafe {
965 self.shared.register_items(&waker);
966 }
967
968 // Double-check after registering (prevent missed wakeup)
969 match self.receiver.try_pop() {
970 Some(value) => {
971 self.shared.notify_space();
972 return Ok(value);
973 }
974 None if self.receiver.is_closed() => {
975 return Err(PopError::Closed);
976 }
977 None => {
978 // Still empty, park until woken
979 parker.park();
980 // Loop again after wakeup
981 }
982 }
983 }
984 }
985
986 /// Blocking receive of multiple items.
987 ///
988 /// Receives up to `dst.len()` items, blocking until at least one is available.
989 /// Returns the number of items actually received.
990 pub fn recv_batch(&self, dst: &mut [T]) -> Result<usize, PopError> {
991 if dst.is_empty() {
992 return Ok(0);
993 }
994
995 let mut filled = match self.receiver.try_pop_n(dst) {
996 Ok(count) => {
997 if count > 0 {
998 self.shared.notify_space();
999 return Ok(count);
1000 }
1001 0
1002 }
1003 Err(PopError::Empty) | Err(PopError::Timeout) => 0,
1004 Err(PopError::Closed) => return Err(PopError::Closed),
1005 };
1006
1007 // Slow path: need to block
1008 let parker = Parker::new();
1009 let unparker = parker.unparker();
1010 let waker = Waker::from(Arc::new(ThreadUnparker { unparker }));
1011
1012 loop {
1013 // Register our waker
1014 unsafe {
1015 self.shared.register_items(&waker);
1016 }
1017
1018 // Double-check and try to make progress
1019 match self.receiver.try_pop_n(&mut dst[filled..]) {
1020 Ok(0) => {
1021 if self.receiver.is_closed() {
1022 return if filled > 0 {
1023 Ok(filled)
1024 } else {
1025 Err(PopError::Closed)
1026 };
1027 }
1028 // No items, park until woken
1029 parker.park();
1030 }
1031 Ok(count) => {
1032 filled += count;
1033 self.shared.notify_space();
1034 if filled == dst.len() || self.receiver.is_closed() {
1035 return Ok(filled);
1036 }
1037 // Got some but not all, park and try again
1038 parker.park();
1039 }
1040 Err(PopError::Empty) | Err(PopError::Timeout) => {
1041 if self.receiver.is_closed() {
1042 return if filled > 0 {
1043 Ok(filled)
1044 } else {
1045 Err(PopError::Closed)
1046 };
1047 }
1048 parker.park();
1049 }
1050 Err(PopError::Closed) => {
1051 return if filled > 0 {
1052 Ok(filled)
1053 } else {
1054 Err(PopError::Closed)
1055 };
1056 }
1057 }
1058 }
1059 }
1060}
1061
1062/// Creates a default async segmented SPSC queue.
1063pub fn new_async_spsc<T, const P: usize, const NUM_SEGS_P2: usize>(
1064 signal: AsyncSignalGate,
1065) -> (
1066 AsyncSpscProducer<T, P, NUM_SEGS_P2>,
1067 AsyncSpscConsumer<T, P, NUM_SEGS_P2>,
1068) {
1069 new_async_with_config::<T, P, NUM_SEGS_P2>(signal, usize::MAX)
1070}
1071
1072/// Creates an async segmented SPSC queue with a custom pooling target.
1073pub fn new_async_with_config<T, const P: usize, const NUM_SEGS_P2: usize>(
1074 signal: AsyncSignalGate,
1075 max_pooled_segments: usize,
1076) -> (
1077 AsyncSpscProducer<T, P, NUM_SEGS_P2>,
1078 AsyncSpscConsumer<T, P, NUM_SEGS_P2>,
1079) {
1080 let shared = Arc::new(AsyncSpscShared::new());
1081 let (sender, receiver) =
1082 crate::spsc::Spsc::<T, P, NUM_SEGS_P2, AsyncSignalGate>::new_with_gate_and_config(
1083 signal,
1084 max_pooled_segments,
1085 );
1086 (
1087 AsyncSpscProducer::new(sender, Arc::clone(&shared)),
1088 AsyncSpscConsumer::new(receiver, shared),
1089 )
1090}
1091
1092/// Creates a default blocking segmented SPSC queue.
1093///
1094/// Both producer and consumer use blocking operations that park the thread.
1095///
1096/// # Example
1097///
1098/// ```ignore
1099/// let (producer, consumer) = new_blocking_spsc(signal);
1100///
1101/// // Producer thread
1102/// std::thread::spawn(move || {
1103/// producer.send(42).unwrap();
1104/// });
1105///
1106/// // Consumer thread
1107/// std::thread::spawn(move || {
1108/// let item = consumer.recv().unwrap();
1109/// });
1110/// ```
1111pub fn new_blocking_spsc<T, const P: usize, const NUM_SEGS_P2: usize>(
1112 signal: AsyncSignalGate,
1113) -> (
1114 BlockingSpscProducer<T, P, NUM_SEGS_P2>,
1115 BlockingSpscConsumer<T, P, NUM_SEGS_P2>,
1116) {
1117 new_blocking_with_config::<T, P, NUM_SEGS_P2>(signal, usize::MAX)
1118}
1119
1120/// Creates a blocking segmented SPSC queue with a custom pooling target.
1121pub fn new_blocking_with_config<T, const P: usize, const NUM_SEGS_P2: usize>(
1122 signal: AsyncSignalGate,
1123 max_pooled_segments: usize,
1124) -> (
1125 BlockingSpscProducer<T, P, NUM_SEGS_P2>,
1126 BlockingSpscConsumer<T, P, NUM_SEGS_P2>,
1127) {
1128 let shared = Arc::new(AsyncSpscShared::new());
1129 let (sender, receiver) =
1130 crate::spsc::Spsc::<T, P, NUM_SEGS_P2, AsyncSignalGate>::new_with_gate_and_config(
1131 signal,
1132 max_pooled_segments,
1133 );
1134 (
1135 BlockingSpscProducer::new(sender, Arc::clone(&shared)),
1136 BlockingSpscConsumer::new(receiver, shared),
1137 )
1138}
1139
1140/// Creates a mixed SPSC queue with blocking producer and async consumer.
1141///
1142/// The blocking producer and async consumer share the same waker infrastructure,
1143/// so they can wake each other efficiently. This is useful when you have a
1144/// blocking thread that needs to send data to an async task.
1145///
1146/// # Example
1147///
1148/// ```ignore
1149/// let (producer, consumer) = new_blocking_async_spsc(signal);
1150///
1151/// // Producer thread (blocking)
1152/// std::thread::spawn(move || {
1153/// producer.send(42).unwrap();
1154/// producer.send(43).unwrap();
1155/// });
1156///
1157/// // Consumer task (async)
1158/// maniac::spawn(async move {
1159/// while let Some(item) = consumer.next().await {
1160/// println!("Got: {}", item);
1161/// }
1162/// });
1163/// ```
1164pub fn new_blocking_async_spsc<T, const P: usize, const NUM_SEGS_P2: usize>(
1165 signal: AsyncSignalGate,
1166) -> (
1167 BlockingSpscProducer<T, P, NUM_SEGS_P2>,
1168 AsyncSpscConsumer<T, P, NUM_SEGS_P2>,
1169) {
1170 new_blocking_async_with_config::<T, P, NUM_SEGS_P2>(signal, usize::MAX)
1171}
1172
1173/// Creates a mixed SPSC queue with blocking producer and async consumer, with custom pooling.
1174pub fn new_blocking_async_with_config<T, const P: usize, const NUM_SEGS_P2: usize>(
1175 signal: AsyncSignalGate,
1176 max_pooled_segments: usize,
1177) -> (
1178 BlockingSpscProducer<T, P, NUM_SEGS_P2>,
1179 AsyncSpscConsumer<T, P, NUM_SEGS_P2>,
1180) {
1181 let shared = Arc::new(AsyncSpscShared::new());
1182 let (sender, receiver) =
1183 crate::spsc::Spsc::<T, P, NUM_SEGS_P2, AsyncSignalGate>::new_with_gate_and_config(
1184 signal,
1185 max_pooled_segments,
1186 );
1187 (
1188 BlockingSpscProducer::new(sender, Arc::clone(&shared)),
1189 AsyncSpscConsumer::new(receiver, shared),
1190 )
1191}
1192
1193/// Creates a mixed SPSC queue with async producer and blocking consumer.
1194///
1195/// The async producer and blocking consumer share the same waker infrastructure,
1196/// so they can wake each other efficiently. This is useful when you have an
1197/// async task that needs to send data to a blocking thread.
1198///
1199/// # Example
1200///
1201/// ```ignore
1202/// let (producer, consumer) = new_async_blocking_spsc(signal);
1203///
1204/// // Producer task (async)
1205/// maniac::spawn(async move {
1206/// producer.send(42).await.unwrap();
1207/// producer.send(43).await.unwrap();
1208/// });
1209///
1210/// // Consumer thread (blocking)
1211/// std::thread::spawn(move || {
1212/// while let Ok(item) = consumer.recv() {
1213/// println!("Got: {}", item);
1214/// }
1215/// });
1216/// ```
1217pub fn new_async_blocking_spsc<T, const P: usize, const NUM_SEGS_P2: usize>(
1218 signal: AsyncSignalGate,
1219) -> (
1220 AsyncSpscProducer<T, P, NUM_SEGS_P2>,
1221 BlockingSpscConsumer<T, P, NUM_SEGS_P2>,
1222) {
1223 new_async_blocking_with_config::<T, P, NUM_SEGS_P2>(signal, usize::MAX)
1224}
1225
1226/// Creates a mixed SPSC queue with async producer and blocking consumer, with custom pooling.
1227pub fn new_async_blocking_with_config<T, const P: usize, const NUM_SEGS_P2: usize>(
1228 signal: AsyncSignalGate,
1229 max_pooled_segments: usize,
1230) -> (
1231 AsyncSpscProducer<T, P, NUM_SEGS_P2>,
1232 BlockingSpscConsumer<T, P, NUM_SEGS_P2>,
1233) {
1234 let shared = Arc::new(AsyncSpscShared::new());
1235 let (sender, receiver) =
1236 crate::spsc::Spsc::<T, P, NUM_SEGS_P2, AsyncSignalGate>::new_with_gate_and_config(
1237 signal,
1238 max_pooled_segments,
1239 );
1240 (
1241 AsyncSpscProducer::new(sender, Arc::clone(&shared)),
1242 BlockingSpscConsumer::new(receiver, shared),
1243 )
1244}
1245
1246// ══════════════════════════════════════════════════════════════════════════════
1247// Unbounded SPSC Variants
1248// ══════════════════════════════════════════════════════════════════════════════
1249
1250/// Asynchronous producer for unbounded SPSC queue.
1251///
1252/// This type provides async send operations for an unbounded queue that automatically
1253/// expands by creating new segments as needed. Never blocks on full queue since the
1254/// queue grows dynamically.
1255pub struct AsyncUnboundedSpscProducer<T, const P: usize, const NUM_SEGS_P2: usize> {
1256 sender: crate::spsc::UnboundedSender<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>,
1257 shared: Arc<AsyncSpscShared>,
1258}
1259
1260impl<T, const P: usize, const NUM_SEGS_P2: usize> AsyncUnboundedSpscProducer<T, P, NUM_SEGS_P2> {
1261 fn new(
1262 sender: crate::spsc::UnboundedSender<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>,
1263 shared: Arc<AsyncSpscShared>,
1264 ) -> Self {
1265 Self { sender, shared }
1266 }
1267
1268 /// Fast-path send without suspension.
1269 ///
1270 /// For unbounded queues, this always succeeds unless the channel is closed.
1271 #[inline]
1272 pub fn try_send(&self, value: T) -> Result<(), PushError<T>> {
1273 match self.sender.try_push(value) {
1274 Ok(()) => {
1275 self.shared.notify_items();
1276 Ok(())
1277 }
1278 Err(err) => Err(err),
1279 }
1280 }
1281
1282 /// Asynchronously sends a single item.
1283 ///
1284 /// For unbounded queues, this typically completes immediately unless the channel
1285 /// is closed. The async interface is provided for API consistency.
1286 pub async fn send(&mut self, value: T) -> Result<(), PushError<T>> {
1287 self.try_send(value)
1288 }
1289
1290 /// Sends an entire Vec, moving items out using bulk operations.
1291 ///
1292 /// On return, the Vec will be empty if all items were sent, or contain only
1293 /// the items that were not sent (if the channel closed).
1294 pub async fn send_batch(&mut self, values: &mut Vec<T>) -> Result<(), PushError<()>> {
1295 if values.is_empty() {
1296 return Ok(());
1297 }
1298
1299 match self.sender.try_push_n(values) {
1300 Ok(_) => {
1301 self.shared.notify_items();
1302 Ok(())
1303 }
1304 Err(err) => Err(err),
1305 }
1306 }
1307
1308 /// Sends every item from the iterator.
1309 pub async fn send_iter<I>(&mut self, iter: I) -> Result<(), PushError<T>>
1310 where
1311 I: IntoIterator<Item = T>,
1312 {
1313 for item in iter {
1314 self.send(item).await?;
1315 }
1316 Ok(())
1317 }
1318
1319 /// Closes the queue and wakes any waiters.
1320 pub fn close(&mut self) {
1321 self.sender.close_channel();
1322 self.shared.notify_items();
1323 self.shared.notify_space();
1324 }
1325
1326 /// Returns the number of nodes in the unbounded queue.
1327 pub fn node_count(&self) -> usize {
1328 self.sender.node_count()
1329 }
1330
1331 /// Returns the total capacity across all nodes.
1332 pub fn total_capacity(&self) -> usize {
1333 self.sender.total_capacity()
1334 }
1335}
1336
1337impl<T, const P: usize, const NUM_SEGS_P2: usize> Sink<T> for AsyncUnboundedSpscProducer<T, P, NUM_SEGS_P2> {
1338 type Error = PushError<T>;
1339
1340 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), PushError<T>>> {
1341 // Unbounded queue is always ready (unless closed)
1342 Poll::Ready(Ok(()))
1343 }
1344
1345 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), PushError<T>> {
1346 let this = unsafe { self.get_unchecked_mut() };
1347 this.try_send(item)
1348 }
1349
1350 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), PushError<T>>> {
1351 Poll::Ready(Ok(()))
1352 }
1353
1354 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), PushError<T>>> {
1355 let this = unsafe { self.get_unchecked_mut() };
1356 this.close();
1357 Poll::Ready(Ok(()))
1358 }
1359}
1360
1361/// Asynchronous consumer for unbounded SPSC queue.
1362pub struct AsyncUnboundedSpscConsumer<T, const P: usize, const NUM_SEGS_P2: usize> {
1363 receiver: crate::spsc::UnboundedReceiver<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>,
1364 shared: Arc<AsyncSpscShared>,
1365}
1366
1367impl<T, const P: usize, const NUM_SEGS_P2: usize> AsyncUnboundedSpscConsumer<T, P, NUM_SEGS_P2> {
1368 fn new(
1369 receiver: crate::spsc::UnboundedReceiver<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>,
1370 shared: Arc<AsyncSpscShared>,
1371 ) -> Self {
1372 Self { receiver, shared }
1373 }
1374
1375 /// Attempts to receive without awaiting.
1376 #[inline]
1377 pub fn try_recv(&self) -> Result<T, PopError> {
1378 match self.receiver.try_pop() {
1379 Some(value) => {
1380 self.shared.notify_space();
1381 Ok(value)
1382 }
1383 None if self.receiver.is_closed() => Err(PopError::Closed),
1384 None => Err(PopError::Empty),
1385 }
1386 }
1387
1388 /// Asynchronously receives a single item.
1389 pub async fn recv(&mut self) -> Result<T, PopError> {
1390 match self.try_recv() {
1391 Ok(value) => return Ok(value),
1392 Err(PopError::Empty) | Err(PopError::Timeout) => {}
1393 Err(PopError::Closed) => return Err(PopError::Closed),
1394 }
1395
1396 let receiver = &self.receiver;
1397 let shared = &self.shared;
1398 unsafe {
1399 shared
1400 .wait_for_items(|| match receiver.try_pop() {
1401 Some(value) => {
1402 shared.notify_space();
1403 Some(Ok(value))
1404 }
1405 None if receiver.is_closed() => Some(Err(PopError::Closed)),
1406 None => None,
1407 })
1408 .await
1409 }
1410 }
1411
1412 /// Receives up to `dst.len()` items.
1413 pub async fn recv_batch(&mut self, dst: &mut [T]) -> Result<usize, PopError>
1414 where
1415 T: Clone,
1416 {
1417 if dst.is_empty() {
1418 return Ok(0);
1419 }
1420
1421 let receiver = &self.receiver;
1422 let shared = &self.shared;
1423 let mut filled = match receiver.try_pop_n(dst) {
1424 Ok(count) => {
1425 if count > 0 {
1426 shared.notify_space();
1427 }
1428 count
1429 }
1430 Err(PopError::Empty) | Err(PopError::Timeout) => 0,
1431 Err(PopError::Closed) => return Err(PopError::Closed),
1432 };
1433
1434 if filled == dst.len() {
1435 return Ok(filled);
1436 }
1437
1438 unsafe {
1439 shared
1440 .wait_for_items(|| {
1441 if filled == dst.len() {
1442 return Some(Ok(filled));
1443 }
1444
1445 match receiver.try_pop_n(&mut dst[filled..]) {
1446 Ok(0) => {
1447 if receiver.is_closed() {
1448 Some(if filled > 0 {
1449 Ok(filled)
1450 } else {
1451 Err(PopError::Closed)
1452 })
1453 } else {
1454 None
1455 }
1456 }
1457 Ok(count) => {
1458 filled += count;
1459 shared.notify_space();
1460 if filled == dst.len() {
1461 Some(Ok(filled))
1462 } else {
1463 None
1464 }
1465 }
1466 Err(PopError::Empty) | Err(PopError::Timeout) => {
1467 if receiver.is_closed() {
1468 Some(if filled > 0 {
1469 Ok(filled)
1470 } else {
1471 Err(PopError::Closed)
1472 })
1473 } else {
1474 None
1475 }
1476 }
1477 Err(PopError::Closed) => Some(if filled > 0 {
1478 Ok(filled)
1479 } else {
1480 Err(PopError::Closed)
1481 }),
1482 }
1483 })
1484 .await
1485 }
1486 }
1487
1488 /// Returns the number of nodes in the unbounded queue.
1489 pub fn node_count(&self) -> usize {
1490 self.receiver.node_count()
1491 }
1492
1493 /// Returns the total capacity across all nodes.
1494 pub fn total_capacity(&self) -> usize {
1495 self.receiver.total_capacity()
1496 }
1497}
1498
1499impl<T, const P: usize, const NUM_SEGS_P2: usize> Stream for AsyncUnboundedSpscConsumer<T, P, NUM_SEGS_P2> {
1500 type Item = T;
1501
1502 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1503 let this = unsafe { self.get_unchecked_mut() };
1504
1505 match this.try_recv() {
1506 Ok(value) => Poll::Ready(Some(value)),
1507 Err(PopError::Closed) => Poll::Ready(None),
1508 Err(PopError::Empty) | Err(PopError::Timeout) => {
1509 unsafe {
1510 this.shared.register_items(cx.waker());
1511 }
1512
1513 match this.try_recv() {
1514 Ok(value) => Poll::Ready(Some(value)),
1515 Err(PopError::Closed) => Poll::Ready(None),
1516 Err(PopError::Empty) | Err(PopError::Timeout) => Poll::Pending,
1517 }
1518 }
1519 }
1520 }
1521}
1522
1523/// Blocking producer for unbounded SPSC queue.
1524pub struct BlockingUnboundedSpscProducer<T, const P: usize, const NUM_SEGS_P2: usize> {
1525 sender: crate::spsc::UnboundedSender<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>,
1526 shared: Arc<AsyncSpscShared>,
1527}
1528
1529impl<T, const P: usize, const NUM_SEGS_P2: usize> BlockingUnboundedSpscProducer<T, P, NUM_SEGS_P2> {
1530 fn new(
1531 sender: crate::spsc::UnboundedSender<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>,
1532 shared: Arc<AsyncSpscShared>,
1533 ) -> Self {
1534 Self { sender, shared }
1535 }
1536
1537 /// Fast-path send without blocking.
1538 ///
1539 /// For unbounded queues, this always succeeds unless the channel is closed.
1540 #[inline]
1541 pub fn try_send(&self, value: T) -> Result<(), PushError<T>> {
1542 match self.sender.try_push(value) {
1543 Ok(()) => {
1544 self.shared.notify_items();
1545 Ok(())
1546 }
1547 Err(err) => Err(err),
1548 }
1549 }
1550
1551 /// Sends a single item.
1552 ///
1553 /// For unbounded queues, this never blocks since the queue grows dynamically.
1554 pub fn send(&self, value: T) -> Result<(), PushError<T>> {
1555 self.try_send(value)
1556 }
1557
1558 /// Sends a Vec of items using bulk operations.
1559 ///
1560 /// On return, the Vec will be empty if all items were sent, or contain only
1561 /// the items that were not sent (if the channel closed).
1562 pub fn send_slice(&self, values: &mut Vec<T>) -> Result<(), PushError<()>> {
1563 if values.is_empty() {
1564 return Ok(());
1565 }
1566
1567 match self.sender.try_push_n(values) {
1568 Ok(_) => {
1569 self.shared.notify_items();
1570 Ok(())
1571 }
1572 Err(err) => Err(err),
1573 }
1574 }
1575
1576 /// Closes the queue and wakes any waiters.
1577 pub fn close(&mut self) {
1578 self.sender.close_channel();
1579 self.shared.notify_items();
1580 self.shared.notify_space();
1581 }
1582
1583 /// Returns the number of nodes in the unbounded queue.
1584 pub fn node_count(&self) -> usize {
1585 self.sender.node_count()
1586 }
1587
1588 /// Returns the total capacity across all nodes.
1589 pub fn total_capacity(&self) -> usize {
1590 self.sender.total_capacity()
1591 }
1592}
1593
1594/// Blocking consumer for unbounded SPSC queue.
1595pub struct BlockingUnboundedSpscConsumer<T, const P: usize, const NUM_SEGS_P2: usize> {
1596 receiver: crate::spsc::UnboundedReceiver<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>,
1597 shared: Arc<AsyncSpscShared>,
1598}
1599
1600impl<T, const P: usize, const NUM_SEGS_P2: usize> BlockingUnboundedSpscConsumer<T, P, NUM_SEGS_P2> {
1601 fn new(
1602 receiver: crate::spsc::UnboundedReceiver<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>,
1603 shared: Arc<AsyncSpscShared>,
1604 ) -> Self {
1605 Self { receiver, shared }
1606 }
1607
1608 /// Fast-path receive without blocking.
1609 #[inline]
1610 pub fn try_recv(&self) -> Result<T, PopError> {
1611 match self.receiver.try_pop() {
1612 Some(value) => {
1613 self.shared.notify_space();
1614 Ok(value)
1615 }
1616 None if self.receiver.is_closed() => Err(PopError::Closed),
1617 None => Err(PopError::Empty),
1618 }
1619 }
1620
1621 /// Blocking receive that parks the thread until an item is available.
1622 pub fn recv(&self) -> Result<T, PopError> {
1623 match self.try_recv() {
1624 Ok(value) => return Ok(value),
1625 Err(PopError::Closed) => return Err(PopError::Closed),
1626 Err(PopError::Empty) | Err(PopError::Timeout) => {}
1627 }
1628
1629 let parker = Parker::new();
1630 let unparker = parker.unparker();
1631 let waker = Waker::from(Arc::new(ThreadUnparker { unparker }));
1632
1633 loop {
1634 unsafe {
1635 self.shared.register_items(&waker);
1636 }
1637
1638 match self.receiver.try_pop() {
1639 Some(value) => {
1640 self.shared.notify_space();
1641 return Ok(value);
1642 }
1643 None if self.receiver.is_closed() => {
1644 return Err(PopError::Closed);
1645 }
1646 None => {
1647 parker.park();
1648 }
1649 }
1650 }
1651 }
1652
1653 /// Blocking receive of multiple items.
1654 pub fn recv_batch(&self, dst: &mut [T]) -> Result<usize, PopError>
1655 where
1656 T: Clone,
1657 {
1658 if dst.is_empty() {
1659 return Ok(0);
1660 }
1661
1662 let mut filled = match self.receiver.try_pop_n(dst) {
1663 Ok(count) => {
1664 if count > 0 {
1665 self.shared.notify_space();
1666 return Ok(count);
1667 }
1668 0
1669 }
1670 Err(PopError::Empty) | Err(PopError::Timeout) => 0,
1671 Err(PopError::Closed) => return Err(PopError::Closed),
1672 };
1673
1674 let parker = Parker::new();
1675 let unparker = parker.unparker();
1676 let waker = Waker::from(Arc::new(ThreadUnparker { unparker }));
1677
1678 loop {
1679 unsafe {
1680 self.shared.register_items(&waker);
1681 }
1682
1683 match self.receiver.try_pop_n(&mut dst[filled..]) {
1684 Ok(0) => {
1685 if self.receiver.is_closed() {
1686 return if filled > 0 {
1687 Ok(filled)
1688 } else {
1689 Err(PopError::Closed)
1690 };
1691 }
1692 parker.park();
1693 }
1694 Ok(count) => {
1695 filled += count;
1696 self.shared.notify_space();
1697 if filled == dst.len() || self.receiver.is_closed() {
1698 return Ok(filled);
1699 }
1700 parker.park();
1701 }
1702 Err(PopError::Empty) | Err(PopError::Timeout) => {
1703 if self.receiver.is_closed() {
1704 return if filled > 0 {
1705 Ok(filled)
1706 } else {
1707 Err(PopError::Closed)
1708 };
1709 }
1710 parker.park();
1711 }
1712 Err(PopError::Closed) => {
1713 return if filled > 0 {
1714 Ok(filled)
1715 } else {
1716 Err(PopError::Closed)
1717 };
1718 }
1719 }
1720 }
1721 }
1722
1723 /// Returns the number of nodes in the unbounded queue.
1724 pub fn node_count(&self) -> usize {
1725 self.receiver.node_count()
1726 }
1727
1728 /// Returns the total capacity across all nodes.
1729 pub fn total_capacity(&self) -> usize {
1730 self.receiver.total_capacity()
1731 }
1732}
1733
1734/// Creates a default async unbounded SPSC queue.
1735///
1736/// The queue automatically grows by creating new segments as needed, so it never
1737/// blocks on full. This is ideal for scenarios where you want to avoid backpressure.
1738///
1739/// # Example
1740///
1741/// ```ignore
1742/// let (mut producer, mut consumer) = new_async_unbounded_spsc(signal);
1743///
1744/// // Producer never blocks on full
1745/// producer.send(42).await.unwrap();
1746/// producer.send(43).await.unwrap();
1747///
1748/// // Consumer receives items
1749/// assert_eq!(consumer.recv().await.unwrap(), 42);
1750/// ```
1751pub fn new_async_unbounded_spsc<T, const P: usize, const NUM_SEGS_P2: usize>(
1752 signal: AsyncSignalGate,
1753) -> (
1754 AsyncUnboundedSpscProducer<T, P, NUM_SEGS_P2>,
1755 AsyncUnboundedSpscConsumer<T, P, NUM_SEGS_P2>,
1756) {
1757 let shared = Arc::new(AsyncSpscShared::new());
1758 let signal_arc = Arc::new(signal);
1759 let (sender, receiver) =
1760 crate::spsc::UnboundedSpsc::<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>::new_with_signal(signal_arc);
1761 (
1762 AsyncUnboundedSpscProducer::new(sender, Arc::clone(&shared)),
1763 AsyncUnboundedSpscConsumer::new(receiver, shared),
1764 )
1765}
1766
1767/// Creates a default blocking unbounded SPSC queue.
1768///
1769/// The queue automatically grows by creating new segments as needed, so the producer
1770/// never blocks on full. The consumer blocks when empty.
1771///
1772/// # Example
1773///
1774/// ```ignore
1775/// let (producer, consumer) = new_blocking_unbounded_spsc(signal);
1776///
1777/// // Producer thread
1778/// std::thread::spawn(move || {
1779/// producer.send(42).unwrap(); // Never blocks on full
1780/// });
1781///
1782/// // Consumer thread
1783/// std::thread::spawn(move || {
1784/// let item = consumer.recv().unwrap(); // Blocks until item available
1785/// });
1786/// ```
1787pub fn new_blocking_unbounded_spsc<T, const P: usize, const NUM_SEGS_P2: usize>(
1788 signal: AsyncSignalGate,
1789) -> (
1790 BlockingUnboundedSpscProducer<T, P, NUM_SEGS_P2>,
1791 BlockingUnboundedSpscConsumer<T, P, NUM_SEGS_P2>,
1792) {
1793 let shared = Arc::new(AsyncSpscShared::new());
1794 let signal_arc = Arc::new(signal);
1795 let (sender, receiver) =
1796 crate::spsc::UnboundedSpsc::<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>::new_with_signal(signal_arc);
1797 (
1798 BlockingUnboundedSpscProducer::new(sender, Arc::clone(&shared)),
1799 BlockingUnboundedSpscConsumer::new(receiver, shared),
1800 )
1801}
1802
1803/// Creates a mixed unbounded SPSC queue with blocking producer and async consumer.
1804///
1805/// # Example
1806///
1807/// ```ignore
1808/// let (producer, consumer) = new_blocking_async_unbounded_spsc(signal);
1809///
1810/// // Producer thread (blocking)
1811/// std::thread::spawn(move || {
1812/// producer.send(42).unwrap(); // Never blocks on full
1813/// });
1814///
1815/// // Consumer task (async)
1816/// maniac::spawn(async move {
1817/// let item = consumer.recv().await.unwrap();
1818/// });
1819/// ```
1820pub fn new_blocking_async_unbounded_spsc<T, const P: usize, const NUM_SEGS_P2: usize>(
1821 signal: AsyncSignalGate,
1822) -> (
1823 BlockingUnboundedSpscProducer<T, P, NUM_SEGS_P2>,
1824 AsyncUnboundedSpscConsumer<T, P, NUM_SEGS_P2>,
1825) {
1826 let shared = Arc::new(AsyncSpscShared::new());
1827 let signal_arc = Arc::new(signal);
1828 let (sender, receiver) =
1829 crate::spsc::UnboundedSpsc::<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>::new_with_signal(signal_arc);
1830 (
1831 BlockingUnboundedSpscProducer::new(sender, Arc::clone(&shared)),
1832 AsyncUnboundedSpscConsumer::new(receiver, shared),
1833 )
1834}
1835
1836/// Creates a mixed unbounded SPSC queue with async producer and blocking consumer.
1837///
1838/// # Example
1839///
1840/// ```ignore
1841/// let (producer, consumer) = new_async_blocking_unbounded_spsc(signal);
1842///
1843/// // Producer task (async)
1844/// maniac::spawn(async move {
1845/// producer.send(42).await.unwrap(); // Never blocks on full
1846/// });
1847///
1848/// // Consumer thread (blocking)
1849/// std::thread::spawn(move || {
1850/// let item = consumer.recv().unwrap();
1851/// });
1852/// ```
1853pub fn new_async_blocking_unbounded_spsc<T, const P: usize, const NUM_SEGS_P2: usize>(
1854 signal: AsyncSignalGate,
1855) -> (
1856 AsyncUnboundedSpscProducer<T, P, NUM_SEGS_P2>,
1857 BlockingUnboundedSpscConsumer<T, P, NUM_SEGS_P2>,
1858) {
1859 let shared = Arc::new(AsyncSpscShared::new());
1860 let signal_arc = Arc::new(signal);
1861 let (sender, receiver) =
1862 crate::spsc::UnboundedSpsc::<T, P, NUM_SEGS_P2, Arc<AsyncSignalGate>>::new_with_signal(signal_arc);
1863 (
1864 AsyncUnboundedSpscProducer::new(sender, Arc::clone(&shared)),
1865 BlockingUnboundedSpscConsumer::new(receiver, shared),
1866 )
1867}