theta_sync/
lib.rs

1#![no_std]
2
3extern crate alloc;
4
5#[cfg(test)]
6#[path = "attacks/mod.rs"]
7pub mod attacks;
8
9use alloc::{boxed::Box, sync::Arc};
10use core::{
11    cell::UnsafeCell,
12    fmt,
13    future::Future,
14    hash::{Hash, Hasher},
15    mem::{ManuallyDrop, MaybeUninit},
16    pin::Pin,
17    ptr,
18    sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering},
19    task::{Context, Poll, Waker},
20};
21
22/// Block capacity - platform dependent for optimal memory usage
23/// 32 messages on 64-bit targets, 16 on 32-bit targets (same as Tokio)
24#[cfg(target_pointer_width = "64")]
25const BLOCK_CAP: usize = 32;
26#[cfg(target_pointer_width = "32")]
27const BLOCK_CAP: usize = 16;
28
29/// Error returned when sending on a closed channel
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct SendError<T>(pub T);
32
33impl<T> fmt::Display for SendError<T> {
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        write!(f, "sending on a closed channel")
36    }
37}
38
39/// Error returned when receiving fails
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum TryRecvError {
42    /// The channel is currently empty
43    Empty,
44    /// The channel is closed and empty
45    Disconnected,
46}
47
48impl fmt::Display for TryRecvError {
49    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50        match self {
51            TryRecvError::Empty => write!(f, "channel is empty"),
52            TryRecvError::Disconnected => write!(f, "channel is disconnected"),
53        }
54    }
55}
56
57/// Creates an unbounded MPSC channel
58pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
59    let block = Block::new(0);
60    let block_ptr = Box::into_raw(Box::new(block));
61
62    let shared = Arc::new(Shared {
63        head: AtomicPtr::new(block_ptr),
64        tail: AtomicPtr::new(block_ptr),
65        rx_waker: AtomicPtr::new(ptr::null_mut()),
66        waker_lock: AtomicBool::new(false),
67        num_senders: AtomicUsize::new(1),
68        num_weak_senders: AtomicUsize::new(0),
69        closed: AtomicBool::new(false),
70    });
71
72    let sender = UnboundedSender {
73        shared: Arc::clone(&shared),
74    };
75    let receiver = UnboundedReceiver {
76        shared,
77        recv_index: 0,
78    };
79
80    (sender, receiver)
81}
82
83/// Block structure for the linked list
84struct Block<T> {
85    /// Next block in the linked list
86    next: AtomicPtr<Block<T>>,
87    /// Starting index for this block
88    start_index: usize,
89    /// Values stored in this block
90    values: UnsafeCell<[MaybeUninit<ManuallyDrop<T>>; BLOCK_CAP]>,
91    /// Bit set indicating which slots are ready
92    ready_slots: AtomicUsize,
93    /// Number of values written to this block
94    len: AtomicUsize,
95}
96
97impl<T> Block<T> {
98    fn new(start_index: usize) -> Self {
99        Self {
100            next: AtomicPtr::new(ptr::null_mut()),
101            start_index,
102            values: UnsafeCell::new([const { MaybeUninit::uninit() }; BLOCK_CAP]),
103            ready_slots: AtomicUsize::new(0),
104            len: AtomicUsize::new(0),
105        }
106    }
107
108    /// Returns the relative index within this block for the given global index
109    fn relative_index(&self, index: usize) -> Option<usize> {
110        if index >= self.start_index && index < self.start_index + BLOCK_CAP {
111            Some(index - self.start_index)
112        } else {
113            None
114        }
115    }
116
117    /// Writes a value to the specified slot
118    /// Returns Ok(()) if successful, Err(value) if slot is already occupied
119    fn write(&self, relative_index: usize, value: T) -> Result<(), T> {
120        if relative_index >= BLOCK_CAP {
121            return Err(value);
122        }
123
124        let mask = 1 << relative_index;
125        
126        // Try to set the ready bit atomically
127        let prev_ready = self.ready_slots.fetch_or(mask, Ordering::AcqRel);
128
129        if prev_ready & mask != 0 {
130            // Slot already occupied, return the value back
131            return Err(value);
132        }
133
134        // Write the value to the slot
135        unsafe {
136            let values = &mut *self.values.get();
137            values[relative_index].write(ManuallyDrop::new(value));
138        }
139
140        // Don't increment len here since it's already done atomically in send()
141        Ok(())
142    }
143
144    /// Reads a value from the specified slot
145    /// Returns None if slot is empty or not ready
146    fn read(&self, relative_index: usize) -> Option<T> {
147        if relative_index >= BLOCK_CAP {
148            return None;
149        }
150
151        let mask = 1 << relative_index;
152
153        // Try to clear the ready bit atomically to claim the slot
154        let prev_ready = self.ready_slots.fetch_and(!mask, Ordering::AcqRel);
155
156        if prev_ready & mask == 0 {
157            // Slot was not ready, restore the bit and return None
158            return None;
159        }
160
161        unsafe {
162            let values = &*self.values.get();
163            Some(ManuallyDrop::into_inner(values[relative_index].assume_init_read()))
164        }
165    }
166
167    /// Checks if a slot is ready
168    fn is_ready(&self, relative_index: usize) -> bool {
169        if relative_index >= BLOCK_CAP {
170            return false;
171        }
172
173        let mask = 1 << relative_index;
174        self.ready_slots.load(Ordering::Acquire) & mask != 0
175    }
176
177    /// Returns the number of ready slots
178    fn ready_count(&self) -> usize {
179        self.ready_slots.load(Ordering::Acquire).count_ones() as usize
180    }
181}
182
183impl<T> Drop for Block<T> {
184    fn drop(&mut self) {
185        let ready = self.ready_slots.load(Ordering::Relaxed);
186        unsafe {
187            let values = &mut *self.values.get();
188            for i in 0..BLOCK_CAP {
189                if ready & (1 << i) != 0 {
190                    ManuallyDrop::drop(values[i].assume_init_mut());
191                }
192            }
193        }
194    }
195}
196
197/// Shared state between senders and receiver
198struct Shared<T> {
199    /// Pointer to the head block (where new values are written)
200    head: AtomicPtr<Block<T>>,
201    /// Pointer to the tail block (where values are read)
202    tail: AtomicPtr<Block<T>>,
203    /// Waker for the receiver task - using atomic approach for no_std compatibility
204    rx_waker: AtomicPtr<Waker>,
205    /// Atomic flag to prevent concurrent waker access
206    waker_lock: AtomicBool,
207    /// Number of active senders (strong references)
208    num_senders: AtomicUsize,
209    /// Number of weak senders
210    num_weak_senders: AtomicUsize,
211    /// Channel closed flag
212    closed: AtomicBool,
213}
214
215impl<T> Shared<T> {
216    /// Atomically take and wake the stored waker
217    fn wake_receiver(&self) {
218        // Quick check if there's even a waker to avoid unnecessary locking
219        if self.rx_waker.load(Ordering::Acquire).is_null() {
220            return; // No waker, skip the expensive spin lock
221        }
222        
223        // Spin lock to ensure exclusive access to waker
224        while self.waker_lock.compare_exchange_weak(
225            false, 
226            true, 
227            Ordering::Acquire, 
228            Ordering::Relaxed
229        ).is_err() {
230            core::hint::spin_loop();
231        }
232        
233        // Take the waker if it exists
234        let waker_ptr = self.rx_waker.swap(ptr::null_mut(), Ordering::Acquire);
235        if !waker_ptr.is_null() {
236            let waker = unsafe { Box::from_raw(waker_ptr) };
237            // Release lock before waking to avoid holding it during wake
238            self.waker_lock.store(false, Ordering::Release);
239            waker.wake();
240        } else {
241            // Release lock
242            self.waker_lock.store(false, Ordering::Release);
243        }
244    }
245    
246    /// Atomically store a new waker
247    fn store_waker(&self, waker: Waker) {
248        // Spin lock to ensure exclusive access to waker  
249        while self.waker_lock.compare_exchange_weak(
250            false,
251            true,
252            Ordering::Acquire,
253            Ordering::Relaxed
254        ).is_err() {
255            core::hint::spin_loop();
256        }
257        
258        // Replace the old waker
259        let new_waker_ptr = Box::into_raw(Box::new(waker));
260        let old_waker_ptr = self.rx_waker.swap(new_waker_ptr, Ordering::Release);
261        
262        // Clean up old waker if it existed
263        if !old_waker_ptr.is_null() {
264            unsafe { drop(Box::from_raw(old_waker_ptr)) };
265        }
266        
267        // Release lock
268        self.waker_lock.store(false, Ordering::Release);
269    }
270}
271
272unsafe impl<T: Send> Send for Shared<T> {}
273unsafe impl<T: Send> Sync for Shared<T> {}
274
275impl<T> Drop for Shared<T> {
276    fn drop(&mut self) {
277        // Clean up waker
278        let waker_ptr = self.rx_waker.load(Ordering::Relaxed);
279        if !waker_ptr.is_null() {
280            unsafe { drop(Box::from_raw(waker_ptr)) };
281        }
282        
283        // Clean up all blocks in the linked list
284        let mut current = self.tail.load(Ordering::Relaxed);
285        while !current.is_null() {
286            let block = unsafe { Box::from_raw(current) };
287            current = block.next.load(Ordering::Relaxed);
288        }
289    }
290}
291
292/// The sending half of an unbounded MPSC channel
293pub struct UnboundedSender<T> {
294    shared: Arc<Shared<T>>,
295}
296
297/// An unbounded sender that does not prevent the channel from being closed.
298///
299/// If all [`UnboundedSender`] instances of a channel were dropped and only
300/// `WeakUnboundedSender` instances remain, the channel is closed.
301///
302/// In order to send messages, the `WeakUnboundedSender` needs to be upgraded using
303/// [`WeakUnboundedSender::upgrade`], which returns `Option<UnboundedSender>`. It returns `None`
304/// if all `UnboundedSender`s have been dropped, and otherwise it returns an `UnboundedSender`.
305///
306/// [`UnboundedSender`]: UnboundedSender
307/// [`WeakUnboundedSender::upgrade`]: WeakUnboundedSender::upgrade
308///
309/// # Examples
310///
311/// ```
312/// use theta_sync::unbounded_channel;
313///
314/// let (tx, _rx) = unbounded_channel::<i32>();
315/// let tx_weak = tx.downgrade();
316///
317/// // Upgrading will succeed because `tx` still exists.
318/// assert!(tx_weak.upgrade().is_some());
319///
320/// // If we drop `tx`, then it will fail.
321/// drop(tx);
322/// assert!(tx_weak.upgrade().is_none());
323/// ```
324pub struct WeakUnboundedSender<T> {
325    shared: Arc<Shared<T>>,
326}
327
328impl<T> Clone for WeakUnboundedSender<T> {
329    fn clone(&self) -> Self {
330        self.shared.num_weak_senders.fetch_add(1, Ordering::Relaxed);
331        Self {
332            shared: Arc::clone(&self.shared),
333        }
334    }
335}
336
337impl<T> Drop for WeakUnboundedSender<T> {
338    fn drop(&mut self) {
339        self.shared.num_weak_senders.fetch_sub(1, Ordering::AcqRel);
340    }
341}
342
343impl<T> Clone for UnboundedSender<T> {
344    fn clone(&self) -> Self {
345        self.shared.num_senders.fetch_add(1, Ordering::Relaxed);
346        Self {
347            shared: Arc::clone(&self.shared),
348        }
349    }
350}
351
352impl<T> Drop for UnboundedSender<T> {
353    fn drop(&mut self) {
354        let prev_count = self.shared.num_senders.fetch_sub(1, Ordering::AcqRel);
355        if prev_count == 1 {
356            // Last sender dropped, close the channel
357            self.shared.closed.store(true, Ordering::Release);
358
359            // Wake up the receiver
360            self.shared.wake_receiver();
361        }
362    }
363}
364
365impl<T> UnboundedSender<T> {
366    /// Sends a value on this channel
367    ///
368    /// This method never blocks. It will return an error if the receiver has been dropped.
369    pub fn send(&self, mut value: T) -> Result<(), SendError<T>> {
370        if self.shared.closed.load(Ordering::Acquire) {
371            return Err(SendError(value));
372        }
373
374        let mut attempts = 0;
375        loop {
376            let head_ptr = self.shared.head.load(Ordering::Acquire);
377            let head = unsafe { &*head_ptr };
378
379            // Atomically allocate the next slot to maintain FIFO ordering
380            let slot_idx = head.len.fetch_add(1, Ordering::AcqRel);
381            
382            if slot_idx < BLOCK_CAP {
383                // We have a valid slot index, try to write to it
384                // This should always succeed since we atomically allocated the slot
385                match head.write(slot_idx, value) {
386                    Ok(()) => {
387                        // Successfully written, wake receiver
388                        self.shared.wake_receiver();
389                        return Ok(());
390                    }
391                    Err(returned_value) => {
392                        // This should rarely happen, but if it does, the slot was somehow occupied
393                        // We need to retry with a new allocation
394                        value = returned_value;
395                        head.len.fetch_sub(1, Ordering::AcqRel); // Rollback the allocation
396                        // Continue the loop to try again
397                        continue;
398                    }
399                }
400            } else {
401                // Block is full, reset the len counter to prevent overflow
402                head.len.store(BLOCK_CAP, Ordering::Release);
403            }
404
405            // Block is full, need to allocate a new one
406            let next_ptr = head.next.load(Ordering::Acquire);
407            if next_ptr.is_null() {
408                // Try to allocate and link a new block
409                let new_block = Box::into_raw(Box::new(Block::new(head.start_index + BLOCK_CAP)));
410
411                match head.next.compare_exchange_weak(
412                    ptr::null_mut(),
413                    new_block,
414                    Ordering::AcqRel,
415                    Ordering::Acquire,
416                ) {
417                    Ok(_) => {
418                        // Successfully linked new block, update head
419                        self.shared.head.store(new_block, Ordering::Release);
420                    }
421                    Err(_) => {
422                        // Someone else allocated a block, free ours
423                        unsafe { drop(Box::from_raw(new_block)) };
424                    }
425                }
426            } else {
427                // Move head to the next block
428                self.shared
429                    .head
430                    .compare_exchange_weak(head_ptr, next_ptr, Ordering::AcqRel, Ordering::Acquire)
431                    .ok();
432            }
433
434            attempts += 1;
435            if attempts > 1000 {
436                // Prevent infinite loops in pathological cases
437                core::hint::spin_loop();
438                attempts = 0;
439            }
440        }
441    }
442
443    /// Returns a unique identifier for this channel based on the shared pointer address
444    pub fn id(&self) -> ChannelId {
445        ChannelId(Arc::as_ptr(&self.shared) as usize)
446    }
447
448    /// Checks if the channel is closed
449    pub fn is_closed(&self) -> bool {
450        self.shared.closed.load(Ordering::Acquire)
451    }
452
453    /// Returns true if this sender and another sender send to the same channel
454    pub fn same_channel(&self, other: &Self) -> bool {
455        Arc::ptr_eq(&self.shared, &other.shared)
456    }
457
458    /// Converts the `UnboundedSender` to a [`WeakUnboundedSender`] that does not count
459    /// towards RAII semantics, i.e. if all `UnboundedSender` instances of the
460    /// channel were dropped and only `WeakUnboundedSender` instances remain,
461    /// the channel is closed.
462    #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
463    pub fn downgrade(&self) -> WeakUnboundedSender<T> {
464        self.shared.num_weak_senders.fetch_add(1, Ordering::Relaxed);
465        WeakUnboundedSender {
466            shared: Arc::clone(&self.shared),
467        }
468    }
469
470    /// Returns the number of [`UnboundedSender`] handles.
471    pub fn strong_count(&self) -> usize {
472        self.shared.num_senders.load(Ordering::Acquire)
473    }
474
475    /// Returns the number of [`WeakUnboundedSender`] handles.
476    pub fn weak_count(&self) -> usize {
477        self.shared.num_weak_senders.load(Ordering::Acquire)
478    }
479
480    /// Completes when the receiver has dropped.
481    ///
482    /// This allows the producers to get notified when interest in the produced values is canceled and immediately stop doing work.
483    pub async fn closed(&self) {
484        ClosedFuture { sender: self }.await
485    }
486}
487
488impl<T> WeakUnboundedSender<T> {
489    /// Tries to convert a `WeakUnboundedSender` into an [`UnboundedSender`].
490    /// This will return `Some` if there are other `UnboundedSender` instances alive and
491    /// the channel wasn't previously dropped, otherwise `None` is returned.
492    pub fn upgrade(&self) -> Option<UnboundedSender<T>> {
493        let mut count = self.shared.num_senders.load(Ordering::Acquire);
494
495        loop {
496            if count == 0 {
497                // No strong senders remaining, cannot upgrade
498                return None;
499            }
500
501            match self.shared.num_senders.compare_exchange_weak(
502                count,
503                count + 1,
504                Ordering::AcqRel,
505                Ordering::Acquire,
506            ) {
507                Ok(_) => {
508                    return Some(UnboundedSender {
509                        shared: Arc::clone(&self.shared),
510                    });
511                }
512                Err(actual) => count = actual,
513            }
514        }
515    }
516
517    /// Returns the number of [`UnboundedSender`] handles.
518    pub fn strong_count(&self) -> usize {
519        self.shared.num_senders.load(Ordering::Acquire)
520    }
521
522    /// Returns the number of [`WeakUnboundedSender`] handles.
523    pub fn weak_count(&self) -> usize {
524        self.shared.num_weak_senders.load(Ordering::Acquire)
525    }
526}
527
528impl<T> PartialEq for UnboundedSender<T> {
529    fn eq(&self, other: &Self) -> bool {
530        self.id() == other.id()
531    }
532}
533
534impl<T> Eq for UnboundedSender<T> {}
535
536impl<T> PartialOrd for UnboundedSender<T> {
537    fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
538        Some(self.cmp(other))
539    }
540}
541
542impl<T> Ord for UnboundedSender<T> {
543    fn cmp(&self, other: &Self) -> core::cmp::Ordering {
544        self.id().cmp(&other.id())
545    }
546}
547
548impl<T> Hash for UnboundedSender<T> {
549    fn hash<H: Hasher>(&self, state: &mut H) {
550        self.id().hash(state);
551    }
552}
553
554impl<T> PartialEq for WeakUnboundedSender<T> {
555    fn eq(&self, other: &Self) -> bool {
556        Arc::ptr_eq(&self.shared, &other.shared)
557    }
558}
559
560impl<T> Eq for WeakUnboundedSender<T> {}
561
562impl<T> PartialOrd for WeakUnboundedSender<T> {
563    fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
564        Some(self.cmp(other))
565    }
566}
567
568impl<T> Ord for WeakUnboundedSender<T> {
569    fn cmp(&self, other: &Self) -> core::cmp::Ordering {
570        let self_ptr = Arc::as_ptr(&self.shared) as usize;
571        let other_ptr = Arc::as_ptr(&other.shared) as usize;
572        self_ptr.cmp(&other_ptr)
573    }
574}
575
576impl<T> Hash for WeakUnboundedSender<T> {
577    fn hash<H: Hasher>(&self, state: &mut H) {
578        let ptr = Arc::as_ptr(&self.shared) as usize;
579        ptr.hash(state);
580    }
581}
582
583impl<T> fmt::Debug for UnboundedSender<T> {
584    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
585        f.debug_struct("UnboundedSender")
586            .field("id", &self.id())
587            .field("strong_count", &self.strong_count())
588            .field("weak_count", &self.weak_count())
589            .finish()
590    }
591}
592
593impl<T> fmt::Debug for WeakUnboundedSender<T> {
594    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
595        f.debug_struct("WeakUnboundedSender")
596            .field("strong_count", &self.strong_count())
597            .field("weak_count", &self.weak_count())
598            .finish()
599    }
600}
601
602/// The receiving half of an unbounded MPSC channel
603pub struct UnboundedReceiver<T> {
604    shared: Arc<Shared<T>>,
605    /// Current position for reading
606    recv_index: usize,
607}
608
609impl<T> UnboundedReceiver<T> {
610    /// Receives a value from the channel asynchronously
611    pub async fn recv(&mut self) -> Option<T> {
612        RecvFuture { receiver: self }.await
613    }
614
615    /// Attempts to receive a value without blocking
616    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
617        loop {
618            let tail_ptr = self.shared.tail.load(Ordering::Acquire);
619            let tail = unsafe { &*tail_ptr };
620
621            if let Some(relative_idx) = tail.relative_index(self.recv_index) {
622                // We're in the current tail block
623                if tail.is_ready(relative_idx) {
624                    if let Some(value) = tail.read(relative_idx) {
625                        self.recv_index += 1;
626                        return Ok(value);
627                    }
628                }
629
630                // Check if we need to move to the next block
631                if relative_idx == BLOCK_CAP - 1
632                    || tail.ready_count() == tail.len.load(Ordering::Acquire)
633                {
634                    let next_ptr = tail.next.load(Ordering::Acquire);
635                    if !next_ptr.is_null() {
636                        // Move to next block and try to free the current one
637                        if self
638                            .shared
639                            .tail
640                            .compare_exchange(
641                                tail_ptr,
642                                next_ptr,
643                                Ordering::AcqRel,
644                                Ordering::Acquire,
645                            )
646                            .is_ok()
647                        {
648                            // Successfully moved tail, free the old block
649                            unsafe { drop(Box::from_raw(tail_ptr)) };
650                        }
651                        continue;
652                    }
653                }
654            } else {
655                // We're behind the current tail block, advance
656                let next_ptr = tail.next.load(Ordering::Acquire);
657                if !next_ptr.is_null() {
658                    if self
659                        .shared
660                        .tail
661                        .compare_exchange(tail_ptr, next_ptr, Ordering::AcqRel, Ordering::Acquire)
662                        .is_ok()
663                    {
664                        unsafe { drop(Box::from_raw(tail_ptr)) };
665                    }
666                    continue;
667                }
668            }
669
670            // No data available
671            if self.shared.closed.load(Ordering::Acquire)
672                && self.shared.num_senders.load(Ordering::Acquire) == 0
673            {
674                return Err(TryRecvError::Disconnected);
675            }
676
677            return Err(TryRecvError::Empty);
678        }
679    }
680
681    /// Returns a unique identifier for this channel based on the shared pointer address
682    pub fn id(&self) -> ChannelId {
683        ChannelId(Arc::as_ptr(&self.shared) as usize)
684    }
685
686    /// Checks if the channel is closed and empty
687    pub fn is_closed(&self) -> bool {
688        self.shared.closed.load(Ordering::Acquire)
689            && self.shared.num_senders.load(Ordering::Acquire) == 0
690    }
691
692    /// Checks if the channel is currently empty
693    pub fn is_empty(&self) -> bool {
694        // We need to create a temporary receiver to check if it's empty
695        // Since this method takes &self, we can't call try_recv which requires &mut self
696        // Instead, we'll check the state without modifying the receiver
697
698        let tail_ptr = self.shared.tail.load(Ordering::Acquire);
699        let tail = unsafe { &*tail_ptr };
700
701        if let Some(relative_idx) = tail.relative_index(self.recv_index) {
702            // Check if there's data ready at our current position
703            if tail.is_ready(relative_idx) {
704                return false; // Not empty
705            }
706        }
707
708        // Check if channel is disconnected
709        if self.shared.closed.load(Ordering::Acquire)
710            && self.shared.num_senders.load(Ordering::Acquire) == 0
711        {
712            return true; // Empty and disconnected
713        }
714
715        // Assume empty if we can't find ready data at current position
716        true
717    }
718
719    /// Closes the receiving half of the channel without dropping it
720    pub fn close(&mut self) {
721        self.shared.closed.store(true, Ordering::Release);
722    }
723
724    /// Returns the number of [`UnboundedSender`] handles.
725    pub fn sender_strong_count(&self) -> usize {
726        self.shared.num_senders.load(Ordering::Acquire)
727    }
728
729    /// Returns the number of [`WeakUnboundedSender`] handles.
730    pub fn sender_weak_count(&self) -> usize {
731        self.shared.num_weak_senders.load(Ordering::Acquire)
732    }
733
734    /// Returns the number of messages in the channel.
735    pub fn len(&self) -> usize {
736        // Count ready messages across all blocks
737        let mut count = 0;
738        let mut current = self.shared.tail.load(Ordering::Acquire);
739        
740        while !current.is_null() {
741            let block = unsafe { &*current };
742            count += block.ready_count();
743            current = block.next.load(Ordering::Acquire);
744        }
745        
746        count
747    }
748
749    fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
750        match self.try_recv() {
751            Ok(value) => Poll::Ready(Some(value)),
752            Err(TryRecvError::Disconnected) => Poll::Ready(None),
753            Err(TryRecvError::Empty) => {
754                // Store waker and return Pending
755                self.shared.store_waker(cx.waker().clone());
756                Poll::Pending
757            }
758        }
759    }
760}
761
762impl<T> Drop for UnboundedReceiver<T> {
763    fn drop(&mut self) {
764        self.shared.closed.store(true, Ordering::Release);
765    }
766}
767
768/// Future returned by `UnboundedReceiver::recv()`
769struct RecvFuture<'a, T> {
770    receiver: &'a mut UnboundedReceiver<T>,
771}
772
773impl<'a, T> Future for RecvFuture<'a, T> {
774    type Output = Option<T>;
775
776    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
777        self.receiver.poll_recv(cx)
778    }
779}
780
781/// Future returned by `UnboundedSender::closed()`
782struct ClosedFuture<'a, T> {
783    sender: &'a UnboundedSender<T>,
784}
785
786impl<'a, T> Future for ClosedFuture<'a, T> {
787    type Output = ();
788
789    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
790        if self.sender.is_closed() {
791            Poll::Ready(())
792        } else {
793            // In a real implementation, we'd store the waker and wake it when the channel closes
794            // For now, just return Pending - this is a simplified implementation
795            Poll::Pending
796        }
797    }
798}
799
800/// Unique identifier for a channel based on its memory address
801#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
802pub struct ChannelId(usize);
803
804impl Hash for ChannelId {
805    fn hash<H: Hasher>(&self, state: &mut H) {
806        self.0.hash(state);
807    }
808}
809
810impl fmt::Display for ChannelId {
811    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
812        write!(f, "ChannelId(0x{:x})", self.0)
813    }
814}
815
816#[cfg(test)]  
817mod tests {
818    use super::*;
819    use alloc::{vec, vec::Vec};
820
821    #[test]
822    fn test_basic_send_recv() {
823        let (tx, mut rx) = unbounded_channel::<i32>();
824
825        tx.send(1).unwrap();
826        tx.send(2).unwrap();
827        tx.send(3).unwrap();
828
829        assert_eq!(rx.try_recv().unwrap(), 1);
830        assert_eq!(rx.try_recv().unwrap(), 2);
831        assert_eq!(rx.try_recv().unwrap(), 3);
832        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
833    }
834
835    #[test]
836    fn test_channel_id() {
837        let (tx1, rx1) = unbounded_channel::<i32>();
838        let (tx2, rx2) = unbounded_channel::<i32>();
839
840        assert_eq!(tx1.id(), rx1.id());
841        assert_ne!(tx1.id(), tx2.id());
842        assert_ne!(rx1.id(), rx2.id());
843    }
844
845    #[test]
846    fn test_clone_sender() {
847        let (tx, mut rx) = unbounded_channel::<i32>();
848        let tx2 = tx.clone();
849
850        tx.send(1).unwrap();
851        tx2.send(2).unwrap();
852
853        assert_eq!(rx.try_recv().unwrap(), 1);
854        assert_eq!(rx.try_recv().unwrap(), 2);
855    }
856
857    #[test]
858    fn test_large_number_of_messages() {
859        let (tx, mut rx) = unbounded_channel::<usize>();
860
861        // Send more messages than a single block can hold
862        for i in 0..100 {
863            tx.send(i).unwrap();
864        }
865
866        // Receive all messages
867        for i in 0..100 {
868            assert_eq!(rx.try_recv().unwrap(), i);
869        }
870    }
871
872    #[test]
873    fn test_drop_sender_closes_channel() {
874        let (tx, mut rx) = unbounded_channel::<i32>();
875
876        tx.send(42).unwrap();
877        drop(tx);
878
879        assert_eq!(rx.try_recv().unwrap(), 42);
880        assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
881    }
882
883    #[test]
884    fn test_same_channel() {
885        let (tx1, _rx) = unbounded_channel::<i32>();
886        let tx2 = tx1.clone();
887        let (tx3, _rx2) = unbounded_channel::<i32>();
888
889        assert!(tx1.same_channel(&tx2));
890        assert!(!tx1.same_channel(&tx3));
891    }
892
893    // === EDGE CASES AND INTENSIVE TESTS ===
894
895    #[test]
896    fn test_stress_many_messages() {
897        let (tx, mut rx) = unbounded_channel::<usize>();
898        const NUM_MESSAGES: usize = 10_000;
899
900        // Send many messages
901        for i in 0..NUM_MESSAGES {
902            tx.send(i).unwrap();
903        }
904
905        // Receive all messages in order
906        for i in 0..NUM_MESSAGES {
907            assert_eq!(rx.try_recv().unwrap(), i);
908        }
909
910        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
911    }
912
913    #[test]
914    fn test_send_recv_interleaved() {
915        let (tx, mut rx) = unbounded_channel::<i32>();
916
917        // Interleave sends and receives
918        let mut expected_recv = 0;
919        for i in 0..100 {
920            tx.send(i).unwrap();
921            if i % 2 == 0 {
922                assert_eq!(rx.try_recv().unwrap(), expected_recv);
923                expected_recv += 1;
924            }
925        }
926
927        // Receive remaining messages
928        while let Ok(value) = rx.try_recv() {
929            assert_eq!(value, expected_recv);
930            expected_recv += 1;
931        }
932        
933        assert_eq!(expected_recv, 100); // Should have received all 100 messages
934    }
935
936    #[test]
937    fn test_drop_receiver_while_sending() {
938        let (tx, rx) = unbounded_channel::<i32>();
939
940        // Send some messages
941        tx.send(1).unwrap();
942        tx.send(2).unwrap();
943
944        // Drop receiver
945        drop(rx);
946
947        // Further sends should fail
948        assert!(matches!(tx.send(3), Err(SendError(3))));
949        assert!(tx.is_closed());
950    }
951
952    #[test]
953    fn test_multiple_sender_drops() {
954        let (tx, mut rx) = unbounded_channel::<i32>();
955        let tx2 = tx.clone();
956        let tx3 = tx.clone();
957
958        tx.send(1).unwrap();
959        tx2.send(2).unwrap();
960        tx3.send(3).unwrap();
961
962        // Drop senders one by one
963        drop(tx);
964        assert!(!rx.is_closed());
965
966        drop(tx2);
967        assert!(!rx.is_closed());
968
969        // Last sender drop should close channel
970        drop(tx3);
971
972        // Receive existing messages
973        assert_eq!(rx.try_recv().unwrap(), 1);
974        assert_eq!(rx.try_recv().unwrap(), 2);
975        assert_eq!(rx.try_recv().unwrap(), 3);
976
977        // Channel should be disconnected
978        assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
979        assert!(rx.is_closed());
980    }
981
982    #[test]
983    fn test_zero_sized_types() {
984        #[derive(Debug, PartialEq)]
985        struct ZeroSized;
986
987        let (tx, mut rx) = unbounded_channel::<ZeroSized>();
988
989        tx.send(ZeroSized).unwrap();
990        tx.send(ZeroSized).unwrap();
991
992        assert_eq!(rx.try_recv().unwrap(), ZeroSized);
993        assert_eq!(rx.try_recv().unwrap(), ZeroSized);
994        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
995    }
996
997    #[test]
998    fn test_large_types() {
999        #[derive(Debug, PartialEq)]
1000        struct LargeType([u8; 1024]);
1001
1002        let (tx, mut rx) = unbounded_channel::<LargeType>();
1003        let large_value = LargeType([42; 1024]);
1004
1005        tx.send(large_value).unwrap();
1006        let received = rx.try_recv().unwrap();
1007        assert_eq!(received.0[0], 42);
1008        assert_eq!(received.0[1023], 42);
1009    }
1010
1011    #[test]
1012    fn test_unwind_safety_basic() {
1013        #[derive(Debug)]
1014        struct PanicOnDrop(bool);
1015        impl Drop for PanicOnDrop {
1016            fn drop(&mut self) {
1017                if self.0 {
1018                    // In no_std, we can't actually panic in tests easily,
1019                    // so we'll just test the structure
1020                }
1021            }
1022        }
1023
1024        let (tx, mut rx) = unbounded_channel::<PanicOnDrop>();
1025
1026        // Send a value that won't panic on drop
1027        tx.send(PanicOnDrop(false)).unwrap();
1028
1029        // Channel should work normally
1030        assert_eq!(rx.try_recv().unwrap().0, false);
1031        tx.send(PanicOnDrop(false)).unwrap();
1032        assert_eq!(rx.try_recv().unwrap().0, false);
1033    }
1034
1035    #[test]
1036    fn test_block_boundary_conditions() {
1037        let (tx, mut rx) = unbounded_channel::<usize>();
1038
1039        // Send exactly BLOCK_CAP messages to fill first block
1040        for i in 0..BLOCK_CAP {
1041            tx.send(i).unwrap();
1042        }
1043
1044        // Send one more to trigger new block allocation
1045        tx.send(BLOCK_CAP).unwrap();
1046
1047        // Receive all messages
1048        for i in 0..=BLOCK_CAP {
1049            assert_eq!(rx.try_recv().unwrap(), i);
1050        }
1051
1052        // Send messages across multiple blocks
1053        for i in 0..(BLOCK_CAP * 3) {
1054            tx.send(i).unwrap();
1055        }
1056
1057        for i in 0..(BLOCK_CAP * 3) {
1058            assert_eq!(rx.try_recv().unwrap(), i);
1059        }
1060    }
1061
1062    #[test]
1063    fn test_receiver_state_consistency() {
1064        let (tx, mut rx) = unbounded_channel::<i32>();
1065
1066        // Test empty state
1067        assert!(rx.is_empty());
1068        assert!(!rx.is_closed());
1069
1070        // Send and test non-empty state
1071        tx.send(42).unwrap();
1072        assert!(!rx.is_empty());
1073        assert!(!rx.is_closed());
1074
1075        // Receive and test empty again
1076        assert_eq!(rx.try_recv().unwrap(), 42);
1077        assert!(rx.is_empty());
1078        assert!(!rx.is_closed());
1079
1080        // Close channel and test closed state
1081        drop(tx);
1082        assert!(rx.is_empty());
1083        assert!(rx.is_closed());
1084        assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1085    }
1086
1087    #[test]
1088    fn test_manual_close() {
1089        let (tx, mut rx) = unbounded_channel::<i32>();
1090
1091        tx.send(1).unwrap();
1092        tx.send(2).unwrap();
1093
1094        // Manually close receiver
1095        rx.close();
1096
1097        // Should still be able to receive existing messages
1098        assert_eq!(rx.try_recv().unwrap(), 1);
1099        assert_eq!(rx.try_recv().unwrap(), 2);
1100
1101        // But sender should see channel as closed
1102        assert!(tx.is_closed());
1103        assert!(matches!(tx.send(3), Err(SendError(3))));
1104    }
1105
1106    #[test]
1107    fn test_channel_id_consistency() {
1108        // Test that sender and receiver from same channel have same ID
1109        let (tx, rx) = unbounded_channel::<i32>();
1110        assert_eq!(tx.id(), rx.id());
1111
1112        // Test that different channels have different IDs (at creation time)
1113        let (tx2, rx2) = unbounded_channel::<i32>();
1114        assert_eq!(tx2.id(), rx2.id());
1115
1116        // Note: Different channels may reuse memory addresses after deallocation,
1117        // so we only test that senders/receivers from the same channel match
1118        let tx_clone = tx.clone();
1119        assert_eq!(tx.id(), tx_clone.id());
1120        assert!(tx.same_channel(&tx_clone));
1121        assert!(!tx.same_channel(&tx2));
1122    }
1123
1124    #[test]
1125    fn test_drop_semantics() {
1126        use alloc::rc::Rc;
1127
1128        let drop_count = Rc::new(core::cell::RefCell::new(0));
1129
1130        #[derive(Debug)]
1131        struct DropCounter(Rc<core::cell::RefCell<i32>>);
1132        impl Drop for DropCounter {
1133            fn drop(&mut self) {
1134                *self.0.borrow_mut() += 1;
1135            }
1136        }
1137
1138        let (tx, mut rx) = unbounded_channel::<DropCounter>();
1139
1140        // Send some values
1141        tx.send(DropCounter(drop_count.clone())).unwrap();
1142        tx.send(DropCounter(drop_count.clone())).unwrap();
1143        tx.send(DropCounter(drop_count.clone())).unwrap();
1144
1145        assert_eq!(*drop_count.borrow(), 0); // Nothing dropped yet
1146
1147        // Receive one value
1148        let _value1 = rx.try_recv().unwrap();
1149        assert_eq!(*drop_count.borrow(), 0); // Still holding reference
1150
1151        drop(_value1);
1152        assert_eq!(*drop_count.borrow(), 1); // One dropped
1153
1154        // Drop the channel with remaining values
1155        drop(tx);
1156        drop(rx);
1157
1158        assert_eq!(*drop_count.borrow(), 3); // All should be dropped
1159    }
1160
1161    #[test]
1162    fn test_memory_safety_after_close() {
1163        let (tx, mut rx) = unbounded_channel::<Vec<u8>>();
1164
1165        // Send some data
1166        tx.send(vec![1, 2, 3]).unwrap();
1167        tx.send(vec![4, 5, 6]).unwrap();
1168
1169        // Close the receiver
1170        rx.close();
1171
1172        // Sender should fail
1173        assert!(matches!(tx.send(vec![7, 8, 9]), Err(_)));
1174
1175        // But we should still be able to receive existing data
1176        assert_eq!(rx.try_recv().unwrap(), vec![1, 2, 3]);
1177        assert_eq!(rx.try_recv().unwrap(), vec![4, 5, 6]);
1178    }
1179
1180    #[test]
1181    fn test_ordering_guarantees() {
1182        let (tx, mut rx) = unbounded_channel::<usize>();
1183
1184        // Send messages in order
1185        for i in 0..1000 {
1186            tx.send(i).unwrap();
1187        }
1188
1189        // Should receive in the same order
1190        for i in 0..1000 {
1191            assert_eq!(rx.try_recv().unwrap(), i);
1192        }
1193    }
1194
1195    #[test]
1196    fn test_empty_channel_operations() {
1197        let (tx, mut rx) = unbounded_channel::<i32>();
1198
1199        // Operations on empty channel
1200        assert!(rx.is_empty());
1201        assert!(!rx.is_closed());
1202        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
1203
1204        // Close and test again
1205        drop(tx);
1206        assert!(rx.is_empty());
1207        assert!(rx.is_closed());
1208        assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1209    }
1210
1211    #[test]
1212    fn test_channel_reuse_after_empty() {
1213        let (tx, mut rx) = unbounded_channel::<i32>();
1214
1215        // Send, receive, repeat multiple times
1216        for round in 0..10 {
1217            for i in 0..10 {
1218                tx.send(round * 10 + i).unwrap();
1219            }
1220
1221            for i in 0..10 {
1222                assert_eq!(rx.try_recv().unwrap(), round * 10 + i);
1223            }
1224
1225            assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
1226        }
1227    }
1228
1229    #[test]
1230    fn test_mixed_operation_patterns() {
1231        let (tx, mut rx) = unbounded_channel::<usize>();
1232
1233        // Pattern: send some, receive some, repeat
1234        let mut next_send = 0;
1235        let mut next_recv = 0;
1236
1237        for _ in 0..100 {
1238            // Send 1-5 messages
1239            let send_count = (next_send % 5) + 1;
1240            for _ in 0..send_count {
1241                tx.send(next_send).unwrap();
1242                next_send += 1;
1243            }
1244
1245            // Receive 1-3 messages (if available)
1246            let recv_count = (next_recv % 3) + 1;
1247            for _ in 0..recv_count {
1248                if let Ok(value) = rx.try_recv() {
1249                    assert_eq!(value, next_recv);
1250                    next_recv += 1;
1251                } else {
1252                    break;
1253                }
1254            }
1255        }
1256
1257        // Receive remaining messages
1258        while let Ok(value) = rx.try_recv() {
1259            assert_eq!(value, next_recv);
1260            next_recv += 1;
1261        }
1262
1263        assert_eq!(next_send, next_recv);
1264    }
1265
1266    // === WEAK SENDER TESTS ===
1267
1268    #[test]
1269    fn test_weak_sender_basic() {
1270        let (tx, mut rx) = unbounded_channel::<i32>();
1271
1272        // Create weak sender
1273        let weak_tx = tx.downgrade();
1274
1275        // Upgrade should succeed while strong sender exists
1276        let upgraded_tx = weak_tx.upgrade().unwrap();
1277
1278        // Send through upgraded sender
1279        upgraded_tx.send(42).unwrap();
1280        assert_eq!(rx.try_recv().unwrap(), 42);
1281
1282        // Drop original strong sender
1283        drop(tx);
1284
1285        // Should still work with upgraded sender
1286        upgraded_tx.send(43).unwrap();
1287        assert_eq!(rx.try_recv().unwrap(), 43);
1288
1289        // Drop upgraded sender
1290        drop(upgraded_tx);
1291
1292        // Now upgrade should fail
1293        assert!(weak_tx.upgrade().is_none());
1294    }
1295
1296    #[test]
1297    fn test_weak_sender_upgrade_failure() {
1298        let (tx, _rx) = unbounded_channel::<i32>();
1299        let weak_tx = tx.downgrade();
1300
1301        // Drop strong sender
1302        drop(tx);
1303
1304        // Upgrade should fail
1305        assert!(weak_tx.upgrade().is_none());
1306    }
1307
1308    #[test]
1309    fn test_weak_sender_counts() {
1310        let (tx, rx) = unbounded_channel::<i32>();
1311
1312        // Initial counts
1313        assert_eq!(tx.strong_count(), 1);
1314        assert_eq!(tx.weak_count(), 0);
1315        assert_eq!(rx.sender_strong_count(), 1);
1316        assert_eq!(rx.sender_weak_count(), 0);
1317
1318        // Create weak sender
1319        let weak_tx = tx.downgrade();
1320        assert_eq!(tx.strong_count(), 1);
1321        assert_eq!(tx.weak_count(), 1);
1322        assert_eq!(weak_tx.strong_count(), 1);
1323        assert_eq!(weak_tx.weak_count(), 1);
1324        assert_eq!(rx.sender_strong_count(), 1);
1325        assert_eq!(rx.sender_weak_count(), 1);
1326
1327        // Clone strong sender
1328        let tx2 = tx.clone();
1329        assert_eq!(tx.strong_count(), 2);
1330        assert_eq!(tx.weak_count(), 1);
1331        assert_eq!(tx2.strong_count(), 2);
1332        assert_eq!(weak_tx.strong_count(), 2);
1333        assert_eq!(weak_tx.weak_count(), 1);
1334        assert_eq!(rx.sender_strong_count(), 2);
1335        assert_eq!(rx.sender_weak_count(), 1);
1336
1337        // Clone weak sender
1338        let weak_tx2 = weak_tx.clone();
1339        assert_eq!(tx.strong_count(), 2);
1340        assert_eq!(tx.weak_count(), 2);
1341        assert_eq!(weak_tx.weak_count(), 2);
1342        assert_eq!(weak_tx2.weak_count(), 2);
1343        assert_eq!(rx.sender_strong_count(), 2);
1344        assert_eq!(rx.sender_weak_count(), 2);
1345
1346        // Drop weak sender
1347        drop(weak_tx);
1348        assert_eq!(tx.weak_count(), 1);
1349        assert_eq!(weak_tx2.weak_count(), 1);
1350        assert_eq!(rx.sender_weak_count(), 1);
1351
1352        // Drop strong sender
1353        drop(tx);
1354        assert_eq!(tx2.strong_count(), 1);
1355        assert_eq!(weak_tx2.strong_count(), 1);
1356        assert_eq!(rx.sender_strong_count(), 1);
1357
1358        // Drop last strong sender
1359        drop(tx2);
1360        assert_eq!(weak_tx2.strong_count(), 0);
1361        assert_eq!(weak_tx2.weak_count(), 1);
1362        assert_eq!(rx.sender_strong_count(), 0);
1363        assert_eq!(rx.sender_weak_count(), 1);
1364
1365        // Upgrade should now fail
1366        assert!(weak_tx2.upgrade().is_none());
1367    }
1368
1369    #[test]
1370    fn test_weak_sender_channel_close() {
1371        let (tx, rx) = unbounded_channel::<i32>();
1372        let weak_tx = tx.downgrade();
1373
1374        // Drop strong sender
1375        drop(tx);
1376
1377        // Channel should be closed
1378        assert!(rx.is_closed());
1379
1380        // Weak sender should not be able to upgrade
1381        assert!(weak_tx.upgrade().is_none());
1382    }
1383
1384    #[test]
1385    fn test_sender_ordering_and_equality() {
1386        let (tx1, _rx1) = unbounded_channel::<i32>();
1387        let (tx2, _rx2) = unbounded_channel::<i32>();
1388
1389        let tx1_clone = tx1.clone();
1390        let weak_tx1 = tx1.downgrade();
1391        let weak_tx2 = tx2.downgrade();
1392
1393        // Same channel senders should be equal
1394        assert_eq!(tx1, tx1_clone);
1395
1396        // Different channels should not be equal
1397        assert_ne!(tx1, tx2);
1398
1399        // Weak senders from same channel should be equal
1400        assert_eq!(weak_tx1, weak_tx1.clone());
1401
1402        // Weak senders from different channels should not be equal
1403        assert_ne!(weak_tx1, weak_tx2);
1404
1405        // Test ordering (consistent but not necessarily meaningful)
1406        let ordering1 = tx1.cmp(&tx2);
1407        let ordering2 = tx1.cmp(&tx2);
1408        assert_eq!(ordering1, ordering2); // Should be consistent
1409
1410        // Test hashing (same senders should have same hash)
1411        use alloc::collections::BTreeSet;
1412        let mut set = BTreeSet::new();
1413        set.insert(tx1.clone());
1414        set.insert(tx1_clone.clone());
1415        assert_eq!(set.len(), 1); // Same sender, so only one in set
1416
1417        set.insert(tx2.clone());
1418        assert_eq!(set.len(), 2); // Different sender, so now two in set
1419    }
1420
1421    #[test]
1422    fn test_weak_sender_multiple_upgrades() {
1423        let (tx, mut rx) = unbounded_channel::<i32>();
1424        let weak_tx = tx.downgrade();
1425
1426        // Multiple upgrades should work
1427        let upgraded1 = weak_tx.upgrade().unwrap();
1428        let upgraded2 = weak_tx.upgrade().unwrap();
1429
1430        upgraded1.send(1).unwrap();
1431        upgraded2.send(2).unwrap();
1432
1433        assert_eq!(rx.try_recv().unwrap(), 1);
1434        assert_eq!(rx.try_recv().unwrap(), 2);
1435
1436        // Drop original and one upgrade
1437        drop(tx);
1438        drop(upgraded1);
1439
1440        // Should still be able to upgrade and send
1441        let upgraded3 = weak_tx.upgrade().unwrap();
1442        upgraded3.send(3).unwrap();
1443        assert_eq!(rx.try_recv().unwrap(), 3);
1444
1445        // Drop remaining senders
1446        drop(upgraded2);
1447        drop(upgraded3);
1448
1449        // Now upgrade should fail
1450        assert!(weak_tx.upgrade().is_none());
1451    }
1452
1453    #[test]
1454    fn test_sender_hash_collections() {
1455        use alloc::collections::BTreeSet;
1456
1457        let (tx1, _rx1) = unbounded_channel::<i32>();
1458        let (tx2, _rx2) = unbounded_channel::<i32>();
1459        let tx1_clone = tx1.clone();
1460
1461        // Test with HashSet equivalent (BTreeSet in no_std)
1462        let mut set = BTreeSet::new();
1463
1464        // Insert original sender
1465        set.insert(tx1.clone());
1466        assert_eq!(set.len(), 1);
1467
1468        // Insert clone of same sender - should not increase size
1469        set.insert(tx1_clone);
1470        assert_eq!(set.len(), 1);
1471
1472        // Insert different sender - should increase size
1473        set.insert(tx2);
1474        assert_eq!(set.len(), 2);
1475
1476        // Test weak senders
1477        let weak_tx1 = tx1.downgrade();
1478        let weak_tx1_clone = weak_tx1.clone();
1479
1480        let mut weak_set = BTreeSet::new();
1481        weak_set.insert(weak_tx1);
1482        weak_set.insert(weak_tx1_clone); // Should not increase size
1483        assert_eq!(weak_set.len(), 1);
1484    }
1485
1486    // === NEW TOKIO-COMPATIBLE TESTS ===
1487
1488    #[test]
1489    fn test_len_method() {
1490        let (tx, rx) = unbounded_channel::<i32>();
1491        
1492        // Empty channel
1493        assert_eq!(rx.len(), 0);
1494        assert!(rx.is_empty());
1495        
1496        // Send some messages
1497        tx.send(1).unwrap();
1498        assert_eq!(rx.len(), 1);
1499        assert!(!rx.is_empty());
1500        
1501        tx.send(2).unwrap();
1502        tx.send(3).unwrap();
1503        assert_eq!(rx.len(), 3);
1504        assert!(!rx.is_empty());
1505    }
1506
1507    /// Compare drop behavior with Tokio's implementation
1508    /// Tests whether unread messages are properly dropped when receiver is dropped
1509    #[test]
1510    fn test_tokio_drop_behavior_compatibility() {
1511        use alloc::sync::Arc;
1512        use core::sync::atomic::{AtomicUsize, Ordering};
1513
1514        // Drop counter to track when messages are dropped
1515        #[derive(Debug)]
1516        struct DropCounter {
1517            id: usize,
1518            counter: Arc<AtomicUsize>,
1519        }
1520
1521        impl Drop for DropCounter {
1522            fn drop(&mut self) {
1523                self.counter.fetch_add(1, Ordering::SeqCst);
1524            }
1525        }
1526
1527        const NUM_MESSAGES: usize = 100; // Smaller for unit test
1528        
1529        // Test our implementation
1530        let our_drop_counter = Arc::new(AtomicUsize::new(0));
1531        {
1532            let (tx, mut rx) = unbounded_channel::<DropCounter>();
1533            
1534            // Send messages
1535            for i in 0..NUM_MESSAGES {
1536                let msg = DropCounter {
1537                    id: i,
1538                    counter: Arc::clone(&our_drop_counter),
1539                };
1540                tx.send(msg).unwrap();
1541            }
1542            
1543            // Receive only first few messages
1544            for _ in 0..10 {
1545                let _msg = rx.try_recv().unwrap();
1546                // Let them drop immediately
1547            }
1548            
1549            // Drop receiver - this should drop all remaining messages
1550            drop(rx);
1551            drop(tx);
1552        }
1553        
1554        let our_dropped_count = our_drop_counter.load(Ordering::SeqCst);
1555        
1556        // All messages should have been dropped
1557        assert_eq!(our_dropped_count, NUM_MESSAGES, 
1558                  "Our implementation should drop all {} messages", NUM_MESSAGES);
1559    }
1560
1561    /// Test the exact same condition as described in Tokio docs:
1562    /// "If the Receiver handle is dropped, then messages can no longer be read out of the channel.
1563    /// In this case, all further attempts to send will result in an error. Additionally,
1564    /// all unread messages will be drained from the channel and dropped."
1565    #[test]
1566    fn test_tokio_exact_drop_condition() {
1567        use alloc::sync::Arc;
1568        use core::sync::atomic::{AtomicUsize, Ordering};
1569        use alloc::vec::Vec;
1570
1571        #[derive(Debug)]
1572        struct DropTracker {
1573            id: usize,
1574            counter: Arc<AtomicUsize>,
1575        }
1576
1577        impl Drop for DropTracker {
1578            fn drop(&mut self) {
1579                self.counter.fetch_add(1, Ordering::SeqCst);
1580            }
1581        }
1582
1583        const NUM_MESSAGES: usize = 50; // Smaller for unit test
1584        let drop_counter = Arc::new(AtomicUsize::new(0));
1585
1586        let (tx, mut rx) = unbounded_channel::<DropTracker>();
1587        let tx_clone = tx.clone();
1588
1589        // Fill the channel with messages
1590        for i in 0..NUM_MESSAGES {
1591            let msg = DropTracker {
1592                id: i,
1593                counter: Arc::clone(&drop_counter),
1594            };
1595            tx.send(msg).unwrap();
1596        }
1597
1598        // Receive some messages (but not all)
1599        let received_before_drop = 10;
1600        for _ in 0..received_before_drop {
1601            let _msg = rx.try_recv().unwrap();
1602        }
1603
1604        // Test condition 1: Drop receiver while messages remain
1605        drop(rx);
1606
1607        // Test condition 2: Further attempts to send should result in error
1608        let mut send_errors = 0;
1609        let mut failed_messages = Vec::new();
1610        
1611        // Try to send more messages after receiver is dropped  
1612        for i in NUM_MESSAGES..NUM_MESSAGES + 10 {
1613            let msg = DropTracker {
1614                id: i,
1615                counter: Arc::clone(&drop_counter),
1616            };
1617            
1618            match tx_clone.send(msg) {
1619                Ok(_) => {
1620                    // This shouldn't happen if receiver is properly dropped
1621                    panic!("Send succeeded after receiver drop - this violates Tokio behavior");
1622                }
1623                Err(send_error) => {
1624                    send_errors += 1;
1625                    failed_messages.push(send_error.0); // Extract the message from SendError
1626                }
1627            }
1628        }
1629
1630        // Drop the senders to clean up
1631        drop(tx);
1632        drop(tx_clone);
1633        
1634        // Drop the failed messages explicitly to ensure they're counted
1635        drop(failed_messages);
1636
1637        let final_drop_count = drop_counter.load(Ordering::SeqCst);
1638
1639        // Verify the Tokio documented behavior:
1640        // 1. All unread messages are drained and dropped
1641        // 2. All received messages are dropped
1642        // 3. All failed send messages are dropped
1643        let expected_total_drops = NUM_MESSAGES + send_errors;
1644        assert_eq!(final_drop_count, expected_total_drops, 
1645                  "Expected {} drops (original {} + failed sends {}), got {}", 
1646                  expected_total_drops, NUM_MESSAGES, send_errors, final_drop_count);
1647        
1648        // 4. Further send attempts result in errors
1649        assert!(send_errors > 0, "Send attempts after receiver drop should fail");
1650    }
1651
1652    #[test]
1653    fn test_tokio_api_compatibility() {
1654        let (tx, mut rx) = unbounded_channel::<i32>();
1655        
1656        // Test basic Tokio-like APIs
1657        assert!(!tx.is_closed());
1658        assert!(!rx.is_closed());
1659        assert!(rx.is_empty());
1660        assert_eq!(rx.len(), 0);
1661        
1662        // Test sender counts
1663        assert_eq!(tx.strong_count(), 1);
1664        assert_eq!(tx.weak_count(), 0);
1665        assert_eq!(rx.sender_strong_count(), 1);
1666        assert_eq!(rx.sender_weak_count(), 0);
1667        
1668        // Test channel identification
1669        assert!(tx.same_channel(&tx));
1670        assert_eq!(tx.id(), rx.id());
1671        
1672        // Test weak sender creation
1673        let _weak_tx = tx.downgrade();
1674        assert_eq!(tx.weak_count(), 1);
1675        assert_eq!(rx.sender_weak_count(), 1);
1676        
1677        // Test message sending and length tracking
1678        tx.send(42).unwrap();
1679        assert_eq!(rx.len(), 1);
1680        assert!(!rx.is_empty());
1681        
1682        // Test message receiving
1683        assert_eq!(rx.try_recv().unwrap(), 42);
1684        assert_eq!(rx.len(), 0);
1685        assert!(rx.is_empty());
1686    }
1687
1688    /// Test drop behavior when all senders are dropped (with and without remaining messages)
1689    /// This complements the receiver drop tests and ensures full Tokio compatibility
1690    #[test]
1691    fn test_sender_drop_behavior_comprehensive() {
1692        use alloc::sync::Arc;
1693        use core::sync::atomic::{AtomicUsize, Ordering};
1694
1695        #[derive(Debug)]
1696        struct DropTracker {
1697            id: usize,
1698            counter: Arc<AtomicUsize>,
1699        }
1700
1701        impl Drop for DropTracker {
1702            fn drop(&mut self) {
1703                self.counter.fetch_add(1, Ordering::SeqCst);
1704            }
1705        }
1706
1707        // Test Case 1: All senders dropped with remaining messages in channel
1708        {
1709            let drop_counter = Arc::new(AtomicUsize::new(0));
1710            const NUM_MESSAGES: usize = 50;
1711            
1712            let (tx, mut rx) = unbounded_channel::<DropTracker>();
1713            let tx2 = tx.clone();
1714            let tx3 = tx.clone();
1715            
1716            // Send messages from multiple senders
1717            for i in 0..NUM_MESSAGES {
1718                let msg = DropTracker {
1719                    id: i,
1720                    counter: Arc::clone(&drop_counter),
1721                };
1722                
1723                // Distribute sends across different senders
1724                match i % 3 {
1725                    0 => tx.send(msg).unwrap(),
1726                    1 => tx2.send(msg).unwrap(),
1727                    2 => tx3.send(msg).unwrap(),
1728                    _ => unreachable!(),
1729                }
1730            }
1731            
1732            // Receive only some messages
1733            let received_count = 15;
1734            for _ in 0..received_count {
1735                let _msg = rx.try_recv().unwrap();
1736            }
1737            
1738            assert_eq!(rx.len(), NUM_MESSAGES - received_count);
1739            assert!(!rx.is_closed());
1740            
1741            // Drop all senders - this should make channel disconnected
1742            drop(tx);
1743            drop(tx2); 
1744            drop(tx3);
1745            
1746            // Channel should now be closed
1747            assert!(rx.is_closed());
1748            
1749            // Should still be able to receive remaining messages
1750            let mut remaining_received = 0;
1751            while let Ok(_msg) = rx.try_recv() {
1752                remaining_received += 1;
1753            }
1754            
1755            assert_eq!(remaining_received, NUM_MESSAGES - received_count);
1756            
1757            // Now channel should show disconnected
1758            assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1759            
1760            // Drop receiver to clean up remaining messages (if any)
1761            drop(rx);
1762            
1763            // All messages should be dropped
1764            assert_eq!(drop_counter.load(Ordering::SeqCst), NUM_MESSAGES,
1765                      "All messages should be dropped when senders and receiver are dropped");
1766        }
1767        
1768        // Test Case 2: All senders dropped with empty channel
1769        {
1770            let drop_counter = Arc::new(AtomicUsize::new(0));
1771            const NUM_MESSAGES: usize = 30;
1772            
1773            let (tx, mut rx) = unbounded_channel::<DropTracker>();
1774            let tx2 = tx.clone();
1775            
1776            // Send and immediately receive all messages
1777            for i in 0..NUM_MESSAGES {
1778                let msg = DropTracker {
1779                    id: i + 100, // Different ID range
1780                    counter: Arc::clone(&drop_counter),
1781                };
1782                
1783                if i % 2 == 0 {
1784                    tx.send(msg).unwrap();
1785                } else {
1786                    tx2.send(msg).unwrap();
1787                }
1788                
1789                // Immediately receive
1790                let _received = rx.try_recv().unwrap();
1791            }
1792            
1793            // Channel should be empty but not closed
1794            assert!(rx.is_empty());
1795            assert!(!rx.is_closed());
1796            assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
1797            
1798            // Drop all senders
1799            drop(tx);
1800            drop(tx2);
1801            
1802            // Channel should now be closed and empty
1803            assert!(rx.is_empty());
1804            assert!(rx.is_closed());
1805            assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1806            
1807            drop(rx);
1808            
1809            // All messages should be dropped (they were all received and dropped)
1810            assert_eq!(drop_counter.load(Ordering::SeqCst), NUM_MESSAGES,
1811                      "All messages should be dropped when received");
1812        }
1813        
1814        // Test Case 3: Gradual sender drop with weak senders
1815        {
1816            let drop_counter = Arc::new(AtomicUsize::new(0));
1817            const NUM_MESSAGES: usize = 40;
1818            
1819            let (tx, mut rx) = unbounded_channel::<DropTracker>();
1820            let tx2 = tx.clone();
1821            let weak_tx = tx.downgrade();
1822            
1823            // Send some messages
1824            for i in 0..NUM_MESSAGES / 2 {
1825                let msg = DropTracker {
1826                    id: i + 200, // Different ID range
1827                    counter: Arc::clone(&drop_counter),
1828                };
1829                tx.send(msg).unwrap();
1830            }
1831            
1832            // Drop one strong sender
1833            drop(tx);
1834            assert!(!rx.is_closed()); // Still have tx2
1835            
1836            // Send more messages with remaining sender
1837            for i in NUM_MESSAGES / 2..NUM_MESSAGES {
1838                let msg = DropTracker {
1839                    id: i + 200,
1840                    counter: Arc::clone(&drop_counter),
1841                };
1842                tx2.send(msg).unwrap();
1843            }
1844            
1845            // Weak sender should still be able to upgrade
1846            let upgraded = weak_tx.upgrade();
1847            assert!(upgraded.is_some());
1848            drop(upgraded); // Important: drop the upgraded sender
1849            
1850            // Drop last strong sender
1851            drop(tx2);
1852            
1853            // Receive all messages first
1854            let mut received_all = 0;
1855            while let Ok(_msg) = rx.try_recv() {
1856                received_all += 1;
1857            }
1858            
1859            assert_eq!(received_all, NUM_MESSAGES);
1860            
1861            // Now channel should show as disconnected and closed
1862            assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1863            assert!(rx.is_closed());
1864            
1865            // Weak sender should no longer be able to upgrade
1866            assert!(weak_tx.upgrade().is_none());
1867            
1868            drop(rx);
1869            drop(weak_tx);
1870            
1871            // All messages should be dropped
1872            assert_eq!(drop_counter.load(Ordering::SeqCst), NUM_MESSAGES,
1873                      "All messages should be dropped with gradual sender drop");
1874        }
1875    }
1876}