theta_sync/
lib.rs

1#![no_std]
2
3extern crate alloc;
4
5#[cfg(test)]
6#[path = "attacks/mod.rs"]
7pub mod attacks;
8
9use alloc::{boxed::Box, sync::Arc};
10use core::{
11    cell::UnsafeCell,
12    fmt,
13    future::Future,
14    hash::{Hash, Hasher},
15    mem::{ManuallyDrop, MaybeUninit},
16    pin::Pin,
17    ptr,
18    sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering},
19    task::{Context, Poll, Waker},
20};
21
22/// Block capacity - platform dependent for optimal memory usage
23/// 32 messages on 64-bit targets, 16 on 32-bit targets (same as Tokio)
24#[cfg(target_pointer_width = "64")]
25const BLOCK_CAP: usize = 32;
26#[cfg(target_pointer_width = "32")]
27const BLOCK_CAP: usize = 16;
28
29/// Error returned when sending on a closed channel
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct SendError<T>(pub T);
32
33impl<T> fmt::Display for SendError<T> {
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        write!(f, "sending on a closed channel")
36    }
37}
38
39/// Error returned when receiving fails
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum TryRecvError {
42    /// The channel is currently empty
43    Empty,
44    /// The channel is closed and empty
45    Disconnected,
46}
47
48impl fmt::Display for TryRecvError {
49    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50        match self {
51            TryRecvError::Empty => write!(f, "channel is empty"),
52            TryRecvError::Disconnected => write!(f, "channel is disconnected"),
53        }
54    }
55}
56
57/// Creates an unbounded MPSC channel
58pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
59    let block = Block::new(0);
60    let block_ptr = Box::into_raw(Box::new(block));
61
62    let shared = Arc::new(Shared {
63        head: AtomicPtr::new(block_ptr),
64        tail: AtomicPtr::new(block_ptr),
65        rx_waker: AtomicPtr::new(ptr::null_mut()),
66        waker_lock: AtomicBool::new(false),
67        num_senders: AtomicUsize::new(1),
68        num_weak_senders: AtomicUsize::new(0),
69        closed: AtomicBool::new(false),
70    });
71
72    let sender = UnboundedSender {
73        shared: Arc::clone(&shared),
74    };
75    let receiver = UnboundedReceiver {
76        shared,
77        recv_index: 0,
78    };
79
80    (sender, receiver)
81}
82
83/// Block structure for the linked list
84struct Block<T> {
85    /// Next block in the linked list
86    next: AtomicPtr<Block<T>>,
87    /// Starting index for this block
88    start_index: usize,
89    /// Values stored in this block
90    values: UnsafeCell<[MaybeUninit<ManuallyDrop<T>>; BLOCK_CAP]>,
91    /// Bit set indicating which slots are ready
92    ready_slots: AtomicUsize,
93    /// Number of values written to this block
94    len: AtomicUsize,
95}
96
97impl<T> Block<T> {
98    fn new(start_index: usize) -> Self {
99        Self {
100            next: AtomicPtr::new(ptr::null_mut()),
101            start_index,
102            values: UnsafeCell::new([const { MaybeUninit::uninit() }; BLOCK_CAP]),
103            ready_slots: AtomicUsize::new(0),
104            len: AtomicUsize::new(0),
105        }
106    }
107
108    /// Returns the relative index within this block for the given global index
109    fn relative_index(&self, index: usize) -> Option<usize> {
110        if index >= self.start_index && index < self.start_index + BLOCK_CAP {
111            Some(index - self.start_index)
112        } else {
113            None
114        }
115    }
116
117    /// Writes a value to the specified slot
118    /// Returns Ok(()) if successful, Err(value) if slot is already occupied
119    fn write(&self, relative_index: usize, value: T) -> Result<(), T> {
120        if relative_index >= BLOCK_CAP {
121            return Err(value);
122        }
123
124        let mask = 1 << relative_index;
125
126        // Try to set the ready bit atomically
127        let prev_ready = self.ready_slots.fetch_or(mask, Ordering::AcqRel);
128
129        if prev_ready & mask != 0 {
130            // Slot already occupied, return the value back
131            return Err(value);
132        }
133
134        // Write the value to the slot
135        unsafe {
136            let values = &mut *self.values.get();
137            values[relative_index].write(ManuallyDrop::new(value));
138        }
139
140        // Don't increment len here since it's already done atomically in send()
141        Ok(())
142    }
143
144    /// Reads a value from the specified slot
145    /// Returns None if slot is empty or not ready
146    fn read(&self, relative_index: usize) -> Option<T> {
147        if relative_index >= BLOCK_CAP {
148            return None;
149        }
150
151        let mask = 1 << relative_index;
152
153        // Try to clear the ready bit atomically to claim the slot
154        let prev_ready = self.ready_slots.fetch_and(!mask, Ordering::AcqRel);
155
156        if prev_ready & mask == 0 {
157            // Slot was not ready, restore the bit and return None
158            return None;
159        }
160
161        unsafe {
162            let values = &*self.values.get();
163            Some(ManuallyDrop::into_inner(
164                values[relative_index].assume_init_read(),
165            ))
166        }
167    }
168
169    /// Checks if a slot is ready
170    fn is_ready(&self, relative_index: usize) -> bool {
171        if relative_index >= BLOCK_CAP {
172            return false;
173        }
174
175        let mask = 1 << relative_index;
176        self.ready_slots.load(Ordering::Acquire) & mask != 0
177    }
178
179    /// Returns the number of ready slots
180    fn ready_count(&self) -> usize {
181        self.ready_slots.load(Ordering::Acquire).count_ones() as usize
182    }
183}
184
185impl<T> Drop for Block<T> {
186    fn drop(&mut self) {
187        let ready = self.ready_slots.load(Ordering::Relaxed);
188        unsafe {
189            let values = &mut *self.values.get();
190            for i in 0..BLOCK_CAP {
191                if ready & (1 << i) != 0 {
192                    ManuallyDrop::drop(values[i].assume_init_mut());
193                }
194            }
195        }
196    }
197}
198
199/// Shared state between senders and receiver
200struct Shared<T> {
201    /// Pointer to the head block (where new values are written)
202    head: AtomicPtr<Block<T>>,
203    /// Pointer to the tail block (where values are read)
204    tail: AtomicPtr<Block<T>>,
205    /// Waker for the receiver task - using atomic approach for no_std compatibility
206    rx_waker: AtomicPtr<Waker>,
207    /// Atomic flag to prevent concurrent waker access
208    waker_lock: AtomicBool,
209    /// Number of active senders (strong references)
210    num_senders: AtomicUsize,
211    /// Number of weak senders
212    num_weak_senders: AtomicUsize,
213    /// Channel closed flag
214    closed: AtomicBool,
215}
216
217impl<T> Shared<T> {
218    /// Atomically take and wake the stored waker
219    fn wake_receiver(&self) {
220        // Quick check if there's even a waker to avoid unnecessary locking
221        if self.rx_waker.load(Ordering::Acquire).is_null() {
222            return; // No waker, skip the expensive spin lock
223        }
224
225        // Spin lock to ensure exclusive access to waker
226        while self
227            .waker_lock
228            .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
229            .is_err()
230        {
231            core::hint::spin_loop();
232        }
233
234        // Take the waker if it exists
235        let waker_ptr = self.rx_waker.swap(ptr::null_mut(), Ordering::Acquire);
236        if !waker_ptr.is_null() {
237            let waker = unsafe { Box::from_raw(waker_ptr) };
238            // Release lock before waking to avoid holding it during wake
239            self.waker_lock.store(false, Ordering::Release);
240            waker.wake();
241        } else {
242            // Release lock
243            self.waker_lock.store(false, Ordering::Release);
244        }
245    }
246
247    /// Atomically store a new waker
248    fn store_waker(&self, waker: Waker) {
249        // Spin lock to ensure exclusive access to waker
250        while self
251            .waker_lock
252            .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
253            .is_err()
254        {
255            core::hint::spin_loop();
256        }
257
258        // Replace the old waker
259        let new_waker_ptr = Box::into_raw(Box::new(waker));
260        let old_waker_ptr = self.rx_waker.swap(new_waker_ptr, Ordering::Release);
261
262        // Clean up old waker if it existed
263        if !old_waker_ptr.is_null() {
264            unsafe { drop(Box::from_raw(old_waker_ptr)) };
265        }
266
267        // Release lock
268        self.waker_lock.store(false, Ordering::Release);
269    }
270}
271
272unsafe impl<T: Send> Send for Shared<T> {}
273unsafe impl<T: Send> Sync for Shared<T> {}
274
275impl<T> Drop for Shared<T> {
276    fn drop(&mut self) {
277        // Clean up waker
278        let waker_ptr = self.rx_waker.load(Ordering::Relaxed);
279        if !waker_ptr.is_null() {
280            unsafe { drop(Box::from_raw(waker_ptr)) };
281        }
282
283        // Clean up all blocks in the linked list
284        let mut current = self.tail.load(Ordering::Relaxed);
285        while !current.is_null() {
286            let block = unsafe { Box::from_raw(current) };
287            current = block.next.load(Ordering::Relaxed);
288        }
289    }
290}
291
292/// The sending half of an unbounded MPSC channel
293pub struct UnboundedSender<T> {
294    shared: Arc<Shared<T>>,
295}
296
297/// An unbounded sender that does not prevent the channel from being closed.
298///
299/// If all [`UnboundedSender`] instances of a channel were dropped and only
300/// `WeakUnboundedSender` instances remain, the channel is closed.
301///
302/// In order to send messages, the `WeakUnboundedSender` needs to be upgraded using
303/// [`WeakUnboundedSender::upgrade`], which returns `Option<UnboundedSender>`. It returns `None`
304/// if all `UnboundedSender`s have been dropped, and otherwise it returns an `UnboundedSender`.
305///
306/// [`UnboundedSender`]: UnboundedSender
307/// [`WeakUnboundedSender::upgrade`]: WeakUnboundedSender::upgrade
308///
309/// # Examples
310///
311/// ```
312/// use theta_sync::unbounded_channel;
313///
314/// let (tx, _rx) = unbounded_channel::<i32>();
315/// let tx_weak = tx.downgrade();
316///
317/// // Upgrading will succeed because `tx` still exists.
318/// assert!(tx_weak.upgrade().is_some());
319///
320/// // If we drop `tx`, then it will fail.
321/// drop(tx);
322/// assert!(tx_weak.upgrade().is_none());
323/// ```
324pub struct WeakUnboundedSender<T> {
325    shared: Arc<Shared<T>>,
326}
327
328impl<T> Clone for WeakUnboundedSender<T> {
329    fn clone(&self) -> Self {
330        self.shared.num_weak_senders.fetch_add(1, Ordering::Relaxed);
331        Self {
332            shared: Arc::clone(&self.shared),
333        }
334    }
335}
336
337impl<T> Drop for WeakUnboundedSender<T> {
338    fn drop(&mut self) {
339        self.shared.num_weak_senders.fetch_sub(1, Ordering::AcqRel);
340    }
341}
342
343impl<T> Clone for UnboundedSender<T> {
344    fn clone(&self) -> Self {
345        self.shared.num_senders.fetch_add(1, Ordering::Relaxed);
346        Self {
347            shared: Arc::clone(&self.shared),
348        }
349    }
350}
351
352impl<T> Drop for UnboundedSender<T> {
353    fn drop(&mut self) {
354        let prev_count = self.shared.num_senders.fetch_sub(1, Ordering::AcqRel);
355        if prev_count == 1 {
356            // Last sender dropped, close the channel
357            self.shared.closed.store(true, Ordering::Release);
358
359            // Wake up the receiver
360            self.shared.wake_receiver();
361        }
362    }
363}
364
365impl<T> UnboundedSender<T> {
366    /// Sends a value on this channel
367    ///
368    /// This method never blocks. It will return an error if the receiver has been dropped.
369    pub fn send(&self, mut value: T) -> Result<(), SendError<T>> {
370        if self.shared.closed.load(Ordering::Acquire) {
371            return Err(SendError(value));
372        }
373
374        let mut attempts = 0;
375        loop {
376            let head_ptr = self.shared.head.load(Ordering::Acquire);
377            let head = unsafe { &*head_ptr };
378
379            // Atomically allocate the next slot to maintain FIFO ordering
380            let slot_idx = head.len.fetch_add(1, Ordering::AcqRel);
381
382            if slot_idx < BLOCK_CAP {
383                // We have a valid slot index, try to write to it
384                // This should always succeed since we atomically allocated the slot
385                match head.write(slot_idx, value) {
386                    Ok(()) => {
387                        // Successfully written, wake receiver
388                        self.shared.wake_receiver();
389                        return Ok(());
390                    }
391                    Err(returned_value) => {
392                        // This should rarely happen, but if it does, the slot was somehow occupied
393                        // We need to retry with a new allocation
394                        value = returned_value;
395                        head.len.fetch_sub(1, Ordering::AcqRel); // Rollback the allocation
396                                                                 // Continue the loop to try again
397                        continue;
398                    }
399                }
400            } else {
401                // Block is full, reset the len counter to prevent overflow
402                head.len.store(BLOCK_CAP, Ordering::Release);
403            }
404
405            // Block is full, need to allocate a new one
406            let next_ptr = head.next.load(Ordering::Acquire);
407            if next_ptr.is_null() {
408                // Try to allocate and link a new block
409                let new_block = Box::into_raw(Box::new(Block::new(head.start_index + BLOCK_CAP)));
410
411                match head.next.compare_exchange_weak(
412                    ptr::null_mut(),
413                    new_block,
414                    Ordering::AcqRel,
415                    Ordering::Acquire,
416                ) {
417                    Ok(_) => {
418                        // Successfully linked new block, update head
419                        self.shared.head.store(new_block, Ordering::Release);
420                    }
421                    Err(_) => {
422                        // Someone else allocated a block, free ours
423                        unsafe { drop(Box::from_raw(new_block)) };
424                    }
425                }
426            } else {
427                // Move head to the next block
428                self.shared
429                    .head
430                    .compare_exchange_weak(head_ptr, next_ptr, Ordering::AcqRel, Ordering::Acquire)
431                    .ok();
432            }
433
434            attempts += 1;
435            if attempts > 1000 {
436                // Prevent infinite loops in pathological cases
437                core::hint::spin_loop();
438                attempts = 0;
439            }
440        }
441    }
442
443    /// Returns a unique identifier for this channel based on the shared pointer address
444    pub fn id(&self) -> usize {
445        Arc::as_ptr(&self.shared) as usize
446    }
447
448    /// Checks if the channel is closed
449    pub fn is_closed(&self) -> bool {
450        self.shared.closed.load(Ordering::Acquire)
451    }
452
453    /// Returns true if this sender and another sender send to the same channel
454    pub fn same_channel(&self, other: &Self) -> bool {
455        Arc::ptr_eq(&self.shared, &other.shared)
456    }
457
458    /// Converts the `UnboundedSender` to a [`WeakUnboundedSender`] that does not count
459    /// towards RAII semantics, i.e. if all `UnboundedSender` instances of the
460    /// channel were dropped and only `WeakUnboundedSender` instances remain,
461    /// the channel is closed.
462    #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
463    pub fn downgrade(&self) -> WeakUnboundedSender<T> {
464        self.shared.num_weak_senders.fetch_add(1, Ordering::Relaxed);
465        WeakUnboundedSender {
466            shared: Arc::clone(&self.shared),
467        }
468    }
469
470    /// Returns the number of [`UnboundedSender`] handles.
471    pub fn strong_count(&self) -> usize {
472        self.shared.num_senders.load(Ordering::Acquire)
473    }
474
475    /// Returns the number of [`WeakUnboundedSender`] handles.
476    pub fn weak_count(&self) -> usize {
477        self.shared.num_weak_senders.load(Ordering::Acquire)
478    }
479
480    /// Completes when the receiver has dropped.
481    ///
482    /// This allows the producers to get notified when interest in the produced values is canceled and immediately stop doing work.
483    pub async fn closed(&self) {
484        ClosedFuture { sender: self }.await
485    }
486}
487
488impl<T> WeakUnboundedSender<T> {
489    /// Tries to convert a `WeakUnboundedSender` into an [`UnboundedSender`].
490    /// This will return `Some` if there are other `UnboundedSender` instances alive and
491    /// the channel wasn't previously dropped, otherwise `None` is returned.
492    pub fn upgrade(&self) -> Option<UnboundedSender<T>> {
493        let mut count = self.shared.num_senders.load(Ordering::Acquire);
494
495        loop {
496            if count == 0 {
497                // No strong senders remaining, cannot upgrade
498                return None;
499            }
500
501            match self.shared.num_senders.compare_exchange_weak(
502                count,
503                count + 1,
504                Ordering::AcqRel,
505                Ordering::Acquire,
506            ) {
507                Ok(_) => {
508                    return Some(UnboundedSender {
509                        shared: Arc::clone(&self.shared),
510                    });
511                }
512                Err(actual) => count = actual,
513            }
514        }
515    }
516
517    /// Returns the number of [`UnboundedSender`] handles.
518    pub fn strong_count(&self) -> usize {
519        self.shared.num_senders.load(Ordering::Acquire)
520    }
521
522    /// Returns the number of [`WeakUnboundedSender`] handles.
523    pub fn weak_count(&self) -> usize {
524        self.shared.num_weak_senders.load(Ordering::Acquire)
525    }
526}
527
528impl<T> PartialEq for UnboundedSender<T> {
529    fn eq(&self, other: &Self) -> bool {
530        self.id() == other.id()
531    }
532}
533
534impl<T> Eq for UnboundedSender<T> {}
535
536impl<T> PartialOrd for UnboundedSender<T> {
537    fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
538        Some(self.cmp(other))
539    }
540}
541
542impl<T> Ord for UnboundedSender<T> {
543    fn cmp(&self, other: &Self) -> core::cmp::Ordering {
544        self.id().cmp(&other.id())
545    }
546}
547
548impl<T> Hash for UnboundedSender<T> {
549    fn hash<H: Hasher>(&self, state: &mut H) {
550        self.id().hash(state);
551    }
552}
553
554impl<T> PartialEq for WeakUnboundedSender<T> {
555    fn eq(&self, other: &Self) -> bool {
556        Arc::ptr_eq(&self.shared, &other.shared)
557    }
558}
559
560impl<T> Eq for WeakUnboundedSender<T> {}
561
562impl<T> PartialOrd for WeakUnboundedSender<T> {
563    fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
564        Some(self.cmp(other))
565    }
566}
567
568impl<T> Ord for WeakUnboundedSender<T> {
569    fn cmp(&self, other: &Self) -> core::cmp::Ordering {
570        let self_ptr = Arc::as_ptr(&self.shared) as usize;
571        let other_ptr = Arc::as_ptr(&other.shared) as usize;
572        self_ptr.cmp(&other_ptr)
573    }
574}
575
576impl<T> Hash for WeakUnboundedSender<T> {
577    fn hash<H: Hasher>(&self, state: &mut H) {
578        let ptr = Arc::as_ptr(&self.shared) as usize;
579        ptr.hash(state);
580    }
581}
582
583impl<T> fmt::Debug for UnboundedSender<T> {
584    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
585        f.debug_struct("UnboundedSender")
586            .field("id", &self.id())
587            .field("strong_count", &self.strong_count())
588            .field("weak_count", &self.weak_count())
589            .finish()
590    }
591}
592
593impl<T> fmt::Debug for WeakUnboundedSender<T> {
594    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
595        f.debug_struct("WeakUnboundedSender")
596            .field("strong_count", &self.strong_count())
597            .field("weak_count", &self.weak_count())
598            .finish()
599    }
600}
601
602/// The receiving half of an unbounded MPSC channel
603pub struct UnboundedReceiver<T> {
604    shared: Arc<Shared<T>>,
605    /// Current position for reading
606    recv_index: usize,
607}
608
609impl<T> UnboundedReceiver<T> {
610    /// Receives a value from the channel asynchronously
611    pub async fn recv(&mut self) -> Option<T> {
612        RecvFuture { receiver: self }.await
613    }
614
615    /// Attempts to receive a value without blocking
616    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
617        loop {
618            let tail_ptr = self.shared.tail.load(Ordering::Acquire);
619            let tail = unsafe { &*tail_ptr };
620
621            if let Some(relative_idx) = tail.relative_index(self.recv_index) {
622                // We're in the current tail block
623                if tail.is_ready(relative_idx) {
624                    if let Some(value) = tail.read(relative_idx) {
625                        self.recv_index += 1;
626                        return Ok(value);
627                    }
628                }
629
630                // Check if we need to move to the next block
631                if relative_idx == BLOCK_CAP - 1
632                    || tail.ready_count() == tail.len.load(Ordering::Acquire)
633                {
634                    let next_ptr = tail.next.load(Ordering::Acquire);
635                    if !next_ptr.is_null() {
636                        // Move to next block and try to free the current one
637                        if self
638                            .shared
639                            .tail
640                            .compare_exchange(
641                                tail_ptr,
642                                next_ptr,
643                                Ordering::AcqRel,
644                                Ordering::Acquire,
645                            )
646                            .is_ok()
647                        {
648                            // Successfully moved tail, free the old block
649                            unsafe { drop(Box::from_raw(tail_ptr)) };
650                        }
651                        continue;
652                    }
653                }
654            } else {
655                // We're behind the current tail block, advance
656                let next_ptr = tail.next.load(Ordering::Acquire);
657                if !next_ptr.is_null() {
658                    if self
659                        .shared
660                        .tail
661                        .compare_exchange(tail_ptr, next_ptr, Ordering::AcqRel, Ordering::Acquire)
662                        .is_ok()
663                    {
664                        unsafe { drop(Box::from_raw(tail_ptr)) };
665                    }
666                    continue;
667                }
668            }
669
670            // No data available
671            if self.shared.closed.load(Ordering::Acquire)
672                && self.shared.num_senders.load(Ordering::Acquire) == 0
673            {
674                return Err(TryRecvError::Disconnected);
675            }
676
677            return Err(TryRecvError::Empty);
678        }
679    }
680
681    /// Returns a unique identifier for this channel based on the shared pointer address
682    pub fn id(&self) -> usize {
683        Arc::as_ptr(&self.shared) as usize
684    }
685
686    /// Checks if the channel is closed and empty
687    pub fn is_closed(&self) -> bool {
688        self.shared.closed.load(Ordering::Acquire)
689            && self.shared.num_senders.load(Ordering::Acquire) == 0
690    }
691
692    /// Checks if the channel is currently empty
693    pub fn is_empty(&self) -> bool {
694        // We need to create a temporary receiver to check if it's empty
695        // Since this method takes &self, we can't call try_recv which requires &mut self
696        // Instead, we'll check the state without modifying the receiver
697
698        let tail_ptr = self.shared.tail.load(Ordering::Acquire);
699        let tail = unsafe { &*tail_ptr };
700
701        if let Some(relative_idx) = tail.relative_index(self.recv_index) {
702            // Check if there's data ready at our current position
703            if tail.is_ready(relative_idx) {
704                return false; // Not empty
705            }
706        }
707
708        // Check if channel is disconnected
709        if self.shared.closed.load(Ordering::Acquire)
710            && self.shared.num_senders.load(Ordering::Acquire) == 0
711        {
712            return true; // Empty and disconnected
713        }
714
715        // Assume empty if we can't find ready data at current position
716        true
717    }
718
719    /// Closes the receiving half of the channel without dropping it
720    pub fn close(&mut self) {
721        self.shared.closed.store(true, Ordering::Release);
722    }
723
724    /// Returns the number of [`UnboundedSender`] handles.
725    pub fn sender_strong_count(&self) -> usize {
726        self.shared.num_senders.load(Ordering::Acquire)
727    }
728
729    /// Returns the number of [`WeakUnboundedSender`] handles.
730    pub fn sender_weak_count(&self) -> usize {
731        self.shared.num_weak_senders.load(Ordering::Acquire)
732    }
733
734    /// Returns the number of messages in the channel.
735    pub fn len(&self) -> usize {
736        // Count ready messages across all blocks
737        let mut count = 0;
738        let mut current = self.shared.tail.load(Ordering::Acquire);
739
740        while !current.is_null() {
741            let block = unsafe { &*current };
742            count += block.ready_count();
743            current = block.next.load(Ordering::Acquire);
744        }
745
746        count
747    }
748
749    fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
750        match self.try_recv() {
751            Ok(value) => Poll::Ready(Some(value)),
752            Err(TryRecvError::Disconnected) => Poll::Ready(None),
753            Err(TryRecvError::Empty) => {
754                // Store waker and return Pending
755                self.shared.store_waker(cx.waker().clone());
756                Poll::Pending
757            }
758        }
759    }
760}
761
762impl<T> Drop for UnboundedReceiver<T> {
763    fn drop(&mut self) {
764        self.shared.closed.store(true, Ordering::Release);
765    }
766}
767
768/// Future returned by `UnboundedReceiver::recv()`
769struct RecvFuture<'a, T> {
770    receiver: &'a mut UnboundedReceiver<T>,
771}
772
773impl<'a, T> Future for RecvFuture<'a, T> {
774    type Output = Option<T>;
775
776    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
777        self.receiver.poll_recv(cx)
778    }
779}
780
781/// Future returned by `UnboundedSender::closed()`
782struct ClosedFuture<'a, T> {
783    sender: &'a UnboundedSender<T>,
784}
785
786impl<'a, T> Future for ClosedFuture<'a, T> {
787    type Output = ();
788
789    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
790        if self.sender.is_closed() {
791            Poll::Ready(())
792        } else {
793            // In a real implementation, we'd store the waker and wake it when the channel closes
794            // For now, just return Pending - this is a simplified implementation
795            Poll::Pending
796        }
797    }
798}
799
800#[cfg(test)]
801mod tests {
802    use super::*;
803    use alloc::{vec, vec::Vec};
804
805    #[test]
806    fn test_basic_send_recv() {
807        let (tx, mut rx) = unbounded_channel::<i32>();
808
809        tx.send(1).unwrap();
810        tx.send(2).unwrap();
811        tx.send(3).unwrap();
812
813        assert_eq!(rx.try_recv().unwrap(), 1);
814        assert_eq!(rx.try_recv().unwrap(), 2);
815        assert_eq!(rx.try_recv().unwrap(), 3);
816        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
817    }
818
819    #[test]
820    fn test_channel_id() {
821        let (tx1, rx1) = unbounded_channel::<i32>();
822        let (tx2, rx2) = unbounded_channel::<i32>();
823
824        assert_eq!(tx1.id(), rx1.id());
825        assert_ne!(tx1.id(), tx2.id());
826        assert_ne!(rx1.id(), rx2.id());
827    }
828
829    #[test]
830    fn test_clone_sender() {
831        let (tx, mut rx) = unbounded_channel::<i32>();
832        let tx2 = tx.clone();
833
834        tx.send(1).unwrap();
835        tx2.send(2).unwrap();
836
837        assert_eq!(rx.try_recv().unwrap(), 1);
838        assert_eq!(rx.try_recv().unwrap(), 2);
839    }
840
841    #[test]
842    fn test_large_number_of_messages() {
843        let (tx, mut rx) = unbounded_channel::<usize>();
844
845        // Send more messages than a single block can hold
846        for i in 0..100 {
847            tx.send(i).unwrap();
848        }
849
850        // Receive all messages
851        for i in 0..100 {
852            assert_eq!(rx.try_recv().unwrap(), i);
853        }
854    }
855
856    #[test]
857    fn test_drop_sender_closes_channel() {
858        let (tx, mut rx) = unbounded_channel::<i32>();
859
860        tx.send(42).unwrap();
861        drop(tx);
862
863        assert_eq!(rx.try_recv().unwrap(), 42);
864        assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
865    }
866
867    #[test]
868    fn test_same_channel() {
869        let (tx1, _rx) = unbounded_channel::<i32>();
870        let tx2 = tx1.clone();
871        let (tx3, _rx2) = unbounded_channel::<i32>();
872
873        assert!(tx1.same_channel(&tx2));
874        assert!(!tx1.same_channel(&tx3));
875    }
876
877    // === EDGE CASES AND INTENSIVE TESTS ===
878
879    #[test]
880    fn test_stress_many_messages() {
881        let (tx, mut rx) = unbounded_channel::<usize>();
882        const NUM_MESSAGES: usize = 10_000;
883
884        // Send many messages
885        for i in 0..NUM_MESSAGES {
886            tx.send(i).unwrap();
887        }
888
889        // Receive all messages in order
890        for i in 0..NUM_MESSAGES {
891            assert_eq!(rx.try_recv().unwrap(), i);
892        }
893
894        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
895    }
896
897    #[test]
898    fn test_send_recv_interleaved() {
899        let (tx, mut rx) = unbounded_channel::<i32>();
900
901        // Interleave sends and receives
902        let mut expected_recv = 0;
903        for i in 0..100 {
904            tx.send(i).unwrap();
905            if i % 2 == 0 {
906                assert_eq!(rx.try_recv().unwrap(), expected_recv);
907                expected_recv += 1;
908            }
909        }
910
911        // Receive remaining messages
912        while let Ok(value) = rx.try_recv() {
913            assert_eq!(value, expected_recv);
914            expected_recv += 1;
915        }
916
917        assert_eq!(expected_recv, 100); // Should have received all 100 messages
918    }
919
920    #[test]
921    fn test_drop_receiver_while_sending() {
922        let (tx, rx) = unbounded_channel::<i32>();
923
924        // Send some messages
925        tx.send(1).unwrap();
926        tx.send(2).unwrap();
927
928        // Drop receiver
929        drop(rx);
930
931        // Further sends should fail
932        assert!(matches!(tx.send(3), Err(SendError(3))));
933        assert!(tx.is_closed());
934    }
935
936    #[test]
937    fn test_multiple_sender_drops() {
938        let (tx, mut rx) = unbounded_channel::<i32>();
939        let tx2 = tx.clone();
940        let tx3 = tx.clone();
941
942        tx.send(1).unwrap();
943        tx2.send(2).unwrap();
944        tx3.send(3).unwrap();
945
946        // Drop senders one by one
947        drop(tx);
948        assert!(!rx.is_closed());
949
950        drop(tx2);
951        assert!(!rx.is_closed());
952
953        // Last sender drop should close channel
954        drop(tx3);
955
956        // Receive existing messages
957        assert_eq!(rx.try_recv().unwrap(), 1);
958        assert_eq!(rx.try_recv().unwrap(), 2);
959        assert_eq!(rx.try_recv().unwrap(), 3);
960
961        // Channel should be disconnected
962        assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
963        assert!(rx.is_closed());
964    }
965
966    #[test]
967    fn test_zero_sized_types() {
968        #[derive(Debug, PartialEq)]
969        struct ZeroSized;
970
971        let (tx, mut rx) = unbounded_channel::<ZeroSized>();
972
973        tx.send(ZeroSized).unwrap();
974        tx.send(ZeroSized).unwrap();
975
976        assert_eq!(rx.try_recv().unwrap(), ZeroSized);
977        assert_eq!(rx.try_recv().unwrap(), ZeroSized);
978        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
979    }
980
981    #[test]
982    fn test_large_types() {
983        #[derive(Debug, PartialEq)]
984        struct LargeType([u8; 1024]);
985
986        let (tx, mut rx) = unbounded_channel::<LargeType>();
987        let large_value = LargeType([42; 1024]);
988
989        tx.send(large_value).unwrap();
990        let received = rx.try_recv().unwrap();
991        
992        // Verify the entire array, not just first and last bytes
993        assert_eq!(received.0.len(), 1024);
994        for &byte in &received.0 {
995            assert_eq!(byte, 42, "Large type data corruption detected");
996        }
997        
998        // Test multiple large messages to ensure no interference
999        let large_value2 = LargeType([123; 1024]);
1000        let large_value3 = LargeType([255; 1024]);
1001        
1002        tx.send(large_value2).unwrap();
1003        tx.send(large_value3).unwrap();
1004        
1005        let received2 = rx.try_recv().unwrap();
1006        let received3 = rx.try_recv().unwrap();
1007        
1008        for &byte in &received2.0 {
1009            assert_eq!(byte, 123, "Second large message corrupted");
1010        }
1011        for &byte in &received3.0 {
1012            assert_eq!(byte, 255, "Third large message corrupted");
1013        }
1014    }
1015
1016    #[test]
1017    fn test_unwind_safety_basic() {
1018        // Test that the channel remains functional even with types that might panic on drop
1019        #[derive(Debug)]
1020        struct ConditionalPanic(bool);
1021        impl Drop for ConditionalPanic {
1022            fn drop(&mut self) {
1023                // We can't actually panic in no_std tests, but we can simulate
1024                // the structure of panic-prone types
1025                if self.0 {
1026                    // Simulate resource cleanup that might fail
1027                    // In real scenarios, this could be file I/O, network calls, etc.
1028                }
1029            }
1030        }
1031
1032        let (tx, mut rx) = unbounded_channel::<ConditionalPanic>();
1033
1034        // Send values that simulate both safe and potentially panicking drops
1035        tx.send(ConditionalPanic(false)).unwrap(); // Safe drop
1036        tx.send(ConditionalPanic(true)).unwrap();  // Potentially panicking drop
1037        tx.send(ConditionalPanic(false)).unwrap(); // Safe drop again
1038
1039        // Channel should work normally regardless of drop behavior
1040        assert_eq!(rx.try_recv().unwrap().0, false);
1041        assert_eq!(rx.try_recv().unwrap().0, true);
1042        assert_eq!(rx.try_recv().unwrap().0, false);
1043        
1044        // Test that we can continue using the channel after potentially problematic drops
1045        tx.send(ConditionalPanic(false)).unwrap();
1046        assert_eq!(rx.try_recv().unwrap().0, false);
1047        
1048        // Channel should remain functional
1049        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
1050        assert!(!rx.is_closed());
1051    }
1052
1053    #[test]
1054    fn test_block_boundary_conditions() {
1055        let (tx, mut rx) = unbounded_channel::<usize>();
1056
1057        // Send exactly BLOCK_CAP messages to fill first block
1058        for i in 0..BLOCK_CAP {
1059            tx.send(i).unwrap();
1060        }
1061
1062        // Send one more to trigger new block allocation
1063        tx.send(BLOCK_CAP).unwrap();
1064
1065        // Receive all messages
1066        for i in 0..=BLOCK_CAP {
1067            assert_eq!(rx.try_recv().unwrap(), i);
1068        }
1069
1070        // Send messages across multiple blocks
1071        for i in 0..(BLOCK_CAP * 3) {
1072            tx.send(i).unwrap();
1073        }
1074
1075        for i in 0..(BLOCK_CAP * 3) {
1076            assert_eq!(rx.try_recv().unwrap(), i);
1077        }
1078    }
1079
1080    #[test]
1081    fn test_receiver_state_consistency() {
1082        let (tx, mut rx) = unbounded_channel::<i32>();
1083
1084        // Test empty state
1085        assert!(rx.is_empty());
1086        assert!(!rx.is_closed());
1087
1088        // Send and test non-empty state
1089        tx.send(42).unwrap();
1090        assert!(!rx.is_empty());
1091        assert!(!rx.is_closed());
1092
1093        // Receive and test empty again
1094        assert_eq!(rx.try_recv().unwrap(), 42);
1095        assert!(rx.is_empty());
1096        assert!(!rx.is_closed());
1097
1098        // Close channel and test closed state
1099        drop(tx);
1100        assert!(rx.is_empty());
1101        assert!(rx.is_closed());
1102        assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1103    }
1104
1105    #[test]
1106    fn test_manual_close() {
1107        let (tx, mut rx) = unbounded_channel::<i32>();
1108
1109        tx.send(1).unwrap();
1110        tx.send(2).unwrap();
1111
1112        // Manually close receiver
1113        rx.close();
1114
1115        // Should still be able to receive existing messages
1116        assert_eq!(rx.try_recv().unwrap(), 1);
1117        assert_eq!(rx.try_recv().unwrap(), 2);
1118
1119        // But sender should see channel as closed
1120        assert!(tx.is_closed());
1121        assert!(matches!(tx.send(3), Err(SendError(3))));
1122    }
1123
1124    #[test]
1125    fn test_channel_id_consistency() {
1126        // Test that sender and receiver from same channel have same ID
1127        let (tx, rx) = unbounded_channel::<i32>();
1128        assert_eq!(tx.id(), rx.id());
1129
1130        // Test that different channels have different IDs (at creation time)
1131        let (tx2, rx2) = unbounded_channel::<i32>();
1132        assert_eq!(tx2.id(), rx2.id());
1133
1134        // Note: Different channels may reuse memory addresses after deallocation,
1135        // so we only test that senders/receivers from the same channel match
1136        let tx_clone = tx.clone();
1137        assert_eq!(tx.id(), tx_clone.id());
1138        assert!(tx.same_channel(&tx_clone));
1139        assert!(!tx.same_channel(&tx2));
1140    }
1141
1142    #[test]
1143    fn test_drop_semantics() {
1144        use alloc::rc::Rc;
1145
1146        let drop_count = Rc::new(core::cell::RefCell::new(0));
1147
1148        #[derive(Debug)]
1149        struct DropCounter(Rc<core::cell::RefCell<i32>>);
1150        impl Drop for DropCounter {
1151            fn drop(&mut self) {
1152                *self.0.borrow_mut() += 1;
1153            }
1154        }
1155
1156        let (tx, mut rx) = unbounded_channel::<DropCounter>();
1157
1158        // Send some values
1159        tx.send(DropCounter(drop_count.clone())).unwrap();
1160        tx.send(DropCounter(drop_count.clone())).unwrap();
1161        tx.send(DropCounter(drop_count.clone())).unwrap();
1162
1163        assert_eq!(*drop_count.borrow(), 0); // Nothing dropped yet
1164
1165        // Receive one value
1166        let _value1 = rx.try_recv().unwrap();
1167        assert_eq!(*drop_count.borrow(), 0); // Still holding reference
1168
1169        drop(_value1);
1170        assert_eq!(*drop_count.borrow(), 1); // One dropped
1171
1172        // Drop the channel with remaining values
1173        drop(tx);
1174        drop(rx);
1175
1176        assert_eq!(*drop_count.borrow(), 3); // All should be dropped
1177    }
1178
1179    #[test]
1180    fn test_memory_safety_after_close() {
1181        let (tx, mut rx) = unbounded_channel::<Vec<u8>>();
1182
1183        // Send some data
1184        tx.send(vec![1, 2, 3]).unwrap();
1185        tx.send(vec![4, 5, 6]).unwrap();
1186
1187        // Close the receiver
1188        rx.close();
1189
1190        // Sender should fail
1191        assert!(matches!(tx.send(vec![7, 8, 9]), Err(_)));
1192
1193        // But we should still be able to receive existing data
1194        assert_eq!(rx.try_recv().unwrap(), vec![1, 2, 3]);
1195        assert_eq!(rx.try_recv().unwrap(), vec![4, 5, 6]);
1196    }
1197
1198    #[test]
1199    fn test_ordering_guarantees() {
1200        let (tx, mut rx) = unbounded_channel::<usize>();
1201
1202        // Send messages in order
1203        for i in 0..1000 {
1204            tx.send(i).unwrap();
1205        }
1206
1207        // Should receive in the same order
1208        for i in 0..1000 {
1209            assert_eq!(rx.try_recv().unwrap(), i);
1210        }
1211    }
1212
1213    #[test]
1214    fn test_empty_channel_operations() {
1215        let (tx, mut rx) = unbounded_channel::<i32>();
1216
1217        // Operations on empty channel
1218        assert!(rx.is_empty());
1219        assert!(!rx.is_closed());
1220        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
1221
1222        // Close and test again
1223        drop(tx);
1224        assert!(rx.is_empty());
1225        assert!(rx.is_closed());
1226        assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1227    }
1228
1229    #[test]
1230    fn test_channel_reuse_after_empty() {
1231        let (tx, mut rx) = unbounded_channel::<i32>();
1232
1233        // Send, receive, repeat multiple times
1234        for round in 0..10 {
1235            for i in 0..10 {
1236                tx.send(round * 10 + i).unwrap();
1237            }
1238
1239            for i in 0..10 {
1240                assert_eq!(rx.try_recv().unwrap(), round * 10 + i);
1241            }
1242
1243            assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
1244        }
1245    }
1246
1247    #[test]
1248    fn test_mixed_operation_patterns() {
1249        let (tx, mut rx) = unbounded_channel::<usize>();
1250
1251        // Pattern: send some, receive some, repeat
1252        let mut next_send = 0;
1253        let mut next_recv = 0;
1254
1255        for _ in 0..100 {
1256            // Send 1-5 messages
1257            let send_count = (next_send % 5) + 1;
1258            for _ in 0..send_count {
1259                tx.send(next_send).unwrap();
1260                next_send += 1;
1261            }
1262
1263            // Receive 1-3 messages (if available)
1264            let recv_count = (next_recv % 3) + 1;
1265            for _ in 0..recv_count {
1266                if let Ok(value) = rx.try_recv() {
1267                    assert_eq!(value, next_recv);
1268                    next_recv += 1;
1269                } else {
1270                    break;
1271                }
1272            }
1273        }
1274
1275        // Receive remaining messages
1276        while let Ok(value) = rx.try_recv() {
1277            assert_eq!(value, next_recv);
1278            next_recv += 1;
1279        }
1280
1281        assert_eq!(next_send, next_recv);
1282    }
1283
1284    // === WEAK SENDER TESTS ===
1285
1286    #[test]
1287    fn test_weak_sender_basic() {
1288        let (tx, mut rx) = unbounded_channel::<i32>();
1289
1290        // Create weak sender
1291        let weak_tx = tx.downgrade();
1292
1293        // Upgrade should succeed while strong sender exists
1294        let upgraded_tx = weak_tx.upgrade().unwrap();
1295
1296        // Send through upgraded sender
1297        upgraded_tx.send(42).unwrap();
1298        assert_eq!(rx.try_recv().unwrap(), 42);
1299
1300        // Drop original strong sender
1301        drop(tx);
1302
1303        // Should still work with upgraded sender
1304        upgraded_tx.send(43).unwrap();
1305        assert_eq!(rx.try_recv().unwrap(), 43);
1306
1307        // Drop upgraded sender
1308        drop(upgraded_tx);
1309
1310        // Now upgrade should fail
1311        assert!(weak_tx.upgrade().is_none());
1312    }
1313
1314    #[test]
1315    fn test_weak_sender_upgrade_failure() {
1316        let (tx, _rx) = unbounded_channel::<i32>();
1317        let weak_tx = tx.downgrade();
1318
1319        // Drop strong sender
1320        drop(tx);
1321
1322        // Upgrade should fail
1323        assert!(weak_tx.upgrade().is_none());
1324    }
1325
1326    #[test]
1327    fn test_weak_sender_counts() {
1328        let (tx, rx) = unbounded_channel::<i32>();
1329
1330        // Initial counts
1331        assert_eq!(tx.strong_count(), 1);
1332        assert_eq!(tx.weak_count(), 0);
1333        assert_eq!(rx.sender_strong_count(), 1);
1334        assert_eq!(rx.sender_weak_count(), 0);
1335
1336        // Create weak sender
1337        let weak_tx = tx.downgrade();
1338        assert_eq!(tx.strong_count(), 1);
1339        assert_eq!(tx.weak_count(), 1);
1340        assert_eq!(weak_tx.strong_count(), 1);
1341        assert_eq!(weak_tx.weak_count(), 1);
1342        assert_eq!(rx.sender_strong_count(), 1);
1343        assert_eq!(rx.sender_weak_count(), 1);
1344
1345        // Clone strong sender
1346        let tx2 = tx.clone();
1347        assert_eq!(tx.strong_count(), 2);
1348        assert_eq!(tx.weak_count(), 1);
1349        assert_eq!(tx2.strong_count(), 2);
1350        assert_eq!(weak_tx.strong_count(), 2);
1351        assert_eq!(weak_tx.weak_count(), 1);
1352        assert_eq!(rx.sender_strong_count(), 2);
1353        assert_eq!(rx.sender_weak_count(), 1);
1354
1355        // Clone weak sender
1356        let weak_tx2 = weak_tx.clone();
1357        assert_eq!(tx.strong_count(), 2);
1358        assert_eq!(tx.weak_count(), 2);
1359        assert_eq!(weak_tx.weak_count(), 2);
1360        assert_eq!(weak_tx2.weak_count(), 2);
1361        assert_eq!(rx.sender_strong_count(), 2);
1362        assert_eq!(rx.sender_weak_count(), 2);
1363
1364        // Drop weak sender
1365        drop(weak_tx);
1366        assert_eq!(tx.weak_count(), 1);
1367        assert_eq!(weak_tx2.weak_count(), 1);
1368        assert_eq!(rx.sender_weak_count(), 1);
1369
1370        // Drop strong sender
1371        drop(tx);
1372        assert_eq!(tx2.strong_count(), 1);
1373        assert_eq!(weak_tx2.strong_count(), 1);
1374        assert_eq!(rx.sender_strong_count(), 1);
1375
1376        // Drop last strong sender
1377        drop(tx2);
1378        assert_eq!(weak_tx2.strong_count(), 0);
1379        assert_eq!(weak_tx2.weak_count(), 1);
1380        assert_eq!(rx.sender_strong_count(), 0);
1381        assert_eq!(rx.sender_weak_count(), 1);
1382
1383        // Upgrade should now fail
1384        assert!(weak_tx2.upgrade().is_none());
1385    }
1386
1387    #[test]
1388    fn test_weak_sender_channel_close() {
1389        let (tx, rx) = unbounded_channel::<i32>();
1390        let weak_tx = tx.downgrade();
1391
1392        // Drop strong sender
1393        drop(tx);
1394
1395        // Channel should be closed
1396        assert!(rx.is_closed());
1397
1398        // Weak sender should not be able to upgrade
1399        assert!(weak_tx.upgrade().is_none());
1400    }
1401
1402    #[test]
1403    fn test_sender_ordering_and_equality() {
1404        let (tx1, _rx1) = unbounded_channel::<i32>();
1405        let (tx2, _rx2) = unbounded_channel::<i32>();
1406
1407        let tx1_clone = tx1.clone();
1408        let weak_tx1 = tx1.downgrade();
1409        let weak_tx2 = tx2.downgrade();
1410
1411        // Same channel senders should be equal
1412        assert_eq!(tx1, tx1_clone);
1413
1414        // Different channels should not be equal
1415        assert_ne!(tx1, tx2);
1416
1417        // Weak senders from same channel should be equal
1418        assert_eq!(weak_tx1, weak_tx1.clone());
1419
1420        // Weak senders from different channels should not be equal
1421        assert_ne!(weak_tx1, weak_tx2);
1422
1423        // Test ordering (consistent but not necessarily meaningful)
1424        let ordering1 = tx1.cmp(&tx2);
1425        let ordering2 = tx1.cmp(&tx2);
1426        assert_eq!(ordering1, ordering2); // Should be consistent
1427
1428        // Test hashing (same senders should have same hash)
1429        use alloc::collections::BTreeSet;
1430        let mut set = BTreeSet::new();
1431        set.insert(tx1.clone());
1432        set.insert(tx1_clone.clone());
1433        assert_eq!(set.len(), 1); // Same sender, so only one in set
1434
1435        set.insert(tx2.clone());
1436        assert_eq!(set.len(), 2); // Different sender, so now two in set
1437    }
1438
1439    #[test]
1440    fn test_weak_sender_multiple_upgrades() {
1441        let (tx, mut rx) = unbounded_channel::<i32>();
1442        let weak_tx = tx.downgrade();
1443
1444        // Multiple upgrades should work
1445        let upgraded1 = weak_tx.upgrade().unwrap();
1446        let upgraded2 = weak_tx.upgrade().unwrap();
1447
1448        upgraded1.send(1).unwrap();
1449        upgraded2.send(2).unwrap();
1450
1451        assert_eq!(rx.try_recv().unwrap(), 1);
1452        assert_eq!(rx.try_recv().unwrap(), 2);
1453
1454        // Drop original and one upgrade
1455        drop(tx);
1456        drop(upgraded1);
1457
1458        // Should still be able to upgrade and send
1459        let upgraded3 = weak_tx.upgrade().unwrap();
1460        upgraded3.send(3).unwrap();
1461        assert_eq!(rx.try_recv().unwrap(), 3);
1462
1463        // Drop remaining senders
1464        drop(upgraded2);
1465        drop(upgraded3);
1466
1467        // Now upgrade should fail
1468        assert!(weak_tx.upgrade().is_none());
1469    }
1470
1471    #[test]
1472    fn test_sender_hash_collections() {
1473        use alloc::collections::BTreeSet;
1474
1475        let (tx1, _rx1) = unbounded_channel::<i32>();
1476        let (tx2, _rx2) = unbounded_channel::<i32>();
1477        let tx1_clone = tx1.clone();
1478
1479        // Test with HashSet equivalent (BTreeSet in no_std)
1480        let mut set = BTreeSet::new();
1481
1482        // Insert original sender
1483        set.insert(tx1.clone());
1484        assert_eq!(set.len(), 1);
1485
1486        // Insert clone of same sender - should not increase size
1487        set.insert(tx1_clone);
1488        assert_eq!(set.len(), 1);
1489
1490        // Insert different sender - should increase size
1491        set.insert(tx2);
1492        assert_eq!(set.len(), 2);
1493
1494        // Test weak senders
1495        let weak_tx1 = tx1.downgrade();
1496        let weak_tx1_clone = weak_tx1.clone();
1497
1498        let mut weak_set = BTreeSet::new();
1499        weak_set.insert(weak_tx1);
1500        weak_set.insert(weak_tx1_clone); // Should not increase size
1501        assert_eq!(weak_set.len(), 1);
1502    }
1503
1504    // === NEW TOKIO-COMPATIBLE TESTS ===
1505
1506    #[test]
1507    fn test_len_method() {
1508        let (tx, mut rx) = unbounded_channel::<i32>();
1509
1510        // Empty channel
1511        assert_eq!(rx.len(), 0);
1512        assert!(rx.is_empty());
1513
1514        // Send some messages and verify len increases
1515        tx.send(1).unwrap();
1516        assert_eq!(rx.len(), 1);
1517        assert!(!rx.is_empty());
1518
1519        tx.send(2).unwrap();
1520        tx.send(3).unwrap();
1521        assert_eq!(rx.len(), 3);
1522        assert!(!rx.is_empty());
1523        
1524        // Receive messages and verify len decreases
1525        assert_eq!(rx.try_recv().unwrap(), 1);
1526        assert_eq!(rx.len(), 2);
1527        assert!(!rx.is_empty());
1528        
1529        assert_eq!(rx.try_recv().unwrap(), 2);
1530        assert_eq!(rx.len(), 1);
1531        assert!(!rx.is_empty());
1532        
1533        assert_eq!(rx.try_recv().unwrap(), 3);
1534        assert_eq!(rx.len(), 0);
1535        assert!(rx.is_empty());
1536        
1537        // Verify empty channel state
1538        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
1539    }
1540
1541    /// Compare drop behavior with Tokio's implementation
1542    /// Tests whether unread messages are properly dropped when receiver is dropped
1543    #[test]
1544    fn test_tokio_drop_behavior_compatibility() {
1545        use alloc::sync::Arc;
1546        use core::sync::atomic::{AtomicUsize, Ordering};
1547
1548        // Drop counter to track when messages are dropped
1549        #[derive(Debug)]
1550        struct DropCounter {
1551            #[allow(dead_code)]
1552            id: usize,
1553            counter: Arc<AtomicUsize>,
1554        }
1555
1556        impl Drop for DropCounter {
1557            fn drop(&mut self) {
1558                self.counter.fetch_add(1, Ordering::SeqCst);
1559            }
1560        }
1561
1562        const NUM_MESSAGES: usize = 100; // Smaller for unit test
1563
1564        // Test our implementation
1565        let our_drop_counter = Arc::new(AtomicUsize::new(0));
1566        {
1567            let (tx, mut rx) = unbounded_channel::<DropCounter>();
1568
1569            // Send messages
1570            for i in 0..NUM_MESSAGES {
1571                let msg = DropCounter {
1572                    id: i,
1573                    counter: Arc::clone(&our_drop_counter),
1574                };
1575                tx.send(msg).unwrap();
1576            }
1577
1578            // Receive only first few messages
1579            for _ in 0..10 {
1580                let _msg = rx.try_recv().unwrap();
1581                // Let them drop immediately
1582            }
1583
1584            // Drop receiver - this should drop all remaining messages
1585            drop(rx);
1586            drop(tx);
1587        }
1588
1589        let our_dropped_count = our_drop_counter.load(Ordering::SeqCst);
1590
1591        // All messages should have been dropped
1592        assert_eq!(
1593            our_dropped_count, NUM_MESSAGES,
1594            "Our implementation should drop all {} messages",
1595            NUM_MESSAGES
1596        );
1597    }
1598
1599    /// Test the exact same condition as described in Tokio docs:
1600    /// "If the Receiver handle is dropped, then messages can no longer be read out of the channel.
1601    /// In this case, all further attempts to send will result in an error. Additionally,
1602    /// all unread messages will be drained from the channel and dropped."
1603    #[test]
1604    fn test_tokio_exact_drop_condition() {
1605        use alloc::sync::Arc;
1606        use alloc::vec::Vec;
1607        use core::sync::atomic::{AtomicUsize, Ordering};
1608
1609        #[derive(Debug)]
1610        struct DropTracker {
1611            #[allow(dead_code)]
1612            id: usize,
1613            counter: Arc<AtomicUsize>,
1614        }
1615
1616        impl Drop for DropTracker {
1617            fn drop(&mut self) {
1618                self.counter.fetch_add(1, Ordering::SeqCst);
1619            }
1620        }
1621
1622        const NUM_MESSAGES: usize = 50; // Smaller for unit test
1623        let drop_counter = Arc::new(AtomicUsize::new(0));
1624
1625        let (tx, mut rx) = unbounded_channel::<DropTracker>();
1626        let tx_clone = tx.clone();
1627
1628        // Fill the channel with messages
1629        for i in 0..NUM_MESSAGES {
1630            let msg = DropTracker {
1631                id: i,
1632                counter: Arc::clone(&drop_counter),
1633            };
1634            tx.send(msg).unwrap();
1635        }
1636
1637        // Receive some messages (but not all)
1638        let received_before_drop = 10;
1639        for _ in 0..received_before_drop {
1640            let _msg = rx.try_recv().unwrap();
1641        }
1642
1643        // Test condition 1: Drop receiver while messages remain
1644        drop(rx);
1645
1646        // Test condition 2: Further attempts to send should result in error
1647        let mut send_errors = 0;
1648        let mut failed_messages = Vec::new();
1649
1650        // Try to send more messages after receiver is dropped
1651        for i in NUM_MESSAGES..NUM_MESSAGES + 10 {
1652            let msg = DropTracker {
1653                id: i,
1654                counter: Arc::clone(&drop_counter),
1655            };
1656
1657            match tx_clone.send(msg) {
1658                Ok(_) => {
1659                    // This shouldn't happen if receiver is properly dropped
1660                    panic!("Send succeeded after receiver drop - this violates Tokio behavior");
1661                }
1662                Err(send_error) => {
1663                    send_errors += 1;
1664                    failed_messages.push(send_error.0); // Extract the message from SendError
1665                }
1666            }
1667        }
1668
1669        // Drop the senders to clean up
1670        drop(tx);
1671        drop(tx_clone);
1672
1673        // Drop the failed messages explicitly to ensure they're counted
1674        drop(failed_messages);
1675
1676        let final_drop_count = drop_counter.load(Ordering::SeqCst);
1677
1678        // Verify the Tokio documented behavior:
1679        // 1. All unread messages are drained and dropped
1680        // 2. All received messages are dropped
1681        // 3. All failed send messages are dropped
1682        let expected_total_drops = NUM_MESSAGES + send_errors;
1683        assert_eq!(
1684            final_drop_count, expected_total_drops,
1685            "Expected {} drops (original {} + failed sends {}), got {}",
1686            expected_total_drops, NUM_MESSAGES, send_errors, final_drop_count
1687        );
1688
1689        // 4. Further send attempts result in errors
1690        assert!(
1691            send_errors > 0,
1692            "Send attempts after receiver drop should fail"
1693        );
1694    }
1695
1696    #[test]
1697    fn test_tokio_api_compatibility() {
1698        let (tx, mut rx) = unbounded_channel::<i32>();
1699
1700        // Test basic Tokio-like APIs
1701        assert!(!tx.is_closed());
1702        assert!(!rx.is_closed());
1703        assert!(rx.is_empty());
1704        assert_eq!(rx.len(), 0);
1705
1706        // Test sender counts
1707        assert_eq!(tx.strong_count(), 1);
1708        assert_eq!(tx.weak_count(), 0);
1709        assert_eq!(rx.sender_strong_count(), 1);
1710        assert_eq!(rx.sender_weak_count(), 0);
1711
1712        // Test channel identification
1713        assert!(tx.same_channel(&tx));
1714        assert_eq!(tx.id(), rx.id());
1715
1716        // Test weak sender creation
1717        let _weak_tx = tx.downgrade();
1718        assert_eq!(tx.weak_count(), 1);
1719        assert_eq!(rx.sender_weak_count(), 1);
1720
1721        // Test message sending and length tracking
1722        tx.send(42).unwrap();
1723        assert_eq!(rx.len(), 1);
1724        assert!(!rx.is_empty());
1725
1726        // Test message receiving
1727        assert_eq!(rx.try_recv().unwrap(), 42);
1728        assert_eq!(rx.len(), 0);
1729        assert!(rx.is_empty());
1730    }
1731
1732    /// Test drop behavior when all senders are dropped (with and without remaining messages)
1733    /// This complements the receiver drop tests and ensures full Tokio compatibility
1734    #[test]
1735    fn test_sender_drop_behavior_comprehensive() {
1736        use alloc::sync::Arc;
1737        use core::sync::atomic::{AtomicUsize, Ordering};
1738
1739        #[derive(Debug)]
1740        struct DropTracker {
1741            #[allow(dead_code)]
1742            id: usize,
1743            counter: Arc<AtomicUsize>,
1744        }
1745
1746        impl Drop for DropTracker {
1747            fn drop(&mut self) {
1748                self.counter.fetch_add(1, Ordering::SeqCst);
1749            }
1750        }
1751
1752        // Test Case 1: All senders dropped with remaining messages in channel
1753        {
1754            let drop_counter = Arc::new(AtomicUsize::new(0));
1755            const NUM_MESSAGES: usize = 50;
1756
1757            let (tx, mut rx) = unbounded_channel::<DropTracker>();
1758            let tx2 = tx.clone();
1759            let tx3 = tx.clone();
1760
1761            // Send messages from multiple senders
1762            for i in 0..NUM_MESSAGES {
1763                let msg = DropTracker {
1764                    id: i,
1765                    counter: Arc::clone(&drop_counter),
1766                };
1767
1768                // Distribute sends across different senders
1769                match i % 3 {
1770                    0 => tx.send(msg).unwrap(),
1771                    1 => tx2.send(msg).unwrap(),
1772                    2 => tx3.send(msg).unwrap(),
1773                    _ => unreachable!(),
1774                }
1775            }
1776
1777            // Receive only some messages
1778            let received_count = 15;
1779            for _ in 0..received_count {
1780                let _msg = rx.try_recv().unwrap();
1781            }
1782
1783            assert_eq!(rx.len(), NUM_MESSAGES - received_count);
1784            assert!(!rx.is_closed());
1785
1786            // Drop all senders - this should make channel disconnected
1787            drop(tx);
1788            drop(tx2);
1789            drop(tx3);
1790
1791            // Channel should now be closed
1792            assert!(rx.is_closed());
1793
1794            // Should still be able to receive remaining messages
1795            let mut remaining_received = 0;
1796            while let Ok(_msg) = rx.try_recv() {
1797                remaining_received += 1;
1798            }
1799
1800            assert_eq!(remaining_received, NUM_MESSAGES - received_count);
1801
1802            // Now channel should show disconnected
1803            assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1804
1805            // Drop receiver to clean up remaining messages (if any)
1806            drop(rx);
1807
1808            // All messages should be dropped
1809            assert_eq!(
1810                drop_counter.load(Ordering::SeqCst),
1811                NUM_MESSAGES,
1812                "All messages should be dropped when senders and receiver are dropped"
1813            );
1814        }
1815
1816        // Test Case 2: All senders dropped with empty channel
1817        {
1818            let drop_counter = Arc::new(AtomicUsize::new(0));
1819            const NUM_MESSAGES: usize = 30;
1820
1821            let (tx, mut rx) = unbounded_channel::<DropTracker>();
1822            let tx2 = tx.clone();
1823
1824            // Send and immediately receive all messages
1825            for i in 0..NUM_MESSAGES {
1826                let msg = DropTracker {
1827                    id: i + 100, // Different ID range
1828                    counter: Arc::clone(&drop_counter),
1829                };
1830
1831                if i % 2 == 0 {
1832                    tx.send(msg).unwrap();
1833                } else {
1834                    tx2.send(msg).unwrap();
1835                }
1836
1837                // Immediately receive
1838                let _received = rx.try_recv().unwrap();
1839            }
1840
1841            // Channel should be empty but not closed
1842            assert!(rx.is_empty());
1843            assert!(!rx.is_closed());
1844            assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
1845
1846            // Drop all senders
1847            drop(tx);
1848            drop(tx2);
1849
1850            // Channel should now be closed and empty
1851            assert!(rx.is_empty());
1852            assert!(rx.is_closed());
1853            assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1854
1855            drop(rx);
1856
1857            // All messages should be dropped (they were all received and dropped)
1858            assert_eq!(
1859                drop_counter.load(Ordering::SeqCst),
1860                NUM_MESSAGES,
1861                "All messages should be dropped when received"
1862            );
1863        }
1864
1865        // Test Case 3: Gradual sender drop with weak senders
1866        {
1867            let drop_counter = Arc::new(AtomicUsize::new(0));
1868            const NUM_MESSAGES: usize = 40;
1869
1870            let (tx, mut rx) = unbounded_channel::<DropTracker>();
1871            let tx2 = tx.clone();
1872            let weak_tx = tx.downgrade();
1873
1874            // Send some messages
1875            for i in 0..NUM_MESSAGES / 2 {
1876                let msg = DropTracker {
1877                    id: i + 200, // Different ID range
1878                    counter: Arc::clone(&drop_counter),
1879                };
1880                tx.send(msg).unwrap();
1881            }
1882
1883            // Drop one strong sender
1884            drop(tx);
1885            assert!(!rx.is_closed()); // Still have tx2
1886
1887            // Send more messages with remaining sender
1888            for i in NUM_MESSAGES / 2..NUM_MESSAGES {
1889                let msg = DropTracker {
1890                    id: i + 200,
1891                    counter: Arc::clone(&drop_counter),
1892                };
1893                tx2.send(msg).unwrap();
1894            }
1895
1896            // Weak sender should still be able to upgrade
1897            let upgraded = weak_tx.upgrade();
1898            assert!(upgraded.is_some());
1899            drop(upgraded); // Important: drop the upgraded sender
1900
1901            // Drop last strong sender
1902            drop(tx2);
1903
1904            // Receive all messages first
1905            let mut received_all = 0;
1906            while let Ok(_msg) = rx.try_recv() {
1907                received_all += 1;
1908            }
1909
1910            assert_eq!(received_all, NUM_MESSAGES);
1911
1912            // Now channel should show as disconnected and closed
1913            assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1914            assert!(rx.is_closed());
1915
1916            // Weak sender should no longer be able to upgrade
1917            assert!(weak_tx.upgrade().is_none());
1918
1919            drop(rx);
1920            drop(weak_tx);
1921
1922            // All messages should be dropped
1923            assert_eq!(
1924                drop_counter.load(Ordering::SeqCst),
1925                NUM_MESSAGES,
1926                "All messages should be dropped with gradual sender drop"
1927            );
1928        }
1929    }
1930}