1use super::signal::AsyncSignalGate;
16use super::signal::AsyncSignalWaker;
17use crate::spsc::Spsc;
18use crate::spsc::UnboundedSpsc;
19use crate::sync::signal::Signal;
20use crate::utils::bits::find_nearest;
21use crate::CachePadded;
22use crate::{PopError, PushError};
23use std::marker::PhantomData;
24use std::mem::{ManuallyDrop, MaybeUninit};
25use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering};
26use std::sync::Arc;
27use std::thread;
28
29use std::pin::Pin;
30use std::task::{Context, Poll, Waker};
31
32use futures::{sink::Sink, stream::Stream};
33
34use crate::future::waker::{DiatomicWaker, WaitUntil};
35use crate::parking::{Parker, Unparker};
36
37use rand::RngCore;
38
39use std::ptr::{self, NonNull};
40
41struct ThreadUnparker {
46 unparker: Unparker,
47}
48
49impl std::task::Wake for ThreadUnparker {
50 fn wake(self: Arc<Self>) {
51 self.unparker.unpark();
52 }
53
54 fn wake_by_ref(self: &Arc<Self>) {
55 self.unparker.unpark();
56 }
57}
58
59pub fn new<T, const P: usize, const NUM_SEGS_P2: usize>() -> Receiver<T, P, NUM_SEGS_P2> {
61 new_with_waker(Arc::new(AsyncSignalWaker::new()))
62}
63
64pub fn new_with_waker<T, const P: usize, const NUM_SEGS_P2: usize>(
84 waker: Arc<AsyncSignalWaker>,
85) -> Receiver<T, P, NUM_SEGS_P2> {
86 let mut queues = Vec::with_capacity(MAX_QUEUES);
88 for _ in 0..MAX_QUEUES {
89 queues.push(AtomicPtr::new(core::ptr::null_mut()));
90 }
91
92 let signals: Arc<[Signal; SIGNAL_WORDS]> =
93 Arc::new(std::array::from_fn(|i| Signal::with_index(i as u64)));
94
95 let inner = Arc::new(Inner {
96 queues: queues.into_boxed_slice(),
97 queue_count: CachePadded::new(AtomicUsize::new(0)),
98 producer_count: CachePadded::new(AtomicUsize::new(0)),
99 max_producer_id: AtomicUsize::new(0),
100 closed: CachePadded::new(AtomicBool::new(false)),
101 summary: waker,
102 signals,
103 });
104
105 Receiver {
106 inner,
107 misses: 0,
108 seed: rand::rng().next_u64(),
109 }
110}
111
112pub fn new_with_sender<T, const P: usize, const NUM_SEGS_P2: usize>()
113-> (Sender<T, P, NUM_SEGS_P2>, Receiver<T, P, NUM_SEGS_P2>) {
114 let waker = Arc::new(AsyncSignalWaker::new());
115 let mut queues = Vec::with_capacity(MAX_QUEUES);
117 for _ in 0..MAX_QUEUES {
118 queues.push(AtomicPtr::new(core::ptr::null_mut()));
119 }
120
121 let signals: Arc<[Signal; SIGNAL_WORDS]> =
122 Arc::new(std::array::from_fn(|i| Signal::with_index(i as u64)));
123
124 let inner = Arc::new(Inner {
125 queues: queues.into_boxed_slice(),
126 queue_count: CachePadded::new(AtomicUsize::new(0)),
127 producer_count: CachePadded::new(AtomicUsize::new(0)),
128 max_producer_id: AtomicUsize::new(0),
129 closed: CachePadded::new(AtomicBool::new(false)),
130 summary: waker,
131 signals,
132 });
133
134 (
135 inner
136 .create_sender()
137 .expect("fatal: mpsc won't allow even 1 sender"),
138 Receiver {
139 inner,
140 misses: 0,
141 seed: rand::rng().next_u64(),
142 },
143 )
144}
145
146const RND_MULTIPLIER: u64 = 0x5DEECE66D;
147const RND_ADDEND: u64 = 0xB;
148const RND_MASK: u64 = (1 << 48) - 1;
149
150const MAX_QUEUES: usize = 64 * 64;
152const QUEUES_PER_PRODUCER: usize = 1;
153const MAX_PRODUCERS: usize = MAX_QUEUES / QUEUES_PER_PRODUCER;
154const MAX_PRODUCERS_MASK: usize = QUEUES_PER_PRODUCER - 1;
155
156const SIGNAL_WORDS: usize = 64;
158
159type CloseFn = Box<dyn FnOnce()>;
162
163struct ProducerSlot<T, const P: usize, const NUM_SEGS_P2: usize> {
166 queue: Spsc<T, P, NUM_SEGS_P2, AsyncSignalGate>,
167 space_waker: DiatomicWaker,
168}
169
170struct Inner<T, const P: usize, const NUM_SEGS_P2: usize> {
182 queues: Box<[AtomicPtr<ProducerSlot<T, P, NUM_SEGS_P2>>]>,
185 queue_count: CachePadded<AtomicUsize>,
186 producer_count: CachePadded<AtomicUsize>,
188 max_producer_id: AtomicUsize,
189 closed: CachePadded<AtomicBool>,
191 summary: Arc<AsyncSignalWaker>,
192 signals: Arc<[Signal; SIGNAL_WORDS]>,
194}
195
196impl<T, const P: usize, const NUM_SEGS_P2: usize> Inner<T, P, NUM_SEGS_P2> {
197 pub fn is_closed(&self) -> bool {
199 self.closed.load(Ordering::Acquire)
200 }
201
202 pub fn producer_count(&self) -> usize {
204 self.producer_count.load(Ordering::Relaxed)
205 }
206
207 pub fn create_sender(self: &Arc<Self>) -> Result<Sender<T, P, NUM_SEGS_P2>, PushError<()>> {
208 self.create_sender_with_config(0)
209 }
210
211 pub fn create_sender_with_config(
242 self: &Arc<Self>,
243 max_pooled_segments: usize,
244 ) -> Result<Sender<T, P, NUM_SEGS_P2>, PushError<()>> {
245 if self.is_closed() {
246 return Err(PushError::Closed(()));
247 }
248
249 loop {
250 let current = self.producer_count.load(Ordering::Acquire);
251 if current >= MAX_PRODUCERS {
252 return Err(PushError::Full(()));
253 }
254 if self
255 .producer_count
256 .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
257 .is_ok()
258 {
259 break;
260 }
261 }
262
263 let mut assigned_id = None;
264 let mut slot_arc: Option<Arc<ProducerSlot<T, P, NUM_SEGS_P2>>> = None;
265
266 for signal_index in 0..SIGNAL_WORDS {
267 for bit_index in 0..64 {
268 let queue_index = signal_index * 64 + bit_index;
269 if queue_index >= MAX_QUEUES {
270 break;
271 }
272 if !self.queues[queue_index].load(Ordering::Acquire).is_null() {
273 continue;
274 }
275
276 let queue = unsafe {
277 Spsc::<T, P, NUM_SEGS_P2, AsyncSignalGate>::new_unsafe_with_gate_and_config(
278 AsyncSignalGate::new(
279 bit_index as u8,
280 self.signals[signal_index].clone(),
281 Arc::clone(&self.summary),
282 ),
283 max_pooled_segments,
284 )
285 };
286
287 let slot = Arc::new(ProducerSlot {
288 queue,
289 space_waker: DiatomicWaker::new(),
290 });
291
292 let raw = Arc::into_raw(Arc::clone(&slot)) as *mut ProducerSlot<T, P, NUM_SEGS_P2>;
293 match self.queues[queue_index].compare_exchange(
294 ptr::null_mut(),
295 raw,
296 Ordering::Release,
297 Ordering::Acquire,
298 ) {
299 Ok(_) => {
300 self.queue_count.fetch_add(1, Ordering::Relaxed);
301 assigned_id = Some(queue_index);
302 slot_arc = Some(slot);
303 break;
304 }
305 Err(_) => unsafe {
306 Arc::from_raw(raw);
307 },
308 }
309 }
310 if assigned_id.is_some() {
311 break;
312 }
313 }
314
315 let producer_id = match assigned_id {
316 Some(id) => id,
317 None => {
318 self.producer_count.fetch_sub(1, Ordering::Release);
319 return Err(PushError::Full(()));
320 }
321 };
322
323 loop {
327 let max_producer_id = self.max_producer_id.load(Ordering::SeqCst);
328 if producer_id <= max_producer_id {
330 break;
331 }
332 if self.is_closed() {
333 return Err(PushError::Closed(()));
334 }
335 if self
336 .max_producer_id
337 .compare_exchange(
338 max_producer_id,
339 producer_id,
340 Ordering::SeqCst,
341 Ordering::SeqCst,
342 )
343 .is_ok()
344 {
345 break;
346 }
347 }
348
349 let slot_arc = slot_arc.expect("slot arc missing");
355
356 Ok(Sender {
357 inner: Arc::clone(&self),
358 slot: slot_arc,
359 producer_id,
360 })
361 }
362
363 pub fn close(&self) -> bool {
370 let was_open = !self.closed.swap(true, Ordering::AcqRel);
371 if was_open {
372 let permits = self.queue_count.load(Ordering::Relaxed).max(1);
373 self.summary.release(permits);
374 }
375
376 for slot_atomic in self.queues.iter() {
377 let slot_ptr = slot_atomic.swap(ptr::null_mut(), Ordering::AcqRel);
378 if slot_ptr.is_null() {
379 continue;
380 }
381
382 unsafe {
383 (*slot_ptr).queue.close();
384 Arc::from_raw(slot_ptr);
385 }
386
387 self.queue_count.fetch_sub(1, Ordering::Relaxed);
388 }
389
390 self.producer_count.store(0, Ordering::Release);
391 true
392 }
393}
394
395pub struct Sender<T, const P: usize, const NUM_SEGS_P2: usize> {
423 inner: Arc<Inner<T, P, NUM_SEGS_P2>>,
424 slot: Arc<ProducerSlot<T, P, NUM_SEGS_P2>>,
425 producer_id: usize,
426}
427
428impl<T, const P: usize, const NUM_SEGS_P2: usize> Sender<T, P, NUM_SEGS_P2> {
429 pub fn is_closed(&self) -> bool {
431 self.inner.closed.load(Ordering::Acquire)
432 }
433
434 pub fn producer_count(&self) -> usize {
436 self.inner.producer_count.load(Ordering::Relaxed)
437 }
438
439 pub fn producer_id(&self) -> usize {
441 self.producer_id
442 }
443
444 pub fn try_push(&mut self, value: T) -> Result<(), PushError<T>> {
449 self.slot.queue.try_push(value)
450 }
451
452 pub fn push_spin(&mut self, mut value: T) -> Result<(), PushError<T>> {
457 loop {
458 match self.slot.queue.try_push(value) {
459 Ok(()) => return Ok(()),
460 Err(PushError::Full(returned)) => {
461 value = returned;
462 std::hint::spin_loop();
463 }
464 Err(err @ PushError::Closed(_)) => return Err(err),
465 }
466 }
467 }
468
469 pub fn try_push_n(&mut self, values: &[T]) -> Result<usize, PushError<()>> {
471 if self.is_closed() {
472 return Err(PushError::Closed(()));
473 }
474 self.slot.queue.try_push_n(values)
475 }
476
477 pub unsafe fn unsafe_try_push(&self, value: T) -> Result<(), PushError<T>> {
482 self.slot.queue.try_push(value)
483 }
484
485 pub unsafe fn unsafe_try_push_n(&self, values: &[T]) -> Result<usize, PushError<()>> {
487 if self.is_closed() {
488 return Err(PushError::Closed(()));
489 }
490 self.slot.queue.try_push_n(values)
491 }
492
493 pub fn close(&mut self) -> bool {
500 self.inner.close()
501 }
502}
503
504impl<T, const P: usize, const NUM_SEGS_P2: usize> Clone for Sender<T, P, NUM_SEGS_P2> {
505 fn clone(&self) -> Self {
506 self.inner.create_sender().expect("too many senders")
507 }
508}
509
510impl<T, const P: usize, const NUM_SEGS_P2: usize> Drop for Sender<T, P, NUM_SEGS_P2> {
511 fn drop(&mut self) {
512 unsafe {
513 self.slot.queue.close();
514 }
515
516 std::sync::atomic::fence(Ordering::Release);
521
522 self.inner.producer_count.fetch_sub(1, Ordering::Release);
526 }
527}
528
529pub struct Receiver<T, const P: usize, const NUM_SEGS_P2: usize> {
530 inner: Arc<Inner<T, P, NUM_SEGS_P2>>,
531 misses: u64,
532 seed: u64,
533}
534
535impl<T, const P: usize, const NUM_SEGS_P2: usize> Receiver<T, P, NUM_SEGS_P2> {
536 pub fn next(&mut self) -> u64 {
537 let old_seed = self.seed;
538 let next_seed = (old_seed
539 .wrapping_mul(RND_MULTIPLIER)
540 .wrapping_add(RND_ADDEND))
541 & RND_MASK;
542 self.seed = next_seed;
543 next_seed >> 16
544 }
545
546 pub(crate) fn inner(&self) -> &Arc<Inner<T, P, NUM_SEGS_P2>> {
548 &self.inner
549 }
550
551 pub fn is_closed(&self) -> bool {
553 self.inner.closed.load(Ordering::Acquire)
554 }
555
556 pub fn close(&self) -> bool {
563 self.inner.close()
564 }
565
566 pub fn producer_count(&self) -> usize {
568 self.inner.producer_count.load(Ordering::Relaxed)
569 }
570
571 pub fn create_sender(&self) -> Result<Sender<T, P, NUM_SEGS_P2>, PushError<()>> {
572 self.create_sender_with_config(0)
573 }
574
575 pub fn create_sender_with_config(
606 &self,
607 max_pooled_segments: usize,
608 ) -> Result<Sender<T, P, NUM_SEGS_P2>, PushError<()>> {
609 self.inner.create_sender_with_config(max_pooled_segments)
610 }
611
612 pub fn try_pop_n(&mut self, batch: &mut [T]) -> usize {
613 let (count, waker_opt) = self.try_pop_n_with_slot(batch);
614 if let Some(waker) = waker_opt {
615 waker.notify();
616 }
617 count
618 }
619
620 pub fn try_pop_with_waker(&mut self) -> Result<(T, &DiatomicWaker), PopError> {
623 let inner = Arc::clone(&self.inner);
625 let mut slot = MaybeUninit::<T>::uninit();
626 let slice = unsafe { std::slice::from_raw_parts_mut(slot.as_mut_ptr(), 1) };
627 let (drained, waker_opt) = self.try_pop_n_with_slot(slice);
628 if drained == 0 {
629 std::sync::atomic::fence(Ordering::Acquire);
633
634 let is_closed = inner.is_closed();
636 let producer_count = inner.producer_count.load(Ordering::Relaxed);
638
639 if is_closed || producer_count == 0 {
641 Err(PopError::Closed)
642 } else {
643 Err(PopError::Empty)
644 }
645 } else {
646 let value = unsafe { slot.assume_init() };
647 let waker = waker_opt.expect("waker missing for drained item");
648 Ok((value, waker))
649 }
650 }
651
652 pub fn try_pop(&mut self) -> Result<T, PopError> {
654 self.try_pop_with_waker().map(|(v, _)| v)
655 }
656
657 fn try_pop_n_with_slot(&mut self, batch: &mut [T]) -> (usize, Option<&DiatomicWaker>) {
658 for _ in 0..64 {
659 match self.acquire() {
660 Some((producer_id, slot_ptr)) => {
661 let slot = unsafe { &*slot_ptr };
662 match slot.queue.try_pop_n(batch) {
663 Ok(size) => {
664 slot.queue.unmark_and_schedule();
665 return (size, Some(&slot.space_waker));
666 }
667 Err(PopError::Closed) => {
668 slot.queue.unmark();
669 self.cleanup_closed_slot(producer_id, slot_ptr);
670 }
671 Err(_) => {
672 slot.queue.unmark();
673 self.misses += 1;
674 }
675 };
676 }
677 None => {
678 }
680 }
681 }
682
683 (0, None)
684 }
685
686 fn cleanup_closed_slot(
687 &self,
688 producer_id: usize,
689 slot_ptr: *mut ProducerSlot<T, P, NUM_SEGS_P2>,
690 ) {
691 let slot_atomic = &self.inner.queues[producer_id];
692 let current = slot_atomic.load(Ordering::Acquire);
693 if current != slot_ptr {
694 return;
695 }
696
697 if slot_atomic
698 .compare_exchange(
699 slot_ptr,
700 ptr::null_mut(),
701 Ordering::AcqRel,
702 Ordering::Acquire,
703 )
704 .is_ok()
705 {
706 self.inner.queue_count.fetch_sub(1, Ordering::Relaxed);
707 unsafe {
708 Arc::from_raw(slot_ptr);
709 }
710 }
711 }
712
713 fn acquire(&mut self) -> Option<(usize, *mut ProducerSlot<T, P, NUM_SEGS_P2>)> {
714 let random = self.next() as usize;
715 let random_word = random % SIGNAL_WORDS;
717 let mut signal_index = self.inner.summary.summary_select(random_word as u64) as usize;
718
719 if signal_index >= SIGNAL_WORDS {
720 signal_index = random_word;
721 }
722
723 let mut signal_bit = self.next() & 63;
724 let signal = &self.inner.signals[signal_index];
725 let signal_value = signal.load(Ordering::Acquire);
726
727 signal_bit = find_nearest(signal_value, signal_bit);
729
730 if signal_bit >= 64 {
732 self.misses += 1;
733 return None;
734 }
735
736 let (bit, expected, acquired) = signal.try_acquire(signal_bit);
738
739 if !acquired {
740 std::hint::spin_loop();
743 return None;
744 }
745
746 let empty = expected == bit;
748
749 if empty {
750 self.inner
751 .summary
752 .try_unmark_if_empty(signal.index(), signal.value());
753 }
754
755 let producer_id = signal_index * 64 + (signal_bit as usize);
757
758 let slot_ptr = self.inner.queues[producer_id].load(Ordering::Acquire);
760 if slot_ptr.is_null() {
761 self.misses += 1;
762 if empty {
763 self.inner
764 .summary
765 .try_unmark_if_empty(signal.index(), signal.value());
766 }
767 return None;
768 }
769
770 let slot = unsafe { &*slot_ptr };
772
773 slot.queue.mark();
775
776 Some((producer_id, slot_ptr))
777 }
778}
779
780impl<T, const P: usize, const NUM_SEGS_P2: usize> Drop for Receiver<T, P, NUM_SEGS_P2> {
781 fn drop(&mut self) {
782 self.inner.close();
783 }
784}
785
786impl<T, const P: usize, const NUM_SEGS_P2: usize> Drop for Inner<T, P, NUM_SEGS_P2> {
787 fn drop(&mut self) {
788 self.close();
789 }
790}
791
792struct SenderPtr<'a, T, const P: usize, const NUM_SEGS_P2: usize> {
793 ptr: NonNull<Sender<T, P, NUM_SEGS_P2>>,
794 _marker: PhantomData<&'a ()>,
795}
796
797impl<'a, T, const P: usize, const NUM_SEGS_P2: usize> Copy for SenderPtr<'a, T, P, NUM_SEGS_P2> {}
798
799impl<'a, T, const P: usize, const NUM_SEGS_P2: usize> Clone for SenderPtr<'a, T, P, NUM_SEGS_P2> {
800 fn clone(&self) -> Self {
801 *self
802 }
803}
804
805impl<'a, T: 'a, const P: usize, const NUM_SEGS_P2: usize> SenderPtr<'a, T, P, NUM_SEGS_P2> {
806 #[inline]
807 unsafe fn space_waker(self) -> &'a DiatomicWaker {
808 let sender = self.ptr.as_ptr();
809 let sender_ref: &Sender<T, P, NUM_SEGS_P2> = unsafe { &*sender };
810 &sender_ref.slot.space_waker
811 }
812
813 #[inline]
814 unsafe fn with_mut<R>(self, f: impl FnOnce(&mut Sender<T, P, NUM_SEGS_P2>) -> R) -> R {
815 let sender = self.ptr.as_ptr();
816 let sender_mut: &mut Sender<T, P, NUM_SEGS_P2> = unsafe { &mut *sender };
817 f(sender_mut)
818 }
819}
820
821unsafe impl<T: Send, const P: usize, const NUM_SEGS_P2: usize> Send
822 for SenderPtr<'_, T, P, NUM_SEGS_P2>
823{
824}
825unsafe impl<T: Send, const P: usize, const NUM_SEGS_P2: usize> Sync
826 for SenderPtr<'_, T, P, NUM_SEGS_P2>
827{
828}
829
830struct AsyncMpscShared<T, const P: usize, const NUM_SEGS_P2: usize> {
835 receiver_waiter: CachePadded<DiatomicWaker>,
836 inner: Arc<Inner<T, P, NUM_SEGS_P2>>,
837}
838
839impl<T, const P: usize, const NUM_SEGS_P2: usize> AsyncMpscShared<T, P, NUM_SEGS_P2> {
840 fn new(inner: Arc<Inner<T, P, NUM_SEGS_P2>>) -> Self {
841 Self {
842 receiver_waiter: CachePadded::new(DiatomicWaker::new()),
843 inner,
844 }
845 }
846
847 #[inline]
848 fn notify_receiver(&self) {
849 self.receiver_waiter.notify();
850 }
851
852 #[inline]
853 unsafe fn wait_for_items<Pred, R>(&self, predicate: Pred) -> WaitUntil<'_, Pred, R>
854 where
855 Pred: FnMut() -> Option<R>,
856 {
857 unsafe { self.receiver_waiter.wait_until(predicate) }
858 }
859
860 #[inline]
861 unsafe fn register_receiver(&self, waker: &Waker) {
862 unsafe { self.receiver_waiter.register(waker) };
863 }
864}
865
866pub struct AsyncMpscSender<T, const P: usize, const NUM_SEGS_P2: usize> {
867 sender: Sender<T, P, NUM_SEGS_P2>,
868 shared: Arc<AsyncMpscShared<T, P, NUM_SEGS_P2>>,
869}
870
871pub struct AsyncMpscReceiver<T, const P: usize, const NUM_SEGS_P2: usize> {
872 receiver: Receiver<T, P, NUM_SEGS_P2>,
873 shared: Arc<AsyncMpscShared<T, P, NUM_SEGS_P2>>,
874}
875
876impl<T, const P: usize, const NUM_SEGS_P2: usize> AsyncMpscSender<T, P, NUM_SEGS_P2> {
877 fn new(
878 sender: Sender<T, P, NUM_SEGS_P2>,
879 shared: Arc<AsyncMpscShared<T, P, NUM_SEGS_P2>>,
880 ) -> Self {
881 Self { sender, shared }
883 }
884
885 #[inline]
887 pub fn try_send(&mut self, value: T) -> Result<(), PushError<T>> {
888 match self.sender.try_push(value) {
889 Ok(()) => {
890 self.shared.notify_receiver();
891 Ok(())
892 }
893 Err(err) => Err(err),
894 }
895 }
896
897 pub async fn send(&mut self, value: T) -> Result<(), PushError<T>> {
898 match self.try_send(value) {
899 Ok(()) => Ok(()),
900 Err(PushError::Full(item)) => {
901 let mut pending = Some(item);
902 let shared = Arc::clone(&self.shared);
903 let sender_ptr = SenderPtr {
904 ptr: NonNull::from(&mut self.sender),
905 _marker: PhantomData,
906 };
907 let waker_ptr = sender_ptr;
908 let wait = unsafe {
909 waker_ptr.space_waker().wait_until(move || {
910 let candidate = pending.take()?;
911 sender_ptr.with_mut(|sender| match sender.try_push(candidate) {
912 Ok(()) => {
913 shared.notify_receiver();
914 Some(Ok(()))
915 }
916 Err(PushError::Full(candidate)) => {
917 pending = Some(candidate);
918 None
919 }
920 Err(PushError::Closed(candidate)) => {
921 Some(Err(PushError::Closed(candidate)))
922 }
923 })
924 })
925 };
926 wait.await
927 }
928 Err(PushError::Closed(item)) => Err(PushError::Closed(item)),
929 }
930 }
931
932 pub async fn send_slice(&mut self, values: Vec<T>) -> Result<Vec<T>, PushError<Vec<T>>> {
933 let mut values = ManuallyDrop::new(values);
934 let data_ptr = values.as_mut_ptr() as usize;
935 let len = values.len();
936 let cap = values.capacity();
937 std::mem::forget(values);
938
939 if len == 0 {
940 let empty = unsafe { Vec::from_raw_parts(data_ptr as *mut T, 0, cap) };
941 return Ok(empty);
942 }
943
944 let slice_from = |sent: usize| unsafe {
945 std::slice::from_raw_parts((data_ptr as *const T).add(sent), len - sent)
946 };
947 let finish_empty = || unsafe { Vec::from_raw_parts(data_ptr as *mut T, 0, cap) };
948 let finish_remaining = |sent: usize| unsafe {
949 let remaining = len - sent;
950 if remaining > 0 {
951 ptr::copy(
952 (data_ptr as *mut T).add(sent),
953 data_ptr as *mut T,
954 remaining,
955 );
956 }
957 Vec::from_raw_parts(data_ptr as *mut T, remaining, cap)
958 };
959
960 let mut sent = 0usize;
961
962 match self.sender.try_push_n(slice_from(sent)) {
963 Ok(written) => {
964 if written > 0 {
965 self.shared.notify_receiver();
966 sent += written;
967 if sent == len {
968 return Ok(finish_empty());
969 }
970 }
971 }
972 Err(PushError::Closed(())) => {
973 let remaining_vec = finish_remaining(sent);
974 return Err(PushError::Closed(remaining_vec));
975 }
976 Err(PushError::Full(())) => {}
977 }
978
979 let shared = Arc::clone(&self.shared);
980 let sender_ptr = SenderPtr {
981 ptr: NonNull::from(&mut self.sender),
982 _marker: PhantomData,
983 };
984 let waker_ptr = sender_ptr;
985 let wait = unsafe {
986 waker_ptr.space_waker().wait_until(move || {
987 if sent == len {
988 return Some(Ok(finish_empty()));
989 }
990 sender_ptr.with_mut(|sender| match sender.try_push_n(slice_from(sent)) {
991 Ok(written) => {
992 if written > 0 {
993 shared.notify_receiver();
994 sent += written;
995 if sent == len {
996 return Some(Ok(finish_empty()));
997 }
998 }
999 None
1000 }
1001 Err(PushError::Full(())) => None,
1002 Err(PushError::Closed(())) => {
1003 let remaining_vec = finish_remaining(sent);
1004 Some(Err(PushError::Closed(remaining_vec)))
1005 }
1006 })
1007 })
1008 };
1009 wait.await
1010 }
1011
1012 pub async fn send_batch<I>(&mut self, iter: I) -> Result<(), PushError<T>>
1013 where
1014 I: IntoIterator<Item = T>,
1015 {
1016 for item in iter {
1017 self.send(item).await?;
1018 }
1019 Ok(())
1020 }
1021
1022 pub fn close(&mut self) -> bool {
1023 self.sender.close()
1024 }
1025}
1026
1027impl<T, const P: usize, const NUM_SEGS_P2: usize> Clone for AsyncMpscSender<T, P, NUM_SEGS_P2> {
1028 fn clone(&self) -> Self {
1029 let sender = self.sender.clone();
1030 Self::new(sender, Arc::clone(&self.shared))
1031 }
1032}
1033
1034impl<T, const P: usize, const NUM_SEGS_P2: usize> Drop for AsyncMpscSender<T, P, NUM_SEGS_P2> {
1035 fn drop(&mut self) {
1036 self.shared.notify_receiver();
1037 }
1038}
1039
1040impl<T, const P: usize, const NUM_SEGS_P2: usize> Sink<T> for AsyncMpscSender<T, P, NUM_SEGS_P2> {
1041 type Error = PushError<T>;
1042
1043 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), PushError<T>>> {
1044 let this = unsafe { self.get_unchecked_mut() };
1045
1046 if !this.sender.slot.queue.is_full() {
1048 return Poll::Ready(Ok(()));
1049 }
1050
1051 unsafe {
1053 this.sender.slot.space_waker.register(cx.waker());
1054 }
1055
1056 if !this.sender.slot.queue.is_full() {
1058 return Poll::Ready(Ok(()));
1059 }
1060
1061 Poll::Pending
1062 }
1063
1064 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), PushError<T>> {
1065 let this = unsafe { self.get_unchecked_mut() };
1066 this.try_send(item)
1067 }
1068
1069 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), PushError<T>>> {
1070 Poll::Ready(Ok(()))
1071 }
1072
1073 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), PushError<T>>> {
1074 let this = unsafe { self.get_unchecked_mut() };
1075 this.close();
1076 Poll::Ready(Ok(()))
1077 }
1078}
1079
1080impl<T, const P: usize, const NUM_SEGS_P2: usize> AsyncMpscReceiver<T, P, NUM_SEGS_P2> {
1081 fn new(
1082 receiver: Receiver<T, P, NUM_SEGS_P2>,
1083 shared: Arc<AsyncMpscShared<T, P, NUM_SEGS_P2>>,
1084 ) -> Self {
1085 Self { receiver, shared }
1086 }
1087
1088 #[inline]
1089 pub fn try_recv(&mut self) -> Result<T, PopError> {
1090 match self.receiver.try_pop_with_waker() {
1091 Ok((value, waker)) => {
1092 waker.notify();
1093 Ok(value)
1094 }
1095 Err(err) => Err(err),
1096 }
1097 }
1098
1099 pub async fn recv(&mut self) -> Result<T, PopError> {
1100 match self.try_recv() {
1101 Ok(value) => Ok(value),
1102 Err(PopError::Empty) | Err(PopError::Timeout) => {
1103 let shared = Arc::clone(&self.shared);
1104 let receiver = &mut self.receiver;
1105 unsafe {
1106 shared
1107 .wait_for_items(|| match receiver.try_pop_with_waker() {
1108 Ok((value, waker)) => {
1109 waker.notify();
1110 Some(Ok(value))
1111 }
1112 Err(PopError::Empty) | Err(PopError::Timeout) => None,
1113 Err(PopError::Closed) => Some(Err(PopError::Closed)),
1114 })
1115 .await
1116 }
1117 }
1118 Err(PopError::Closed) => Err(PopError::Closed),
1119 }
1120 }
1121
1122 pub async fn recv_batch(&mut self, dst: &mut [T]) -> Result<usize, PopError> {
1154 if dst.is_empty() {
1155 return Ok(0);
1156 }
1157
1158 let receiver = &mut self.receiver;
1159 let shared = &self.shared;
1160
1161 let mut filled = receiver.try_pop_n(dst);
1164 if filled > 0 {
1165 return Ok(filled);
1166 }
1167
1168 if receiver.is_closed() || receiver.producer_count() == 0 {
1170 return Err(PopError::Closed);
1171 }
1172
1173 unsafe {
1175 shared
1176 .wait_for_items(|| {
1177 if filled == dst.len() {
1178 return Some(Ok(filled));
1179 }
1180
1181 let count = receiver.try_pop_n(&mut dst[filled..]);
1182 if count == 0 {
1183 if receiver.is_closed() || receiver.producer_count() == 0 {
1185 Some(if filled > 0 {
1186 Ok(filled)
1187 } else {
1188 Err(PopError::Closed)
1189 })
1190 } else {
1191 None
1192 }
1193 } else {
1194 filled += count;
1195 if filled == dst.len() {
1197 Some(Ok(filled))
1198 } else {
1199 None
1200 }
1201 }
1202 })
1203 .await
1204 }
1205 }
1206}
1207
1208impl<T, const P: usize, const NUM_SEGS_P2: usize> Stream for AsyncMpscReceiver<T, P, NUM_SEGS_P2> {
1209 type Item = T;
1210
1211 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1212 let this = unsafe { self.get_unchecked_mut() };
1213 match this.try_recv() {
1214 Ok(value) => Poll::Ready(Some(value)),
1215 Err(PopError::Closed) => Poll::Ready(None),
1216 Err(PopError::Empty) | Err(PopError::Timeout) => {
1217 unsafe {
1218 this.shared.register_receiver(cx.waker());
1219 }
1220 match this.try_recv() {
1221 Ok(value) => Poll::Ready(Some(value)),
1222 Err(PopError::Closed) => Poll::Ready(None),
1223 Err(PopError::Empty) | Err(PopError::Timeout) => Poll::Pending,
1224 }
1225 }
1226 }
1227 }
1228}
1229
1230pub struct BlockingMpscSender<T, const P: usize, const NUM_SEGS_P2: usize> {
1235 sender: Sender<T, P, NUM_SEGS_P2>,
1236 shared: Arc<AsyncMpscShared<T, P, NUM_SEGS_P2>>,
1237 parker: Parker,
1238 parker_waker: Arc<ThreadUnparker>,
1239}
1240
1241impl<T, const P: usize, const NUM_SEGS_P2: usize> BlockingMpscSender<T, P, NUM_SEGS_P2> {
1242 fn new(
1243 sender: Sender<T, P, NUM_SEGS_P2>,
1244 shared: Arc<AsyncMpscShared<T, P, NUM_SEGS_P2>>,
1245 ) -> Self {
1246 let parker = Parker::new();
1247 let parker_waker = Arc::new(ThreadUnparker {
1248 unparker: parker.unparker(),
1249 });
1250 Self {
1251 sender,
1252 shared,
1253 parker,
1254 parker_waker,
1255 }
1256 }
1257
1258 #[inline]
1260 unsafe fn register_space_waker(&self, waker: &Waker) {
1261 unsafe { self.sender.slot.space_waker.register(waker) };
1262 }
1263
1264 #[inline]
1265 pub fn try_send(&mut self, value: T) -> Result<(), PushError<T>> {
1266 match self.sender.try_push(value) {
1267 Ok(()) => {
1268 self.shared.notify_receiver();
1269 Ok(())
1270 }
1271 Err(err) => Err(err),
1272 }
1273 }
1274
1275 pub fn send(&mut self, mut value: T) -> Result<(), PushError<T>> {
1277 let sender_ptr = &mut self.sender as *mut Sender<T, P, NUM_SEGS_P2>;
1278
1279 match unsafe { (&mut *sender_ptr).try_push(value) } {
1281 Ok(()) => return Ok(()),
1282 Err(PushError::Closed(item)) => return Err(PushError::Closed(item)),
1283 Err(PushError::Full(item)) => value = item,
1284 }
1285
1286 let waker = Waker::from(Arc::clone(&self.parker_waker));
1288
1289 loop {
1290 unsafe {
1291 self.register_space_waker(&waker);
1292 }
1293
1294 match unsafe { (&mut *sender_ptr).try_push(value) } {
1295 Ok(()) => {
1296 self.shared.notify_receiver();
1297 return Ok(());
1298 }
1299 Err(PushError::Full(item)) => {
1300 value = item;
1301 self.parker.park();
1302 }
1303 Err(PushError::Closed(item)) => {
1304 return Err(PushError::Closed(item));
1305 }
1306 }
1307 }
1308 }
1309
1310 pub fn send_slice(&mut self, values: Vec<T>) -> Result<Vec<T>, PushError<Vec<T>>> {
1312 let mut values = ManuallyDrop::new(values);
1313 let data_ptr = values.as_mut_ptr();
1314 let len = values.len();
1315 let cap = values.capacity();
1316
1317 if len == 0 {
1318 let empty = unsafe { Vec::from_raw_parts(data_ptr, 0, cap) };
1319 return Ok(empty);
1320 }
1321
1322 let slice_from =
1323 |sent: usize| unsafe { std::slice::from_raw_parts(data_ptr.add(sent), len - sent) };
1324 let finish_empty = || unsafe { Vec::from_raw_parts(data_ptr, 0, cap) };
1325 let finish_remaining = |sent: usize| unsafe {
1326 let remaining = len - sent;
1327 if remaining > 0 {
1328 ptr::copy(data_ptr.add(sent), data_ptr, remaining);
1329 }
1330 Vec::from_raw_parts(data_ptr, remaining, cap)
1331 };
1332
1333 let waker = Waker::from(Arc::clone(&self.parker_waker));
1334 let sender_ptr = SenderPtr {
1335 ptr: NonNull::from(&mut self.sender),
1336 _marker: PhantomData,
1337 };
1338 let mut sent = 0usize;
1339
1340 match self.sender.try_push_n(slice_from(sent)) {
1341 Ok(written) => {
1342 if written > 0 {
1343 self.shared.notify_receiver();
1344 sent += written;
1345 if sent == len {
1346 return Ok(finish_empty());
1347 }
1348 }
1349 }
1350 Err(PushError::Closed(())) => {
1351 let remaining_vec = finish_remaining(sent);
1352 return Err(PushError::Closed(remaining_vec));
1353 }
1354 Err(PushError::Full(())) => {}
1355 }
1356
1357 loop {
1358 unsafe {
1359 self.register_space_waker(&waker);
1360 }
1361
1362 if sent == len {
1363 return Ok(finish_empty());
1364 }
1365
1366 match unsafe { sender_ptr.with_mut(|sender| sender.try_push_n(slice_from(sent))) } {
1367 Ok(written) => {
1368 if written > 0 {
1369 self.shared.notify_receiver();
1370 sent += written;
1371 if sent == len {
1372 return Ok(finish_empty());
1373 }
1374 }
1375 self.parker.park();
1376 }
1377 Err(PushError::Full(())) => {
1378 self.parker.park();
1379 }
1380 Err(PushError::Closed(())) => {
1381 let remaining_vec = finish_remaining(sent);
1382 return Err(PushError::Closed(remaining_vec));
1383 }
1384 }
1385 }
1386 }
1387
1388 pub fn close(&mut self) -> bool {
1389 self.sender.close()
1390 }
1391}
1392
1393impl<T, const P: usize, const NUM_SEGS_P2: usize> Clone for BlockingMpscSender<T, P, NUM_SEGS_P2> {
1394 fn clone(&self) -> Self {
1395 let sender = self.sender.clone();
1396 Self::new(sender, Arc::clone(&self.shared))
1397 }
1398}
1399
1400impl<T, const P: usize, const NUM_SEGS_P2: usize> Drop for BlockingMpscSender<T, P, NUM_SEGS_P2> {
1401 fn drop(&mut self) {
1402 self.shared.notify_receiver();
1403 }
1404}
1405
1406pub struct BlockingMpscReceiver<T, const P: usize, const NUM_SEGS_P2: usize> {
1410 receiver: Receiver<T, P, NUM_SEGS_P2>,
1411 shared: Arc<AsyncMpscShared<T, P, NUM_SEGS_P2>>,
1412}
1413
1414impl<T, const P: usize, const NUM_SEGS_P2: usize> BlockingMpscReceiver<T, P, NUM_SEGS_P2> {
1415 fn new(
1416 receiver: Receiver<T, P, NUM_SEGS_P2>,
1417 shared: Arc<AsyncMpscShared<T, P, NUM_SEGS_P2>>,
1418 ) -> Self {
1419 Self { receiver, shared }
1420 }
1421
1422 #[inline]
1423 pub fn try_recv(&mut self) -> Result<T, PopError> {
1424 match self.receiver.try_pop_with_waker() {
1425 Ok((value, waker)) => {
1426 waker.notify();
1427 Ok(value)
1428 }
1429 Err(err) => Err(err),
1430 }
1431 }
1432
1433 pub fn recv(&mut self) -> Result<T, PopError> {
1435 match self.try_recv() {
1437 Ok(value) => return Ok(value),
1438 Err(PopError::Closed) => return Err(PopError::Closed),
1439 Err(PopError::Empty) | Err(PopError::Timeout) => {}
1440 }
1441
1442 let parker = Parker::new();
1444 let unparker = parker.unparker();
1445 let waker = Waker::from(Arc::new(ThreadUnparker { unparker }));
1446
1447 loop {
1448 unsafe {
1450 self.shared.register_receiver(&waker);
1451 }
1452
1453 match self.receiver.try_pop_with_waker() {
1455 Ok((value, waker)) => {
1456 waker.notify();
1457 return Ok(value);
1458 }
1459 Err(PopError::Closed) => {
1460 return Err(PopError::Closed);
1461 }
1462 Err(PopError::Empty) | Err(PopError::Timeout) => {
1463 parker.park();
1464 }
1465 }
1466 }
1467 }
1468
1469 pub fn recv_batch(&mut self, dst: &mut [T]) -> Result<usize, PopError> {
1501 if dst.is_empty() {
1502 return Ok(0);
1503 }
1504
1505 let mut filled = 0;
1507 while filled < dst.len() {
1508 match self.receiver.try_pop_with_waker() {
1509 Ok((value, waker)) => {
1510 dst[filled] = value;
1511 filled += 1;
1512 waker.notify();
1513 }
1514 Err(_) => break,
1515 }
1516 }
1517
1518 if filled > 0 {
1519 return Ok(filled);
1520 }
1521
1522 if self.receiver.is_closed() || self.receiver.producer_count() == 0 {
1524 return Err(PopError::Closed);
1525 }
1526
1527 let parker = Parker::new();
1529 let unparker = parker.unparker();
1530 let waker = Waker::from(Arc::new(ThreadUnparker { unparker }));
1531
1532 loop {
1533 unsafe {
1535 self.shared.register_receiver(&waker);
1536 }
1537
1538 while filled < dst.len() {
1540 match self.receiver.try_pop_with_waker() {
1541 Ok((value, waker)) => {
1542 dst[filled] = value;
1543 filled += 1;
1544 waker.notify();
1545 }
1546 Err(_) => break,
1547 }
1548 }
1549
1550 if filled > 0 {
1551 if filled == dst.len()
1553 || self.receiver.is_closed()
1554 || self.receiver.producer_count() == 0
1555 {
1556 return Ok(filled);
1557 }
1558 parker.park();
1560 } else if self.receiver.is_closed() || self.receiver.producer_count() == 0 {
1561 return Err(PopError::Closed);
1562 } else {
1563 parker.park();
1565 }
1566 }
1567 }
1568}
1569
1570pub fn async_mpsc<T, const P: usize, const NUM_SEGS_P2: usize>() -> (
1571 AsyncMpscSender<T, P, NUM_SEGS_P2>,
1572 AsyncMpscReceiver<T, P, NUM_SEGS_P2>,
1573) {
1574 let (sender, receiver) = new_with_sender();
1575 async_mpsc_from_parts(sender, receiver)
1576}
1577
1578pub fn async_mpsc_from_parts<T, const P: usize, const NUM_SEGS_P2: usize>(
1579 sender: Sender<T, P, NUM_SEGS_P2>,
1580 receiver: Receiver<T, P, NUM_SEGS_P2>,
1581) -> (
1582 AsyncMpscSender<T, P, NUM_SEGS_P2>,
1583 AsyncMpscReceiver<T, P, NUM_SEGS_P2>,
1584) {
1585 let inner = Arc::clone(receiver.inner());
1586 let shared = Arc::new(AsyncMpscShared::new(inner));
1587 let async_sender = AsyncMpscSender::new(sender, Arc::clone(&shared));
1588 let async_receiver = AsyncMpscReceiver::new(receiver, shared);
1589 (async_sender, async_receiver)
1590}
1591
1592pub fn blocking_mpsc<T, const P: usize, const NUM_SEGS_P2: usize>() -> (
1596 BlockingMpscSender<T, P, NUM_SEGS_P2>,
1597 BlockingMpscReceiver<T, P, NUM_SEGS_P2>,
1598) {
1599 let (sender, receiver) = new_with_sender();
1600 blocking_mpsc_from_parts(sender, receiver)
1601}
1602
1603pub fn blocking_mpsc_from_parts<T, const P: usize, const NUM_SEGS_P2: usize>(
1605 sender: Sender<T, P, NUM_SEGS_P2>,
1606 receiver: Receiver<T, P, NUM_SEGS_P2>,
1607) -> (
1608 BlockingMpscSender<T, P, NUM_SEGS_P2>,
1609 BlockingMpscReceiver<T, P, NUM_SEGS_P2>,
1610) {
1611 let inner = Arc::clone(receiver.inner());
1612 let shared = Arc::new(AsyncMpscShared::new(inner));
1613 let blocking_sender = BlockingMpscSender::new(sender, Arc::clone(&shared));
1614 let blocking_receiver = BlockingMpscReceiver::new(receiver, shared);
1615 (blocking_sender, blocking_receiver)
1616}
1617
1618pub fn blocking_async_mpsc<T, const P: usize, const NUM_SEGS_P2: usize>() -> (
1645 BlockingMpscSender<T, P, NUM_SEGS_P2>,
1646 AsyncMpscReceiver<T, P, NUM_SEGS_P2>,
1647) {
1648 let (sender, receiver) = new_with_sender();
1649 blocking_async_mpsc_from_parts(sender, receiver)
1650}
1651
1652pub fn blocking_async_mpsc_from_parts<T, const P: usize, const NUM_SEGS_P2: usize>(
1654 sender: Sender<T, P, NUM_SEGS_P2>,
1655 receiver: Receiver<T, P, NUM_SEGS_P2>,
1656) -> (
1657 BlockingMpscSender<T, P, NUM_SEGS_P2>,
1658 AsyncMpscReceiver<T, P, NUM_SEGS_P2>,
1659) {
1660 let inner = Arc::clone(receiver.inner());
1661 let shared = Arc::new(AsyncMpscShared::new(inner));
1662 let blocking_sender = BlockingMpscSender::new(sender, Arc::clone(&shared));
1663 let async_receiver = AsyncMpscReceiver::new(receiver, shared);
1664 (blocking_sender, async_receiver)
1665}
1666
1667pub fn async_blocking_mpsc<T, const P: usize, const NUM_SEGS_P2: usize>() -> (
1694 AsyncMpscSender<T, P, NUM_SEGS_P2>,
1695 BlockingMpscReceiver<T, P, NUM_SEGS_P2>,
1696) {
1697 let (sender, receiver) = new_with_sender();
1698 async_blocking_mpsc_from_parts(sender, receiver)
1699}
1700
1701pub fn async_blocking_mpsc_from_parts<T, const P: usize, const NUM_SEGS_P2: usize>(
1703 sender: Sender<T, P, NUM_SEGS_P2>,
1704 receiver: Receiver<T, P, NUM_SEGS_P2>,
1705) -> (
1706 AsyncMpscSender<T, P, NUM_SEGS_P2>,
1707 BlockingMpscReceiver<T, P, NUM_SEGS_P2>,
1708) {
1709 let inner = Arc::clone(receiver.inner());
1710 let shared = Arc::new(AsyncMpscShared::new(inner));
1711 let async_sender = AsyncMpscSender::new(sender, Arc::clone(&shared));
1712 let blocking_receiver = BlockingMpscReceiver::new(receiver, shared);
1713 (async_sender, blocking_receiver)
1714}
1715
1716pub mod unbounded {
1774 use super::*;
1775 use crate::spsc::unbounded::{UnboundedSender as UnboundedSender_, UnboundedReceiver as UnboundedReceiver_};
1776 use crate::spsc::NoOpSignal;
1777
1778 struct UnboundedProducerSlot<T> {
1780 sender: UnboundedSender_<T>,
1781 receiver: UnboundedReceiver_<T>,
1782 space_waker: DiatomicWaker,
1783 }
1784
1785 struct UnboundedInner<T> {
1797 queues: Box<[AtomicPtr<UnboundedProducerSlot<T>>]>,
1799 queue_count: CachePadded<AtomicUsize>,
1800 producer_count: CachePadded<AtomicUsize>,
1801 max_producer_id: AtomicUsize,
1802 closed: CachePadded<AtomicBool>,
1803 summary: Arc<AsyncSignalWaker>,
1804 signals: Arc<[Signal; SIGNAL_WORDS]>,
1805 }
1806
1807 impl<T> UnboundedInner<T> {
1808 fn is_closed(&self) -> bool {
1809 self.closed.load(Ordering::Acquire)
1810 }
1811
1812 fn producer_count(&self) -> usize {
1813 self.producer_count.load(Ordering::Relaxed)
1814 }
1815
1816 fn create_sender(self: &Arc<Self>) -> Result<UnboundedSender<T>, PushError<()>> {
1817 if self.is_closed() {
1818 return Err(PushError::Closed(()));
1819 }
1820
1821 loop {
1822 let current = self.producer_count.load(Ordering::Acquire);
1823 if current >= MAX_PRODUCERS {
1824 return Err(PushError::Full(()));
1825 }
1826 if self
1827 .producer_count
1828 .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
1829 .is_ok()
1830 {
1831 break;
1832 }
1833 }
1834
1835 let mut assigned_id = None;
1836 let mut slot_arc: Option<Arc<UnboundedProducerSlot<T>>> = None;
1837
1838 for signal_index in 0..SIGNAL_WORDS {
1839 for bit_index in 0..64 {
1840 let queue_index = signal_index * 64 + bit_index;
1841 if queue_index >= MAX_QUEUES {
1842 break;
1843 }
1844 if !self.queues[queue_index].load(Ordering::Acquire).is_null() {
1845 continue;
1846 }
1847
1848 let (sender, receiver) = UnboundedSpsc::<T, 6, 8, NoOpSignal>::new();
1849 let slot = Arc::new(UnboundedProducerSlot {
1850 sender,
1851 receiver,
1852 space_waker: DiatomicWaker::new(),
1853 });
1854
1855 let raw = Arc::into_raw(Arc::clone(&slot)) as *mut UnboundedProducerSlot<T>;
1856 match self.queues[queue_index].compare_exchange(
1857 ptr::null_mut(),
1858 raw,
1859 Ordering::Release,
1860 Ordering::Acquire,
1861 ) {
1862 Ok(_) => {
1863 self.queue_count.fetch_add(1, Ordering::Relaxed);
1864 assigned_id = Some(queue_index);
1865 slot_arc = Some(slot);
1866 break;
1867 }
1868 Err(_) => unsafe {
1869 Arc::from_raw(raw);
1870 },
1871 }
1872 }
1873 if assigned_id.is_some() {
1874 break;
1875 }
1876 }
1877
1878 let producer_id = match assigned_id {
1879 Some(id) => id,
1880 None => {
1881 self.producer_count.fetch_sub(1, Ordering::Release);
1882 return Err(PushError::Full(()));
1883 }
1884 };
1885
1886 loop {
1890 let max_producer_id = self.max_producer_id.load(Ordering::SeqCst);
1891 if producer_id <= max_producer_id {
1893 break;
1894 }
1895 if self.is_closed() {
1896 return Err(PushError::Closed(()));
1897 }
1898 if self
1899 .max_producer_id
1900 .compare_exchange(
1901 max_producer_id,
1902 producer_id,
1903 Ordering::SeqCst,
1904 Ordering::SeqCst,
1905 )
1906 .is_ok()
1907 {
1908 break;
1909 }
1910 }
1911
1912 let slot_arc = slot_arc.expect("slot arc missing");
1919
1920 Ok(UnboundedSender {
1921 inner: Arc::clone(self),
1922 slot: slot_arc,
1923 producer_id,
1924 })
1925 }
1926
1927 fn close(&self) -> bool {
1928 let was_open = !self.closed.swap(true, Ordering::AcqRel);
1929 if was_open {
1930 let permits = self.queue_count.load(Ordering::Relaxed).max(1);
1931 self.summary.release(permits);
1932 }
1933
1934 for slot_atomic in self.queues.iter() {
1935 let slot_ptr = slot_atomic.swap(ptr::null_mut(), Ordering::AcqRel);
1936 if slot_ptr.is_null() {
1937 continue;
1938 }
1939
1940 unsafe {
1941 (*slot_ptr).sender.close_channel();
1942 Arc::from_raw(slot_ptr);
1943 }
1944
1945 self.queue_count.fetch_sub(1, Ordering::Relaxed);
1946 }
1947
1948 self.producer_count.store(0, Ordering::Release);
1949 true
1950 }
1951 }
1952
1953 impl<T> Drop for UnboundedInner<T> {
1954 fn drop(&mut self) {
1955 self.close();
1956 }
1957 }
1958
1959 pub fn unbounded_new<T>() -> UnboundedReceiver<T> {
1961 unbounded_new_with_waker(Arc::new(AsyncSignalWaker::new()))
1962 }
1963
1964 pub fn unbounded_new_with_waker<T>(
1966 waker: Arc<AsyncSignalWaker>,
1967 ) -> UnboundedReceiver<T> {
1968 let mut queues = Vec::with_capacity(MAX_QUEUES);
1969 for _ in 0..MAX_QUEUES {
1970 queues.push(AtomicPtr::new(core::ptr::null_mut()));
1971 }
1972
1973 let signals: Arc<[Signal; SIGNAL_WORDS]> =
1974 Arc::new(std::array::from_fn(|i| Signal::with_index(i as u64)));
1975
1976 let inner = Arc::new(UnboundedInner {
1977 queues: queues.into_boxed_slice(),
1978 queue_count: CachePadded::new(AtomicUsize::new(0)),
1979 producer_count: CachePadded::new(AtomicUsize::new(0)),
1980 max_producer_id: AtomicUsize::new(0),
1981 closed: CachePadded::new(AtomicBool::new(false)),
1982 summary: waker,
1983 signals,
1984 });
1985
1986 UnboundedReceiver {
1987 inner,
1988 misses: 0,
1989 seed: rand::rng().next_u64(),
1990 }
1991 }
1992
1993 pub fn unbounded_new_with_sender<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
1995 let waker = Arc::new(AsyncSignalWaker::new());
1996 let mut queues = Vec::with_capacity(MAX_QUEUES);
1997 for _ in 0..MAX_QUEUES {
1998 queues.push(AtomicPtr::new(core::ptr::null_mut()));
1999 }
2000
2001 let signals: Arc<[Signal; SIGNAL_WORDS]> =
2002 Arc::new(std::array::from_fn(|i| Signal::with_index(i as u64)));
2003
2004 let inner = Arc::new(UnboundedInner {
2005 queues: queues.into_boxed_slice(),
2006 queue_count: CachePadded::new(AtomicUsize::new(0)),
2007 producer_count: CachePadded::new(AtomicUsize::new(0)),
2008 max_producer_id: AtomicUsize::new(0),
2009 closed: CachePadded::new(AtomicBool::new(false)),
2010 summary: waker,
2011 signals,
2012 });
2013
2014 let receiver = UnboundedReceiver {
2015 inner: inner.clone(),
2016 misses: 0,
2017 seed: rand::rng().next_u64(),
2018 };
2019
2020 let sender = inner
2021 .create_sender()
2022 .expect("fatal: unbounded mpsc won't allow even 1 sender");
2023
2024 (sender, receiver)
2025 }
2026
2027 pub struct UnboundedSender<T> {
2029 inner: Arc<UnboundedInner<T>>,
2030 slot: Arc<UnboundedProducerSlot<T>>,
2031 producer_id: usize,
2032 }
2033
2034 impl<T> UnboundedSender<T> {
2035 pub fn is_closed(&self) -> bool {
2036 self.inner.is_closed()
2037 }
2038
2039 pub fn producer_count(&self) -> usize {
2040 self.inner.producer_count()
2041 }
2042
2043 pub fn producer_id(&self) -> usize {
2044 self.producer_id
2045 }
2046
2047 pub fn try_push(&mut self, value: T) -> Result<(), PushError<T>> {
2049 self.slot.sender.try_push(value)
2050 }
2051
2052 pub fn close(&mut self) -> bool {
2054 self.inner.close()
2055 }
2056 }
2057
2058 impl<T> Clone for UnboundedSender<T> {
2059 fn clone(&self) -> Self {
2060 self.inner
2061 .create_sender()
2062 .expect("too many senders")
2063 }
2064 }
2065
2066 impl<T> Drop for UnboundedSender<T> {
2067 fn drop(&mut self) {
2068 self.slot.sender.close_channel();
2072
2073 std::sync::atomic::fence(Ordering::Release);
2078
2079 self.inner.producer_count.fetch_sub(1, Ordering::Release);
2082 }
2083 }
2084
2085 pub struct UnboundedReceiver<T> {
2087 inner: Arc<UnboundedInner<T>>,
2088 misses: u64,
2089 seed: u64,
2090 }
2091
2092 impl<T> UnboundedReceiver<T> {
2093 fn next(&mut self) -> u64 {
2094 let old_seed = self.seed;
2095 let next_seed = (old_seed
2096 .wrapping_mul(RND_MULTIPLIER)
2097 .wrapping_add(RND_ADDEND))
2098 & RND_MASK;
2099 self.seed = next_seed;
2100 next_seed >> 16
2101 }
2102
2103 pub fn is_closed(&self) -> bool {
2104 self.inner.closed.load(Ordering::Acquire)
2105 }
2106
2107 pub fn close(&self) -> bool {
2108 self.inner.close()
2109 }
2110
2111 pub fn producer_count(&self) -> usize {
2112 self.inner.producer_count()
2113 }
2114
2115 pub fn create_sender(&self) -> Result<UnboundedSender<T>, PushError<()>> {
2116 self.inner.create_sender()
2117 }
2118
2119 pub fn try_pop(&mut self) -> Result<T, PopError> {
2121 self.try_pop_with_waker().map(|(v, _)| v)
2122 }
2123
2124 pub fn try_pop_with_waker(&mut self) -> Result<(T, &DiatomicWaker), PopError> {
2126 let max_producer_id = self.inner.max_producer_id.load(Ordering::Acquire);
2128
2129 for producer_id in 0..=max_producer_id {
2131 let slot_ptr = self.inner.queues[producer_id].load(Ordering::Acquire);
2132 if slot_ptr.is_null() {
2133 continue;
2134 }
2135
2136 let slot = unsafe { &*slot_ptr };
2139 if let Some(value) = slot.receiver.try_pop() {
2140 return Ok((value, &slot.space_waker));
2141 }
2142 }
2143
2144 std::sync::atomic::fence(Ordering::Acquire);
2149
2150 let is_closed = self.inner.is_closed();
2152 let producer_count = self.inner.producer_count.load(Ordering::Relaxed);
2154
2155 if is_closed || producer_count == 0 {
2156 for producer_id in 0..=max_producer_id {
2159 let slot_ptr = self.inner.queues[producer_id].load(Ordering::Acquire);
2160 if slot_ptr.is_null() {
2161 continue;
2162 }
2163
2164 let slot = unsafe { &*slot_ptr };
2166 if let Some(value) = slot.receiver.try_pop() {
2167 return Ok((value, &slot.space_waker));
2168 }
2169 }
2170
2171 Err(PopError::Closed)
2172 } else {
2173 Err(PopError::Empty)
2174 }
2175 }
2176
2177 fn acquire(&mut self) -> Option<*mut UnboundedProducerSlot<T>> {
2178 let random = self.next() as usize;
2179 let random_word = random % SIGNAL_WORDS;
2180 let mut signal_index = self.inner.summary.summary_select(random_word as u64) as usize;
2181
2182 if signal_index >= SIGNAL_WORDS {
2183 signal_index = random_word;
2184 }
2185
2186 let mut signal_bit = self.next() & 63;
2187 let signal = &self.inner.signals[signal_index];
2188 let signal_value = signal.load(Ordering::Acquire);
2189
2190 signal_bit = find_nearest(signal_value, signal_bit);
2191
2192 if signal_bit >= 64 {
2193 self.misses += 1;
2194 return None;
2195 }
2196
2197 let (bit, expected, acquired) = signal.try_acquire(signal_bit);
2198
2199 if !acquired {
2200 std::hint::spin_loop();
2201 return None;
2202 }
2203
2204 let empty = expected == bit;
2205
2206 if empty {
2207 self.inner
2208 .summary
2209 .try_unmark_if_empty(signal.index(), signal.value());
2210 }
2211
2212 let producer_id = signal_index * 64 + (signal_bit as usize);
2213 let slot_ptr = self.inner.queues[producer_id].load(Ordering::Acquire);
2214
2215 if slot_ptr.is_null() {
2216 self.misses += 1;
2217 if empty {
2218 self.inner
2219 .summary
2220 .try_unmark_if_empty(signal.index(), signal.value());
2221 }
2222 return None;
2223 }
2224
2225 Some(slot_ptr)
2226 }
2227 }
2228
2229 impl<T> Drop for UnboundedReceiver<T> {
2230 fn drop(&mut self) {
2231 self.inner.close();
2232 }
2233 }
2234
2235 struct UnboundedAsyncMpscShared<T> {
2237 receiver_waiter: CachePadded<DiatomicWaker>,
2238 inner: Arc<UnboundedInner<T>>,
2239 }
2240
2241 impl<T> UnboundedAsyncMpscShared<T> {
2242 fn new(inner: Arc<UnboundedInner<T>>) -> Self {
2243 Self {
2244 receiver_waiter: CachePadded::new(DiatomicWaker::new()),
2245 inner,
2246 }
2247 }
2248
2249 #[inline]
2250 fn notify_receiver(&self) {
2251 self.receiver_waiter.notify();
2252 }
2253
2254 #[inline]
2255 unsafe fn wait_for_items<Pred, R>(&self, predicate: Pred) -> WaitUntil<'_, Pred, R>
2256 where
2257 Pred: FnMut() -> Option<R>,
2258 {
2259 unsafe { self.receiver_waiter.wait_until(predicate) }
2260 }
2261
2262 #[inline]
2263 unsafe fn register_receiver(&self, waker: &Waker) {
2264 unsafe { self.receiver_waiter.register(waker) };
2265 }
2266 }
2267
2268 pub struct AsyncUnboundedMpscSender<T> {
2270 sender: UnboundedSender<T>,
2271 shared: Arc<UnboundedAsyncMpscShared<T>>,
2272 }
2273
2274 pub struct AsyncUnboundedMpscReceiver<T> {
2276 receiver: UnboundedReceiver<T>,
2277 shared: Arc<UnboundedAsyncMpscShared<T>>,
2278 }
2279
2280 impl<T> AsyncUnboundedMpscSender<T> {
2281 fn new(
2282 sender: UnboundedSender<T>,
2283 shared: Arc<UnboundedAsyncMpscShared<T>>,
2284 ) -> Self {
2285 Self { sender, shared }
2286 }
2287
2288 #[inline]
2290 pub fn try_send(&mut self, value: T) -> Result<(), PushError<T>> {
2291 match self.sender.try_push(value) {
2292 Ok(()) => {
2293 self.shared.notify_receiver();
2294 Ok(())
2295 }
2296 Err(err) => Err(err),
2297 }
2298 }
2299
2300 pub async fn send(&mut self, value: T) -> Result<(), PushError<T>> {
2302 match self.try_send(value) {
2303 Ok(()) => Ok(()),
2304 Err(PushError::Full(item)) => Err(PushError::Full(item)), Err(PushError::Closed(item)) => Err(PushError::Closed(item)),
2306 }
2307 }
2308
2309 pub fn close(&mut self) -> bool {
2310 self.sender.close()
2311 }
2312
2313 pub fn create_blocking_sender(&self) -> BlockingUnboundedMpscSender<T> {
2315 let sender = self.sender.clone();
2316 BlockingUnboundedMpscSender::new(sender, Arc::clone(&self.shared))
2317 }
2318 }
2319
2320 impl<T> Clone for AsyncUnboundedMpscSender<T> {
2321 fn clone(&self) -> Self {
2322 let sender = self.sender.clone();
2323 Self::new(sender, Arc::clone(&self.shared))
2324 }
2325 }
2326
2327 impl<T> Drop for AsyncUnboundedMpscSender<T> {
2328 fn drop(&mut self) {
2329 self.shared.notify_receiver();
2330 }
2331 }
2332
2333 impl<T> AsyncUnboundedMpscReceiver<T> {
2334 fn new(
2335 receiver: UnboundedReceiver<T>,
2336 shared: Arc<UnboundedAsyncMpscShared<T>>,
2337 ) -> Self {
2338 Self { receiver, shared }
2339 }
2340
2341 pub fn producer_count(&self) -> usize {
2343 self.receiver.producer_count()
2344 }
2345
2346 #[inline]
2348 pub fn try_recv(&mut self) -> Result<T, PopError> {
2349 match self.receiver.try_pop_with_waker() {
2350 Ok((value, waker)) => {
2351 waker.notify();
2352 Ok(value)
2353 }
2354 Err(err) => Err(err),
2355 }
2356 }
2357
2358 pub async fn recv(&mut self) -> Result<T, PopError> {
2360 match self.try_recv() {
2361 Ok(value) => Ok(value),
2362 Err(PopError::Empty) | Err(PopError::Timeout) => {
2363 let shared = Arc::clone(&self.shared);
2364 let receiver = &mut self.receiver;
2365 unsafe {
2366 shared
2367 .wait_for_items(|| match receiver.try_pop_with_waker() {
2368 Ok((value, waker)) => {
2369 waker.notify();
2370 Some(Ok(value))
2371 }
2372 Err(PopError::Empty) | Err(PopError::Timeout) => None,
2373 Err(PopError::Closed) => Some(Err(PopError::Closed)),
2374 })
2375 .await
2376 }
2377 }
2378 Err(PopError::Closed) => Err(PopError::Closed),
2379 }
2380 }
2381 }
2382
2383 impl<T> Stream for AsyncUnboundedMpscReceiver<T> {
2384 type Item = T;
2385
2386 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2387 let this = unsafe { self.get_unchecked_mut() };
2388 match this.try_recv() {
2389 Ok(value) => Poll::Ready(Some(value)),
2390 Err(PopError::Closed) => Poll::Ready(None),
2391 Err(PopError::Empty) | Err(PopError::Timeout) => {
2392 unsafe {
2393 this.shared.register_receiver(cx.waker());
2394 }
2395 match this.try_recv() {
2396 Ok(value) => Poll::Ready(Some(value)),
2397 Err(PopError::Closed) => Poll::Ready(None),
2398 Err(PopError::Empty) | Err(PopError::Timeout) => Poll::Pending,
2399 }
2400 }
2401 }
2402 }
2403 }
2404
2405 pub struct BlockingUnboundedMpscSender<T> {
2407 sender: UnboundedSender<T>,
2408 shared: Arc<UnboundedAsyncMpscShared<T>>,
2409 parker: Parker,
2410 parker_waker: Arc<ThreadUnparker>,
2411 }
2412
2413 impl<T> BlockingUnboundedMpscSender<T> {
2414 fn new(
2415 sender: UnboundedSender<T>,
2416 shared: Arc<UnboundedAsyncMpscShared<T>>,
2417 ) -> Self {
2418 let parker = Parker::new();
2419 let parker_waker = Arc::new(ThreadUnparker {
2420 unparker: parker.unparker(),
2421 });
2422 Self {
2423 sender,
2424 shared,
2425 parker,
2426 parker_waker,
2427 }
2428 }
2429
2430 #[inline]
2432 pub fn try_send(&mut self, value: T) -> Result<(), PushError<T>> {
2433 match self.sender.try_push(value) {
2434 Ok(()) => {
2435 self.shared.notify_receiver();
2436 Ok(())
2437 }
2438 Err(err) => Err(err),
2439 }
2440 }
2441
2442 pub fn send(&mut self, value: T) -> Result<(), PushError<T>> {
2444 self.try_send(value)
2446 }
2447
2448 pub fn close(&mut self) -> bool {
2449 self.sender.close()
2450 }
2451
2452 pub fn create_async_sender(&self) -> AsyncUnboundedMpscSender<T> {
2454 let sender = self.sender.clone();
2455 AsyncUnboundedMpscSender::new(sender, Arc::clone(&self.shared))
2456 }
2457 }
2458
2459 impl<T> Clone for BlockingUnboundedMpscSender<T> {
2460 fn clone(&self) -> Self {
2461 let sender = self.sender.clone();
2462 Self::new(sender, Arc::clone(&self.shared))
2463 }
2464 }
2465
2466 impl<T> Drop for BlockingUnboundedMpscSender<T> {
2467 fn drop(&mut self) {
2468 self.shared.notify_receiver();
2469 }
2470 }
2471
2472 pub struct BlockingUnboundedMpscReceiver<T> {
2474 receiver: UnboundedReceiver<T>,
2475 shared: Arc<UnboundedAsyncMpscShared<T>>,
2476 }
2477
2478 impl<T> BlockingUnboundedMpscReceiver<T> {
2479 fn new(
2480 receiver: UnboundedReceiver<T>,
2481 shared: Arc<UnboundedAsyncMpscShared<T>>,
2482 ) -> Self {
2483 Self { receiver, shared }
2484 }
2485
2486 pub fn producer_count(&self) -> usize {
2488 self.receiver.producer_count()
2489 }
2490
2491 #[inline]
2493 pub fn try_recv(&mut self) -> Result<T, PopError> {
2494 match self.receiver.try_pop_with_waker() {
2495 Ok((value, waker)) => {
2496 waker.notify();
2497 Ok(value)
2498 }
2499 Err(err) => Err(err),
2500 }
2501 }
2502
2503 pub fn recv(&mut self) -> Result<T, PopError> {
2505 match self.try_recv() {
2506 Ok(value) => return Ok(value),
2507 Err(PopError::Closed) => return Err(PopError::Closed),
2508 Err(PopError::Empty) | Err(PopError::Timeout) => {}
2509 }
2510
2511 let parker = Parker::new();
2512 let unparker = parker.unparker();
2513 let waker = Waker::from(Arc::new(ThreadUnparker { unparker }));
2514
2515 loop {
2516 unsafe {
2517 self.shared.register_receiver(&waker);
2518 }
2519
2520 match self.receiver.try_pop_with_waker() {
2521 Ok((value, waker)) => {
2522 waker.notify();
2523 return Ok(value);
2524 }
2525 Err(PopError::Closed) => {
2526 return Err(PopError::Closed);
2527 }
2528 Err(PopError::Empty) | Err(PopError::Timeout) => {
2529 parker.park();
2530 }
2531 }
2532 }
2533 }
2534 }
2535
2536
2537 pub fn async_unbounded_mpsc<T>() -> (AsyncUnboundedMpscSender<T>, AsyncUnboundedMpscReceiver<T>) {
2539 let (sender, receiver) = unbounded_new_with_sender();
2540 let inner = receiver.inner.clone();
2541 let shared = Arc::new(UnboundedAsyncMpscShared::new(inner));
2542 let async_sender = AsyncUnboundedMpscSender::new(sender, Arc::clone(&shared));
2543 let async_receiver = AsyncUnboundedMpscReceiver::new(receiver, shared);
2544 (async_sender, async_receiver)
2545 }
2546
2547 pub fn blocking_unbounded_mpsc<T>() -> (BlockingUnboundedMpscSender<T>, BlockingUnboundedMpscReceiver<T>) {
2549 let (sender, receiver) = unbounded_new_with_sender();
2550 let inner = receiver.inner.clone();
2551 let shared = Arc::new(UnboundedAsyncMpscShared::new(inner));
2552 let blocking_sender = BlockingUnboundedMpscSender::new(sender, Arc::clone(&shared));
2553 let blocking_receiver = BlockingUnboundedMpscReceiver::new(receiver, shared);
2554 (blocking_sender, blocking_receiver)
2555 }
2556
2557 pub fn blocking_async_unbounded_mpsc<T>() -> (BlockingUnboundedMpscSender<T>, AsyncUnboundedMpscReceiver<T>) {
2559 let (sender, receiver) = unbounded_new_with_sender();
2560 let inner = receiver.inner.clone();
2561 let shared = Arc::new(UnboundedAsyncMpscShared::new(inner));
2562 let blocking_sender = BlockingUnboundedMpscSender::new(sender, Arc::clone(&shared));
2563 let async_receiver = AsyncUnboundedMpscReceiver::new(receiver, shared);
2564 (blocking_sender, async_receiver)
2565 }
2566
2567 pub fn async_blocking_unbounded_mpsc<T>() -> (AsyncUnboundedMpscSender<T>, BlockingUnboundedMpscReceiver<T>) {
2569 let (sender, receiver) = unbounded_new_with_sender();
2570 let inner = receiver.inner.clone();
2571 let shared = Arc::new(UnboundedAsyncMpscShared::new(inner));
2572 let async_sender = AsyncUnboundedMpscSender::new(sender, Arc::clone(&shared));
2573 let blocking_receiver = BlockingUnboundedMpscReceiver::new(receiver, shared);
2574 (async_sender, blocking_receiver)
2575 }
2576}
2577
2578#[cfg(test)]
2579mod tests {
2580 use super::*;
2581 use std::sync::atomic::Ordering;
2582 use std::sync::Arc;
2583 use std::thread;
2584 use std::time::Duration;
2585
2586 #[test]
2587 fn try_pop_drains_and_reports_closed() {
2588 let (mut tx, mut rx) = new_with_sender::<u64, 6, 8>();
2589
2590 tx.try_push(42).unwrap();
2591 assert_eq!(rx.try_pop().unwrap(), 42);
2592 assert_eq!(rx.try_pop(), Err(PopError::Empty));
2593
2594 assert!(rx.close());
2595 assert_eq!(rx.try_pop(), Err(PopError::Closed));
2596 }
2597
2598 #[test]
2599 fn dropping_local_sender_clears_producer_slot() {
2600 let (tx, rx) = new_with_sender::<u64, 6, 8>();
2601 assert_eq!(tx.producer_count(), 1);
2602
2603 drop(tx);
2604
2605 assert!(rx.close());
2607 assert_eq!(rx.producer_count(), 0);
2608 assert_eq!(rx.inner.queue_count.load(Ordering::SeqCst), 0);
2609 }
2610
2611 #[tokio::test]
2614 async fn async_async_basic_send_recv() {
2615 let (mut sender, mut receiver) = async_mpsc::<u64, 6, 8>();
2616
2617 sender.send(42).await.unwrap();
2618 assert_eq!(receiver.recv().await.unwrap(), 42);
2619 }
2620
2621 #[tokio::test]
2622 async fn async_async_multiple_senders() {
2623 let (sender, mut receiver) = async_mpsc::<u64, 6, 8>();
2624 let mut sender1 = sender.clone();
2625 let mut sender2 = sender.clone();
2626 let mut sender3 = sender.clone();
2627
2628 tokio::spawn(async move {
2629 sender1.send(1).await.unwrap();
2630 });
2631 tokio::spawn(async move {
2632 sender2.send(2).await.unwrap();
2633 });
2634 tokio::spawn(async move {
2635 sender3.send(3).await.unwrap();
2636 });
2637
2638 let mut received = Vec::new();
2639 for _ in 0..3 {
2640 received.push(receiver.recv().await.unwrap());
2641 }
2642 received.sort();
2643 assert_eq!(received, vec![1, 2, 3]);
2644 }
2645
2646 #[tokio::test]
2647 async fn async_async_batch_operations() {
2648 let (mut sender, mut receiver) = async_mpsc::<u64, 6, 8>();
2649
2650 assert!(
2652 sender
2653 .send_slice(vec![1, 2, 3, 4, 5])
2654 .await
2655 .unwrap()
2656 .is_empty()
2657 );
2658
2659 let mut buf = [0u64; 5];
2661 let count = receiver.recv_batch(&mut buf).await.unwrap();
2662 assert_eq!(count, 5);
2663 assert_eq!(buf, [1, 2, 3, 4, 5]);
2664 }
2665
2666 #[tokio::test]
2667 async fn async_async_closed_queue() {
2668 let (mut sender, mut receiver) = async_mpsc::<u64, 6, 8>();
2669
2670 sender.send(42).await.unwrap();
2671 assert_eq!(receiver.recv().await.unwrap(), 42);
2672
2673 drop(sender);
2675
2676 assert_eq!(receiver.recv().await, Err(PopError::Closed));
2678 }
2679
2680 #[test]
2683 fn blocking_blocking_basic_send_recv() {
2684 let (mut sender, mut receiver) = blocking_mpsc::<u64, 6, 8>();
2685
2686 sender.send(42).unwrap();
2687 assert_eq!(receiver.recv().unwrap(), 42);
2688 }
2689
2690 #[test]
2691 fn blocking_blocking_multiple_senders() {
2692 let (sender, receiver) = blocking_mpsc::<u64, 6, 8>();
2693 let receiver = Arc::new(std::sync::Mutex::new(receiver));
2694
2695 let mut handles = Vec::new();
2696 for i in 0..5 {
2697 let mut sender = sender.clone();
2698 let receiver = Arc::clone(&receiver);
2699 handles.push(thread::spawn(move || {
2700 sender.send(i).unwrap();
2701 }));
2702 }
2703
2704 for handle in handles {
2705 handle.join().unwrap();
2706 }
2707
2708 let mut received = Vec::new();
2709 let mut receiver = receiver.lock().unwrap();
2710 for _ in 0..5 {
2711 received.push(receiver.recv().unwrap());
2712 }
2713 received.sort();
2714 assert_eq!(received, vec![0, 1, 2, 3, 4]);
2715 }
2716
2717 #[test]
2718 fn blocking_blocking_batch_operations() {
2719 let (mut sender, mut receiver) = blocking_mpsc::<u64, 6, 8>();
2720
2721 assert!(sender.send_slice(vec![1, 2, 3, 4, 5]).unwrap().is_empty());
2723
2724 let mut buf = [0u64; 5];
2726 let count = receiver.recv_batch(&mut buf).unwrap();
2727 assert_eq!(count, 5);
2728 assert_eq!(buf, [1, 2, 3, 4, 5]);
2729 }
2730
2731 #[test]
2732 fn blocking_blocking_closed_queue() {
2733 let (mut sender, mut receiver) = blocking_mpsc::<u64, 6, 8>();
2734
2735 sender.send(42).unwrap();
2736 assert_eq!(receiver.recv().unwrap(), 42);
2737
2738 drop(sender);
2740
2741 assert_eq!(receiver.recv(), Err(PopError::Closed));
2743 }
2744
2745 #[tokio::test]
2748 async fn blocking_async_basic_send_recv() {
2749 let (mut sender, mut receiver) = blocking_async_mpsc::<u64, 6, 8>();
2750
2751 let handle = thread::spawn(move || {
2753 sender.send(42).unwrap();
2754 });
2755
2756 handle.join().unwrap();
2757 assert_eq!(receiver.recv().await.unwrap(), 42);
2758 }
2759
2760 #[tokio::test]
2761 async fn blocking_async_multiple_blocking_senders() {
2762 let (sender, mut receiver) = blocking_async_mpsc::<u64, 6, 8>();
2763
2764 let mut handles = Vec::new();
2765 for i in 0..5 {
2766 let mut sender = sender.clone();
2767 handles.push(thread::spawn(move || {
2768 sender.send(i).unwrap();
2769 }));
2770 }
2771
2772 for handle in handles {
2773 handle.join().unwrap();
2774 }
2775
2776 let mut received = Vec::new();
2777 for _ in 0..5 {
2778 received.push(receiver.recv().await.unwrap());
2779 }
2780 received.sort();
2781 assert_eq!(received, vec![0, 1, 2, 3, 4]);
2782 }
2783
2784 #[tokio::test]
2785 async fn blocking_async_wakeup_async_receiver() {
2786 let (mut sender, mut receiver) = blocking_async_mpsc::<u64, 6, 8>();
2787
2788 let recv_handle = tokio::spawn(async move { receiver.recv().await.unwrap() });
2790
2791 tokio::time::sleep(Duration::from_millis(10)).await;
2793
2794 thread::spawn(move || {
2796 sender.send(99).unwrap();
2797 });
2798
2799 assert_eq!(recv_handle.await.unwrap(), 99);
2800 }
2801
2802 #[tokio::test]
2803 async fn blocking_async_batch_operations() {
2804 let (mut sender, mut receiver) = blocking_async_mpsc::<u64, 6, 8>();
2805
2806 thread::spawn(move || {
2808 let _ = sender.send_slice(vec![1, 2, 3, 4, 5]).unwrap();
2809 });
2810
2811 let mut buf = [0u64; 5];
2813 let count = receiver.recv_batch(&mut buf).await.unwrap();
2814 assert_eq!(count, 5);
2815 assert_eq!(buf, [1, 2, 3, 4, 5]);
2816 }
2817
2818 #[tokio::test]
2821 async fn async_blocking_basic_send_recv() {
2822 let (mut sender, receiver) = async_blocking_mpsc::<u64, 6, 8>();
2823 let receiver = Arc::new(std::sync::Mutex::new(receiver));
2824
2825 let receiver_clone = Arc::clone(&receiver);
2827 let handle = thread::spawn(move || receiver_clone.lock().unwrap().recv().unwrap());
2828
2829 tokio::time::sleep(Duration::from_millis(10)).await;
2831
2832 sender.send(42).await.unwrap();
2834
2835 assert_eq!(handle.join().unwrap(), 42);
2836 }
2837
2838 #[tokio::test]
2839 async fn async_blocking_multiple_async_senders() {
2840 let (sender, receiver) = async_blocking_mpsc::<u64, 6, 8>();
2841 let receiver = Arc::new(std::sync::Mutex::new(receiver));
2842
2843 for i in 0..5 {
2845 let mut sender = sender.clone();
2846 tokio::spawn(async move {
2847 sender.send(i).await.unwrap();
2848 });
2849 }
2850
2851 tokio::time::sleep(Duration::from_millis(50)).await;
2853
2854 let mut received = Vec::new();
2856 let mut receiver = receiver.lock().unwrap();
2857 for _ in 0..5 {
2858 received.push(receiver.recv().unwrap());
2859 }
2860 received.sort();
2861 assert_eq!(received, vec![0, 1, 2, 3, 4]);
2862 }
2863
2864 #[tokio::test]
2865 async fn async_blocking_wakeup_blocking_receiver() {
2866 let (mut sender, receiver) = async_blocking_mpsc::<u64, 6, 8>();
2867 let receiver = Arc::new(std::sync::Mutex::new(receiver));
2868
2869 let receiver_clone = Arc::clone(&receiver);
2871 let recv_handle = thread::spawn(move || receiver_clone.lock().unwrap().recv().unwrap());
2872
2873 tokio::time::sleep(Duration::from_millis(10)).await;
2875
2876 let send_handle = tokio::spawn(async move {
2878 sender.send(88).await.unwrap();
2879 });
2880
2881 send_handle.await.unwrap();
2883 assert_eq!(recv_handle.join().unwrap(), 88);
2884 }
2885
2886 #[tokio::test]
2887 async fn async_blocking_batch_operations() {
2888 let (mut sender, receiver) = async_blocking_mpsc::<u64, 6, 8>();
2889 let receiver = Arc::new(std::sync::Mutex::new(receiver));
2890
2891 tokio::spawn(async move {
2893 assert!(
2894 sender
2895 .send_slice(vec![1, 2, 3, 4, 5])
2896 .await
2897 .unwrap()
2898 .is_empty()
2899 );
2900 });
2901
2902 tokio::time::sleep(Duration::from_millis(10)).await;
2904
2905 let receiver_clone = Arc::clone(&receiver);
2907 let handle = thread::spawn(move || {
2908 let mut buf = [0u64; 5];
2909 let count = receiver_clone.lock().unwrap().recv_batch(&mut buf).unwrap();
2910 (count, buf)
2911 });
2912
2913 let (count, buf) = handle.join().unwrap();
2914 assert_eq!(count, 5);
2915 assert_eq!(buf, [1, 2, 3, 4, 5]);
2916 }
2917
2918 #[tokio::test]
2927 async fn async_async_try_send_full() {
2928 let (mut sender, _receiver) = async_mpsc::<u64, 6, 8>();
2929
2930 assert!(sender.try_send(1).is_ok());
2933 }
2934
2935 #[test]
2936 fn blocking_blocking_try_send_full() {
2937 let (mut sender, _receiver) = blocking_mpsc::<u64, 6, 8>();
2938
2939 assert!(sender.try_send(1).is_ok());
2940 }
2941
2942 #[tokio::test]
2943 async fn async_async_closed_sender() {
2944 let (mut sender, mut receiver) = async_mpsc::<u64, 6, 8>();
2945
2946 sender.send(1).await.unwrap();
2948 assert_eq!(receiver.recv().await.unwrap(), 1);
2949
2950 drop(receiver);
2952
2953 assert_eq!(sender.send(42).await, Err(PushError::Closed(42)));
2955 }
2956
2957 #[test]
2958 fn blocking_blocking_closed_sender() {
2959 let (mut sender, mut receiver) = blocking_mpsc::<u64, 6, 8>();
2960
2961 sender.send(1).unwrap();
2963 assert_eq!(receiver.recv().unwrap(), 1);
2964
2965 drop(receiver);
2967
2968 assert_eq!(sender.send(42), Err(PushError::Closed(42)));
2970 }
2971
2972 #[test]
2975 fn unbounded_try_pop_drains_and_reports_closed() {
2976 use crate::sync::mpsc::unbounded;
2977 let (mut tx, mut rx) = unbounded::unbounded_new_with_sender::<u64>();
2978
2979 tx.try_push(42).unwrap();
2980 assert_eq!(rx.try_pop().unwrap(), 42);
2981 assert_eq!(rx.try_pop(), Err(PopError::Empty));
2982
2983 assert!(rx.close());
2984 assert_eq!(rx.try_pop(), Err(PopError::Closed));
2985 }
2986
2987 #[test]
2988 fn unbounded_dropping_local_sender_clears_producer_slot() {
2989 use crate::sync::mpsc::unbounded;
2990 let (tx, rx) = unbounded::unbounded_new_with_sender::<u64>();
2991 assert_eq!(tx.producer_count(), 1);
2992
2993 drop(tx);
2994
2995 assert!(rx.close());
2997 assert_eq!(rx.producer_count(), 0);
2998 }
2999
3000 #[tokio::test]
3003 async fn unbounded_async_async_basic_send_recv() {
3004 use crate::sync::mpsc::unbounded;
3005 let (mut sender, mut receiver) = unbounded::async_unbounded_mpsc::<u64>();
3006
3007 sender.send(42).await.unwrap();
3008 assert_eq!(receiver.recv().await.unwrap(), 42);
3009 }
3010
3011 #[tokio::test]
3012 async fn unbounded_async_async_multiple_senders() {
3013 use crate::sync::mpsc::unbounded;
3014 let (sender, mut receiver) = unbounded::async_unbounded_mpsc::<u64>();
3015 let mut sender1 = sender.clone();
3016 let mut sender2 = sender.clone();
3017 let mut sender3 = sender.clone();
3018
3019 tokio::spawn(async move {
3020 sender1.send(1).await.unwrap();
3021 });
3022 tokio::spawn(async move {
3023 sender2.send(2).await.unwrap();
3024 });
3025 tokio::spawn(async move {
3026 sender3.send(3).await.unwrap();
3027 });
3028
3029 let mut received = Vec::new();
3030 for _ in 0..3 {
3031 received.push(receiver.recv().await.unwrap());
3032 }
3033 received.sort();
3034 assert_eq!(received, vec![1, 2, 3]);
3035 }
3036
3037 #[tokio::test]
3038 async fn unbounded_async_async_closed_queue() {
3039 use crate::sync::mpsc::unbounded;
3040 let (mut sender, mut receiver) = unbounded::async_unbounded_mpsc::<u64>();
3041
3042 sender.send(42).await.unwrap();
3043 assert_eq!(receiver.recv().await.unwrap(), 42);
3044
3045 drop(sender);
3047
3048 assert_eq!(receiver.recv().await, Err(PopError::Closed));
3050 }
3051
3052 #[tokio::test]
3053 async fn unbounded_async_async_try_send_full() {
3054 use crate::sync::mpsc::unbounded;
3055 let (mut sender, _receiver) = unbounded::async_unbounded_mpsc::<u64>();
3056
3057 assert!(sender.try_send(1).is_ok());
3059 assert!(sender.try_send(2).is_ok());
3060 assert!(sender.try_send(3).is_ok());
3061 }
3062
3063 #[tokio::test]
3064 async fn unbounded_async_async_closed_sender() {
3065 use crate::sync::mpsc::unbounded;
3066 let (mut sender, mut receiver) = unbounded::async_unbounded_mpsc::<u64>();
3067
3068 sender.send(1).await.unwrap();
3070 assert_eq!(receiver.recv().await.unwrap(), 1);
3071
3072 drop(receiver);
3074
3075 assert_eq!(sender.send(42).await, Err(PushError::Closed(42)));
3077 }
3078
3079 #[test]
3082 fn unbounded_blocking_blocking_basic_send_recv() {
3083 use crate::sync::mpsc::unbounded;
3084 let (mut sender, mut receiver) = unbounded::blocking_unbounded_mpsc::<u64>();
3085
3086 sender.send(42).unwrap();
3087 assert_eq!(receiver.recv().unwrap(), 42);
3088 }
3089
3090 #[test]
3091 fn unbounded_blocking_blocking_multiple_senders() {
3092 use crate::sync::mpsc::unbounded;
3093 let (sender, receiver) = unbounded::blocking_unbounded_mpsc::<u64>();
3094 let receiver = Arc::new(std::sync::Mutex::new(receiver));
3095
3096 let mut handles = Vec::new();
3097 for i in 0..5 {
3098 let mut sender = sender.clone();
3099 let receiver = Arc::clone(&receiver);
3100 handles.push(thread::spawn(move || {
3101 sender.send(i).unwrap();
3102 }));
3103 }
3104
3105 for handle in handles {
3106 handle.join().unwrap();
3107 }
3108
3109 let mut received = Vec::new();
3110 let mut receiver = receiver.lock().unwrap();
3111 for _ in 0..5 {
3112 received.push(receiver.recv().unwrap());
3113 }
3114 received.sort();
3115 assert_eq!(received, vec![0, 1, 2, 3, 4]);
3116 }
3117
3118 #[test]
3119 fn unbounded_blocking_blocking_closed_queue() {
3120 use crate::sync::mpsc::unbounded;
3121 let (mut sender, mut receiver) = unbounded::blocking_unbounded_mpsc::<u64>();
3122
3123 sender.send(42).unwrap();
3124 assert_eq!(receiver.recv().unwrap(), 42);
3125
3126 drop(sender);
3128
3129 assert_eq!(receiver.recv(), Err(PopError::Closed));
3131 }
3132
3133 #[test]
3134 fn unbounded_blocking_blocking_try_send_full() {
3135 use crate::sync::mpsc::unbounded;
3136 let (mut sender, _receiver) = unbounded::blocking_unbounded_mpsc::<u64>();
3137
3138 assert!(sender.try_send(1).is_ok());
3140 }
3141
3142 #[test]
3143 fn unbounded_blocking_blocking_closed_sender() {
3144 use crate::sync::mpsc::unbounded;
3145 let (mut sender, mut receiver) = unbounded::blocking_unbounded_mpsc::<u64>();
3146
3147 sender.send(1).unwrap();
3149 assert_eq!(receiver.recv().unwrap(), 1);
3150
3151 drop(receiver);
3153
3154 assert_eq!(sender.send(42), Err(PushError::Closed(42)));
3156 }
3157
3158 #[tokio::test]
3161 async fn unbounded_blocking_async_basic_send_recv() {
3162 use crate::sync::mpsc::unbounded;
3163 let (mut sender, mut receiver) = unbounded::blocking_async_unbounded_mpsc::<u64>();
3164
3165 let handle = thread::spawn(move || {
3167 sender.send(42).unwrap();
3168 });
3169
3170 handle.join().unwrap();
3171 assert_eq!(receiver.recv().await.unwrap(), 42);
3172 }
3173
3174 #[tokio::test]
3175 async fn unbounded_blocking_async_multiple_blocking_senders() {
3176 use crate::sync::mpsc::unbounded;
3177 let (sender, mut receiver) = unbounded::blocking_async_unbounded_mpsc::<u64>();
3178
3179 let mut handles = Vec::new();
3180 for i in 0..5 {
3181 let mut sender = sender.clone();
3182 handles.push(thread::spawn(move || {
3183 sender.send(i).unwrap();
3184 }));
3185 }
3186
3187 for handle in handles {
3188 handle.join().unwrap();
3189 }
3190
3191 let mut received = Vec::new();
3192 for _ in 0..5 {
3193 received.push(receiver.recv().await.unwrap());
3194 }
3195 received.sort();
3196 assert_eq!(received, vec![0, 1, 2, 3, 4]);
3197 }
3198
3199 #[tokio::test]
3200 async fn unbounded_blocking_async_wakeup_async_receiver() {
3201 use crate::sync::mpsc::unbounded;
3202 let (mut sender, mut receiver) = unbounded::blocking_async_unbounded_mpsc::<u64>();
3203
3204 let recv_handle = tokio::spawn(async move { receiver.recv().await.unwrap() });
3206
3207 tokio::time::sleep(Duration::from_millis(10)).await;
3209
3210 thread::spawn(move || {
3212 sender.send(99).unwrap();
3213 });
3214
3215 assert_eq!(recv_handle.await.unwrap(), 99);
3216 }
3217
3218 #[tokio::test]
3221 async fn unbounded_async_blocking_basic_send_recv() {
3222 use crate::sync::mpsc::unbounded;
3223 let (mut sender, receiver) = unbounded::async_blocking_unbounded_mpsc::<u64>();
3224 let receiver = Arc::new(std::sync::Mutex::new(receiver));
3225
3226 let receiver_clone = Arc::clone(&receiver);
3228 let handle = thread::spawn(move || receiver_clone.lock().unwrap().recv().unwrap());
3229
3230 tokio::time::sleep(Duration::from_millis(10)).await;
3232
3233 sender.send(42).await.unwrap();
3235
3236 assert_eq!(handle.join().unwrap(), 42);
3237 }
3238
3239 #[tokio::test]
3240 async fn unbounded_async_blocking_multiple_async_senders() {
3241 use crate::sync::mpsc::unbounded;
3242 let (sender, receiver) = unbounded::async_blocking_unbounded_mpsc::<u64>();
3243 let receiver = Arc::new(std::sync::Mutex::new(receiver));
3244
3245 for i in 0..5 {
3247 let mut sender = sender.clone();
3248 tokio::spawn(async move {
3249 sender.send(i).await.unwrap();
3250 });
3251 }
3252
3253 tokio::time::sleep(Duration::from_millis(50)).await;
3255
3256 let mut received = Vec::new();
3258 let mut receiver = receiver.lock().unwrap();
3259 for _ in 0..5 {
3260 received.push(receiver.recv().unwrap());
3261 }
3262 received.sort();
3263 assert_eq!(received, vec![0, 1, 2, 3, 4]);
3264 }
3265
3266 #[tokio::test]
3267 async fn unbounded_async_blocking_wakeup_blocking_receiver() {
3268 use crate::sync::mpsc::unbounded;
3269 let (mut sender, receiver) = unbounded::async_blocking_unbounded_mpsc::<u64>();
3270 let receiver = Arc::new(std::sync::Mutex::new(receiver));
3271
3272 let receiver_clone = Arc::clone(&receiver);
3274 let recv_handle = thread::spawn(move || receiver_clone.lock().unwrap().recv().unwrap());
3275
3276 tokio::time::sleep(Duration::from_millis(10)).await;
3278
3279 let send_handle = tokio::spawn(async move {
3281 sender.send(88).await.unwrap();
3282 });
3283
3284 send_handle.await.unwrap();
3286 assert_eq!(recv_handle.join().unwrap(), 88);
3287 }
3288
3289 #[test]
3292 fn unbounded_high_volume_stress() {
3293 use crate::sync::mpsc::unbounded;
3294 let (sender, receiver) = unbounded::blocking_unbounded_mpsc::<u64>();
3295 let receiver = Arc::new(std::sync::Mutex::new(receiver));
3296
3297 let mut producer_handles = Vec::new();
3299 for producer_id in 0..4 {
3300 let mut sender = sender.clone();
3301 let handle = thread::spawn(move || {
3302 for i in 0..250 {
3303 let value = producer_id * 1000 + i;
3304 sender.send(value).unwrap();
3305 }
3306 });
3307 producer_handles.push(handle);
3308 }
3309
3310 let receiver_clone = Arc::clone(&receiver);
3312 let consumer_handle = thread::spawn(move || {
3313 let mut received = Vec::new();
3314 let mut receiver = receiver_clone.lock().unwrap();
3315
3316 loop {
3317 match receiver.recv() {
3318 Ok(value) => {
3319 received.push(value);
3320 }
3321 Err(PopError::Closed) => break,
3322 Err(PopError::Empty) | Err(PopError::Timeout) => {
3323 thread::sleep(std::time::Duration::from_micros(1));
3325 }
3326 }
3327 }
3328 received
3329 });
3330
3331 for handle in producer_handles {
3333 handle.join().unwrap();
3334 }
3335
3336 thread::sleep(Duration::from_millis(10));
3338
3339 drop(sender); let mut received = consumer_handle.join().unwrap();
3343 received.sort();
3344
3345 assert_eq!(received.len(), 1000, "Expected 1000 items but got {}", received.len());
3347
3348 let mut expected = Vec::new();
3350 for producer_id in 0..4 {
3351 for i in 0..250 {
3352 expected.push(producer_id * 1000 + i);
3353 }
3354 }
3355 expected.sort();
3356
3357 assert_eq!(received, expected);
3358 }
3359
3360 #[tokio::test]
3361 async fn unbounded_async_high_volume_stress() {
3362 use crate::sync::mpsc::unbounded;
3363 let (sender, receiver) = unbounded::async_unbounded_mpsc::<u64>();
3364 let receiver = Arc::new(std::sync::Mutex::new(receiver));
3365
3366 let mut producer_tasks = Vec::new();
3368 for producer_id in 0..4 {
3369 let mut sender = sender.clone();
3370 let task = tokio::spawn(async move {
3371 for i in 0..250 {
3372 let value = producer_id * 1000 + i;
3373 sender.send(value).await.unwrap();
3374 }
3375 });
3376 producer_tasks.push(task);
3377 }
3378
3379 let receiver_clone = Arc::clone(&receiver);
3381 let receiver_task = tokio::spawn(async move {
3382 let mut items: Vec<u64> = Vec::new();
3383
3384 loop {
3385 let result = {
3386 let mut receiver_guard = receiver_clone.lock().unwrap();
3387 receiver_guard.try_recv()
3388 };
3389
3390 match result {
3391 Ok(value) => {
3392 items.push(value);
3393 }
3394 Err(PopError::Closed) => break,
3395 Err(PopError::Empty) | Err(PopError::Timeout) => {
3396 tokio::time::sleep(Duration::from_micros(1)).await;
3398 }
3399 }
3400 }
3401 items
3402 });
3403
3404 for task in producer_tasks {
3406 task.await.unwrap();
3407 }
3408
3409 tokio::time::sleep(Duration::from_millis(10)).await;
3411
3412 drop(sender); let mut recv_items = receiver_task.await.unwrap();
3416
3417 recv_items.sort();
3419 assert_eq!(recv_items.len(), 1000, "Expected 1000 items but got {}", recv_items.len());
3420
3421 let mut expected = Vec::new();
3423 for producer_id in 0..4 {
3424 for i in 0..250 {
3425 expected.push(producer_id * 1000 + i);
3426 }
3427 }
3428 expected.sort();
3429
3430 assert_eq!(recv_items, expected);
3431 }
3432
3433 #[test]
3436 fn unbounded_can_push_beyond_typical_limits() {
3437 use crate::sync::mpsc::unbounded;
3438 let (mut sender, mut receiver) = unbounded::unbounded_new_with_sender::<u64>();
3439
3440 const NUM_ITEMS: u64 = 10_000;
3442 for i in 0..NUM_ITEMS {
3443 sender.try_push(i).unwrap();
3444 }
3445
3446 for i in 0..NUM_ITEMS {
3448 assert_eq!(receiver.try_pop().unwrap(), i);
3449 }
3450
3451 assert_eq!(receiver.try_pop(), Err(PopError::Empty));
3452 }
3453
3454 #[tokio::test]
3455 async fn unbounded_async_can_push_beyond_typical_limits() {
3456 use crate::sync::mpsc::unbounded;
3457 let (mut sender, mut receiver) = unbounded::async_unbounded_mpsc::<u64>();
3458
3459 const NUM_ITEMS: u64 = 10_000;
3461 for i in 0..NUM_ITEMS {
3462 sender.send(i).await.unwrap();
3463 }
3464
3465 for i in 0..NUM_ITEMS {
3467 assert_eq!(receiver.recv().await.unwrap(), i);
3468 }
3469
3470 drop(sender);
3472 assert_eq!(receiver.recv().await, Err(PopError::Closed));
3473 }
3474
3475 #[test]
3478 fn unbounded_producer_count_tracking() {
3479 use crate::sync::mpsc::unbounded;
3480 let (sender, rx) = unbounded::unbounded_new_with_sender::<u64>();
3481 assert_eq!(sender.producer_count(), 1);
3482
3483 let sender2 = sender.clone();
3484 assert_eq!(sender2.producer_count(), 2);
3485
3486 let sender3 = sender.clone();
3487 assert_eq!(sender3.producer_count(), 3);
3488
3489 drop(sender);
3490 assert_eq!(sender2.producer_count(), 2);
3491
3492 drop(sender2);
3493 assert_eq!(sender3.producer_count(), 1);
3494
3495 drop(sender3);
3496 rx.close();
3498 }
3499
3500 #[tokio::test]
3503 async fn unbounded_mixed_async_blocking_senders() {
3504 use crate::sync::mpsc::unbounded;
3505 let (async_sender, mut receiver) = unbounded::async_unbounded_mpsc::<u64>();
3506
3507 let blocking_sender = async_sender.create_blocking_sender();
3509
3510 let mut async_sender_clone = async_sender.clone();
3512 let async_task = tokio::spawn(async move {
3513 for i in 0..100 {
3514 async_sender_clone.send(i).await.unwrap();
3515 }
3516 });
3517
3518 let blocking_sender_clone = blocking_sender.clone();
3520 let blocking_thread = thread::spawn(move || {
3521 let mut sender = blocking_sender_clone;
3522 for i in 100..200 {
3523 sender.send(i).unwrap();
3524 }
3525 });
3526
3527 drop(async_sender);
3529 drop(blocking_sender);
3530
3531 async_task.await.unwrap();
3533 blocking_thread.join().unwrap();
3534
3535 let mut items = Vec::new();
3537 loop {
3538 match receiver.recv().await {
3539 Ok(value) => items.push(value),
3540 Err(PopError::Closed) => break,
3541 Err(_) => continue,
3542 }
3543 }
3544
3545 items.sort();
3547 assert_eq!(items.len(), 200);
3548 assert_eq!(items, (0..200).collect::<Vec<_>>());
3549 }
3550
3551 #[test]
3552 fn unbounded_mixed_blocking_recv() {
3553 use crate::sync::mpsc::unbounded;
3554 let (blocking_sender, mut receiver) = unbounded::blocking_unbounded_mpsc::<u64>();
3555
3556 let async_sender = blocking_sender.create_async_sender();
3558
3559 let blocking_sender_clone = blocking_sender.clone();
3561 let blocking_thread = thread::spawn(move || {
3562 let mut sender = blocking_sender_clone;
3563 for i in 0..100 {
3564 sender.send(i).unwrap();
3565 }
3566 });
3567
3568 drop(async_sender);
3570 drop(blocking_sender);
3571
3572 blocking_thread.join().unwrap();
3574
3575 let mut items = Vec::new();
3577 loop {
3578 match receiver.recv() {
3579 Ok(value) => items.push(value),
3580 Err(PopError::Closed) => break,
3581 Err(_) => continue,
3582 }
3583 }
3584
3585 items.sort();
3587 assert_eq!(items.len(), 100);
3588 assert_eq!(items, (0..100).collect::<Vec<_>>());
3589 }
3590}