1#![no_std]
2
3extern crate alloc;
4
5#[cfg(test)]
6#[path = "attacks/mod.rs"]
7pub mod attacks;
8
9use alloc::{boxed::Box, sync::Arc};
10use core::{
11 cell::UnsafeCell,
12 fmt,
13 future::Future,
14 hash::{Hash, Hasher},
15 mem::{ManuallyDrop, MaybeUninit},
16 pin::Pin,
17 ptr,
18 sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering},
19 task::{Context, Poll, Waker},
20};
21
22#[cfg(target_pointer_width = "64")]
25const BLOCK_CAP: usize = 32;
26#[cfg(target_pointer_width = "32")]
27const BLOCK_CAP: usize = 16;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct SendError<T>(pub T);
32
33impl<T> fmt::Display for SendError<T> {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 write!(f, "sending on a closed channel")
36 }
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum TryRecvError {
42 Empty,
44 Disconnected,
46}
47
48impl fmt::Display for TryRecvError {
49 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50 match self {
51 TryRecvError::Empty => write!(f, "channel is empty"),
52 TryRecvError::Disconnected => write!(f, "channel is disconnected"),
53 }
54 }
55}
56
57pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
59 let block = Block::new(0);
60 let block_ptr = Box::into_raw(Box::new(block));
61
62 let shared = Arc::new(Shared {
63 head: AtomicPtr::new(block_ptr),
64 tail: AtomicPtr::new(block_ptr),
65 rx_waker: AtomicPtr::new(ptr::null_mut()),
66 waker_lock: AtomicBool::new(false),
67 num_senders: AtomicUsize::new(1),
68 num_weak_senders: AtomicUsize::new(0),
69 closed: AtomicBool::new(false),
70 });
71
72 let sender = UnboundedSender {
73 shared: Arc::clone(&shared),
74 };
75 let receiver = UnboundedReceiver {
76 shared,
77 recv_index: 0,
78 };
79
80 (sender, receiver)
81}
82
83struct Block<T> {
85 next: AtomicPtr<Block<T>>,
87 start_index: usize,
89 values: UnsafeCell<[MaybeUninit<ManuallyDrop<T>>; BLOCK_CAP]>,
91 ready_slots: AtomicUsize,
93 len: AtomicUsize,
95}
96
97impl<T> Block<T> {
98 fn new(start_index: usize) -> Self {
99 Self {
100 next: AtomicPtr::new(ptr::null_mut()),
101 start_index,
102 values: UnsafeCell::new([const { MaybeUninit::uninit() }; BLOCK_CAP]),
103 ready_slots: AtomicUsize::new(0),
104 len: AtomicUsize::new(0),
105 }
106 }
107
108 fn relative_index(&self, index: usize) -> Option<usize> {
110 if index >= self.start_index && index < self.start_index + BLOCK_CAP {
111 Some(index - self.start_index)
112 } else {
113 None
114 }
115 }
116
117 fn write(&self, relative_index: usize, value: T) -> Result<(), T> {
120 if relative_index >= BLOCK_CAP {
121 return Err(value);
122 }
123
124 let mask = 1 << relative_index;
125
126 let prev_ready = self.ready_slots.fetch_or(mask, Ordering::AcqRel);
128
129 if prev_ready & mask != 0 {
130 return Err(value);
132 }
133
134 unsafe {
136 let values = &mut *self.values.get();
137 values[relative_index].write(ManuallyDrop::new(value));
138 }
139
140 Ok(())
142 }
143
144 fn read(&self, relative_index: usize) -> Option<T> {
147 if relative_index >= BLOCK_CAP {
148 return None;
149 }
150
151 let mask = 1 << relative_index;
152
153 let prev_ready = self.ready_slots.fetch_and(!mask, Ordering::AcqRel);
155
156 if prev_ready & mask == 0 {
157 return None;
159 }
160
161 unsafe {
162 let values = &*self.values.get();
163 Some(ManuallyDrop::into_inner(
164 values[relative_index].assume_init_read(),
165 ))
166 }
167 }
168
169 fn is_ready(&self, relative_index: usize) -> bool {
171 if relative_index >= BLOCK_CAP {
172 return false;
173 }
174
175 let mask = 1 << relative_index;
176 self.ready_slots.load(Ordering::Acquire) & mask != 0
177 }
178
179 fn ready_count(&self) -> usize {
181 self.ready_slots.load(Ordering::Acquire).count_ones() as usize
182 }
183}
184
185impl<T> Drop for Block<T> {
186 fn drop(&mut self) {
187 let ready = self.ready_slots.load(Ordering::Relaxed);
188 unsafe {
189 let values = &mut *self.values.get();
190 for i in 0..BLOCK_CAP {
191 if ready & (1 << i) != 0 {
192 ManuallyDrop::drop(values[i].assume_init_mut());
193 }
194 }
195 }
196 }
197}
198
199struct Shared<T> {
201 head: AtomicPtr<Block<T>>,
203 tail: AtomicPtr<Block<T>>,
205 rx_waker: AtomicPtr<Waker>,
207 waker_lock: AtomicBool,
209 num_senders: AtomicUsize,
211 num_weak_senders: AtomicUsize,
213 closed: AtomicBool,
215}
216
217impl<T> Shared<T> {
218 fn wake_receiver(&self) {
220 if self.rx_waker.load(Ordering::Acquire).is_null() {
222 return; }
224
225 while self
227 .waker_lock
228 .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
229 .is_err()
230 {
231 core::hint::spin_loop();
232 }
233
234 let waker_ptr = self.rx_waker.swap(ptr::null_mut(), Ordering::Acquire);
236 if !waker_ptr.is_null() {
237 let waker = unsafe { Box::from_raw(waker_ptr) };
238 self.waker_lock.store(false, Ordering::Release);
240 waker.wake();
241 } else {
242 self.waker_lock.store(false, Ordering::Release);
244 }
245 }
246
247 fn store_waker(&self, waker: Waker) {
249 while self
251 .waker_lock
252 .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
253 .is_err()
254 {
255 core::hint::spin_loop();
256 }
257
258 let new_waker_ptr = Box::into_raw(Box::new(waker));
260 let old_waker_ptr = self.rx_waker.swap(new_waker_ptr, Ordering::Release);
261
262 if !old_waker_ptr.is_null() {
264 unsafe { drop(Box::from_raw(old_waker_ptr)) };
265 }
266
267 self.waker_lock.store(false, Ordering::Release);
269 }
270}
271
272unsafe impl<T: Send> Send for Shared<T> {}
273unsafe impl<T: Send> Sync for Shared<T> {}
274
275impl<T> Drop for Shared<T> {
276 fn drop(&mut self) {
277 let waker_ptr = self.rx_waker.load(Ordering::Relaxed);
279 if !waker_ptr.is_null() {
280 unsafe { drop(Box::from_raw(waker_ptr)) };
281 }
282
283 let mut current = self.tail.load(Ordering::Relaxed);
285 while !current.is_null() {
286 let block = unsafe { Box::from_raw(current) };
287 current = block.next.load(Ordering::Relaxed);
288 }
289 }
290}
291
292pub struct UnboundedSender<T> {
294 shared: Arc<Shared<T>>,
295}
296
297pub struct WeakUnboundedSender<T> {
325 shared: Arc<Shared<T>>,
326}
327
328impl<T> Clone for WeakUnboundedSender<T> {
329 fn clone(&self) -> Self {
330 self.shared.num_weak_senders.fetch_add(1, Ordering::Relaxed);
331 Self {
332 shared: Arc::clone(&self.shared),
333 }
334 }
335}
336
337impl<T> Drop for WeakUnboundedSender<T> {
338 fn drop(&mut self) {
339 self.shared.num_weak_senders.fetch_sub(1, Ordering::AcqRel);
340 }
341}
342
343impl<T> Clone for UnboundedSender<T> {
344 fn clone(&self) -> Self {
345 self.shared.num_senders.fetch_add(1, Ordering::Relaxed);
346 Self {
347 shared: Arc::clone(&self.shared),
348 }
349 }
350}
351
352impl<T> Drop for UnboundedSender<T> {
353 fn drop(&mut self) {
354 let prev_count = self.shared.num_senders.fetch_sub(1, Ordering::AcqRel);
355 if prev_count == 1 {
356 self.shared.closed.store(true, Ordering::Release);
358
359 self.shared.wake_receiver();
361 }
362 }
363}
364
365impl<T> UnboundedSender<T> {
366 pub fn send(&self, mut value: T) -> Result<(), SendError<T>> {
370 if self.shared.closed.load(Ordering::Acquire) {
371 return Err(SendError(value));
372 }
373
374 let mut attempts = 0;
375 loop {
376 let head_ptr = self.shared.head.load(Ordering::Acquire);
377 let head = unsafe { &*head_ptr };
378
379 let slot_idx = head.len.fetch_add(1, Ordering::AcqRel);
381
382 if slot_idx < BLOCK_CAP {
383 match head.write(slot_idx, value) {
386 Ok(()) => {
387 self.shared.wake_receiver();
389 return Ok(());
390 }
391 Err(returned_value) => {
392 value = returned_value;
395 head.len.fetch_sub(1, Ordering::AcqRel); continue;
398 }
399 }
400 } else {
401 head.len.store(BLOCK_CAP, Ordering::Release);
403 }
404
405 let next_ptr = head.next.load(Ordering::Acquire);
407 if next_ptr.is_null() {
408 let new_block = Box::into_raw(Box::new(Block::new(head.start_index + BLOCK_CAP)));
410
411 match head.next.compare_exchange_weak(
412 ptr::null_mut(),
413 new_block,
414 Ordering::AcqRel,
415 Ordering::Acquire,
416 ) {
417 Ok(_) => {
418 self.shared.head.store(new_block, Ordering::Release);
420 }
421 Err(_) => {
422 unsafe { drop(Box::from_raw(new_block)) };
424 }
425 }
426 } else {
427 self.shared
429 .head
430 .compare_exchange_weak(head_ptr, next_ptr, Ordering::AcqRel, Ordering::Acquire)
431 .ok();
432 }
433
434 attempts += 1;
435 if attempts > 1000 {
436 core::hint::spin_loop();
438 attempts = 0;
439 }
440 }
441 }
442
443 pub fn id(&self) -> usize {
445 Arc::as_ptr(&self.shared) as usize
446 }
447
448 pub fn is_closed(&self) -> bool {
450 self.shared.closed.load(Ordering::Acquire)
451 }
452
453 pub fn same_channel(&self, other: &Self) -> bool {
455 Arc::ptr_eq(&self.shared, &other.shared)
456 }
457
458 #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
463 pub fn downgrade(&self) -> WeakUnboundedSender<T> {
464 self.shared.num_weak_senders.fetch_add(1, Ordering::Relaxed);
465 WeakUnboundedSender {
466 shared: Arc::clone(&self.shared),
467 }
468 }
469
470 pub fn strong_count(&self) -> usize {
472 self.shared.num_senders.load(Ordering::Acquire)
473 }
474
475 pub fn weak_count(&self) -> usize {
477 self.shared.num_weak_senders.load(Ordering::Acquire)
478 }
479
480 pub async fn closed(&self) {
484 ClosedFuture { sender: self }.await
485 }
486}
487
488impl<T> WeakUnboundedSender<T> {
489 pub fn upgrade(&self) -> Option<UnboundedSender<T>> {
493 let mut count = self.shared.num_senders.load(Ordering::Acquire);
494
495 loop {
496 if count == 0 {
497 return None;
499 }
500
501 match self.shared.num_senders.compare_exchange_weak(
502 count,
503 count + 1,
504 Ordering::AcqRel,
505 Ordering::Acquire,
506 ) {
507 Ok(_) => {
508 return Some(UnboundedSender {
509 shared: Arc::clone(&self.shared),
510 });
511 }
512 Err(actual) => count = actual,
513 }
514 }
515 }
516
517 pub fn strong_count(&self) -> usize {
519 self.shared.num_senders.load(Ordering::Acquire)
520 }
521
522 pub fn weak_count(&self) -> usize {
524 self.shared.num_weak_senders.load(Ordering::Acquire)
525 }
526}
527
528impl<T> PartialEq for UnboundedSender<T> {
529 fn eq(&self, other: &Self) -> bool {
530 self.id() == other.id()
531 }
532}
533
534impl<T> Eq for UnboundedSender<T> {}
535
536impl<T> PartialOrd for UnboundedSender<T> {
537 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
538 Some(self.cmp(other))
539 }
540}
541
542impl<T> Ord for UnboundedSender<T> {
543 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
544 self.id().cmp(&other.id())
545 }
546}
547
548impl<T> Hash for UnboundedSender<T> {
549 fn hash<H: Hasher>(&self, state: &mut H) {
550 self.id().hash(state);
551 }
552}
553
554impl<T> PartialEq for WeakUnboundedSender<T> {
555 fn eq(&self, other: &Self) -> bool {
556 Arc::ptr_eq(&self.shared, &other.shared)
557 }
558}
559
560impl<T> Eq for WeakUnboundedSender<T> {}
561
562impl<T> PartialOrd for WeakUnboundedSender<T> {
563 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
564 Some(self.cmp(other))
565 }
566}
567
568impl<T> Ord for WeakUnboundedSender<T> {
569 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
570 let self_ptr = Arc::as_ptr(&self.shared) as usize;
571 let other_ptr = Arc::as_ptr(&other.shared) as usize;
572 self_ptr.cmp(&other_ptr)
573 }
574}
575
576impl<T> Hash for WeakUnboundedSender<T> {
577 fn hash<H: Hasher>(&self, state: &mut H) {
578 let ptr = Arc::as_ptr(&self.shared) as usize;
579 ptr.hash(state);
580 }
581}
582
583impl<T> fmt::Debug for UnboundedSender<T> {
584 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
585 f.debug_struct("UnboundedSender")
586 .field("id", &self.id())
587 .field("strong_count", &self.strong_count())
588 .field("weak_count", &self.weak_count())
589 .finish()
590 }
591}
592
593impl<T> fmt::Debug for WeakUnboundedSender<T> {
594 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
595 f.debug_struct("WeakUnboundedSender")
596 .field("strong_count", &self.strong_count())
597 .field("weak_count", &self.weak_count())
598 .finish()
599 }
600}
601
602pub struct UnboundedReceiver<T> {
604 shared: Arc<Shared<T>>,
605 recv_index: usize,
607}
608
609impl<T> UnboundedReceiver<T> {
610 pub async fn recv(&mut self) -> Option<T> {
612 RecvFuture { receiver: self }.await
613 }
614
615 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
617 loop {
618 let tail_ptr = self.shared.tail.load(Ordering::Acquire);
619 let tail = unsafe { &*tail_ptr };
620
621 if let Some(relative_idx) = tail.relative_index(self.recv_index) {
622 if tail.is_ready(relative_idx) {
624 if let Some(value) = tail.read(relative_idx) {
625 self.recv_index += 1;
626 return Ok(value);
627 }
628 }
629
630 if relative_idx == BLOCK_CAP - 1
632 || tail.ready_count() == tail.len.load(Ordering::Acquire)
633 {
634 let next_ptr = tail.next.load(Ordering::Acquire);
635 if !next_ptr.is_null() {
636 if self
638 .shared
639 .tail
640 .compare_exchange(
641 tail_ptr,
642 next_ptr,
643 Ordering::AcqRel,
644 Ordering::Acquire,
645 )
646 .is_ok()
647 {
648 unsafe { drop(Box::from_raw(tail_ptr)) };
650 }
651 continue;
652 }
653 }
654 } else {
655 let next_ptr = tail.next.load(Ordering::Acquire);
657 if !next_ptr.is_null() {
658 if self
659 .shared
660 .tail
661 .compare_exchange(tail_ptr, next_ptr, Ordering::AcqRel, Ordering::Acquire)
662 .is_ok()
663 {
664 unsafe { drop(Box::from_raw(tail_ptr)) };
665 }
666 continue;
667 }
668 }
669
670 if self.shared.closed.load(Ordering::Acquire)
672 && self.shared.num_senders.load(Ordering::Acquire) == 0
673 {
674 return Err(TryRecvError::Disconnected);
675 }
676
677 return Err(TryRecvError::Empty);
678 }
679 }
680
681 pub fn id(&self) -> usize {
683 Arc::as_ptr(&self.shared) as usize
684 }
685
686 pub fn is_closed(&self) -> bool {
688 self.shared.closed.load(Ordering::Acquire)
689 && self.shared.num_senders.load(Ordering::Acquire) == 0
690 }
691
692 pub fn is_empty(&self) -> bool {
694 let tail_ptr = self.shared.tail.load(Ordering::Acquire);
699 let tail = unsafe { &*tail_ptr };
700
701 if let Some(relative_idx) = tail.relative_index(self.recv_index) {
702 if tail.is_ready(relative_idx) {
704 return false; }
706 }
707
708 if self.shared.closed.load(Ordering::Acquire)
710 && self.shared.num_senders.load(Ordering::Acquire) == 0
711 {
712 return true; }
714
715 true
717 }
718
719 pub fn close(&mut self) {
721 self.shared.closed.store(true, Ordering::Release);
722 }
723
724 pub fn sender_strong_count(&self) -> usize {
726 self.shared.num_senders.load(Ordering::Acquire)
727 }
728
729 pub fn sender_weak_count(&self) -> usize {
731 self.shared.num_weak_senders.load(Ordering::Acquire)
732 }
733
734 pub fn len(&self) -> usize {
736 let mut count = 0;
738 let mut current = self.shared.tail.load(Ordering::Acquire);
739
740 while !current.is_null() {
741 let block = unsafe { &*current };
742 count += block.ready_count();
743 current = block.next.load(Ordering::Acquire);
744 }
745
746 count
747 }
748
749 fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
750 match self.try_recv() {
751 Ok(value) => Poll::Ready(Some(value)),
752 Err(TryRecvError::Disconnected) => Poll::Ready(None),
753 Err(TryRecvError::Empty) => {
754 self.shared.store_waker(cx.waker().clone());
756 Poll::Pending
757 }
758 }
759 }
760}
761
762impl<T> Drop for UnboundedReceiver<T> {
763 fn drop(&mut self) {
764 self.shared.closed.store(true, Ordering::Release);
765 }
766}
767
768struct RecvFuture<'a, T> {
770 receiver: &'a mut UnboundedReceiver<T>,
771}
772
773impl<'a, T> Future for RecvFuture<'a, T> {
774 type Output = Option<T>;
775
776 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
777 self.receiver.poll_recv(cx)
778 }
779}
780
781struct ClosedFuture<'a, T> {
783 sender: &'a UnboundedSender<T>,
784}
785
786impl<'a, T> Future for ClosedFuture<'a, T> {
787 type Output = ();
788
789 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
790 if self.sender.is_closed() {
791 Poll::Ready(())
792 } else {
793 Poll::Pending
796 }
797 }
798}
799
800#[cfg(test)]
801mod tests {
802 use super::*;
803 use alloc::{vec, vec::Vec};
804
805 #[test]
806 fn test_basic_send_recv() {
807 let (tx, mut rx) = unbounded_channel::<i32>();
808
809 tx.send(1).unwrap();
810 tx.send(2).unwrap();
811 tx.send(3).unwrap();
812
813 assert_eq!(rx.try_recv().unwrap(), 1);
814 assert_eq!(rx.try_recv().unwrap(), 2);
815 assert_eq!(rx.try_recv().unwrap(), 3);
816 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
817 }
818
819 #[test]
820 fn test_channel_id() {
821 let (tx1, rx1) = unbounded_channel::<i32>();
822 let (tx2, rx2) = unbounded_channel::<i32>();
823
824 assert_eq!(tx1.id(), rx1.id());
825 assert_ne!(tx1.id(), tx2.id());
826 assert_ne!(rx1.id(), rx2.id());
827 }
828
829 #[test]
830 fn test_clone_sender() {
831 let (tx, mut rx) = unbounded_channel::<i32>();
832 let tx2 = tx.clone();
833
834 tx.send(1).unwrap();
835 tx2.send(2).unwrap();
836
837 assert_eq!(rx.try_recv().unwrap(), 1);
838 assert_eq!(rx.try_recv().unwrap(), 2);
839 }
840
841 #[test]
842 fn test_large_number_of_messages() {
843 let (tx, mut rx) = unbounded_channel::<usize>();
844
845 for i in 0..100 {
847 tx.send(i).unwrap();
848 }
849
850 for i in 0..100 {
852 assert_eq!(rx.try_recv().unwrap(), i);
853 }
854 }
855
856 #[test]
857 fn test_drop_sender_closes_channel() {
858 let (tx, mut rx) = unbounded_channel::<i32>();
859
860 tx.send(42).unwrap();
861 drop(tx);
862
863 assert_eq!(rx.try_recv().unwrap(), 42);
864 assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
865 }
866
867 #[test]
868 fn test_same_channel() {
869 let (tx1, _rx) = unbounded_channel::<i32>();
870 let tx2 = tx1.clone();
871 let (tx3, _rx2) = unbounded_channel::<i32>();
872
873 assert!(tx1.same_channel(&tx2));
874 assert!(!tx1.same_channel(&tx3));
875 }
876
877 #[test]
880 fn test_stress_many_messages() {
881 let (tx, mut rx) = unbounded_channel::<usize>();
882 const NUM_MESSAGES: usize = 10_000;
883
884 for i in 0..NUM_MESSAGES {
886 tx.send(i).unwrap();
887 }
888
889 for i in 0..NUM_MESSAGES {
891 assert_eq!(rx.try_recv().unwrap(), i);
892 }
893
894 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
895 }
896
897 #[test]
898 fn test_send_recv_interleaved() {
899 let (tx, mut rx) = unbounded_channel::<i32>();
900
901 let mut expected_recv = 0;
903 for i in 0..100 {
904 tx.send(i).unwrap();
905 if i % 2 == 0 {
906 assert_eq!(rx.try_recv().unwrap(), expected_recv);
907 expected_recv += 1;
908 }
909 }
910
911 while let Ok(value) = rx.try_recv() {
913 assert_eq!(value, expected_recv);
914 expected_recv += 1;
915 }
916
917 assert_eq!(expected_recv, 100); }
919
920 #[test]
921 fn test_drop_receiver_while_sending() {
922 let (tx, rx) = unbounded_channel::<i32>();
923
924 tx.send(1).unwrap();
926 tx.send(2).unwrap();
927
928 drop(rx);
930
931 assert!(matches!(tx.send(3), Err(SendError(3))));
933 assert!(tx.is_closed());
934 }
935
936 #[test]
937 fn test_multiple_sender_drops() {
938 let (tx, mut rx) = unbounded_channel::<i32>();
939 let tx2 = tx.clone();
940 let tx3 = tx.clone();
941
942 tx.send(1).unwrap();
943 tx2.send(2).unwrap();
944 tx3.send(3).unwrap();
945
946 drop(tx);
948 assert!(!rx.is_closed());
949
950 drop(tx2);
951 assert!(!rx.is_closed());
952
953 drop(tx3);
955
956 assert_eq!(rx.try_recv().unwrap(), 1);
958 assert_eq!(rx.try_recv().unwrap(), 2);
959 assert_eq!(rx.try_recv().unwrap(), 3);
960
961 assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
963 assert!(rx.is_closed());
964 }
965
966 #[test]
967 fn test_zero_sized_types() {
968 #[derive(Debug, PartialEq)]
969 struct ZeroSized;
970
971 let (tx, mut rx) = unbounded_channel::<ZeroSized>();
972
973 tx.send(ZeroSized).unwrap();
974 tx.send(ZeroSized).unwrap();
975
976 assert_eq!(rx.try_recv().unwrap(), ZeroSized);
977 assert_eq!(rx.try_recv().unwrap(), ZeroSized);
978 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
979 }
980
981 #[test]
982 fn test_large_types() {
983 #[derive(Debug, PartialEq)]
984 struct LargeType([u8; 1024]);
985
986 let (tx, mut rx) = unbounded_channel::<LargeType>();
987 let large_value = LargeType([42; 1024]);
988
989 tx.send(large_value).unwrap();
990 let received = rx.try_recv().unwrap();
991
992 assert_eq!(received.0.len(), 1024);
994 for &byte in &received.0 {
995 assert_eq!(byte, 42, "Large type data corruption detected");
996 }
997
998 let large_value2 = LargeType([123; 1024]);
1000 let large_value3 = LargeType([255; 1024]);
1001
1002 tx.send(large_value2).unwrap();
1003 tx.send(large_value3).unwrap();
1004
1005 let received2 = rx.try_recv().unwrap();
1006 let received3 = rx.try_recv().unwrap();
1007
1008 for &byte in &received2.0 {
1009 assert_eq!(byte, 123, "Second large message corrupted");
1010 }
1011 for &byte in &received3.0 {
1012 assert_eq!(byte, 255, "Third large message corrupted");
1013 }
1014 }
1015
1016 #[test]
1017 fn test_unwind_safety_basic() {
1018 #[derive(Debug)]
1020 struct ConditionalPanic(bool);
1021 impl Drop for ConditionalPanic {
1022 fn drop(&mut self) {
1023 if self.0 {
1026 }
1029 }
1030 }
1031
1032 let (tx, mut rx) = unbounded_channel::<ConditionalPanic>();
1033
1034 tx.send(ConditionalPanic(false)).unwrap(); tx.send(ConditionalPanic(true)).unwrap(); tx.send(ConditionalPanic(false)).unwrap(); assert_eq!(rx.try_recv().unwrap().0, false);
1041 assert_eq!(rx.try_recv().unwrap().0, true);
1042 assert_eq!(rx.try_recv().unwrap().0, false);
1043
1044 tx.send(ConditionalPanic(false)).unwrap();
1046 assert_eq!(rx.try_recv().unwrap().0, false);
1047
1048 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
1050 assert!(!rx.is_closed());
1051 }
1052
1053 #[test]
1054 fn test_block_boundary_conditions() {
1055 let (tx, mut rx) = unbounded_channel::<usize>();
1056
1057 for i in 0..BLOCK_CAP {
1059 tx.send(i).unwrap();
1060 }
1061
1062 tx.send(BLOCK_CAP).unwrap();
1064
1065 for i in 0..=BLOCK_CAP {
1067 assert_eq!(rx.try_recv().unwrap(), i);
1068 }
1069
1070 for i in 0..(BLOCK_CAP * 3) {
1072 tx.send(i).unwrap();
1073 }
1074
1075 for i in 0..(BLOCK_CAP * 3) {
1076 assert_eq!(rx.try_recv().unwrap(), i);
1077 }
1078 }
1079
1080 #[test]
1081 fn test_receiver_state_consistency() {
1082 let (tx, mut rx) = unbounded_channel::<i32>();
1083
1084 assert!(rx.is_empty());
1086 assert!(!rx.is_closed());
1087
1088 tx.send(42).unwrap();
1090 assert!(!rx.is_empty());
1091 assert!(!rx.is_closed());
1092
1093 assert_eq!(rx.try_recv().unwrap(), 42);
1095 assert!(rx.is_empty());
1096 assert!(!rx.is_closed());
1097
1098 drop(tx);
1100 assert!(rx.is_empty());
1101 assert!(rx.is_closed());
1102 assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1103 }
1104
1105 #[test]
1106 fn test_manual_close() {
1107 let (tx, mut rx) = unbounded_channel::<i32>();
1108
1109 tx.send(1).unwrap();
1110 tx.send(2).unwrap();
1111
1112 rx.close();
1114
1115 assert_eq!(rx.try_recv().unwrap(), 1);
1117 assert_eq!(rx.try_recv().unwrap(), 2);
1118
1119 assert!(tx.is_closed());
1121 assert!(matches!(tx.send(3), Err(SendError(3))));
1122 }
1123
1124 #[test]
1125 fn test_channel_id_consistency() {
1126 let (tx, rx) = unbounded_channel::<i32>();
1128 assert_eq!(tx.id(), rx.id());
1129
1130 let (tx2, rx2) = unbounded_channel::<i32>();
1132 assert_eq!(tx2.id(), rx2.id());
1133
1134 let tx_clone = tx.clone();
1137 assert_eq!(tx.id(), tx_clone.id());
1138 assert!(tx.same_channel(&tx_clone));
1139 assert!(!tx.same_channel(&tx2));
1140 }
1141
1142 #[test]
1143 fn test_drop_semantics() {
1144 use alloc::rc::Rc;
1145
1146 let drop_count = Rc::new(core::cell::RefCell::new(0));
1147
1148 #[derive(Debug)]
1149 struct DropCounter(Rc<core::cell::RefCell<i32>>);
1150 impl Drop for DropCounter {
1151 fn drop(&mut self) {
1152 *self.0.borrow_mut() += 1;
1153 }
1154 }
1155
1156 let (tx, mut rx) = unbounded_channel::<DropCounter>();
1157
1158 tx.send(DropCounter(drop_count.clone())).unwrap();
1160 tx.send(DropCounter(drop_count.clone())).unwrap();
1161 tx.send(DropCounter(drop_count.clone())).unwrap();
1162
1163 assert_eq!(*drop_count.borrow(), 0); let _value1 = rx.try_recv().unwrap();
1167 assert_eq!(*drop_count.borrow(), 0); drop(_value1);
1170 assert_eq!(*drop_count.borrow(), 1); drop(tx);
1174 drop(rx);
1175
1176 assert_eq!(*drop_count.borrow(), 3); }
1178
1179 #[test]
1180 fn test_memory_safety_after_close() {
1181 let (tx, mut rx) = unbounded_channel::<Vec<u8>>();
1182
1183 tx.send(vec![1, 2, 3]).unwrap();
1185 tx.send(vec![4, 5, 6]).unwrap();
1186
1187 rx.close();
1189
1190 assert!(matches!(tx.send(vec![7, 8, 9]), Err(_)));
1192
1193 assert_eq!(rx.try_recv().unwrap(), vec![1, 2, 3]);
1195 assert_eq!(rx.try_recv().unwrap(), vec![4, 5, 6]);
1196 }
1197
1198 #[test]
1199 fn test_ordering_guarantees() {
1200 let (tx, mut rx) = unbounded_channel::<usize>();
1201
1202 for i in 0..1000 {
1204 tx.send(i).unwrap();
1205 }
1206
1207 for i in 0..1000 {
1209 assert_eq!(rx.try_recv().unwrap(), i);
1210 }
1211 }
1212
1213 #[test]
1214 fn test_empty_channel_operations() {
1215 let (tx, mut rx) = unbounded_channel::<i32>();
1216
1217 assert!(rx.is_empty());
1219 assert!(!rx.is_closed());
1220 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
1221
1222 drop(tx);
1224 assert!(rx.is_empty());
1225 assert!(rx.is_closed());
1226 assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1227 }
1228
1229 #[test]
1230 fn test_channel_reuse_after_empty() {
1231 let (tx, mut rx) = unbounded_channel::<i32>();
1232
1233 for round in 0..10 {
1235 for i in 0..10 {
1236 tx.send(round * 10 + i).unwrap();
1237 }
1238
1239 for i in 0..10 {
1240 assert_eq!(rx.try_recv().unwrap(), round * 10 + i);
1241 }
1242
1243 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
1244 }
1245 }
1246
1247 #[test]
1248 fn test_mixed_operation_patterns() {
1249 let (tx, mut rx) = unbounded_channel::<usize>();
1250
1251 let mut next_send = 0;
1253 let mut next_recv = 0;
1254
1255 for _ in 0..100 {
1256 let send_count = (next_send % 5) + 1;
1258 for _ in 0..send_count {
1259 tx.send(next_send).unwrap();
1260 next_send += 1;
1261 }
1262
1263 let recv_count = (next_recv % 3) + 1;
1265 for _ in 0..recv_count {
1266 if let Ok(value) = rx.try_recv() {
1267 assert_eq!(value, next_recv);
1268 next_recv += 1;
1269 } else {
1270 break;
1271 }
1272 }
1273 }
1274
1275 while let Ok(value) = rx.try_recv() {
1277 assert_eq!(value, next_recv);
1278 next_recv += 1;
1279 }
1280
1281 assert_eq!(next_send, next_recv);
1282 }
1283
1284 #[test]
1287 fn test_weak_sender_basic() {
1288 let (tx, mut rx) = unbounded_channel::<i32>();
1289
1290 let weak_tx = tx.downgrade();
1292
1293 let upgraded_tx = weak_tx.upgrade().unwrap();
1295
1296 upgraded_tx.send(42).unwrap();
1298 assert_eq!(rx.try_recv().unwrap(), 42);
1299
1300 drop(tx);
1302
1303 upgraded_tx.send(43).unwrap();
1305 assert_eq!(rx.try_recv().unwrap(), 43);
1306
1307 drop(upgraded_tx);
1309
1310 assert!(weak_tx.upgrade().is_none());
1312 }
1313
1314 #[test]
1315 fn test_weak_sender_upgrade_failure() {
1316 let (tx, _rx) = unbounded_channel::<i32>();
1317 let weak_tx = tx.downgrade();
1318
1319 drop(tx);
1321
1322 assert!(weak_tx.upgrade().is_none());
1324 }
1325
1326 #[test]
1327 fn test_weak_sender_counts() {
1328 let (tx, rx) = unbounded_channel::<i32>();
1329
1330 assert_eq!(tx.strong_count(), 1);
1332 assert_eq!(tx.weak_count(), 0);
1333 assert_eq!(rx.sender_strong_count(), 1);
1334 assert_eq!(rx.sender_weak_count(), 0);
1335
1336 let weak_tx = tx.downgrade();
1338 assert_eq!(tx.strong_count(), 1);
1339 assert_eq!(tx.weak_count(), 1);
1340 assert_eq!(weak_tx.strong_count(), 1);
1341 assert_eq!(weak_tx.weak_count(), 1);
1342 assert_eq!(rx.sender_strong_count(), 1);
1343 assert_eq!(rx.sender_weak_count(), 1);
1344
1345 let tx2 = tx.clone();
1347 assert_eq!(tx.strong_count(), 2);
1348 assert_eq!(tx.weak_count(), 1);
1349 assert_eq!(tx2.strong_count(), 2);
1350 assert_eq!(weak_tx.strong_count(), 2);
1351 assert_eq!(weak_tx.weak_count(), 1);
1352 assert_eq!(rx.sender_strong_count(), 2);
1353 assert_eq!(rx.sender_weak_count(), 1);
1354
1355 let weak_tx2 = weak_tx.clone();
1357 assert_eq!(tx.strong_count(), 2);
1358 assert_eq!(tx.weak_count(), 2);
1359 assert_eq!(weak_tx.weak_count(), 2);
1360 assert_eq!(weak_tx2.weak_count(), 2);
1361 assert_eq!(rx.sender_strong_count(), 2);
1362 assert_eq!(rx.sender_weak_count(), 2);
1363
1364 drop(weak_tx);
1366 assert_eq!(tx.weak_count(), 1);
1367 assert_eq!(weak_tx2.weak_count(), 1);
1368 assert_eq!(rx.sender_weak_count(), 1);
1369
1370 drop(tx);
1372 assert_eq!(tx2.strong_count(), 1);
1373 assert_eq!(weak_tx2.strong_count(), 1);
1374 assert_eq!(rx.sender_strong_count(), 1);
1375
1376 drop(tx2);
1378 assert_eq!(weak_tx2.strong_count(), 0);
1379 assert_eq!(weak_tx2.weak_count(), 1);
1380 assert_eq!(rx.sender_strong_count(), 0);
1381 assert_eq!(rx.sender_weak_count(), 1);
1382
1383 assert!(weak_tx2.upgrade().is_none());
1385 }
1386
1387 #[test]
1388 fn test_weak_sender_channel_close() {
1389 let (tx, rx) = unbounded_channel::<i32>();
1390 let weak_tx = tx.downgrade();
1391
1392 drop(tx);
1394
1395 assert!(rx.is_closed());
1397
1398 assert!(weak_tx.upgrade().is_none());
1400 }
1401
1402 #[test]
1403 fn test_sender_ordering_and_equality() {
1404 let (tx1, _rx1) = unbounded_channel::<i32>();
1405 let (tx2, _rx2) = unbounded_channel::<i32>();
1406
1407 let tx1_clone = tx1.clone();
1408 let weak_tx1 = tx1.downgrade();
1409 let weak_tx2 = tx2.downgrade();
1410
1411 assert_eq!(tx1, tx1_clone);
1413
1414 assert_ne!(tx1, tx2);
1416
1417 assert_eq!(weak_tx1, weak_tx1.clone());
1419
1420 assert_ne!(weak_tx1, weak_tx2);
1422
1423 let ordering1 = tx1.cmp(&tx2);
1425 let ordering2 = tx1.cmp(&tx2);
1426 assert_eq!(ordering1, ordering2); use alloc::collections::BTreeSet;
1430 let mut set = BTreeSet::new();
1431 set.insert(tx1.clone());
1432 set.insert(tx1_clone.clone());
1433 assert_eq!(set.len(), 1); set.insert(tx2.clone());
1436 assert_eq!(set.len(), 2); }
1438
1439 #[test]
1440 fn test_weak_sender_multiple_upgrades() {
1441 let (tx, mut rx) = unbounded_channel::<i32>();
1442 let weak_tx = tx.downgrade();
1443
1444 let upgraded1 = weak_tx.upgrade().unwrap();
1446 let upgraded2 = weak_tx.upgrade().unwrap();
1447
1448 upgraded1.send(1).unwrap();
1449 upgraded2.send(2).unwrap();
1450
1451 assert_eq!(rx.try_recv().unwrap(), 1);
1452 assert_eq!(rx.try_recv().unwrap(), 2);
1453
1454 drop(tx);
1456 drop(upgraded1);
1457
1458 let upgraded3 = weak_tx.upgrade().unwrap();
1460 upgraded3.send(3).unwrap();
1461 assert_eq!(rx.try_recv().unwrap(), 3);
1462
1463 drop(upgraded2);
1465 drop(upgraded3);
1466
1467 assert!(weak_tx.upgrade().is_none());
1469 }
1470
1471 #[test]
1472 fn test_sender_hash_collections() {
1473 use alloc::collections::BTreeSet;
1474
1475 let (tx1, _rx1) = unbounded_channel::<i32>();
1476 let (tx2, _rx2) = unbounded_channel::<i32>();
1477 let tx1_clone = tx1.clone();
1478
1479 let mut set = BTreeSet::new();
1481
1482 set.insert(tx1.clone());
1484 assert_eq!(set.len(), 1);
1485
1486 set.insert(tx1_clone);
1488 assert_eq!(set.len(), 1);
1489
1490 set.insert(tx2);
1492 assert_eq!(set.len(), 2);
1493
1494 let weak_tx1 = tx1.downgrade();
1496 let weak_tx1_clone = weak_tx1.clone();
1497
1498 let mut weak_set = BTreeSet::new();
1499 weak_set.insert(weak_tx1);
1500 weak_set.insert(weak_tx1_clone); assert_eq!(weak_set.len(), 1);
1502 }
1503
1504 #[test]
1507 fn test_len_method() {
1508 let (tx, mut rx) = unbounded_channel::<i32>();
1509
1510 assert_eq!(rx.len(), 0);
1512 assert!(rx.is_empty());
1513
1514 tx.send(1).unwrap();
1516 assert_eq!(rx.len(), 1);
1517 assert!(!rx.is_empty());
1518
1519 tx.send(2).unwrap();
1520 tx.send(3).unwrap();
1521 assert_eq!(rx.len(), 3);
1522 assert!(!rx.is_empty());
1523
1524 assert_eq!(rx.try_recv().unwrap(), 1);
1526 assert_eq!(rx.len(), 2);
1527 assert!(!rx.is_empty());
1528
1529 assert_eq!(rx.try_recv().unwrap(), 2);
1530 assert_eq!(rx.len(), 1);
1531 assert!(!rx.is_empty());
1532
1533 assert_eq!(rx.try_recv().unwrap(), 3);
1534 assert_eq!(rx.len(), 0);
1535 assert!(rx.is_empty());
1536
1537 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
1539 }
1540
1541 #[test]
1544 fn test_tokio_drop_behavior_compatibility() {
1545 use alloc::sync::Arc;
1546 use core::sync::atomic::{AtomicUsize, Ordering};
1547
1548 #[derive(Debug)]
1550 struct DropCounter {
1551 #[allow(dead_code)]
1552 id: usize,
1553 counter: Arc<AtomicUsize>,
1554 }
1555
1556 impl Drop for DropCounter {
1557 fn drop(&mut self) {
1558 self.counter.fetch_add(1, Ordering::SeqCst);
1559 }
1560 }
1561
1562 const NUM_MESSAGES: usize = 100; let our_drop_counter = Arc::new(AtomicUsize::new(0));
1566 {
1567 let (tx, mut rx) = unbounded_channel::<DropCounter>();
1568
1569 for i in 0..NUM_MESSAGES {
1571 let msg = DropCounter {
1572 id: i,
1573 counter: Arc::clone(&our_drop_counter),
1574 };
1575 tx.send(msg).unwrap();
1576 }
1577
1578 for _ in 0..10 {
1580 let _msg = rx.try_recv().unwrap();
1581 }
1583
1584 drop(rx);
1586 drop(tx);
1587 }
1588
1589 let our_dropped_count = our_drop_counter.load(Ordering::SeqCst);
1590
1591 assert_eq!(
1593 our_dropped_count, NUM_MESSAGES,
1594 "Our implementation should drop all {} messages",
1595 NUM_MESSAGES
1596 );
1597 }
1598
1599 #[test]
1604 fn test_tokio_exact_drop_condition() {
1605 use alloc::sync::Arc;
1606 use alloc::vec::Vec;
1607 use core::sync::atomic::{AtomicUsize, Ordering};
1608
1609 #[derive(Debug)]
1610 struct DropTracker {
1611 #[allow(dead_code)]
1612 id: usize,
1613 counter: Arc<AtomicUsize>,
1614 }
1615
1616 impl Drop for DropTracker {
1617 fn drop(&mut self) {
1618 self.counter.fetch_add(1, Ordering::SeqCst);
1619 }
1620 }
1621
1622 const NUM_MESSAGES: usize = 50; let drop_counter = Arc::new(AtomicUsize::new(0));
1624
1625 let (tx, mut rx) = unbounded_channel::<DropTracker>();
1626 let tx_clone = tx.clone();
1627
1628 for i in 0..NUM_MESSAGES {
1630 let msg = DropTracker {
1631 id: i,
1632 counter: Arc::clone(&drop_counter),
1633 };
1634 tx.send(msg).unwrap();
1635 }
1636
1637 let received_before_drop = 10;
1639 for _ in 0..received_before_drop {
1640 let _msg = rx.try_recv().unwrap();
1641 }
1642
1643 drop(rx);
1645
1646 let mut send_errors = 0;
1648 let mut failed_messages = Vec::new();
1649
1650 for i in NUM_MESSAGES..NUM_MESSAGES + 10 {
1652 let msg = DropTracker {
1653 id: i,
1654 counter: Arc::clone(&drop_counter),
1655 };
1656
1657 match tx_clone.send(msg) {
1658 Ok(_) => {
1659 panic!("Send succeeded after receiver drop - this violates Tokio behavior");
1661 }
1662 Err(send_error) => {
1663 send_errors += 1;
1664 failed_messages.push(send_error.0); }
1666 }
1667 }
1668
1669 drop(tx);
1671 drop(tx_clone);
1672
1673 drop(failed_messages);
1675
1676 let final_drop_count = drop_counter.load(Ordering::SeqCst);
1677
1678 let expected_total_drops = NUM_MESSAGES + send_errors;
1683 assert_eq!(
1684 final_drop_count, expected_total_drops,
1685 "Expected {} drops (original {} + failed sends {}), got {}",
1686 expected_total_drops, NUM_MESSAGES, send_errors, final_drop_count
1687 );
1688
1689 assert!(
1691 send_errors > 0,
1692 "Send attempts after receiver drop should fail"
1693 );
1694 }
1695
1696 #[test]
1697 fn test_tokio_api_compatibility() {
1698 let (tx, mut rx) = unbounded_channel::<i32>();
1699
1700 assert!(!tx.is_closed());
1702 assert!(!rx.is_closed());
1703 assert!(rx.is_empty());
1704 assert_eq!(rx.len(), 0);
1705
1706 assert_eq!(tx.strong_count(), 1);
1708 assert_eq!(tx.weak_count(), 0);
1709 assert_eq!(rx.sender_strong_count(), 1);
1710 assert_eq!(rx.sender_weak_count(), 0);
1711
1712 assert!(tx.same_channel(&tx));
1714 assert_eq!(tx.id(), rx.id());
1715
1716 let _weak_tx = tx.downgrade();
1718 assert_eq!(tx.weak_count(), 1);
1719 assert_eq!(rx.sender_weak_count(), 1);
1720
1721 tx.send(42).unwrap();
1723 assert_eq!(rx.len(), 1);
1724 assert!(!rx.is_empty());
1725
1726 assert_eq!(rx.try_recv().unwrap(), 42);
1728 assert_eq!(rx.len(), 0);
1729 assert!(rx.is_empty());
1730 }
1731
1732 #[test]
1735 fn test_sender_drop_behavior_comprehensive() {
1736 use alloc::sync::Arc;
1737 use core::sync::atomic::{AtomicUsize, Ordering};
1738
1739 #[derive(Debug)]
1740 struct DropTracker {
1741 #[allow(dead_code)]
1742 id: usize,
1743 counter: Arc<AtomicUsize>,
1744 }
1745
1746 impl Drop for DropTracker {
1747 fn drop(&mut self) {
1748 self.counter.fetch_add(1, Ordering::SeqCst);
1749 }
1750 }
1751
1752 {
1754 let drop_counter = Arc::new(AtomicUsize::new(0));
1755 const NUM_MESSAGES: usize = 50;
1756
1757 let (tx, mut rx) = unbounded_channel::<DropTracker>();
1758 let tx2 = tx.clone();
1759 let tx3 = tx.clone();
1760
1761 for i in 0..NUM_MESSAGES {
1763 let msg = DropTracker {
1764 id: i,
1765 counter: Arc::clone(&drop_counter),
1766 };
1767
1768 match i % 3 {
1770 0 => tx.send(msg).unwrap(),
1771 1 => tx2.send(msg).unwrap(),
1772 2 => tx3.send(msg).unwrap(),
1773 _ => unreachable!(),
1774 }
1775 }
1776
1777 let received_count = 15;
1779 for _ in 0..received_count {
1780 let _msg = rx.try_recv().unwrap();
1781 }
1782
1783 assert_eq!(rx.len(), NUM_MESSAGES - received_count);
1784 assert!(!rx.is_closed());
1785
1786 drop(tx);
1788 drop(tx2);
1789 drop(tx3);
1790
1791 assert!(rx.is_closed());
1793
1794 let mut remaining_received = 0;
1796 while let Ok(_msg) = rx.try_recv() {
1797 remaining_received += 1;
1798 }
1799
1800 assert_eq!(remaining_received, NUM_MESSAGES - received_count);
1801
1802 assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1804
1805 drop(rx);
1807
1808 assert_eq!(
1810 drop_counter.load(Ordering::SeqCst),
1811 NUM_MESSAGES,
1812 "All messages should be dropped when senders and receiver are dropped"
1813 );
1814 }
1815
1816 {
1818 let drop_counter = Arc::new(AtomicUsize::new(0));
1819 const NUM_MESSAGES: usize = 30;
1820
1821 let (tx, mut rx) = unbounded_channel::<DropTracker>();
1822 let tx2 = tx.clone();
1823
1824 for i in 0..NUM_MESSAGES {
1826 let msg = DropTracker {
1827 id: i + 100, counter: Arc::clone(&drop_counter),
1829 };
1830
1831 if i % 2 == 0 {
1832 tx.send(msg).unwrap();
1833 } else {
1834 tx2.send(msg).unwrap();
1835 }
1836
1837 let _received = rx.try_recv().unwrap();
1839 }
1840
1841 assert!(rx.is_empty());
1843 assert!(!rx.is_closed());
1844 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
1845
1846 drop(tx);
1848 drop(tx2);
1849
1850 assert!(rx.is_empty());
1852 assert!(rx.is_closed());
1853 assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1854
1855 drop(rx);
1856
1857 assert_eq!(
1859 drop_counter.load(Ordering::SeqCst),
1860 NUM_MESSAGES,
1861 "All messages should be dropped when received"
1862 );
1863 }
1864
1865 {
1867 let drop_counter = Arc::new(AtomicUsize::new(0));
1868 const NUM_MESSAGES: usize = 40;
1869
1870 let (tx, mut rx) = unbounded_channel::<DropTracker>();
1871 let tx2 = tx.clone();
1872 let weak_tx = tx.downgrade();
1873
1874 for i in 0..NUM_MESSAGES / 2 {
1876 let msg = DropTracker {
1877 id: i + 200, counter: Arc::clone(&drop_counter),
1879 };
1880 tx.send(msg).unwrap();
1881 }
1882
1883 drop(tx);
1885 assert!(!rx.is_closed()); for i in NUM_MESSAGES / 2..NUM_MESSAGES {
1889 let msg = DropTracker {
1890 id: i + 200,
1891 counter: Arc::clone(&drop_counter),
1892 };
1893 tx2.send(msg).unwrap();
1894 }
1895
1896 let upgraded = weak_tx.upgrade();
1898 assert!(upgraded.is_some());
1899 drop(upgraded); drop(tx2);
1903
1904 let mut received_all = 0;
1906 while let Ok(_msg) = rx.try_recv() {
1907 received_all += 1;
1908 }
1909
1910 assert_eq!(received_all, NUM_MESSAGES);
1911
1912 assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1914 assert!(rx.is_closed());
1915
1916 assert!(weak_tx.upgrade().is_none());
1918
1919 drop(rx);
1920 drop(weak_tx);
1921
1922 assert_eq!(
1924 drop_counter.load(Ordering::SeqCst),
1925 NUM_MESSAGES,
1926 "All messages should be dropped with gradual sender drop"
1927 );
1928 }
1929 }
1930}