1use super::signal::{SIGNAL_MASK, Signal, SignalGate};
2use super::waker::{STATUS_SUMMARY_WORDS, WorkerWaker};
3use crate::utils::bits::find_nearest;
4use crate::utils::CachePadded;
5use crate::{PopError, PushError};
6use std::cell::UnsafeCell;
7use std::mem::MaybeUninit;
8use std::ptr;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering};
11use std::thread;
12
13use crate::spsc::{UnboundedSpsc, UnboundedSender};
14use rand::RngCore;
15
16pub fn new<T, const P: usize, const NUM_SEGS_P2: usize>() -> Receiver<T, P, NUM_SEGS_P2> {
18 new_with_waker(Arc::new(WorkerWaker::new()))
19}
20
21pub fn new_with_waker<T, const P: usize, const NUM_SEGS_P2: usize>(
41 waker: Arc<WorkerWaker>,
42) -> Receiver<T, P, NUM_SEGS_P2> {
43 let mut queues = Vec::with_capacity(MAX_QUEUES);
45 for _ in 0..MAX_QUEUES {
46 queues.push(AtomicPtr::new(core::ptr::null_mut()));
47 }
48
49 let signals: Arc<[Signal; SIGNAL_WORDS]> =
50 Arc::new(std::array::from_fn(|i| Signal::with_index(i as u64)));
51
52 let inner = Arc::new(Inner {
53 queues: queues.into_boxed_slice(),
54 queue_count: CachePadded::new(AtomicUsize::new(0)),
55 producer_count: CachePadded::new(AtomicUsize::new(0)),
56 max_producer_id: AtomicUsize::new(0),
57 closed: CachePadded::new(AtomicBool::new(false)),
58 waker,
59 signals,
60 });
61
62 Receiver {
63 inner,
64 misses: 0,
65 seed: rand::rng().next_u64(),
66 }
67}
68
69pub fn new_with_sender<T, const P: usize, const NUM_SEGS_P2: usize>()
70-> (Sender<T, P, NUM_SEGS_P2>, Receiver<T, P, NUM_SEGS_P2>) {
71 let waker = Arc::new(WorkerWaker::new());
72 let mut queues = Vec::with_capacity(MAX_QUEUES);
74 for _ in 0..MAX_QUEUES {
75 queues.push(AtomicPtr::new(core::ptr::null_mut()));
76 }
77
78 let signals: Arc<[Signal; SIGNAL_WORDS]> =
79 Arc::new(std::array::from_fn(|i| Signal::with_index(i as u64)));
80
81 let inner = Arc::new(Inner {
82 queues: queues.into_boxed_slice(),
83 queue_count: CachePadded::new(AtomicUsize::new(0)),
84 producer_count: CachePadded::new(AtomicUsize::new(0)),
85 max_producer_id: AtomicUsize::new(0),
86 closed: CachePadded::new(AtomicBool::new(false)),
87 waker: waker,
88 signals,
89 });
90
91 (
92 inner
93 .create_sender()
94 .expect("fatal: mpsc won't allow even 1 sender"),
95 Receiver {
96 inner,
97 misses: 0,
98 seed: rand::rng().next_u64(),
99 },
100 )
101}
102
103const RND_MULTIPLIER: u64 = 0x5DEECE66D;
104const RND_ADDEND: u64 = 0xB;
105const RND_MASK: u64 = (1 << 48) - 1;
106
107const MAX_QUEUES: usize = STATUS_SUMMARY_WORDS * 64;
109const QUEUES_PER_PRODUCER: usize = 1;
110const MAX_PRODUCERS: usize = MAX_QUEUES / QUEUES_PER_PRODUCER;
111const MAX_PRODUCERS_MASK: usize = QUEUES_PER_PRODUCER - 1;
112
113const SIGNAL_WORDS: usize = STATUS_SUMMARY_WORDS;
115
116type CloseFn = Box<dyn FnOnce()>;
119
120struct Inner<T, const P: usize, const NUM_SEGS_P2: usize> {
122 queues: Box<[AtomicPtr<UnboundedSpsc<T, P, NUM_SEGS_P2, Arc<SignalGate>>>]>,
125 queue_count: CachePadded<AtomicUsize>,
126 producer_count: CachePadded<AtomicUsize>,
128 max_producer_id: AtomicUsize,
129 closed: CachePadded<AtomicBool>,
131 waker: Arc<WorkerWaker>,
132 signals: Arc<[Signal; SIGNAL_WORDS]>,
134}
135
136impl<T, const P: usize, const NUM_SEGS_P2: usize> Inner<T, P, NUM_SEGS_P2> {
137 pub fn is_closed(&self) -> bool {
139 self.closed.load(Ordering::Acquire)
140 }
141
142 pub fn producer_count(&self) -> usize {
144 self.producer_count.load(Ordering::Relaxed)
145 }
146
147 pub fn create_sender(self: &Arc<Self>) -> Result<Sender<T, P, NUM_SEGS_P2>, PushError<()>> {
148 self.create_sender_with_config(0)
149 }
150
151 pub fn create_sender_with_config(
182 self: &Arc<Self>,
183 max_pooled_segments: usize,
184 ) -> Result<Sender<T, P, NUM_SEGS_P2>, PushError<()>> {
185 if self.is_closed() {
186 return Err(PushError::Closed(()));
187 }
188
189 loop {
190 let current = self.producer_count.load(Ordering::Acquire);
191 if current >= MAX_PRODUCERS {
192 return Err(PushError::Full(()));
193 }
194 if self
195 .producer_count
196 .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
197 .is_ok()
198 {
199 break;
200 }
201 }
202
203 let mut assigned_id = None;
204 let mut sender: Option<UnboundedSender<T, P, NUM_SEGS_P2, Arc<SignalGate>>> = None;
205
206 for signal_index in 0..SIGNAL_WORDS {
207 for bit_index in 0..64 {
208 let queue_index = signal_index * 64 + bit_index;
209 if queue_index >= MAX_QUEUES {
210 break;
211 }
212 if !self.queues[queue_index].load(Ordering::Acquire).is_null() {
213 continue;
214 }
215
216 let signal_gate = Arc::new(SignalGate::new(
217 bit_index as u8,
218 self.signals[signal_index].clone(),
219 Arc::clone(&self.waker),
220 ));
221
222 let (tx, _rx) = UnboundedSpsc::<T, P, NUM_SEGS_P2, Arc<SignalGate>>::new_with_signal(signal_gate);
223
224 let unbounded_arc = tx.unbounded_arc();
226 let raw = Arc::into_raw(unbounded_arc) as *mut UnboundedSpsc<T, P, NUM_SEGS_P2, Arc<SignalGate>>;
227
228 match self.queues[queue_index].compare_exchange(
229 ptr::null_mut(),
230 raw,
231 Ordering::Release,
232 Ordering::Acquire,
233 ) {
234 Ok(_) => {
235 self.queue_count.fetch_add(1, Ordering::Relaxed);
236 assigned_id = Some(queue_index);
237 sender = Some(tx);
238 break;
239 }
240 Err(_) => unsafe {
241 Arc::from_raw(raw);
242 },
243 }
244 }
245 if assigned_id.is_some() {
246 break;
247 }
248 }
249
250 let producer_id = match assigned_id {
251 Some(id) => id,
252 None => {
253 self.producer_count.fetch_sub(1, Ordering::Release);
254 return Err(PushError::Full(()));
255 }
256 };
257
258 loop {
259 let max_producer_id = self.max_producer_id.load(Ordering::SeqCst);
260 if producer_id < max_producer_id {
261 break;
262 }
263 if self.is_closed() {
264 return Err(PushError::Closed(()));
265 }
266 if self
267 .max_producer_id
268 .compare_exchange(
269 max_producer_id,
270 producer_id,
271 Ordering::SeqCst,
272 Ordering::SeqCst,
273 )
274 .is_ok()
275 {
276 break;
277 }
278 }
279
280 let sender = sender.expect("sender missing");
281
282 Ok(Sender {
283 inner: Arc::clone(&self),
284 sender,
285 producer_id,
286 })
287 }
288
289 pub fn close(&self) -> bool {
296 let was_open = !self.closed.swap(true, Ordering::AcqRel);
297 if was_open {
298 let permits = self.queue_count.load(Ordering::Relaxed).max(1);
299 self.waker.release(permits);
300 }
301
302 for slot in self.queues.iter() {
303 let queue_ptr = slot.swap(ptr::null_mut(), Ordering::AcqRel);
304 if queue_ptr.is_null() {
305 continue;
306 }
307
308 unsafe {
309 (&*queue_ptr).close();
310 Arc::from_raw(queue_ptr);
311 }
312
313 self.queue_count.fetch_sub(1, Ordering::Relaxed);
314 self.producer_count.fetch_sub(1, Ordering::Relaxed);
315 }
316
317 self.producer_count.load(Ordering::Acquire) == 0
318 }
319}
320
321pub struct Sender<T, const P: usize, const NUM_SEGS_P2: usize> {
349 inner: Arc<Inner<T, P, NUM_SEGS_P2>>,
350 sender: UnboundedSender<T, P, NUM_SEGS_P2, Arc<SignalGate>>,
351 producer_id: usize,
352}
353
354impl<T, const P: usize, const NUM_SEGS_P2: usize> Sender<T, P, NUM_SEGS_P2> {
355 pub fn is_closed(&self) -> bool {
357 self.inner.closed.load(Ordering::Acquire)
358 }
359
360 pub fn producer_count(&self) -> usize {
362 self.inner.producer_count.load(Ordering::Relaxed)
363 }
364
365 pub fn producer_id(&self) -> usize {
367 self.producer_id
368 }
369
370 pub fn try_push(&mut self, value: T) -> Result<(), PushError<T>> {
375 self.sender.try_push(value)
376 }
377
378 pub fn try_push_n(&mut self, values: &mut Vec<T>) -> Result<usize, PushError<()>> {
380 if self.is_closed() {
381 return Err(PushError::Closed(()));
382 }
383 self.sender.try_push_n(values)
384 }
385
386 pub unsafe fn unsafe_try_push(&self, value: T) -> Result<(), PushError<T>> {
390 self.sender.try_push(value)
391 }
392
393 pub unsafe fn unsafe_try_push_n(&self, values: &mut Vec<T>) -> Result<usize, PushError<()>> {
395 if self.is_closed() {
396 return Err(PushError::Closed(()));
397 }
398 self.sender.try_push_n(values)
399 }
400
401 pub fn close(&mut self) -> bool {
408 self.inner.close()
409 }
410}
411
412impl<T, const P: usize, const NUM_SEGS_P2: usize> Clone for Sender<T, P, NUM_SEGS_P2> {
413 fn clone(&self) -> Self {
414 self.inner.create_sender().expect("too many senders")
415 }
416}
417
418impl<T, const P: usize, const NUM_SEGS_P2: usize> Drop for Sender<T, P, NUM_SEGS_P2> {
419 fn drop(&mut self) {
420 self.sender.close_channel();
421 }
422}
423
424pub struct Receiver<T, const P: usize, const NUM_SEGS_P2: usize> {
425 inner: Arc<Inner<T, P, NUM_SEGS_P2>>,
426 misses: u64,
427 seed: u64,
428}
429
430impl<T, const P: usize, const NUM_SEGS_P2: usize> Receiver<T, P, NUM_SEGS_P2> {
431 pub fn next(&mut self) -> u64 {
432 let old_seed = self.seed;
433 let next_seed = (old_seed
434 .wrapping_mul(RND_MULTIPLIER)
435 .wrapping_add(RND_ADDEND))
436 & RND_MASK;
437 self.seed = next_seed;
438 next_seed >> 16
439 }
440
441 pub fn is_closed(&self) -> bool {
443 self.inner.closed.load(Ordering::Acquire)
444 }
445
446 pub fn close(&self) -> bool {
453 self.inner.close()
454 }
455
456 pub fn producer_count(&self) -> usize {
458 self.inner.producer_count.load(Ordering::Relaxed)
459 }
460
461 pub fn create_sender(&self) -> Result<Sender<T, P, NUM_SEGS_P2>, PushError<()>> {
462 self.create_sender_with_config(0)
463 }
464
465 pub fn create_sender_with_config(
496 &self,
497 max_pooled_segments: usize,
498 ) -> Result<Sender<T, P, NUM_SEGS_P2>, PushError<()>> {
499 self.inner.create_sender_with_config(max_pooled_segments)
500 }
501
502 pub fn try_pop(&mut self) -> Result<T, PopError> {
510 let mut slot = MaybeUninit::<T>::uninit();
511 let slice = unsafe { std::slice::from_raw_parts_mut(slot.as_mut_ptr(), 1) };
512
513 let drained = self.try_pop_n(slice);
514 if drained == 0 {
515 if self.is_closed() && self.inner.producer_count.load(Ordering::Acquire) == 0 {
516 Err(PopError::Closed)
517 } else {
518 Err(PopError::Empty)
519 }
520 } else {
521 Ok(unsafe { slot.assume_init() })
522 }
523 }
524
525 pub fn try_pop_n(&mut self, batch: &mut [T]) -> usize {
653 self.try_pop_n_with_producer(batch).0
654 }
655
656 pub fn try_pop_with_id(&mut self) -> Result<(T, usize), PopError> {
658 let mut slot = MaybeUninit::<T>::uninit();
659 let slice = unsafe { std::slice::from_raw_parts_mut(slot.as_mut_ptr(), 1) };
660 let (drained, producer_id) = self.try_pop_n_with_producer(slice);
661 if drained == 0 {
662 if self.is_closed() && self.inner.producer_count.load(Ordering::Acquire) == 0 {
663 Err(PopError::Closed)
664 } else {
665 Err(PopError::Empty)
666 }
667 } else {
668 debug_assert!(producer_id.is_some());
669 let value = unsafe { slot.assume_init() };
670 Ok((
671 value,
672 producer_id.expect("producer id missing for drained item"),
673 ))
674 }
675 }
676
677 fn try_pop_n_with_producer(&mut self, batch: &mut [T]) -> (usize, Option<usize>) {
678 for _ in 0..64 {
679 match self.acquire() {
680 Some((producer_id, queue)) => {
681 match queue.try_pop_n(batch) {
682 Ok(size) => {
683 queue.unmark_and_schedule();
684 return (size, Some(producer_id));
685 }
686 Err(PopError::Closed) => {
687 queue.unmark();
688 let old_ptr = self.inner.queues[producer_id]
689 .swap(ptr::null_mut(), Ordering::AcqRel);
690
691 if !old_ptr.is_null() {
692 self.inner.producer_count.fetch_sub(1, Ordering::Relaxed);
693 self.inner.queue_count.fetch_sub(1, Ordering::Relaxed);
694
695 unsafe {
696 Arc::from_raw(old_ptr);
697 }
698 }
699 }
700 Err(_) => {
701 queue.unmark();
702 self.misses += 1;
703 }
704 };
705 }
706 None => {
707 }
709 }
710 }
711
712 (0, None)
713 }
714
715 fn acquire(&mut self) -> Option<(usize, crate::spsc::UnboundedReceiver<T, P, NUM_SEGS_P2, Arc<SignalGate>>)> {
716 let random = self.next() as usize;
717 let random_word = random % SIGNAL_WORDS;
719 let mut signal_index = self.inner.waker.summary_select(random_word as u64) as usize;
720
721 if signal_index >= SIGNAL_WORDS {
722 signal_index = random_word;
723 }
724
725 let mut signal_bit = self.next() & 63;
726 let signal = &self.inner.signals[signal_index];
727 let signal_value = signal.load(Ordering::Acquire);
728
729 signal_bit = find_nearest(signal_value, signal_bit);
731
732 if signal_bit >= 64 {
734 self.misses += 1;
735 return None;
736 }
737
738 let (bit, expected, acquired) = signal.try_acquire(signal_bit);
740
741 if !acquired {
742 std::hint::spin_loop();
745 return None;
746 }
747
748 let empty = expected == bit;
750
751 if empty {
752 self.inner
753 .waker
754 .try_unmark_if_empty(signal.index(), signal.value());
755 }
756
757 let producer_id = signal_index * 64 + (signal_bit as usize);
759
760 let queue_ptr = self.inner.queues[producer_id].load(Ordering::Acquire);
762 if queue_ptr.is_null() {
763 self.misses += 1;
764 if empty {
765 self.inner
766 .waker
767 .try_unmark_if_empty(signal.index(), signal.value());
768 }
769 return None;
770 }
771
772 let unbounded_arc = unsafe { Arc::from_raw(queue_ptr) };
774 let receiver = unbounded_arc.create_receiver();
775
776 receiver.mark();
778
779 let _ = Arc::into_raw(unbounded_arc);
781
782 Some((producer_id, receiver))
783 }
784}
785
786impl<T, const P: usize, const NUM_SEGS_P2: usize> Drop for Receiver<T, P, NUM_SEGS_P2> {
787 fn drop(&mut self) {
788 self.inner.close();
789 }
790}
791
792impl<T, const P: usize, const NUM_SEGS_P2: usize> Drop for Inner<T, P, NUM_SEGS_P2> {
793 fn drop(&mut self) {
794 self.close();
795 }
796}
797
798unsafe impl<T: Send, const P: usize, const NUM_SEGS_P2: usize> Send for Sender<T, P, NUM_SEGS_P2> {}
799unsafe impl<T: Send, const P: usize, const NUM_SEGS_P2: usize> Send
800 for Receiver<T, P, NUM_SEGS_P2>
801{
802}
803
804#[cfg(test)]
805mod tests {
806 use super::*;
807 use std::sync::atomic::Ordering;
808
809 #[test]
810 fn try_pop_drains_and_reports_closed() {
811 let (mut tx, mut rx) = new_with_sender::<u64, 6, 8>();
812
813 tx.try_push(42).unwrap();
814 assert_eq!(rx.try_pop().unwrap(), 42);
815 assert_eq!(rx.try_pop(), Err(PopError::Empty));
816
817 assert!(rx.close());
818 assert_eq!(rx.try_pop(), Err(PopError::Closed));
819 }
820
821 #[test]
822 fn dropping_local_sender_clears_producer_slot() {
823 let (tx, rx) = new_with_sender::<u64, 6, 8>();
824 assert_eq!(tx.producer_count(), 1);
825
826 drop(tx);
827
828 assert!(rx.close());
830 assert_eq!(rx.producer_count(), 0);
831 assert_eq!(rx.inner.queue_count.load(Ordering::SeqCst), 0);
832 }
833
834 #[test]
835 fn single_producer_multiple_items() {
836 let (mut tx, mut rx) = new_with_sender::<u64, 6, 8>();
837
838 for i in 0..100 {
840 tx.try_push(i).expect("push should not fail for unbounded queue");
841 }
842
843 for i in 0..100 {
845 assert_eq!(rx.try_pop().unwrap(), i);
846 }
847
848 assert_eq!(rx.try_pop(), Err(PopError::Empty));
850 }
851
852 #[test]
853 fn multiple_producers_single_consumer() {
854 use std::thread;
855
856 let (tx, mut rx) = new_with_sender::<u64, 6, 8>();
857 let mut received = vec![false; 30]; let handles: Vec<_> = (0..3)
861 .map(|producer_id| {
862 let tx = tx.clone();
863 thread::spawn(move || {
864 for i in 0..10 {
865 let value = (producer_id * 10 + i) as u64;
866 tx.clone().try_push(value).expect("push should succeed");
867 }
868 })
869 })
870 .collect();
871
872 for handle in handles {
874 handle.join().unwrap();
875 }
876
877 let mut count = 0;
879 for _ in 0..100 {
880 if let Ok(value) = rx.try_pop() {
881 assert!(value < 30, "received unexpected value: {}", value);
882 received[value as usize] = true;
883 count += 1;
884 } else {
885 break;
886 }
887 }
888
889 assert_eq!(count, 30, "expected 30 items, got {}", count);
891 for (i, &received_item) in received.iter().enumerate() {
892 assert!(received_item, "item {} was not received", i);
893 }
894 }
895
896 #[test]
897 fn unbounded_growth_with_large_batches() {
898 let (mut tx, mut rx) = new_with_sender::<u64, 2, 2>(); let mut items_to_push: Vec<u64> = (0..1000).collect();
902 let pushed = tx.try_push_n(&mut items_to_push).expect("bulk push should succeed");
903 assert_eq!(pushed, 1000, "should push all items");
904 assert!(items_to_push.is_empty(), "Vec should be drained");
905
906 for i in 0..1000 {
908 assert_eq!(rx.try_pop().unwrap(), i as u64);
909 }
910
911 assert_eq!(rx.try_pop(), Err(PopError::Empty));
912 }
913
914 #[test]
915 fn close_stops_receives() {
916 let (mut tx, mut rx) = new_with_sender::<u64, 6, 8>();
917
918 tx.try_push(42).unwrap();
919 assert_eq!(rx.try_pop().unwrap(), 42);
920
921 rx.close();
923
924 assert_eq!(rx.try_pop(), Err(PopError::Closed));
926
927 assert_eq!(tx.try_push(100), Err(PushError::Closed(100)));
929 }
930
931 #[test]
932 fn multiple_senders_cloning() {
933 let (tx, mut rx) = new_with_sender::<u64, 6, 8>();
934 assert_eq!(tx.producer_count(), 1);
935
936 let mut tx2 = tx.clone();
937 assert_eq!(tx2.producer_count(), 2);
938
939 let mut tx3 = tx.clone();
940 assert_eq!(tx3.producer_count(), 3);
941
942 drop(tx);
944 tx2.try_push(1).unwrap();
945 tx3.try_push(2).unwrap();
946
947 let mut items = Vec::new();
949 for _ in 0..2 {
950 if let Ok(val) = rx.try_pop() {
951 items.push(val);
952 }
953 }
954
955 items.sort();
956 assert_eq!(items, vec![1, 2]);
957 }
958
959 #[test]
960 fn interleaved_push_pop() {
961 let (mut tx, mut rx) = new_with_sender::<u64, 6, 8>();
962
963 let mut received = Vec::new();
965 for i in 0..10 {
966 tx.try_push(i * 2).unwrap();
967 tx.try_push(i * 2 + 1).unwrap();
968 while let Ok(value) = rx.try_pop() {
970 received.push(value);
971 }
972 }
973
974 while let Ok(value) = rx.try_pop() {
976 received.push(value);
977 }
978
979 assert_eq!(received.len(), 20);
981 received.sort();
982 for i in 0..20 {
983 assert_eq!(received[i], i as u64);
984 }
985
986 assert_eq!(rx.try_pop(), Err(PopError::Empty));
987 }
988
989 #[test]
990 fn batch_push_pop() {
991 let (mut tx, mut rx) = new_with_sender::<u64, 6, 8>();
992
993 let mut items: Vec<u64> = (0..50).collect();
995 let pushed = tx.try_push_n(&mut items).expect("bulk push should succeed");
996 assert_eq!(pushed, 50);
997 assert!(items.is_empty());
998
999 let mut dst = [0u64; 100];
1001 let popped = rx.try_pop_n(&mut dst);
1002 assert_eq!(popped, 50);
1003
1004 for i in 0..50 {
1005 assert_eq!(dst[i], i as u64);
1006 }
1007 }
1008
1009 #[test]
1010 fn concurrent_push_pop() {
1011 use std::thread;
1012 use std::sync::Arc;
1013 use std::sync::{Mutex};
1014 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1015
1016 let (tx, rx) = new_with_sender::<u64, 6, 8>();
1017 let counter = Arc::new(AtomicUsize::new(0));
1018
1019 let rx = Arc::new(Mutex::new(rx));
1020 let rx_clone = Arc::clone(&rx);
1021 let counter_clone = Arc::clone(&counter);
1022
1023 let producer = thread::spawn(move || {
1025 let mut tx = tx;
1026 for i in 0..1000 {
1027 tx.try_push(i).expect("push should succeed");
1028 thread::yield_now();
1029 }
1030 });
1031
1032 let consumer = thread::spawn(move || {
1034 let mut rx = rx_clone.lock().unwrap();
1035 let mut count: usize = 0;
1036 for _ in 0..10000 {
1037 if let Ok(value) = rx.try_pop() {
1038 assert_eq!(value, count as u64);
1039 count += 1;
1040 if count >= 1000 {
1041 break;
1042 }
1043 } else {
1044 thread::yield_now();
1045 }
1046 }
1047 counter_clone.fetch_add(count, AtomicOrdering::Relaxed);
1048 });
1049
1050 producer.join().unwrap();
1051 consumer.join().unwrap();
1052
1053 let final_count = counter.load(AtomicOrdering::Relaxed);
1054 assert_eq!(final_count, 1000, "should have received all 1000 items");
1055 }
1056
1057 #[test]
1058 fn stress_test_unbounded_expansion() {
1059 let (mut tx, mut rx) = new_with_sender::<u64, 2, 2>(); let mut items: Vec<u64> = (0..10000).collect();
1063 let pushed = tx.try_push_n(&mut items).expect("bulk push should succeed");
1064 assert_eq!(pushed, 10000);
1065
1066 for i in 0..10000 {
1068 assert_eq!(rx.try_pop().unwrap(), i as u64, "item {} mismatch", i);
1069 }
1070
1071 assert_eq!(rx.try_pop(), Err(PopError::Empty));
1072 }
1073
1074 #[test]
1075 fn producer_id_uniqueness() {
1076 let (tx1, _rx) = new_with_sender::<u64, 6, 8>();
1077 let tx2 = tx1.clone();
1078 let tx3 = tx1.clone();
1079
1080 assert_ne!(tx1.producer_id(), tx2.producer_id());
1082 assert_ne!(tx2.producer_id(), tx3.producer_id());
1083 assert_ne!(tx1.producer_id(), tx3.producer_id());
1084 }
1085
1086 #[test]
1087 fn receiver_count_tracking() {
1088 let (tx1, mut rx) = new_with_sender::<u64, 6, 8>();
1089 assert_eq!(rx.producer_count(), 1);
1090
1091 let tx2 = tx1.clone();
1092 assert_eq!(rx.producer_count(), 2);
1093
1094 let tx3 = tx2.clone();
1095 assert_eq!(rx.producer_count(), 3);
1096
1097 drop(tx1);
1098 rx.close(); assert_eq!(rx.producer_count(), 0);
1102 }
1103}