Skip to main content

nexus_channel/
lib.rs

1//! High-performance bounded SPSC channel for low-latency systems.
2//!
3//! This crate provides a blocking single-producer single-consumer channel
4//! optimized for trading systems and other latency-critical workloads.
5//!
6//! For MPSC (multi-producer), use [`crossbeam-channel`](https://docs.rs/crossbeam-channel)
7//! which is well-optimized for that use case. For raw MPSC queue performance without
8//! blocking semantics, see [`nexus-queue::mpsc`](https://docs.rs/nexus-queue).
9//!
10//! # Design
11//!
12//! The channel uses a three-phase backoff strategy that minimizes syscall overhead:
13//!
14//! 1. **Fast path**: Try the operation immediately
15//! 2. **Backoff**: Spin with exponential backoff using `crossbeam::Backoff`
16//! 3. **Park**: Sleep until woken by the other end
17//!
18//! The key optimization is *conditional parking*: we only issue expensive unpark
19//! syscalls when the other end has actually gone to sleep. This dramatically
20//! reduces tail latency compared to channels that unpark unconditionally.
21//!
22//! # Quick Start
23//!
24//! ```
25//! use nexus_channel::channel;
26//!
27//! let (tx, rx) = channel::<u64>(1024);
28//!
29//! tx.send(42).unwrap();
30//! assert_eq!(rx.recv().unwrap(), 42);
31//! ```
32//!
33//! # Timeout Support
34//!
35//! ```
36//! use nexus_channel::{channel, RecvTimeoutError};
37//! use std::time::Duration;
38//!
39//! let (tx, rx) = channel::<u64>(4);
40//!
41//! match rx.recv_timeout(Duration::from_millis(100)) {
42//!     Ok(value) => println!("got {}", value),
43//!     Err(RecvTimeoutError::Timeout) => println!("timed out"),
44//!     Err(RecvTimeoutError::Disconnected) => println!("sender dropped"),
45//! }
46//! ```
47//!
48//! # Performance
49//!
50//! Benchmarked against `crossbeam-channel` on Intel Core Ultra 7 @ 2.7GHz,
51//! pinned to physical cores with turbo disabled:
52//!
53//! | Metric | nexus-channel | crossbeam-channel | Improvement |
54//! |--------|---------------|-------------------|-------------|
55//! | p50 latency | 665 cycles | 1344 cycles | **2.0x** |
56//! | p999 latency | 2501 cycles | 37023 cycles | **14.8x** |
57//! | Throughput | 64 M msgs/sec | 34 M msgs/sec | **1.9x** |
58//!
59//! The large p999 improvement comes from avoiding unnecessary syscalls.
60
61#![deny(unsafe_op_in_unsafe_fn)]
62#![warn(missing_docs, missing_debug_implementations)]
63
64use core::fmt;
65use std::mem::ManuallyDrop;
66use std::sync::Arc;
67use std::sync::atomic::{AtomicBool, Ordering};
68use std::time::{Duration, Instant};
69
70use crossbeam_utils::sync::{Parker, Unparker};
71use crossbeam_utils::{Backoff, CachePadded};
72use nexus_queue::Full;
73use nexus_queue::spsc::{Consumer, Producer, ring_buffer};
74
75// Re-export spsc module for backwards compatibility
76pub mod spsc {
77    //! Single-producer single-consumer bounded channel (re-export).
78    //!
79    //! This module re-exports the channel types from the crate root for
80    //! backwards compatibility. You can also use `nexus_channel::channel()`
81    //! directly.
82
83    pub use crate::{Receiver, Sender, channel, channel_with_config};
84}
85
86// ============================================================================
87// Channel Creation
88// ============================================================================
89
90/// Default number of backoff snooze iterations before parking.
91const DEFAULT_SNOOZE_ITERS: usize = 8;
92
93/// Shared state between sender and receiver.
94struct Shared {
95    sender_parked: CachePadded<AtomicBool>,
96    receiver_parked: CachePadded<AtomicBool>,
97}
98
99/// Creates a bounded SPSC channel with the given capacity.
100///
101/// Returns a `(Sender, Receiver)` pair. The actual capacity will be rounded
102/// up to the next power of two.
103///
104/// # Panics
105///
106/// Panics if `capacity` is 0.
107///
108/// # Example
109///
110/// ```
111/// use nexus_channel::channel;
112///
113/// let (tx, rx) = channel::<String>(100);
114///
115/// tx.send("hello".to_string()).unwrap();
116/// assert_eq!(rx.recv().unwrap(), "hello");
117/// ```
118pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
119    channel_with_config(capacity, DEFAULT_SNOOZE_ITERS)
120}
121
122/// Creates a bounded SPSC channel with custom backoff configuration.
123///
124/// # Arguments
125///
126/// * `capacity` - Maximum number of messages the channel can hold (rounded to power of 2)
127/// * `snooze_iters` - Number of backoff iterations before parking. Higher values
128///   burn more CPU but reduce latency for bursty workloads.
129///
130/// # Panics
131///
132/// Panics if `capacity` is 0.
133pub fn channel_with_config<T>(capacity: usize, snooze_iters: usize) -> (Sender<T>, Receiver<T>) {
134    let (producer, consumer) = ring_buffer(capacity);
135
136    let shared = Arc::new(Shared {
137        sender_parked: CachePadded::new(AtomicBool::new(false)),
138        receiver_parked: CachePadded::new(AtomicBool::new(false)),
139    });
140
141    let sender_parker = Parker::new();
142    let sender_unparker = sender_parker.unparker().clone();
143
144    let receiver_parker = Parker::new();
145    let receiver_unparker = receiver_parker.unparker().clone();
146
147    (
148        Sender {
149            producer: ManuallyDrop::new(producer),
150            shared: Arc::clone(&shared),
151            parker: sender_parker,
152            receiver_unparker,
153            snooze_iters,
154        },
155        Receiver {
156            consumer: ManuallyDrop::new(consumer),
157            shared,
158            parker: receiver_parker,
159            sender_unparker,
160            snooze_iters,
161        },
162    )
163}
164
165// ============================================================================
166// Sender
167// ============================================================================
168
169/// The sending half of an SPSC channel.
170///
171/// Messages can be sent with [`send`](Sender::send) (blocking) or
172/// [`try_send`](Sender::try_send) (non-blocking).
173pub struct Sender<T> {
174    producer: ManuallyDrop<Producer<T>>,
175    shared: Arc<Shared>,
176    parker: Parker,
177    receiver_unparker: Unparker,
178    snooze_iters: usize,
179}
180
181impl<T> Sender<T> {
182    /// Sends a message into the channel, blocking if necessary.
183    ///
184    /// If the channel is full, this method will block until space is available
185    /// or the receiver disconnects.
186    ///
187    /// Returns `Err(SendError(value))` if the receiver has been dropped.
188    #[inline]
189    pub fn send(&self, value: T) -> Result<(), SendError<T>> {
190        if self.producer.is_disconnected() {
191            return cold_send_err(value);
192        }
193
194        let mut val = value;
195
196        // Fast path
197        match self.producer.push(val) {
198            Ok(()) => {
199                self.notify_receiver();
200                return Ok(());
201            }
202            Err(Full(v)) => val = v,
203        }
204
205        // Backoff phase
206        let backoff = Backoff::new();
207        for _ in 0..self.snooze_iters {
208            backoff.snooze();
209
210            if self.producer.is_disconnected() {
211                return cold_send_err(val);
212            }
213
214            match self.producer.push(val) {
215                Ok(()) => {
216                    self.notify_receiver();
217                    return Ok(());
218                }
219                Err(Full(v)) => val = v,
220            }
221        }
222
223        // Park phase
224        loop {
225            self.shared.sender_parked.store(true, Ordering::SeqCst);
226
227            if self.producer.is_disconnected() {
228                self.shared.sender_parked.store(false, Ordering::Relaxed);
229                return cold_send_err(val);
230            }
231
232            match self.producer.push(val) {
233                Ok(()) => {
234                    self.shared.sender_parked.store(false, Ordering::Relaxed);
235                    self.notify_receiver();
236                    return Ok(());
237                }
238                Err(Full(v)) => val = v,
239            }
240
241            self.parker.park();
242            self.shared.sender_parked.store(false, Ordering::Relaxed);
243
244            if self.producer.is_disconnected() {
245                return cold_send_err(val);
246            }
247
248            match self.producer.push(val) {
249                Ok(()) => {
250                    self.notify_receiver();
251                    return Ok(());
252                }
253                Err(Full(v)) => val = v,
254            }
255        }
256    }
257
258    /// Attempts to send a message without blocking.
259    ///
260    /// Returns immediately with:
261    /// - `Ok(())` if the message was sent
262    /// - `Err(TrySendError::Full(value))` if the channel is full
263    /// - `Err(TrySendError::Disconnected(value))` if the receiver was dropped
264    #[inline]
265    pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
266        if self.producer.is_disconnected() {
267            return cold_try_send_disconnected(value);
268        }
269
270        match self.producer.push(value) {
271            Ok(()) => {
272                self.notify_receiver();
273                Ok(())
274            }
275            Err(Full(v)) => Err(TrySendError::Full(v)),
276        }
277    }
278
279    #[inline]
280    fn notify_receiver(&self) {
281        if self.shared.receiver_parked.load(Ordering::SeqCst) {
282            self.receiver_unparker.unpark();
283        }
284    }
285
286    /// Returns `true` if the receiver has been dropped.
287    #[inline]
288    pub fn is_disconnected(&self) -> bool {
289        self.producer.is_disconnected()
290    }
291
292    /// Returns the capacity of the channel.
293    #[inline]
294    pub fn capacity(&self) -> usize {
295        self.producer.capacity()
296    }
297}
298
299impl<T> fmt::Debug for Sender<T> {
300    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
301        f.debug_struct("Sender")
302            .field("capacity", &self.capacity())
303            .field("disconnected", &self.is_disconnected())
304            .finish_non_exhaustive()
305    }
306}
307
308impl<T> Drop for Sender<T> {
309    fn drop(&mut self) {
310        // SAFETY: producer is valid (ManuallyDrop preserves it). We drop it here to
311        // trigger the Producer's disconnect logic before unparking the receiver, so
312        // the receiver sees is_disconnected() == true after waking.
313        unsafe { ManuallyDrop::drop(&mut self.producer) };
314        self.receiver_unparker.unpark();
315    }
316}
317
318// ============================================================================
319// Receiver
320// ============================================================================
321
322/// The receiving half of an SPSC channel.
323///
324/// Messages can be received with [`recv`](Receiver::recv) (blocking),
325/// [`recv_timeout`](Receiver::recv_timeout) (blocking with timeout), or
326/// [`try_recv`](Receiver::try_recv) (non-blocking).
327pub struct Receiver<T> {
328    consumer: ManuallyDrop<Consumer<T>>,
329    shared: Arc<Shared>,
330    parker: Parker,
331    sender_unparker: Unparker,
332    snooze_iters: usize,
333}
334
335impl<T> Receiver<T> {
336    /// Receives a message from the channel, blocking if necessary.
337    ///
338    /// If the channel is empty, this method will block until a message arrives
339    /// or the sender disconnects.
340    ///
341    /// Returns `Err(RecvError)` if the sender has been dropped and no messages
342    /// remain in the channel.
343    #[inline]
344    pub fn recv(&self) -> Result<T, RecvError> {
345        // Fast path
346        if let Some(v) = self.consumer.pop() {
347            self.notify_sender();
348            return Ok(v);
349        }
350
351        // Backoff phase
352        let backoff = Backoff::new();
353        for _ in 0..self.snooze_iters {
354            backoff.snooze();
355
356            if let Some(v) = self.consumer.pop() {
357                self.notify_sender();
358                return Ok(v);
359            }
360
361            if self.consumer.is_disconnected() {
362                return self.consumer.pop().ok_or(RecvError);
363            }
364        }
365
366        // Park phase
367        loop {
368            self.shared.receiver_parked.store(true, Ordering::SeqCst);
369
370            if let Some(v) = self.consumer.pop() {
371                self.shared.receiver_parked.store(false, Ordering::Relaxed);
372                self.notify_sender();
373                return Ok(v);
374            }
375
376            if self.consumer.is_disconnected() {
377                self.shared.receiver_parked.store(false, Ordering::Relaxed);
378                return cold_recv_err();
379            }
380
381            self.parker.park();
382            self.shared.receiver_parked.store(false, Ordering::Relaxed);
383
384            if let Some(v) = self.consumer.pop() {
385                self.notify_sender();
386                return Ok(v);
387            }
388
389            if self.consumer.is_disconnected() {
390                return cold_recv_err();
391            }
392        }
393    }
394
395    /// Receives a message from the channel, blocking for at most `timeout`.
396    ///
397    /// Returns:
398    /// - `Ok(value)` if a message was received
399    /// - `Err(RecvTimeoutError::Timeout)` if the timeout elapsed
400    /// - `Err(RecvTimeoutError::Disconnected)` if the sender was dropped
401    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
402        let deadline = Instant::now() + timeout;
403
404        // Fast path
405        if let Some(v) = self.consumer.pop() {
406            self.notify_sender();
407            return Ok(v);
408        }
409
410        // Backoff phase
411        let backoff = Backoff::new();
412        for _ in 0..self.snooze_iters {
413            if Instant::now() >= deadline {
414                return Err(RecvTimeoutError::Timeout);
415            }
416
417            backoff.snooze();
418
419            if let Some(v) = self.consumer.pop() {
420                self.notify_sender();
421                return Ok(v);
422            }
423
424            if self.consumer.is_disconnected() {
425                return self.consumer.pop().ok_or(RecvTimeoutError::Disconnected);
426            }
427        }
428
429        // Park phase with timeout
430        loop {
431            let now = Instant::now();
432            if now >= deadline {
433                return Err(RecvTimeoutError::Timeout);
434            }
435
436            self.shared.receiver_parked.store(true, Ordering::SeqCst);
437
438            if let Some(v) = self.consumer.pop() {
439                self.shared.receiver_parked.store(false, Ordering::Relaxed);
440                self.notify_sender();
441                return Ok(v);
442            }
443
444            if self.consumer.is_disconnected() {
445                self.shared.receiver_parked.store(false, Ordering::Relaxed);
446                return cold_recv_timeout_disconnected();
447            }
448
449            let remaining = deadline - now;
450            self.parker.park_timeout(remaining);
451            self.shared.receiver_parked.store(false, Ordering::Relaxed);
452
453            if let Some(v) = self.consumer.pop() {
454                self.notify_sender();
455                return Ok(v);
456            }
457
458            if self.consumer.is_disconnected() {
459                return cold_recv_timeout_disconnected();
460            }
461        }
462    }
463
464    /// Attempts to receive a message without blocking.
465    ///
466    /// Returns immediately with:
467    /// - `Ok(value)` if a message was available
468    /// - `Err(TryRecvError::Empty)` if the channel is empty
469    /// - `Err(TryRecvError::Disconnected)` if the sender was dropped and channel is empty
470    #[inline]
471    #[allow(clippy::option_if_let_else)]
472    pub fn try_recv(&self) -> Result<T, TryRecvError> {
473        match self.consumer.pop() {
474            Some(v) => {
475                self.notify_sender();
476                Ok(v)
477            }
478            None => {
479                if self.consumer.is_disconnected() {
480                    cold_try_recv_disconnected()
481                } else {
482                    Err(TryRecvError::Empty)
483                }
484            }
485        }
486    }
487
488    #[inline]
489    fn notify_sender(&self) {
490        if self.shared.sender_parked.load(Ordering::SeqCst) {
491            self.sender_unparker.unpark();
492        }
493    }
494
495    /// Returns `true` if the sender has been dropped.
496    #[inline]
497    pub fn is_disconnected(&self) -> bool {
498        self.consumer.is_disconnected()
499    }
500
501    /// Returns the capacity of the channel.
502    #[inline]
503    pub fn capacity(&self) -> usize {
504        self.consumer.capacity()
505    }
506}
507
508impl<T> fmt::Debug for Receiver<T> {
509    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
510        f.debug_struct("Receiver")
511            .field("capacity", &self.capacity())
512            .field("disconnected", &self.is_disconnected())
513            .finish_non_exhaustive()
514    }
515}
516
517impl<T> Drop for Receiver<T> {
518    fn drop(&mut self) {
519        // SAFETY: consumer is valid (ManuallyDrop preserves it). We drop it here to
520        // trigger the Consumer's disconnect logic before unparking the sender, so
521        // the sender sees is_disconnected() == true after waking.
522        unsafe { ManuallyDrop::drop(&mut self.consumer) };
523        self.sender_unparker.unpark();
524    }
525}
526
527// ============================================================================
528// Error Types
529// ============================================================================
530
531/// Error returned when sending fails due to disconnection.
532///
533/// Contains the message that could not be sent, allowing recovery of the value.
534#[derive(Debug, Clone, Copy, PartialEq, Eq)]
535pub struct SendError<T>(pub T);
536
537impl<T> SendError<T> {
538    /// Returns the message that could not be sent.
539    pub fn into_inner(self) -> T {
540        self.0
541    }
542}
543
544impl<T> fmt::Display for SendError<T> {
545    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
546        write!(f, "channel disconnected")
547    }
548}
549
550impl<T: fmt::Debug> std::error::Error for SendError<T> {}
551
552/// Error returned when receiving fails due to disconnection.
553///
554/// This error occurs when all senders have been dropped and no messages
555/// remain in the channel.
556#[derive(Debug, Clone, Copy, PartialEq, Eq)]
557pub struct RecvError;
558
559impl fmt::Display for RecvError {
560    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
561        write!(f, "channel disconnected")
562    }
563}
564
565impl std::error::Error for RecvError {}
566
567/// Error returned by `try_send`.
568#[derive(Debug, Clone, Copy, PartialEq, Eq)]
569pub enum TrySendError<T> {
570    /// The channel is full but still connected.
571    Full(T),
572
573    /// The receiver has been dropped.
574    Disconnected(T),
575}
576
577impl<T> TrySendError<T> {
578    /// Returns the message that could not be sent.
579    pub fn into_inner(self) -> T {
580        match self {
581            TrySendError::Full(v) | TrySendError::Disconnected(v) => v,
582        }
583    }
584
585    /// Returns `true` if this error is the `Full` variant.
586    pub fn is_full(&self) -> bool {
587        matches!(self, TrySendError::Full(_))
588    }
589
590    /// Returns `true` if this error is the `Disconnected` variant.
591    pub fn is_disconnected(&self) -> bool {
592        matches!(self, TrySendError::Disconnected(_))
593    }
594}
595
596impl<T> fmt::Display for TrySendError<T> {
597    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
598        match self {
599            TrySendError::Full(_) => write!(f, "channel full"),
600            TrySendError::Disconnected(_) => write!(f, "channel disconnected"),
601        }
602    }
603}
604
605impl<T: fmt::Debug> std::error::Error for TrySendError<T> {}
606
607/// Error returned by `try_recv`.
608#[derive(Debug, Clone, Copy, PartialEq, Eq)]
609pub enum TryRecvError {
610    /// The channel is empty but still connected.
611    Empty,
612
613    /// All senders have been dropped and no messages remain.
614    Disconnected,
615}
616
617impl TryRecvError {
618    /// Returns `true` if this error is the `Empty` variant.
619    pub fn is_empty(&self) -> bool {
620        matches!(self, TryRecvError::Empty)
621    }
622
623    /// Returns `true` if this error is the `Disconnected` variant.
624    pub fn is_disconnected(&self) -> bool {
625        matches!(self, TryRecvError::Disconnected)
626    }
627}
628
629impl fmt::Display for TryRecvError {
630    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
631        match self {
632            TryRecvError::Empty => write!(f, "channel empty"),
633            TryRecvError::Disconnected => write!(f, "channel disconnected"),
634        }
635    }
636}
637
638impl std::error::Error for TryRecvError {}
639
640/// Error returned by `recv_timeout`.
641#[derive(Debug, Clone, Copy, PartialEq, Eq)]
642pub enum RecvTimeoutError {
643    /// The timeout elapsed before a message arrived.
644    Timeout,
645
646    /// All senders have been dropped and no messages remain.
647    Disconnected,
648}
649
650impl RecvTimeoutError {
651    /// Returns `true` if this error is the `Timeout` variant.
652    pub fn is_timeout(&self) -> bool {
653        matches!(self, RecvTimeoutError::Timeout)
654    }
655
656    /// Returns `true` if this error is the `Disconnected` variant.
657    pub fn is_disconnected(&self) -> bool {
658        matches!(self, RecvTimeoutError::Disconnected)
659    }
660}
661
662impl fmt::Display for RecvTimeoutError {
663    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
664        match self {
665            RecvTimeoutError::Timeout => write!(f, "timed out"),
666            RecvTimeoutError::Disconnected => write!(f, "channel disconnected"),
667        }
668    }
669}
670
671impl std::error::Error for RecvTimeoutError {}
672
673// ============================================================================
674// Cold error constructors
675// ============================================================================
676
677#[cold]
678fn cold_send_err<T>(val: T) -> Result<(), SendError<T>> {
679    Err(SendError(val))
680}
681
682#[cold]
683fn cold_try_send_disconnected<T>(val: T) -> Result<(), TrySendError<T>> {
684    Err(TrySendError::Disconnected(val))
685}
686
687#[cold]
688fn cold_recv_err<T>() -> Result<T, RecvError> {
689    Err(RecvError)
690}
691
692#[cold]
693fn cold_try_recv_disconnected<T>() -> Result<T, TryRecvError> {
694    Err(TryRecvError::Disconnected)
695}
696
697#[cold]
698fn cold_recv_timeout_disconnected<T>() -> Result<T, RecvTimeoutError> {
699    Err(RecvTimeoutError::Disconnected)
700}
701
702// ============================================================================
703// Tests
704// ============================================================================
705
706#[cfg(test)]
707mod tests {
708    use super::*;
709    use std::sync::atomic::AtomicUsize;
710    use std::thread;
711
712    // ============================================================================
713    // Basic Operations
714    // ============================================================================
715
716    #[test]
717    fn basic_send_recv() {
718        let (tx, rx) = channel::<u64>(4);
719
720        tx.send(1).unwrap();
721        tx.send(2).unwrap();
722        tx.send(3).unwrap();
723
724        assert_eq!(rx.recv().unwrap(), 1);
725        assert_eq!(rx.recv().unwrap(), 2);
726        assert_eq!(rx.recv().unwrap(), 3);
727    }
728
729    #[test]
730    fn try_send_try_recv() {
731        let (tx, rx) = channel::<u64>(2);
732
733        assert!(tx.try_send(1).is_ok());
734        assert!(tx.try_send(2).is_ok());
735        assert!(matches!(tx.try_send(3), Err(TrySendError::Full(3))));
736
737        assert_eq!(rx.try_recv().unwrap(), 1);
738        assert_eq!(rx.try_recv().unwrap(), 2);
739        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
740    }
741
742    #[test]
743    fn send_fills_then_recv_drains() {
744        let (tx, rx) = channel::<u64>(4);
745
746        for i in 0..4 {
747            tx.try_send(i).unwrap();
748        }
749        assert!(matches!(tx.try_send(99), Err(TrySendError::Full(99))));
750
751        for i in 0..4 {
752            assert_eq!(rx.recv().unwrap(), i);
753        }
754        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
755    }
756
757    // ============================================================================
758    // Timeout Operations
759    // ============================================================================
760
761    #[test]
762    fn recv_timeout_success() {
763        let (tx, rx) = channel::<u64>(4);
764
765        tx.send(42).unwrap();
766
767        let result = rx.recv_timeout(Duration::from_millis(100));
768        assert_eq!(result.unwrap(), 42);
769    }
770
771    #[test]
772    fn recv_timeout_expires() {
773        let (_tx, rx) = channel::<u64>(4);
774
775        let start = Instant::now();
776        let result = rx.recv_timeout(Duration::from_millis(50));
777
778        assert!(matches!(result, Err(RecvTimeoutError::Timeout)));
779        assert!(start.elapsed() >= Duration::from_millis(50));
780    }
781
782    #[test]
783    fn recv_timeout_disconnected() {
784        let (tx, rx) = channel::<u64>(4);
785
786        drop(tx);
787
788        let result = rx.recv_timeout(Duration::from_millis(100));
789        assert!(matches!(result, Err(RecvTimeoutError::Disconnected)));
790    }
791
792    #[test]
793    fn recv_timeout_data_arrives() {
794        let (tx, rx) = channel::<u64>(4);
795
796        let handle = thread::spawn(move || {
797            thread::sleep(Duration::from_millis(25));
798            tx.send(42).unwrap();
799        });
800
801        let result = rx.recv_timeout(Duration::from_millis(100));
802        assert_eq!(result.unwrap(), 42);
803
804        handle.join().unwrap();
805    }
806
807    #[test]
808    fn recv_timeout_disconnect_while_waiting() {
809        let (tx, rx) = channel::<u64>(4);
810
811        let handle = thread::spawn(move || {
812            thread::sleep(Duration::from_millis(25));
813            drop(tx);
814        });
815
816        let result = rx.recv_timeout(Duration::from_millis(100));
817        assert!(matches!(result, Err(RecvTimeoutError::Disconnected)));
818
819        handle.join().unwrap();
820    }
821
822    // ============================================================================
823    // Disconnection
824    // ============================================================================
825
826    #[test]
827    fn recv_returns_error_when_sender_dropped() {
828        let (tx, rx) = channel::<u64>(4);
829
830        drop(tx);
831
832        assert!(rx.recv().is_err());
833        assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
834    }
835
836    #[test]
837    fn recv_drains_before_error_when_sender_dropped() {
838        let (tx, rx) = channel::<u64>(4);
839
840        tx.send(1).unwrap();
841        tx.send(2).unwrap();
842        drop(tx);
843
844        assert_eq!(rx.recv().unwrap(), 1);
845        assert_eq!(rx.recv().unwrap(), 2);
846        assert!(rx.recv().is_err());
847    }
848
849    #[test]
850    fn send_returns_error_when_receiver_dropped() {
851        let (tx, rx) = channel::<u64>(4);
852
853        drop(rx);
854
855        assert!(tx.send(1).is_err());
856        assert!(matches!(tx.try_send(1), Err(TrySendError::Disconnected(1))));
857    }
858
859    #[test]
860    fn is_disconnected_sender() {
861        let (tx, rx) = channel::<u64>(4);
862
863        assert!(!tx.is_disconnected());
864        drop(rx);
865        assert!(tx.is_disconnected());
866    }
867
868    #[test]
869    fn is_disconnected_receiver() {
870        let (tx, rx) = channel::<u64>(4);
871
872        assert!(!rx.is_disconnected());
873        drop(tx);
874        assert!(rx.is_disconnected());
875    }
876
877    // ============================================================================
878    // Cross-Thread Basic
879    // ============================================================================
880
881    #[test]
882    fn cross_thread_single_message() {
883        let (tx, rx) = channel::<u64>(4);
884
885        let handle = thread::spawn(move || rx.recv().unwrap());
886
887        tx.send(42).unwrap();
888
889        assert_eq!(handle.join().unwrap(), 42);
890    }
891
892    #[test]
893    fn cross_thread_multiple_messages() {
894        let (tx, rx) = channel::<u64>(4);
895
896        let handle = thread::spawn(move || {
897            let mut sum = 0;
898            for _ in 0..100 {
899                sum += rx.recv().unwrap();
900            }
901            sum
902        });
903
904        for i in 0..100 {
905            tx.send(i).unwrap();
906        }
907
908        let sum = handle.join().unwrap();
909        assert_eq!(sum, 99 * 100 / 2);
910    }
911
912    // ============================================================================
913    // FIFO Ordering
914    // ============================================================================
915
916    #[test]
917    fn fifo_ordering_single_thread() {
918        let (tx, rx) = channel::<u64>(8);
919
920        for i in 0..8 {
921            tx.try_send(i).unwrap();
922        }
923
924        for i in 0..8 {
925            assert_eq!(rx.recv().unwrap(), i);
926        }
927    }
928
929    #[test]
930    fn fifo_ordering_cross_thread() {
931        let (tx, rx) = channel::<u64>(64);
932
933        let handle = thread::spawn(move || {
934            let mut expected = 0u64;
935            while expected < 10_000 {
936                let val = rx.recv().unwrap();
937                assert_eq!(val, expected, "FIFO order violated");
938                expected += 1;
939            }
940        });
941
942        for i in 0..10_000 {
943            tx.send(i).unwrap();
944        }
945
946        handle.join().unwrap();
947    }
948
949    // ============================================================================
950    // Blocking Behavior
951    // ============================================================================
952
953    #[test]
954    fn recv_blocks_until_send() {
955        let (tx, rx) = channel::<u64>(4);
956
957        let start = Instant::now();
958
959        let handle = thread::spawn(move || rx.recv().unwrap());
960
961        thread::sleep(Duration::from_millis(50));
962        tx.send(42).unwrap();
963
964        let val = handle.join().unwrap();
965        assert_eq!(val, 42);
966        assert!(start.elapsed() >= Duration::from_millis(50));
967    }
968
969    #[test]
970    fn send_blocks_until_recv() {
971        let (tx, rx) = channel::<u64>(2);
972
973        // Fill the buffer
974        tx.try_send(1).unwrap();
975        tx.try_send(2).unwrap();
976
977        let start = Instant::now();
978
979        let handle = thread::spawn(move || {
980            tx.send(3).unwrap(); // Should block
981            tx
982        });
983
984        thread::sleep(Duration::from_millis(50));
985        rx.recv().unwrap(); // Free up space
986
987        let _ = handle.join().unwrap();
988        assert!(start.elapsed() >= Duration::from_millis(50));
989    }
990
991    // ============================================================================
992    // Wake on Disconnect
993    // ============================================================================
994
995    #[test]
996    fn recv_wakes_on_sender_drop() {
997        let (tx, rx) = channel::<u64>(4);
998
999        let handle = thread::spawn(move || {
1000            let result = rx.recv();
1001            assert!(result.is_err());
1002        });
1003
1004        thread::sleep(Duration::from_millis(50));
1005        drop(tx);
1006
1007        // Should complete, not hang
1008        handle.join().unwrap();
1009    }
1010
1011    #[test]
1012    fn send_wakes_on_receiver_drop() {
1013        let (tx, rx) = channel::<u64>(1);
1014
1015        tx.try_send(1).unwrap(); // Fill it
1016
1017        let handle = thread::spawn(move || {
1018            let result = tx.send(2); // Should block then error
1019            assert!(result.is_err());
1020        });
1021
1022        thread::sleep(Duration::from_millis(50));
1023        drop(rx);
1024
1025        // Should complete, not hang
1026        handle.join().unwrap();
1027    }
1028
1029    // ============================================================================
1030    // Capacity Edge Cases
1031    // ============================================================================
1032
1033    #[test]
1034    fn capacity_one() {
1035        let (tx, rx) = channel::<u64>(1);
1036
1037        for i in 0..100 {
1038            tx.send(i).unwrap();
1039            assert_eq!(rx.recv().unwrap(), i);
1040        }
1041    }
1042
1043    #[test]
1044    fn capacity_one_cross_thread() {
1045        let (tx, rx) = channel::<u64>(1);
1046
1047        let handle = thread::spawn(move || {
1048            for _ in 0..1000 {
1049                rx.recv().unwrap();
1050            }
1051        });
1052
1053        for i in 0..1000 {
1054            tx.send(i).unwrap();
1055        }
1056
1057        handle.join().unwrap();
1058    }
1059
1060    // ============================================================================
1061    // Drop Behavior
1062    // ============================================================================
1063
1064    #[test]
1065    fn values_dropped_on_channel_drop() {
1066        static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
1067
1068        #[derive(Debug)]
1069        struct DropCounter;
1070        impl Drop for DropCounter {
1071            fn drop(&mut self) {
1072                DROP_COUNT.fetch_add(1, Ordering::SeqCst);
1073            }
1074        }
1075
1076        DROP_COUNT.store(0, Ordering::SeqCst);
1077
1078        let (tx, rx) = channel::<DropCounter>(4);
1079
1080        tx.try_send(DropCounter).unwrap();
1081        tx.try_send(DropCounter).unwrap();
1082        tx.try_send(DropCounter).unwrap();
1083
1084        assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 0);
1085
1086        drop(tx);
1087        drop(rx);
1088
1089        assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 3);
1090    }
1091
1092    #[test]
1093    fn failed_send_returns_value() {
1094        let (tx, rx) = channel::<String>(1);
1095
1096        tx.try_send("hello".to_string()).unwrap();
1097
1098        let err = tx.try_send("world".to_string());
1099        match err {
1100            Err(TrySendError::Full(s)) => assert_eq!(s, "world"),
1101            _ => panic!("expected Full error"),
1102        }
1103
1104        drop(rx);
1105
1106        let err = tx.try_send("test".to_string());
1107        match err {
1108            Err(TrySendError::Disconnected(s)) => assert_eq!(s, "test"),
1109            _ => panic!("expected Disconnected error"),
1110        }
1111    }
1112
1113    // ============================================================================
1114    // Special Types
1115    // ============================================================================
1116
1117    #[test]
1118    fn zero_sized_type() {
1119        let (tx, rx) = channel::<()>(4);
1120
1121        tx.send(()).unwrap();
1122        tx.send(()).unwrap();
1123
1124        assert_eq!(rx.recv().unwrap(), ());
1125        assert_eq!(rx.recv().unwrap(), ());
1126    }
1127
1128    #[test]
1129    fn large_message_type() {
1130        #[derive(Clone, PartialEq, Debug)]
1131        struct LargeMessage {
1132            data: [u8; 4096],
1133        }
1134
1135        let (tx, rx) = channel::<LargeMessage>(4);
1136
1137        let msg = LargeMessage { data: [42u8; 4096] };
1138        tx.send(msg).unwrap();
1139
1140        let received = rx.recv().unwrap();
1141        assert_eq!(received.data[0], 42);
1142        assert_eq!(received.data[4095], 42);
1143    }
1144
1145    // ============================================================================
1146    // Multiple Laps
1147    // ============================================================================
1148
1149    #[test]
1150    fn many_laps_single_thread() {
1151        let (tx, rx) = channel::<u64>(4);
1152
1153        // 1000 messages through 4-slot buffer = 250 laps
1154        for i in 0..1000 {
1155            tx.send(i).unwrap();
1156            assert_eq!(rx.recv().unwrap(), i);
1157        }
1158    }
1159
1160    #[test]
1161    fn many_laps_cross_thread() {
1162        const COUNT: u64 = 100_000;
1163
1164        let (tx, rx) = channel::<u64>(4); // Small buffer, many laps
1165
1166        let producer = thread::spawn(move || {
1167            for i in 0..COUNT {
1168                tx.send(i).unwrap();
1169            }
1170        });
1171
1172        let consumer = thread::spawn(move || {
1173            let mut expected = 0u64;
1174            while expected < COUNT {
1175                let val = rx.recv().unwrap();
1176                assert_eq!(val, expected);
1177                expected += 1;
1178            }
1179        });
1180
1181        producer.join().unwrap();
1182        consumer.join().unwrap();
1183    }
1184
1185    // ============================================================================
1186    // Stress Tests
1187    // ============================================================================
1188
1189    #[test]
1190    fn stress_high_volume() {
1191        const COUNT: u64 = 100_000;
1192
1193        let (tx, rx) = channel::<u64>(1024);
1194
1195        let producer = thread::spawn(move || {
1196            for i in 0..COUNT {
1197                tx.send(i).unwrap();
1198            }
1199        });
1200
1201        let consumer = thread::spawn(move || {
1202            let mut sum = 0u64;
1203            for _ in 0..COUNT {
1204                sum = sum.wrapping_add(rx.recv().unwrap());
1205            }
1206            sum
1207        });
1208
1209        producer.join().unwrap();
1210        let sum = consumer.join().unwrap();
1211        assert_eq!(sum, COUNT * (COUNT - 1) / 2);
1212    }
1213
1214    #[test]
1215    fn stress_small_buffer() {
1216        const COUNT: u64 = 10_000;
1217
1218        let (tx, rx) = channel::<u64>(4);
1219
1220        let producer = thread::spawn(move || {
1221            for i in 0..COUNT {
1222                tx.send(i).unwrap();
1223            }
1224        });
1225
1226        let consumer = thread::spawn(move || {
1227            let mut received = 0u64;
1228            while received < COUNT {
1229                rx.recv().unwrap();
1230                received += 1;
1231            }
1232            received
1233        });
1234
1235        producer.join().unwrap();
1236        let received = consumer.join().unwrap();
1237        assert_eq!(received, COUNT);
1238    }
1239
1240    #[test]
1241    fn stress_capacity_one_high_volume() {
1242        const COUNT: u64 = 10_000;
1243
1244        let (tx, rx) = channel::<u64>(1);
1245
1246        let producer = thread::spawn(move || {
1247            for i in 0..COUNT {
1248                tx.send(i).unwrap();
1249            }
1250        });
1251
1252        let consumer = thread::spawn(move || {
1253            let mut expected = 0u64;
1254            while expected < COUNT {
1255                let val = rx.recv().unwrap();
1256                assert_eq!(val, expected);
1257                expected += 1;
1258            }
1259        });
1260
1261        producer.join().unwrap();
1262        consumer.join().unwrap();
1263    }
1264
1265    // ============================================================================
1266    // Ping-Pong Tests (exercises park/unpark heavily)
1267    // ============================================================================
1268
1269    #[test]
1270    fn ping_pong_basic() {
1271        let (tx1, rx1) = channel::<u64>(1);
1272        let (tx2, rx2) = channel::<u64>(1);
1273
1274        let handle = thread::spawn(move || {
1275            for i in 0..1000 {
1276                let val = rx1.recv().unwrap();
1277                assert_eq!(val, i);
1278                tx2.send(i).unwrap();
1279            }
1280        });
1281
1282        for i in 0..1000 {
1283            tx1.send(i).unwrap();
1284            let val = rx2.recv().unwrap();
1285            assert_eq!(val, i);
1286        }
1287
1288        handle.join().unwrap();
1289    }
1290
1291    #[test]
1292    fn ping_pong_high_iterations() {
1293        let (tx1, rx1) = channel::<u64>(1);
1294        let (tx2, rx2) = channel::<u64>(1);
1295
1296        let handle = thread::spawn(move || {
1297            for i in 0..10_000 {
1298                let val = rx1.recv().unwrap();
1299                assert_eq!(val, i);
1300                tx2.send(i * 2).unwrap();
1301            }
1302        });
1303
1304        for i in 0..10_000 {
1305            tx1.send(i).unwrap();
1306            let val = rx2.recv().unwrap();
1307            assert_eq!(val, i * 2);
1308        }
1309
1310        handle.join().unwrap();
1311    }
1312
1313    // ============================================================================
1314    // Deadlock Prevention Tests
1315    // ============================================================================
1316
1317    #[test]
1318    fn no_deadlock_alternating() {
1319        let (tx, rx) = channel::<u64>(1);
1320
1321        let handle = thread::spawn(move || {
1322            for i in 0..1000u64 {
1323                tx.send(i).unwrap();
1324            }
1325        });
1326
1327        for _ in 0..1000 {
1328            rx.recv().unwrap();
1329        }
1330
1331        handle.join().unwrap();
1332    }
1333
1334    #[test]
1335    fn no_deadlock_burst_then_drain() {
1336        let (tx, rx) = channel::<u64>(8);
1337
1338        for round in 0..100 {
1339            // Burst
1340            for i in 0..8 {
1341                tx.try_send(round * 8 + i).unwrap();
1342            }
1343            // Drain
1344            for i in 0..8 {
1345                assert_eq!(rx.recv().unwrap(), round * 8 + i);
1346            }
1347        }
1348    }
1349
1350    #[test]
1351    fn no_deadlock_concurrent_full_empty_transitions() {
1352        let (tx, rx) = channel::<u64>(2);
1353
1354        let producer = thread::spawn(move || {
1355            for i in 0..10_000u64 {
1356                tx.send(i).unwrap();
1357            }
1358        });
1359
1360        let consumer = thread::spawn(move || {
1361            for _ in 0..10_000 {
1362                rx.recv().unwrap();
1363            }
1364        });
1365
1366        producer.join().unwrap();
1367        consumer.join().unwrap();
1368    }
1369
1370    #[test]
1371    fn no_deadlock_disconnect_while_blocked_recv() {
1372        let (tx, rx) = channel::<u64>(1);
1373
1374        let handle = thread::spawn(move || {
1375            // Will block waiting for data
1376            let result = rx.recv();
1377            assert!(result.is_err()); // Should error, not deadlock
1378        });
1379
1380        thread::sleep(Duration::from_millis(50));
1381        drop(tx); // Disconnect while receiver is blocked
1382
1383        handle.join().unwrap();
1384    }
1385
1386    #[test]
1387    fn no_deadlock_disconnect_while_blocked_send() {
1388        let (tx, rx) = channel::<u64>(1);
1389        tx.try_send(1).unwrap(); // Fill it
1390
1391        let handle = thread::spawn(move || {
1392            // Will block waiting for space
1393            let result = tx.send(2);
1394            assert!(result.is_err()); // Should error, not deadlock
1395        });
1396
1397        thread::sleep(Duration::from_millis(50));
1398        drop(rx); // Disconnect while sender is blocked
1399
1400        handle.join().unwrap();
1401    }
1402
1403    // ============================================================================
1404    // Stress: Rapid Park/Unpark Cycles
1405    // ============================================================================
1406
1407    #[test]
1408    fn stress_rapid_park_unpark_sender() {
1409        let (tx, rx) = channel::<u64>(1);
1410
1411        let handle = thread::spawn(move || {
1412            for _ in 0..10_000 {
1413                rx.recv().unwrap();
1414            }
1415        });
1416
1417        for i in 0..10_000 {
1418            tx.send(i).unwrap();
1419        }
1420
1421        handle.join().unwrap();
1422    }
1423
1424    #[test]
1425    fn stress_rapid_park_unpark_receiver() {
1426        let (tx, rx) = channel::<u64>(1);
1427
1428        let handle = thread::spawn(move || {
1429            for i in 0..10_000 {
1430                tx.send(i).unwrap();
1431            }
1432        });
1433
1434        for _ in 0..10_000 {
1435            rx.recv().unwrap();
1436        }
1437
1438        handle.join().unwrap();
1439    }
1440
1441    #[test]
1442    fn stress_park_unpark_both_sides() {
1443        // Both sender and receiver will park repeatedly
1444        let (tx, rx) = channel::<u64>(1);
1445
1446        let sender = thread::spawn(move || {
1447            for i in 0..50_000 {
1448                tx.send(i).unwrap();
1449            }
1450        });
1451
1452        let receiver = thread::spawn(move || {
1453            let mut count = 0;
1454            for _ in 0..50_000 {
1455                rx.recv().unwrap();
1456                count += 1;
1457            }
1458            count
1459        });
1460
1461        sender.join().unwrap();
1462        assert_eq!(receiver.join().unwrap(), 50_000);
1463    }
1464
1465    // ============================================================================
1466    // Timed Tests (ensure no indefinite blocking)
1467    // ============================================================================
1468
1469    #[test]
1470    fn completes_in_reasonable_time() {
1471        use std::sync::mpsc;
1472
1473        let (done_tx, done_rx) = mpsc::channel();
1474
1475        let handle = thread::spawn(move || {
1476            let (tx, rx) = channel::<u64>(1);
1477
1478            let h = thread::spawn(move || {
1479                for i in 0..1000 {
1480                    tx.send(i).unwrap();
1481                }
1482            });
1483
1484            for _ in 0..1000 {
1485                rx.recv().unwrap();
1486            }
1487
1488            h.join().unwrap();
1489            done_tx.send(()).unwrap();
1490        });
1491
1492        // Should complete in well under a second
1493        let result = done_rx.recv_timeout(Duration::from_secs(5));
1494        assert!(result.is_ok(), "Test timed out - possible deadlock!");
1495
1496        handle.join().unwrap();
1497    }
1498
1499    #[test]
1500    fn does_not_hang_on_disconnect_during_recv() {
1501        let done = Arc::new(AtomicBool::new(false));
1502        let done_clone = done.clone();
1503
1504        let (tx, rx) = channel::<u64>(4);
1505
1506        let handle = thread::spawn(move || {
1507            let _ = rx.recv(); // Will block, then return Err on disconnect
1508            done_clone.store(true, Ordering::SeqCst);
1509        });
1510
1511        thread::sleep(Duration::from_millis(50));
1512        assert!(!done.load(Ordering::SeqCst)); // Still blocked
1513
1514        drop(tx);
1515
1516        handle.join().unwrap();
1517        assert!(done.load(Ordering::SeqCst)); // Completed
1518    }
1519
1520    #[test]
1521    fn does_not_hang_on_disconnect_during_send() {
1522        let done = Arc::new(AtomicBool::new(false));
1523        let done_clone = done.clone();
1524
1525        let (tx, rx) = channel::<u64>(1);
1526        tx.try_send(1).unwrap(); // Fill it
1527
1528        let handle = thread::spawn(move || {
1529            let _ = tx.send(2); // Will block, then return Err on disconnect
1530            done_clone.store(true, Ordering::SeqCst);
1531        });
1532
1533        thread::sleep(Duration::from_millis(50));
1534        assert!(!done.load(Ordering::SeqCst)); // Still blocked
1535
1536        drop(rx);
1537
1538        handle.join().unwrap();
1539        assert!(done.load(Ordering::SeqCst)); // Completed
1540    }
1541
1542    // ============================================================================
1543    // Rapid Connect/Disconnect
1544    // ============================================================================
1545
1546    #[test]
1547    fn rapid_channel_creation() {
1548        for _ in 0..1000 {
1549            let (tx, rx) = channel::<u64>(4);
1550            tx.try_send(1).unwrap();
1551            assert_eq!(rx.recv().unwrap(), 1);
1552        }
1553    }
1554
1555    #[test]
1556    fn rapid_disconnect() {
1557        for _ in 0..1000 {
1558            let (tx, rx) = channel::<u64>(4);
1559            drop(tx);
1560            drop(rx);
1561        }
1562    }
1563}