Skip to main content

nexus_async_rt/channel/
mod.rs

1//! Async channels for task communication.
2//!
3//! # Channel Types
4//!
5//! - [`local`] — bounded MPSC for single-threaded use. `!Send`, `!Sync`.
6//!   No atomics, no `Arc`, zero synchronization overhead.
7//! - [`mpsc`] — bounded MPSC for cross-thread use. `Sender: Clone + Send + Sync`,
8//!   `Receiver: Send`. Lock-free atomic queue (nexus-queue).
9//! - [`spsc`] — bounded SPSC for cross-thread use. `Sender: Send`,
10//!   `Receiver: Send`. Single producer, single consumer. Fastest cross-thread channel.
11//! - [`spsc_bytes`] — bounded SPSC byte channel. Variable-length `&[u8]` messages
12//!   over `nexus_logbuf::spsc`. Single producer, single consumer.
13//! - [`mpsc_bytes`] — bounded MPSC byte channel. Variable-length `&[u8]` messages
14//!   over `nexus_logbuf::mpsc`. `Sender: Clone + Send`, `Receiver: Send`.
15//!
16//! All must be created inside [`Runtime::block_on`](crate::Runtime::block_on).
17//!
18//! # Example
19//!
20//! ```ignore
21//! use nexus_async_rt::channel::local;
22//!
23//! // Inside block_on:
24//! let (tx, rx) = local::channel::<u64>(64);
25//!
26//! spawn_boxed(async move {
27//!     tx.send(42).await.unwrap();
28//! });
29//!
30//! let value = rx.recv().await.unwrap();
31//! assert_eq!(value, 42);
32//! ```
33
34pub mod local;
35pub mod mpsc;
36pub mod mpsc_bytes;
37pub mod spsc;
38pub mod spsc_bytes;
39
40use std::fmt;
41
42// =============================================================================
43// Error types
44// =============================================================================
45
46/// The receiver was dropped — channel is closed.
47///
48/// Contains the value that could not be sent.
49pub struct SendError<T>(pub T);
50
51impl<T> fmt::Debug for SendError<T> {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        f.write_str("SendError(..)")
54    }
55}
56
57impl<T> fmt::Display for SendError<T> {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        f.write_str("channel closed")
60    }
61}
62
63impl<T: fmt::Debug> std::error::Error for SendError<T> {}
64
65/// Error returned by [`Sender::try_send`](mpsc::Sender::try_send).
66pub enum TrySendError<T> {
67    /// The channel buffer is full.
68    Full(T),
69    /// The receiver was dropped — channel is closed.
70    Closed(T),
71}
72
73impl<T> TrySendError<T> {
74    /// Consume the error, returning the value that could not be sent.
75    pub fn into_inner(self) -> T {
76        match self {
77            Self::Full(v) | Self::Closed(v) => v,
78        }
79    }
80
81    /// Whether this is a `Full` error.
82    pub fn is_full(&self) -> bool {
83        matches!(self, Self::Full(_))
84    }
85
86    /// Whether this is a `Closed` error.
87    pub fn is_closed(&self) -> bool {
88        matches!(self, Self::Closed(_))
89    }
90}
91
92impl<T> fmt::Debug for TrySendError<T> {
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        match self {
95            Self::Full(_) => f.write_str("Full(..)"),
96            Self::Closed(_) => f.write_str("Closed(..)"),
97        }
98    }
99}
100
101impl<T> fmt::Display for TrySendError<T> {
102    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103        match self {
104            Self::Full(_) => f.write_str("channel full"),
105            Self::Closed(_) => f.write_str("channel closed"),
106        }
107    }
108}
109
110impl<T: fmt::Debug> std::error::Error for TrySendError<T> {}
111
112/// All senders were dropped — no more values will arrive.
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub struct RecvError;
115
116impl fmt::Display for RecvError {
117    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118        f.write_str("channel closed")
119    }
120}
121
122impl std::error::Error for RecvError {}
123
124/// Error returned by [`Receiver::try_recv`](mpsc::Receiver::try_recv).
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126pub enum TryRecvError {
127    /// The channel buffer is empty.
128    Empty,
129    /// All senders were dropped — channel is closed.
130    Closed,
131}
132
133impl fmt::Display for TryRecvError {
134    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135        match self {
136            Self::Empty => f.write_str("channel empty"),
137            Self::Closed => f.write_str("channel closed"),
138        }
139    }
140}
141
142impl std::error::Error for TryRecvError {}