1#![no_std]
2
3extern crate alloc;
4
5#[cfg(test)]
6#[path = "attacks/mod.rs"]
7pub mod attacks;
8
9use alloc::{boxed::Box, sync::Arc};
10use core::{
11 cell::UnsafeCell,
12 fmt,
13 future::Future,
14 hash::{Hash, Hasher},
15 mem::{ManuallyDrop, MaybeUninit},
16 pin::Pin,
17 ptr,
18 sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering},
19 task::{Context, Poll, Waker},
20};
21
22#[cfg(target_pointer_width = "64")]
25const BLOCK_CAP: usize = 32;
26#[cfg(target_pointer_width = "32")]
27const BLOCK_CAP: usize = 16;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct SendError<T>(pub T);
32
33impl<T> fmt::Display for SendError<T> {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 write!(f, "sending on a closed channel")
36 }
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum TryRecvError {
42 Empty,
44 Disconnected,
46}
47
48impl fmt::Display for TryRecvError {
49 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50 match self {
51 TryRecvError::Empty => write!(f, "channel is empty"),
52 TryRecvError::Disconnected => write!(f, "channel is disconnected"),
53 }
54 }
55}
56
57pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
59 let block = Block::new(0);
60 let block_ptr = Box::into_raw(Box::new(block));
61
62 let shared = Arc::new(Shared {
63 head: AtomicPtr::new(block_ptr),
64 tail: AtomicPtr::new(block_ptr),
65 rx_waker: AtomicPtr::new(ptr::null_mut()),
66 waker_lock: AtomicBool::new(false),
67 num_senders: AtomicUsize::new(1),
68 num_weak_senders: AtomicUsize::new(0),
69 closed: AtomicBool::new(false),
70 });
71
72 let sender = UnboundedSender {
73 shared: Arc::clone(&shared),
74 };
75 let receiver = UnboundedReceiver {
76 shared,
77 recv_index: 0,
78 };
79
80 (sender, receiver)
81}
82
83struct Block<T> {
85 next: AtomicPtr<Block<T>>,
87 start_index: usize,
89 values: UnsafeCell<[MaybeUninit<ManuallyDrop<T>>; BLOCK_CAP]>,
91 ready_slots: AtomicUsize,
93 len: AtomicUsize,
95}
96
97impl<T> Block<T> {
98 fn new(start_index: usize) -> Self {
99 Self {
100 next: AtomicPtr::new(ptr::null_mut()),
101 start_index,
102 values: UnsafeCell::new([const { MaybeUninit::uninit() }; BLOCK_CAP]),
103 ready_slots: AtomicUsize::new(0),
104 len: AtomicUsize::new(0),
105 }
106 }
107
108 fn relative_index(&self, index: usize) -> Option<usize> {
110 if index >= self.start_index && index < self.start_index + BLOCK_CAP {
111 Some(index - self.start_index)
112 } else {
113 None
114 }
115 }
116
117 fn write(&self, relative_index: usize, value: T) -> Result<(), T> {
120 if relative_index >= BLOCK_CAP {
121 return Err(value);
122 }
123
124 let mask = 1 << relative_index;
125
126 let prev_ready = self.ready_slots.fetch_or(mask, Ordering::AcqRel);
128
129 if prev_ready & mask != 0 {
130 return Err(value);
132 }
133
134 unsafe {
136 let values = &mut *self.values.get();
137 values[relative_index].write(ManuallyDrop::new(value));
138 }
139
140 Ok(())
142 }
143
144 fn read(&self, relative_index: usize) -> Option<T> {
147 if relative_index >= BLOCK_CAP {
148 return None;
149 }
150
151 let mask = 1 << relative_index;
152
153 let prev_ready = self.ready_slots.fetch_and(!mask, Ordering::AcqRel);
155
156 if prev_ready & mask == 0 {
157 return None;
159 }
160
161 unsafe {
162 let values = &*self.values.get();
163 Some(ManuallyDrop::into_inner(values[relative_index].assume_init_read()))
164 }
165 }
166
167 fn is_ready(&self, relative_index: usize) -> bool {
169 if relative_index >= BLOCK_CAP {
170 return false;
171 }
172
173 let mask = 1 << relative_index;
174 self.ready_slots.load(Ordering::Acquire) & mask != 0
175 }
176
177 fn ready_count(&self) -> usize {
179 self.ready_slots.load(Ordering::Acquire).count_ones() as usize
180 }
181}
182
183impl<T> Drop for Block<T> {
184 fn drop(&mut self) {
185 let ready = self.ready_slots.load(Ordering::Relaxed);
186 unsafe {
187 let values = &mut *self.values.get();
188 for i in 0..BLOCK_CAP {
189 if ready & (1 << i) != 0 {
190 ManuallyDrop::drop(values[i].assume_init_mut());
191 }
192 }
193 }
194 }
195}
196
197struct Shared<T> {
199 head: AtomicPtr<Block<T>>,
201 tail: AtomicPtr<Block<T>>,
203 rx_waker: AtomicPtr<Waker>,
205 waker_lock: AtomicBool,
207 num_senders: AtomicUsize,
209 num_weak_senders: AtomicUsize,
211 closed: AtomicBool,
213}
214
215impl<T> Shared<T> {
216 fn wake_receiver(&self) {
218 if self.rx_waker.load(Ordering::Acquire).is_null() {
220 return; }
222
223 while self.waker_lock.compare_exchange_weak(
225 false,
226 true,
227 Ordering::Acquire,
228 Ordering::Relaxed
229 ).is_err() {
230 core::hint::spin_loop();
231 }
232
233 let waker_ptr = self.rx_waker.swap(ptr::null_mut(), Ordering::Acquire);
235 if !waker_ptr.is_null() {
236 let waker = unsafe { Box::from_raw(waker_ptr) };
237 self.waker_lock.store(false, Ordering::Release);
239 waker.wake();
240 } else {
241 self.waker_lock.store(false, Ordering::Release);
243 }
244 }
245
246 fn store_waker(&self, waker: Waker) {
248 while self.waker_lock.compare_exchange_weak(
250 false,
251 true,
252 Ordering::Acquire,
253 Ordering::Relaxed
254 ).is_err() {
255 core::hint::spin_loop();
256 }
257
258 let new_waker_ptr = Box::into_raw(Box::new(waker));
260 let old_waker_ptr = self.rx_waker.swap(new_waker_ptr, Ordering::Release);
261
262 if !old_waker_ptr.is_null() {
264 unsafe { drop(Box::from_raw(old_waker_ptr)) };
265 }
266
267 self.waker_lock.store(false, Ordering::Release);
269 }
270}
271
272unsafe impl<T: Send> Send for Shared<T> {}
273unsafe impl<T: Send> Sync for Shared<T> {}
274
275impl<T> Drop for Shared<T> {
276 fn drop(&mut self) {
277 let waker_ptr = self.rx_waker.load(Ordering::Relaxed);
279 if !waker_ptr.is_null() {
280 unsafe { drop(Box::from_raw(waker_ptr)) };
281 }
282
283 let mut current = self.tail.load(Ordering::Relaxed);
285 while !current.is_null() {
286 let block = unsafe { Box::from_raw(current) };
287 current = block.next.load(Ordering::Relaxed);
288 }
289 }
290}
291
292pub struct UnboundedSender<T> {
294 shared: Arc<Shared<T>>,
295}
296
297pub struct WeakUnboundedSender<T> {
325 shared: Arc<Shared<T>>,
326}
327
328impl<T> Clone for WeakUnboundedSender<T> {
329 fn clone(&self) -> Self {
330 self.shared.num_weak_senders.fetch_add(1, Ordering::Relaxed);
331 Self {
332 shared: Arc::clone(&self.shared),
333 }
334 }
335}
336
337impl<T> Drop for WeakUnboundedSender<T> {
338 fn drop(&mut self) {
339 self.shared.num_weak_senders.fetch_sub(1, Ordering::AcqRel);
340 }
341}
342
343impl<T> Clone for UnboundedSender<T> {
344 fn clone(&self) -> Self {
345 self.shared.num_senders.fetch_add(1, Ordering::Relaxed);
346 Self {
347 shared: Arc::clone(&self.shared),
348 }
349 }
350}
351
352impl<T> Drop for UnboundedSender<T> {
353 fn drop(&mut self) {
354 let prev_count = self.shared.num_senders.fetch_sub(1, Ordering::AcqRel);
355 if prev_count == 1 {
356 self.shared.closed.store(true, Ordering::Release);
358
359 self.shared.wake_receiver();
361 }
362 }
363}
364
365impl<T> UnboundedSender<T> {
366 pub fn send(&self, mut value: T) -> Result<(), SendError<T>> {
370 if self.shared.closed.load(Ordering::Acquire) {
371 return Err(SendError(value));
372 }
373
374 let mut attempts = 0;
375 loop {
376 let head_ptr = self.shared.head.load(Ordering::Acquire);
377 let head = unsafe { &*head_ptr };
378
379 let slot_idx = head.len.fetch_add(1, Ordering::AcqRel);
381
382 if slot_idx < BLOCK_CAP {
383 match head.write(slot_idx, value) {
386 Ok(()) => {
387 self.shared.wake_receiver();
389 return Ok(());
390 }
391 Err(returned_value) => {
392 value = returned_value;
395 head.len.fetch_sub(1, Ordering::AcqRel); continue;
398 }
399 }
400 } else {
401 head.len.store(BLOCK_CAP, Ordering::Release);
403 }
404
405 let next_ptr = head.next.load(Ordering::Acquire);
407 if next_ptr.is_null() {
408 let new_block = Box::into_raw(Box::new(Block::new(head.start_index + BLOCK_CAP)));
410
411 match head.next.compare_exchange_weak(
412 ptr::null_mut(),
413 new_block,
414 Ordering::AcqRel,
415 Ordering::Acquire,
416 ) {
417 Ok(_) => {
418 self.shared.head.store(new_block, Ordering::Release);
420 }
421 Err(_) => {
422 unsafe { drop(Box::from_raw(new_block)) };
424 }
425 }
426 } else {
427 self.shared
429 .head
430 .compare_exchange_weak(head_ptr, next_ptr, Ordering::AcqRel, Ordering::Acquire)
431 .ok();
432 }
433
434 attempts += 1;
435 if attempts > 1000 {
436 core::hint::spin_loop();
438 attempts = 0;
439 }
440 }
441 }
442
443 pub fn id(&self) -> ChannelId {
445 ChannelId(Arc::as_ptr(&self.shared) as usize)
446 }
447
448 pub fn is_closed(&self) -> bool {
450 self.shared.closed.load(Ordering::Acquire)
451 }
452
453 pub fn same_channel(&self, other: &Self) -> bool {
455 Arc::ptr_eq(&self.shared, &other.shared)
456 }
457
458 #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
463 pub fn downgrade(&self) -> WeakUnboundedSender<T> {
464 self.shared.num_weak_senders.fetch_add(1, Ordering::Relaxed);
465 WeakUnboundedSender {
466 shared: Arc::clone(&self.shared),
467 }
468 }
469
470 pub fn strong_count(&self) -> usize {
472 self.shared.num_senders.load(Ordering::Acquire)
473 }
474
475 pub fn weak_count(&self) -> usize {
477 self.shared.num_weak_senders.load(Ordering::Acquire)
478 }
479
480 pub async fn closed(&self) {
484 ClosedFuture { sender: self }.await
485 }
486}
487
488impl<T> WeakUnboundedSender<T> {
489 pub fn upgrade(&self) -> Option<UnboundedSender<T>> {
493 let mut count = self.shared.num_senders.load(Ordering::Acquire);
494
495 loop {
496 if count == 0 {
497 return None;
499 }
500
501 match self.shared.num_senders.compare_exchange_weak(
502 count,
503 count + 1,
504 Ordering::AcqRel,
505 Ordering::Acquire,
506 ) {
507 Ok(_) => {
508 return Some(UnboundedSender {
509 shared: Arc::clone(&self.shared),
510 });
511 }
512 Err(actual) => count = actual,
513 }
514 }
515 }
516
517 pub fn strong_count(&self) -> usize {
519 self.shared.num_senders.load(Ordering::Acquire)
520 }
521
522 pub fn weak_count(&self) -> usize {
524 self.shared.num_weak_senders.load(Ordering::Acquire)
525 }
526}
527
528impl<T> PartialEq for UnboundedSender<T> {
529 fn eq(&self, other: &Self) -> bool {
530 self.id() == other.id()
531 }
532}
533
534impl<T> Eq for UnboundedSender<T> {}
535
536impl<T> PartialOrd for UnboundedSender<T> {
537 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
538 Some(self.cmp(other))
539 }
540}
541
542impl<T> Ord for UnboundedSender<T> {
543 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
544 self.id().cmp(&other.id())
545 }
546}
547
548impl<T> Hash for UnboundedSender<T> {
549 fn hash<H: Hasher>(&self, state: &mut H) {
550 self.id().hash(state);
551 }
552}
553
554impl<T> PartialEq for WeakUnboundedSender<T> {
555 fn eq(&self, other: &Self) -> bool {
556 Arc::ptr_eq(&self.shared, &other.shared)
557 }
558}
559
560impl<T> Eq for WeakUnboundedSender<T> {}
561
562impl<T> PartialOrd for WeakUnboundedSender<T> {
563 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
564 Some(self.cmp(other))
565 }
566}
567
568impl<T> Ord for WeakUnboundedSender<T> {
569 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
570 let self_ptr = Arc::as_ptr(&self.shared) as usize;
571 let other_ptr = Arc::as_ptr(&other.shared) as usize;
572 self_ptr.cmp(&other_ptr)
573 }
574}
575
576impl<T> Hash for WeakUnboundedSender<T> {
577 fn hash<H: Hasher>(&self, state: &mut H) {
578 let ptr = Arc::as_ptr(&self.shared) as usize;
579 ptr.hash(state);
580 }
581}
582
583impl<T> fmt::Debug for UnboundedSender<T> {
584 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
585 f.debug_struct("UnboundedSender")
586 .field("id", &self.id())
587 .field("strong_count", &self.strong_count())
588 .field("weak_count", &self.weak_count())
589 .finish()
590 }
591}
592
593impl<T> fmt::Debug for WeakUnboundedSender<T> {
594 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
595 f.debug_struct("WeakUnboundedSender")
596 .field("strong_count", &self.strong_count())
597 .field("weak_count", &self.weak_count())
598 .finish()
599 }
600}
601
602pub struct UnboundedReceiver<T> {
604 shared: Arc<Shared<T>>,
605 recv_index: usize,
607}
608
609impl<T> UnboundedReceiver<T> {
610 pub async fn recv(&mut self) -> Option<T> {
612 RecvFuture { receiver: self }.await
613 }
614
615 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
617 loop {
618 let tail_ptr = self.shared.tail.load(Ordering::Acquire);
619 let tail = unsafe { &*tail_ptr };
620
621 if let Some(relative_idx) = tail.relative_index(self.recv_index) {
622 if tail.is_ready(relative_idx) {
624 if let Some(value) = tail.read(relative_idx) {
625 self.recv_index += 1;
626 return Ok(value);
627 }
628 }
629
630 if relative_idx == BLOCK_CAP - 1
632 || tail.ready_count() == tail.len.load(Ordering::Acquire)
633 {
634 let next_ptr = tail.next.load(Ordering::Acquire);
635 if !next_ptr.is_null() {
636 if self
638 .shared
639 .tail
640 .compare_exchange(
641 tail_ptr,
642 next_ptr,
643 Ordering::AcqRel,
644 Ordering::Acquire,
645 )
646 .is_ok()
647 {
648 unsafe { drop(Box::from_raw(tail_ptr)) };
650 }
651 continue;
652 }
653 }
654 } else {
655 let next_ptr = tail.next.load(Ordering::Acquire);
657 if !next_ptr.is_null() {
658 if self
659 .shared
660 .tail
661 .compare_exchange(tail_ptr, next_ptr, Ordering::AcqRel, Ordering::Acquire)
662 .is_ok()
663 {
664 unsafe { drop(Box::from_raw(tail_ptr)) };
665 }
666 continue;
667 }
668 }
669
670 if self.shared.closed.load(Ordering::Acquire)
672 && self.shared.num_senders.load(Ordering::Acquire) == 0
673 {
674 return Err(TryRecvError::Disconnected);
675 }
676
677 return Err(TryRecvError::Empty);
678 }
679 }
680
681 pub fn id(&self) -> ChannelId {
683 ChannelId(Arc::as_ptr(&self.shared) as usize)
684 }
685
686 pub fn is_closed(&self) -> bool {
688 self.shared.closed.load(Ordering::Acquire)
689 && self.shared.num_senders.load(Ordering::Acquire) == 0
690 }
691
692 pub fn is_empty(&self) -> bool {
694 let tail_ptr = self.shared.tail.load(Ordering::Acquire);
699 let tail = unsafe { &*tail_ptr };
700
701 if let Some(relative_idx) = tail.relative_index(self.recv_index) {
702 if tail.is_ready(relative_idx) {
704 return false; }
706 }
707
708 if self.shared.closed.load(Ordering::Acquire)
710 && self.shared.num_senders.load(Ordering::Acquire) == 0
711 {
712 return true; }
714
715 true
717 }
718
719 pub fn close(&mut self) {
721 self.shared.closed.store(true, Ordering::Release);
722 }
723
724 pub fn sender_strong_count(&self) -> usize {
726 self.shared.num_senders.load(Ordering::Acquire)
727 }
728
729 pub fn sender_weak_count(&self) -> usize {
731 self.shared.num_weak_senders.load(Ordering::Acquire)
732 }
733
734 pub fn len(&self) -> usize {
736 let mut count = 0;
738 let mut current = self.shared.tail.load(Ordering::Acquire);
739
740 while !current.is_null() {
741 let block = unsafe { &*current };
742 count += block.ready_count();
743 current = block.next.load(Ordering::Acquire);
744 }
745
746 count
747 }
748
749 fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
750 match self.try_recv() {
751 Ok(value) => Poll::Ready(Some(value)),
752 Err(TryRecvError::Disconnected) => Poll::Ready(None),
753 Err(TryRecvError::Empty) => {
754 self.shared.store_waker(cx.waker().clone());
756 Poll::Pending
757 }
758 }
759 }
760}
761
762impl<T> Drop for UnboundedReceiver<T> {
763 fn drop(&mut self) {
764 self.shared.closed.store(true, Ordering::Release);
765 }
766}
767
768struct RecvFuture<'a, T> {
770 receiver: &'a mut UnboundedReceiver<T>,
771}
772
773impl<'a, T> Future for RecvFuture<'a, T> {
774 type Output = Option<T>;
775
776 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
777 self.receiver.poll_recv(cx)
778 }
779}
780
781struct ClosedFuture<'a, T> {
783 sender: &'a UnboundedSender<T>,
784}
785
786impl<'a, T> Future for ClosedFuture<'a, T> {
787 type Output = ();
788
789 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
790 if self.sender.is_closed() {
791 Poll::Ready(())
792 } else {
793 Poll::Pending
796 }
797 }
798}
799
800#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
802pub struct ChannelId(usize);
803
804impl Hash for ChannelId {
805 fn hash<H: Hasher>(&self, state: &mut H) {
806 self.0.hash(state);
807 }
808}
809
810impl fmt::Display for ChannelId {
811 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
812 write!(f, "ChannelId(0x{:x})", self.0)
813 }
814}
815
816#[cfg(test)]
817mod tests {
818 use super::*;
819 use alloc::{vec, vec::Vec};
820
821 #[test]
822 fn test_basic_send_recv() {
823 let (tx, mut rx) = unbounded_channel::<i32>();
824
825 tx.send(1).unwrap();
826 tx.send(2).unwrap();
827 tx.send(3).unwrap();
828
829 assert_eq!(rx.try_recv().unwrap(), 1);
830 assert_eq!(rx.try_recv().unwrap(), 2);
831 assert_eq!(rx.try_recv().unwrap(), 3);
832 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
833 }
834
835 #[test]
836 fn test_channel_id() {
837 let (tx1, rx1) = unbounded_channel::<i32>();
838 let (tx2, rx2) = unbounded_channel::<i32>();
839
840 assert_eq!(tx1.id(), rx1.id());
841 assert_ne!(tx1.id(), tx2.id());
842 assert_ne!(rx1.id(), rx2.id());
843 }
844
845 #[test]
846 fn test_clone_sender() {
847 let (tx, mut rx) = unbounded_channel::<i32>();
848 let tx2 = tx.clone();
849
850 tx.send(1).unwrap();
851 tx2.send(2).unwrap();
852
853 assert_eq!(rx.try_recv().unwrap(), 1);
854 assert_eq!(rx.try_recv().unwrap(), 2);
855 }
856
857 #[test]
858 fn test_large_number_of_messages() {
859 let (tx, mut rx) = unbounded_channel::<usize>();
860
861 for i in 0..100 {
863 tx.send(i).unwrap();
864 }
865
866 for i in 0..100 {
868 assert_eq!(rx.try_recv().unwrap(), i);
869 }
870 }
871
872 #[test]
873 fn test_drop_sender_closes_channel() {
874 let (tx, mut rx) = unbounded_channel::<i32>();
875
876 tx.send(42).unwrap();
877 drop(tx);
878
879 assert_eq!(rx.try_recv().unwrap(), 42);
880 assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
881 }
882
883 #[test]
884 fn test_same_channel() {
885 let (tx1, _rx) = unbounded_channel::<i32>();
886 let tx2 = tx1.clone();
887 let (tx3, _rx2) = unbounded_channel::<i32>();
888
889 assert!(tx1.same_channel(&tx2));
890 assert!(!tx1.same_channel(&tx3));
891 }
892
893 #[test]
896 fn test_stress_many_messages() {
897 let (tx, mut rx) = unbounded_channel::<usize>();
898 const NUM_MESSAGES: usize = 10_000;
899
900 for i in 0..NUM_MESSAGES {
902 tx.send(i).unwrap();
903 }
904
905 for i in 0..NUM_MESSAGES {
907 assert_eq!(rx.try_recv().unwrap(), i);
908 }
909
910 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
911 }
912
913 #[test]
914 fn test_send_recv_interleaved() {
915 let (tx, mut rx) = unbounded_channel::<i32>();
916
917 let mut expected_recv = 0;
919 for i in 0..100 {
920 tx.send(i).unwrap();
921 if i % 2 == 0 {
922 assert_eq!(rx.try_recv().unwrap(), expected_recv);
923 expected_recv += 1;
924 }
925 }
926
927 while let Ok(value) = rx.try_recv() {
929 assert_eq!(value, expected_recv);
930 expected_recv += 1;
931 }
932
933 assert_eq!(expected_recv, 100); }
935
936 #[test]
937 fn test_drop_receiver_while_sending() {
938 let (tx, rx) = unbounded_channel::<i32>();
939
940 tx.send(1).unwrap();
942 tx.send(2).unwrap();
943
944 drop(rx);
946
947 assert!(matches!(tx.send(3), Err(SendError(3))));
949 assert!(tx.is_closed());
950 }
951
952 #[test]
953 fn test_multiple_sender_drops() {
954 let (tx, mut rx) = unbounded_channel::<i32>();
955 let tx2 = tx.clone();
956 let tx3 = tx.clone();
957
958 tx.send(1).unwrap();
959 tx2.send(2).unwrap();
960 tx3.send(3).unwrap();
961
962 drop(tx);
964 assert!(!rx.is_closed());
965
966 drop(tx2);
967 assert!(!rx.is_closed());
968
969 drop(tx3);
971
972 assert_eq!(rx.try_recv().unwrap(), 1);
974 assert_eq!(rx.try_recv().unwrap(), 2);
975 assert_eq!(rx.try_recv().unwrap(), 3);
976
977 assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
979 assert!(rx.is_closed());
980 }
981
982 #[test]
983 fn test_zero_sized_types() {
984 #[derive(Debug, PartialEq)]
985 struct ZeroSized;
986
987 let (tx, mut rx) = unbounded_channel::<ZeroSized>();
988
989 tx.send(ZeroSized).unwrap();
990 tx.send(ZeroSized).unwrap();
991
992 assert_eq!(rx.try_recv().unwrap(), ZeroSized);
993 assert_eq!(rx.try_recv().unwrap(), ZeroSized);
994 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
995 }
996
997 #[test]
998 fn test_large_types() {
999 #[derive(Debug, PartialEq)]
1000 struct LargeType([u8; 1024]);
1001
1002 let (tx, mut rx) = unbounded_channel::<LargeType>();
1003 let large_value = LargeType([42; 1024]);
1004
1005 tx.send(large_value).unwrap();
1006 let received = rx.try_recv().unwrap();
1007 assert_eq!(received.0[0], 42);
1008 assert_eq!(received.0[1023], 42);
1009 }
1010
1011 #[test]
1012 fn test_unwind_safety_basic() {
1013 #[derive(Debug)]
1014 struct PanicOnDrop(bool);
1015 impl Drop for PanicOnDrop {
1016 fn drop(&mut self) {
1017 if self.0 {
1018 }
1021 }
1022 }
1023
1024 let (tx, mut rx) = unbounded_channel::<PanicOnDrop>();
1025
1026 tx.send(PanicOnDrop(false)).unwrap();
1028
1029 assert_eq!(rx.try_recv().unwrap().0, false);
1031 tx.send(PanicOnDrop(false)).unwrap();
1032 assert_eq!(rx.try_recv().unwrap().0, false);
1033 }
1034
1035 #[test]
1036 fn test_block_boundary_conditions() {
1037 let (tx, mut rx) = unbounded_channel::<usize>();
1038
1039 for i in 0..BLOCK_CAP {
1041 tx.send(i).unwrap();
1042 }
1043
1044 tx.send(BLOCK_CAP).unwrap();
1046
1047 for i in 0..=BLOCK_CAP {
1049 assert_eq!(rx.try_recv().unwrap(), i);
1050 }
1051
1052 for i in 0..(BLOCK_CAP * 3) {
1054 tx.send(i).unwrap();
1055 }
1056
1057 for i in 0..(BLOCK_CAP * 3) {
1058 assert_eq!(rx.try_recv().unwrap(), i);
1059 }
1060 }
1061
1062 #[test]
1063 fn test_receiver_state_consistency() {
1064 let (tx, mut rx) = unbounded_channel::<i32>();
1065
1066 assert!(rx.is_empty());
1068 assert!(!rx.is_closed());
1069
1070 tx.send(42).unwrap();
1072 assert!(!rx.is_empty());
1073 assert!(!rx.is_closed());
1074
1075 assert_eq!(rx.try_recv().unwrap(), 42);
1077 assert!(rx.is_empty());
1078 assert!(!rx.is_closed());
1079
1080 drop(tx);
1082 assert!(rx.is_empty());
1083 assert!(rx.is_closed());
1084 assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1085 }
1086
1087 #[test]
1088 fn test_manual_close() {
1089 let (tx, mut rx) = unbounded_channel::<i32>();
1090
1091 tx.send(1).unwrap();
1092 tx.send(2).unwrap();
1093
1094 rx.close();
1096
1097 assert_eq!(rx.try_recv().unwrap(), 1);
1099 assert_eq!(rx.try_recv().unwrap(), 2);
1100
1101 assert!(tx.is_closed());
1103 assert!(matches!(tx.send(3), Err(SendError(3))));
1104 }
1105
1106 #[test]
1107 fn test_channel_id_consistency() {
1108 let (tx, rx) = unbounded_channel::<i32>();
1110 assert_eq!(tx.id(), rx.id());
1111
1112 let (tx2, rx2) = unbounded_channel::<i32>();
1114 assert_eq!(tx2.id(), rx2.id());
1115
1116 let tx_clone = tx.clone();
1119 assert_eq!(tx.id(), tx_clone.id());
1120 assert!(tx.same_channel(&tx_clone));
1121 assert!(!tx.same_channel(&tx2));
1122 }
1123
1124 #[test]
1125 fn test_drop_semantics() {
1126 use alloc::rc::Rc;
1127
1128 let drop_count = Rc::new(core::cell::RefCell::new(0));
1129
1130 #[derive(Debug)]
1131 struct DropCounter(Rc<core::cell::RefCell<i32>>);
1132 impl Drop for DropCounter {
1133 fn drop(&mut self) {
1134 *self.0.borrow_mut() += 1;
1135 }
1136 }
1137
1138 let (tx, mut rx) = unbounded_channel::<DropCounter>();
1139
1140 tx.send(DropCounter(drop_count.clone())).unwrap();
1142 tx.send(DropCounter(drop_count.clone())).unwrap();
1143 tx.send(DropCounter(drop_count.clone())).unwrap();
1144
1145 assert_eq!(*drop_count.borrow(), 0); let _value1 = rx.try_recv().unwrap();
1149 assert_eq!(*drop_count.borrow(), 0); drop(_value1);
1152 assert_eq!(*drop_count.borrow(), 1); drop(tx);
1156 drop(rx);
1157
1158 assert_eq!(*drop_count.borrow(), 3); }
1160
1161 #[test]
1162 fn test_memory_safety_after_close() {
1163 let (tx, mut rx) = unbounded_channel::<Vec<u8>>();
1164
1165 tx.send(vec![1, 2, 3]).unwrap();
1167 tx.send(vec![4, 5, 6]).unwrap();
1168
1169 rx.close();
1171
1172 assert!(matches!(tx.send(vec![7, 8, 9]), Err(_)));
1174
1175 assert_eq!(rx.try_recv().unwrap(), vec![1, 2, 3]);
1177 assert_eq!(rx.try_recv().unwrap(), vec![4, 5, 6]);
1178 }
1179
1180 #[test]
1181 fn test_ordering_guarantees() {
1182 let (tx, mut rx) = unbounded_channel::<usize>();
1183
1184 for i in 0..1000 {
1186 tx.send(i).unwrap();
1187 }
1188
1189 for i in 0..1000 {
1191 assert_eq!(rx.try_recv().unwrap(), i);
1192 }
1193 }
1194
1195 #[test]
1196 fn test_empty_channel_operations() {
1197 let (tx, mut rx) = unbounded_channel::<i32>();
1198
1199 assert!(rx.is_empty());
1201 assert!(!rx.is_closed());
1202 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
1203
1204 drop(tx);
1206 assert!(rx.is_empty());
1207 assert!(rx.is_closed());
1208 assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1209 }
1210
1211 #[test]
1212 fn test_channel_reuse_after_empty() {
1213 let (tx, mut rx) = unbounded_channel::<i32>();
1214
1215 for round in 0..10 {
1217 for i in 0..10 {
1218 tx.send(round * 10 + i).unwrap();
1219 }
1220
1221 for i in 0..10 {
1222 assert_eq!(rx.try_recv().unwrap(), round * 10 + i);
1223 }
1224
1225 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
1226 }
1227 }
1228
1229 #[test]
1230 fn test_mixed_operation_patterns() {
1231 let (tx, mut rx) = unbounded_channel::<usize>();
1232
1233 let mut next_send = 0;
1235 let mut next_recv = 0;
1236
1237 for _ in 0..100 {
1238 let send_count = (next_send % 5) + 1;
1240 for _ in 0..send_count {
1241 tx.send(next_send).unwrap();
1242 next_send += 1;
1243 }
1244
1245 let recv_count = (next_recv % 3) + 1;
1247 for _ in 0..recv_count {
1248 if let Ok(value) = rx.try_recv() {
1249 assert_eq!(value, next_recv);
1250 next_recv += 1;
1251 } else {
1252 break;
1253 }
1254 }
1255 }
1256
1257 while let Ok(value) = rx.try_recv() {
1259 assert_eq!(value, next_recv);
1260 next_recv += 1;
1261 }
1262
1263 assert_eq!(next_send, next_recv);
1264 }
1265
1266 #[test]
1269 fn test_weak_sender_basic() {
1270 let (tx, mut rx) = unbounded_channel::<i32>();
1271
1272 let weak_tx = tx.downgrade();
1274
1275 let upgraded_tx = weak_tx.upgrade().unwrap();
1277
1278 upgraded_tx.send(42).unwrap();
1280 assert_eq!(rx.try_recv().unwrap(), 42);
1281
1282 drop(tx);
1284
1285 upgraded_tx.send(43).unwrap();
1287 assert_eq!(rx.try_recv().unwrap(), 43);
1288
1289 drop(upgraded_tx);
1291
1292 assert!(weak_tx.upgrade().is_none());
1294 }
1295
1296 #[test]
1297 fn test_weak_sender_upgrade_failure() {
1298 let (tx, _rx) = unbounded_channel::<i32>();
1299 let weak_tx = tx.downgrade();
1300
1301 drop(tx);
1303
1304 assert!(weak_tx.upgrade().is_none());
1306 }
1307
1308 #[test]
1309 fn test_weak_sender_counts() {
1310 let (tx, rx) = unbounded_channel::<i32>();
1311
1312 assert_eq!(tx.strong_count(), 1);
1314 assert_eq!(tx.weak_count(), 0);
1315 assert_eq!(rx.sender_strong_count(), 1);
1316 assert_eq!(rx.sender_weak_count(), 0);
1317
1318 let weak_tx = tx.downgrade();
1320 assert_eq!(tx.strong_count(), 1);
1321 assert_eq!(tx.weak_count(), 1);
1322 assert_eq!(weak_tx.strong_count(), 1);
1323 assert_eq!(weak_tx.weak_count(), 1);
1324 assert_eq!(rx.sender_strong_count(), 1);
1325 assert_eq!(rx.sender_weak_count(), 1);
1326
1327 let tx2 = tx.clone();
1329 assert_eq!(tx.strong_count(), 2);
1330 assert_eq!(tx.weak_count(), 1);
1331 assert_eq!(tx2.strong_count(), 2);
1332 assert_eq!(weak_tx.strong_count(), 2);
1333 assert_eq!(weak_tx.weak_count(), 1);
1334 assert_eq!(rx.sender_strong_count(), 2);
1335 assert_eq!(rx.sender_weak_count(), 1);
1336
1337 let weak_tx2 = weak_tx.clone();
1339 assert_eq!(tx.strong_count(), 2);
1340 assert_eq!(tx.weak_count(), 2);
1341 assert_eq!(weak_tx.weak_count(), 2);
1342 assert_eq!(weak_tx2.weak_count(), 2);
1343 assert_eq!(rx.sender_strong_count(), 2);
1344 assert_eq!(rx.sender_weak_count(), 2);
1345
1346 drop(weak_tx);
1348 assert_eq!(tx.weak_count(), 1);
1349 assert_eq!(weak_tx2.weak_count(), 1);
1350 assert_eq!(rx.sender_weak_count(), 1);
1351
1352 drop(tx);
1354 assert_eq!(tx2.strong_count(), 1);
1355 assert_eq!(weak_tx2.strong_count(), 1);
1356 assert_eq!(rx.sender_strong_count(), 1);
1357
1358 drop(tx2);
1360 assert_eq!(weak_tx2.strong_count(), 0);
1361 assert_eq!(weak_tx2.weak_count(), 1);
1362 assert_eq!(rx.sender_strong_count(), 0);
1363 assert_eq!(rx.sender_weak_count(), 1);
1364
1365 assert!(weak_tx2.upgrade().is_none());
1367 }
1368
1369 #[test]
1370 fn test_weak_sender_channel_close() {
1371 let (tx, rx) = unbounded_channel::<i32>();
1372 let weak_tx = tx.downgrade();
1373
1374 drop(tx);
1376
1377 assert!(rx.is_closed());
1379
1380 assert!(weak_tx.upgrade().is_none());
1382 }
1383
1384 #[test]
1385 fn test_sender_ordering_and_equality() {
1386 let (tx1, _rx1) = unbounded_channel::<i32>();
1387 let (tx2, _rx2) = unbounded_channel::<i32>();
1388
1389 let tx1_clone = tx1.clone();
1390 let weak_tx1 = tx1.downgrade();
1391 let weak_tx2 = tx2.downgrade();
1392
1393 assert_eq!(tx1, tx1_clone);
1395
1396 assert_ne!(tx1, tx2);
1398
1399 assert_eq!(weak_tx1, weak_tx1.clone());
1401
1402 assert_ne!(weak_tx1, weak_tx2);
1404
1405 let ordering1 = tx1.cmp(&tx2);
1407 let ordering2 = tx1.cmp(&tx2);
1408 assert_eq!(ordering1, ordering2); use alloc::collections::BTreeSet;
1412 let mut set = BTreeSet::new();
1413 set.insert(tx1.clone());
1414 set.insert(tx1_clone.clone());
1415 assert_eq!(set.len(), 1); set.insert(tx2.clone());
1418 assert_eq!(set.len(), 2); }
1420
1421 #[test]
1422 fn test_weak_sender_multiple_upgrades() {
1423 let (tx, mut rx) = unbounded_channel::<i32>();
1424 let weak_tx = tx.downgrade();
1425
1426 let upgraded1 = weak_tx.upgrade().unwrap();
1428 let upgraded2 = weak_tx.upgrade().unwrap();
1429
1430 upgraded1.send(1).unwrap();
1431 upgraded2.send(2).unwrap();
1432
1433 assert_eq!(rx.try_recv().unwrap(), 1);
1434 assert_eq!(rx.try_recv().unwrap(), 2);
1435
1436 drop(tx);
1438 drop(upgraded1);
1439
1440 let upgraded3 = weak_tx.upgrade().unwrap();
1442 upgraded3.send(3).unwrap();
1443 assert_eq!(rx.try_recv().unwrap(), 3);
1444
1445 drop(upgraded2);
1447 drop(upgraded3);
1448
1449 assert!(weak_tx.upgrade().is_none());
1451 }
1452
1453 #[test]
1454 fn test_sender_hash_collections() {
1455 use alloc::collections::BTreeSet;
1456
1457 let (tx1, _rx1) = unbounded_channel::<i32>();
1458 let (tx2, _rx2) = unbounded_channel::<i32>();
1459 let tx1_clone = tx1.clone();
1460
1461 let mut set = BTreeSet::new();
1463
1464 set.insert(tx1.clone());
1466 assert_eq!(set.len(), 1);
1467
1468 set.insert(tx1_clone);
1470 assert_eq!(set.len(), 1);
1471
1472 set.insert(tx2);
1474 assert_eq!(set.len(), 2);
1475
1476 let weak_tx1 = tx1.downgrade();
1478 let weak_tx1_clone = weak_tx1.clone();
1479
1480 let mut weak_set = BTreeSet::new();
1481 weak_set.insert(weak_tx1);
1482 weak_set.insert(weak_tx1_clone); assert_eq!(weak_set.len(), 1);
1484 }
1485
1486 #[test]
1489 fn test_len_method() {
1490 let (tx, rx) = unbounded_channel::<i32>();
1491
1492 assert_eq!(rx.len(), 0);
1494 assert!(rx.is_empty());
1495
1496 tx.send(1).unwrap();
1498 assert_eq!(rx.len(), 1);
1499 assert!(!rx.is_empty());
1500
1501 tx.send(2).unwrap();
1502 tx.send(3).unwrap();
1503 assert_eq!(rx.len(), 3);
1504 assert!(!rx.is_empty());
1505 }
1506
1507 #[test]
1510 fn test_tokio_drop_behavior_compatibility() {
1511 use alloc::sync::Arc;
1512 use core::sync::atomic::{AtomicUsize, Ordering};
1513
1514 #[derive(Debug)]
1516 struct DropCounter {
1517 id: usize,
1518 counter: Arc<AtomicUsize>,
1519 }
1520
1521 impl Drop for DropCounter {
1522 fn drop(&mut self) {
1523 self.counter.fetch_add(1, Ordering::SeqCst);
1524 }
1525 }
1526
1527 const NUM_MESSAGES: usize = 100; let our_drop_counter = Arc::new(AtomicUsize::new(0));
1531 {
1532 let (tx, mut rx) = unbounded_channel::<DropCounter>();
1533
1534 for i in 0..NUM_MESSAGES {
1536 let msg = DropCounter {
1537 id: i,
1538 counter: Arc::clone(&our_drop_counter),
1539 };
1540 tx.send(msg).unwrap();
1541 }
1542
1543 for _ in 0..10 {
1545 let _msg = rx.try_recv().unwrap();
1546 }
1548
1549 drop(rx);
1551 drop(tx);
1552 }
1553
1554 let our_dropped_count = our_drop_counter.load(Ordering::SeqCst);
1555
1556 assert_eq!(our_dropped_count, NUM_MESSAGES,
1558 "Our implementation should drop all {} messages", NUM_MESSAGES);
1559 }
1560
1561 #[test]
1566 fn test_tokio_exact_drop_condition() {
1567 use alloc::sync::Arc;
1568 use core::sync::atomic::{AtomicUsize, Ordering};
1569 use alloc::vec::Vec;
1570
1571 #[derive(Debug)]
1572 struct DropTracker {
1573 id: usize,
1574 counter: Arc<AtomicUsize>,
1575 }
1576
1577 impl Drop for DropTracker {
1578 fn drop(&mut self) {
1579 self.counter.fetch_add(1, Ordering::SeqCst);
1580 }
1581 }
1582
1583 const NUM_MESSAGES: usize = 50; let drop_counter = Arc::new(AtomicUsize::new(0));
1585
1586 let (tx, mut rx) = unbounded_channel::<DropTracker>();
1587 let tx_clone = tx.clone();
1588
1589 for i in 0..NUM_MESSAGES {
1591 let msg = DropTracker {
1592 id: i,
1593 counter: Arc::clone(&drop_counter),
1594 };
1595 tx.send(msg).unwrap();
1596 }
1597
1598 let received_before_drop = 10;
1600 for _ in 0..received_before_drop {
1601 let _msg = rx.try_recv().unwrap();
1602 }
1603
1604 drop(rx);
1606
1607 let mut send_errors = 0;
1609 let mut failed_messages = Vec::new();
1610
1611 for i in NUM_MESSAGES..NUM_MESSAGES + 10 {
1613 let msg = DropTracker {
1614 id: i,
1615 counter: Arc::clone(&drop_counter),
1616 };
1617
1618 match tx_clone.send(msg) {
1619 Ok(_) => {
1620 panic!("Send succeeded after receiver drop - this violates Tokio behavior");
1622 }
1623 Err(send_error) => {
1624 send_errors += 1;
1625 failed_messages.push(send_error.0); }
1627 }
1628 }
1629
1630 drop(tx);
1632 drop(tx_clone);
1633
1634 drop(failed_messages);
1636
1637 let final_drop_count = drop_counter.load(Ordering::SeqCst);
1638
1639 let expected_total_drops = NUM_MESSAGES + send_errors;
1644 assert_eq!(final_drop_count, expected_total_drops,
1645 "Expected {} drops (original {} + failed sends {}), got {}",
1646 expected_total_drops, NUM_MESSAGES, send_errors, final_drop_count);
1647
1648 assert!(send_errors > 0, "Send attempts after receiver drop should fail");
1650 }
1651
1652 #[test]
1653 fn test_tokio_api_compatibility() {
1654 let (tx, mut rx) = unbounded_channel::<i32>();
1655
1656 assert!(!tx.is_closed());
1658 assert!(!rx.is_closed());
1659 assert!(rx.is_empty());
1660 assert_eq!(rx.len(), 0);
1661
1662 assert_eq!(tx.strong_count(), 1);
1664 assert_eq!(tx.weak_count(), 0);
1665 assert_eq!(rx.sender_strong_count(), 1);
1666 assert_eq!(rx.sender_weak_count(), 0);
1667
1668 assert!(tx.same_channel(&tx));
1670 assert_eq!(tx.id(), rx.id());
1671
1672 let _weak_tx = tx.downgrade();
1674 assert_eq!(tx.weak_count(), 1);
1675 assert_eq!(rx.sender_weak_count(), 1);
1676
1677 tx.send(42).unwrap();
1679 assert_eq!(rx.len(), 1);
1680 assert!(!rx.is_empty());
1681
1682 assert_eq!(rx.try_recv().unwrap(), 42);
1684 assert_eq!(rx.len(), 0);
1685 assert!(rx.is_empty());
1686 }
1687
1688 #[test]
1691 fn test_sender_drop_behavior_comprehensive() {
1692 use alloc::sync::Arc;
1693 use core::sync::atomic::{AtomicUsize, Ordering};
1694
1695 #[derive(Debug)]
1696 struct DropTracker {
1697 id: usize,
1698 counter: Arc<AtomicUsize>,
1699 }
1700
1701 impl Drop for DropTracker {
1702 fn drop(&mut self) {
1703 self.counter.fetch_add(1, Ordering::SeqCst);
1704 }
1705 }
1706
1707 {
1709 let drop_counter = Arc::new(AtomicUsize::new(0));
1710 const NUM_MESSAGES: usize = 50;
1711
1712 let (tx, mut rx) = unbounded_channel::<DropTracker>();
1713 let tx2 = tx.clone();
1714 let tx3 = tx.clone();
1715
1716 for i in 0..NUM_MESSAGES {
1718 let msg = DropTracker {
1719 id: i,
1720 counter: Arc::clone(&drop_counter),
1721 };
1722
1723 match i % 3 {
1725 0 => tx.send(msg).unwrap(),
1726 1 => tx2.send(msg).unwrap(),
1727 2 => tx3.send(msg).unwrap(),
1728 _ => unreachable!(),
1729 }
1730 }
1731
1732 let received_count = 15;
1734 for _ in 0..received_count {
1735 let _msg = rx.try_recv().unwrap();
1736 }
1737
1738 assert_eq!(rx.len(), NUM_MESSAGES - received_count);
1739 assert!(!rx.is_closed());
1740
1741 drop(tx);
1743 drop(tx2);
1744 drop(tx3);
1745
1746 assert!(rx.is_closed());
1748
1749 let mut remaining_received = 0;
1751 while let Ok(_msg) = rx.try_recv() {
1752 remaining_received += 1;
1753 }
1754
1755 assert_eq!(remaining_received, NUM_MESSAGES - received_count);
1756
1757 assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1759
1760 drop(rx);
1762
1763 assert_eq!(drop_counter.load(Ordering::SeqCst), NUM_MESSAGES,
1765 "All messages should be dropped when senders and receiver are dropped");
1766 }
1767
1768 {
1770 let drop_counter = Arc::new(AtomicUsize::new(0));
1771 const NUM_MESSAGES: usize = 30;
1772
1773 let (tx, mut rx) = unbounded_channel::<DropTracker>();
1774 let tx2 = tx.clone();
1775
1776 for i in 0..NUM_MESSAGES {
1778 let msg = DropTracker {
1779 id: i + 100, counter: Arc::clone(&drop_counter),
1781 };
1782
1783 if i % 2 == 0 {
1784 tx.send(msg).unwrap();
1785 } else {
1786 tx2.send(msg).unwrap();
1787 }
1788
1789 let _received = rx.try_recv().unwrap();
1791 }
1792
1793 assert!(rx.is_empty());
1795 assert!(!rx.is_closed());
1796 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
1797
1798 drop(tx);
1800 drop(tx2);
1801
1802 assert!(rx.is_empty());
1804 assert!(rx.is_closed());
1805 assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1806
1807 drop(rx);
1808
1809 assert_eq!(drop_counter.load(Ordering::SeqCst), NUM_MESSAGES,
1811 "All messages should be dropped when received");
1812 }
1813
1814 {
1816 let drop_counter = Arc::new(AtomicUsize::new(0));
1817 const NUM_MESSAGES: usize = 40;
1818
1819 let (tx, mut rx) = unbounded_channel::<DropTracker>();
1820 let tx2 = tx.clone();
1821 let weak_tx = tx.downgrade();
1822
1823 for i in 0..NUM_MESSAGES / 2 {
1825 let msg = DropTracker {
1826 id: i + 200, counter: Arc::clone(&drop_counter),
1828 };
1829 tx.send(msg).unwrap();
1830 }
1831
1832 drop(tx);
1834 assert!(!rx.is_closed()); for i in NUM_MESSAGES / 2..NUM_MESSAGES {
1838 let msg = DropTracker {
1839 id: i + 200,
1840 counter: Arc::clone(&drop_counter),
1841 };
1842 tx2.send(msg).unwrap();
1843 }
1844
1845 let upgraded = weak_tx.upgrade();
1847 assert!(upgraded.is_some());
1848 drop(upgraded); drop(tx2);
1852
1853 let mut received_all = 0;
1855 while let Ok(_msg) = rx.try_recv() {
1856 received_all += 1;
1857 }
1858
1859 assert_eq!(received_all, NUM_MESSAGES);
1860
1861 assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1863 assert!(rx.is_closed());
1864
1865 assert!(weak_tx.upgrade().is_none());
1867
1868 drop(rx);
1869 drop(weak_tx);
1870
1871 assert_eq!(drop_counter.load(Ordering::SeqCst), NUM_MESSAGES,
1873 "All messages should be dropped with gradual sender drop");
1874 }
1875 }
1876}