Skip to main content

zng_task/
channel.rs

1//! Communication channels.
2//!
3//! Use [`bounded`], [`unbounded`] and [`rendezvous`] to create channels for use across threads in the same process.
4//! Use [`ipc_unbounded`] to create channels that work across processes.
5//!
6//! # Examples
7//!
8//! ```no_run
9//! use zng_task::{self as task, channel};
10//! # use zng_unit::*;
11//!
12//! let (sender, receiver) = channel::bounded(5);
13//!
14//! task::spawn(async move {
15//!     task::deadline(5.secs()).await;
16//!     if let Err(e) = sender.send("Data!").await {
17//!         eprintln!("no receiver connected, did not send message: '{e}'")
18//!     }
19//! });
20//! task::spawn(async move {
21//!     match receiver.recv().await {
22//!         Ok(msg) => println!("{msg}"),
23//!         Err(_) => eprintln!("no message in channel and no sender connected"),
24//!     }
25//! });
26//! ```
27
28use std::{fmt, sync::Arc, time::Duration};
29
30use zng_time::{Deadline, INSTANT};
31
32mod ipc;
33pub use ipc::{IpcReceiver, IpcSender, IpcValue, NamedIpcReceiver, NamedIpcSender, ipc_unbounded};
34
35mod ipc_bytes;
36mod ipc_bytes_cast;
37#[cfg(ipc)]
38mod ipc_bytes_memmap;
39mod ipc_bytes_mut;
40mod ipc_read;
41pub use ipc_bytes::{IpcBytes, IpcBytesIntoIter, WeakIpcBytes};
42pub use ipc_bytes_cast::{IpcBytesCast, IpcBytesCastIntoIter, IpcBytesMutCast};
43pub use ipc_bytes_mut::{IpcBytesMut, IpcBytesWriter, IpcBytesWriterBlocking};
44pub use ipc_read::{IpcRead, IpcReadBlocking, IpcReadHandle};
45
46#[cfg(ipc)]
47pub use ipc_bytes::{is_ipc_serialization, with_ipc_serialization};
48
49mod ipc_file;
50pub use ipc_file::IpcFileHandle;
51
52use zng_txt::ToTxt;
53
54/// The transmitting end of a channel.
55///
56/// Use [`unbounded`], [`bounded`] or [`rendezvous`] to create a channel.
57pub struct Sender<T>(flume::Sender<T>);
58impl<T> fmt::Debug for Sender<T> {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        write!(f, "Sender<{}>", pretty_type_name::pretty_type_name::<T>())
61    }
62}
63impl<T> Clone for Sender<T> {
64    fn clone(&self) -> Self {
65        Sender(self.0.clone())
66    }
67}
68impl<T> From<flume::Sender<T>> for Sender<T> {
69    fn from(s: flume::Sender<T>) -> Self {
70        Sender(s)
71    }
72}
73impl<T> From<Sender<T>> for flume::Sender<T> {
74    fn from(s: Sender<T>) -> Self {
75        s.0
76    }
77}
78impl<T> Sender<T> {
79    /// Send a value into the channel.
80    ///
81    /// Waits until there is space in the channel buffer.
82    ///
83    /// Returns an error if all receivers have been dropped.
84    pub async fn send(&self, msg: T) -> Result<(), ChannelError> {
85        self.0.send_async(msg).await?;
86        Ok(())
87    }
88
89    /// Send a value into the channel.
90    ///
91    /// Waits until there is space in the channel buffer or the `deadline` is reached.
92    ///
93    /// Returns an error if all receivers have been dropped or the `deadline` is reached. The `msg` is lost in case of timeout.
94    pub async fn send_deadline(&self, msg: T, deadline: impl Into<Deadline>) -> Result<(), ChannelError> {
95        match super::with_deadline(self.send(msg), deadline).await {
96            Ok(r) => match r {
97                Ok(_) => Ok(()),
98                Err(e) => Err(e),
99            },
100            Err(_) => Err(ChannelError::Timeout),
101        }
102    }
103
104    /// Send a value into the channel.
105    ///
106    /// Blocks until there is space in the channel buffer.
107    ///
108    /// Returns an error if all receivers have been dropped.
109    pub fn send_blocking(&self, msg: T) -> Result<(), ChannelError> {
110        self.0.send(msg)?;
111        Ok(())
112    }
113
114    /// Send a value into the channel.
115    ///
116    /// Blocks until there is space in the channel buffer or the `deadline` is reached.
117    ///
118    /// Returns an error if all receivers have been dropped or the `deadline` is reached. The `msg` is lost in case of timeout.
119    pub fn send_deadline_blocking(&self, msg: T, deadline: impl Into<Deadline>) -> Result<(), ChannelError> {
120        super::block_on(self.send_deadline(msg, deadline))
121    }
122
123    /// Gets if the channel has no pending messages.
124    ///
125    /// Note that [`rendezvous`] channels are always empty.
126    pub fn is_empty(&self) -> bool {
127        self.0.is_empty()
128    }
129}
130
131/// The receiving end of a channel.
132///
133/// Use [`unbounded`], [`bounded`] or [`rendezvous`] to create a channel.
134///
135/// # Work Stealing
136///
137/// Cloning the receiver **does not** turn this channel into a broadcast channel.
138/// Each message will only be received by a single receiver. You can use this to
139/// to implement work stealing.
140pub struct Receiver<T>(flume::Receiver<T>);
141impl<T> fmt::Debug for Receiver<T> {
142    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143        write!(f, "Receiver<{}>", pretty_type_name::pretty_type_name::<T>())
144    }
145}
146impl<T> Clone for Receiver<T> {
147    fn clone(&self) -> Self {
148        Receiver(self.0.clone())
149    }
150}
151impl<T> Receiver<T> {
152    /// Wait for an incoming value from the channel associated with this receiver.
153    ///
154    /// Returns an error if all senders have been dropped.
155    pub async fn recv(&self) -> Result<T, ChannelError> {
156        let r = self.0.recv_async().await?;
157        Ok(r)
158    }
159
160    /// Wait for an incoming value from the channel associated with this receiver.
161    ///
162    /// Returns an error if all senders have been dropped or the `deadline` is reached.
163    pub async fn recv_deadline(&self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
164        match super::with_deadline(self.recv(), deadline).await {
165            Ok(r) => match r {
166                Ok(m) => Ok(m),
167                e => e,
168            },
169            Err(_) => Err(ChannelError::Timeout),
170        }
171    }
172
173    /// Wait for an incoming value from the channel associated with this receiver.
174    ///
175    /// Returns an error if all senders have been dropped.
176    pub fn recv_blocking(&self) -> Result<T, ChannelError> {
177        let r = self.0.recv()?;
178        Ok(r)
179    }
180
181    /// Block for an incoming value from the channel associated with this receiver.
182    ///
183    /// Returns an error if all senders have been dropped or the `deadline` is reached.
184    pub fn recv_deadline_blocking(&self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
185        self.recv_deadline_blocking_impl(deadline.into())
186    }
187    fn recv_deadline_blocking_impl(&self, deadline: Deadline) -> Result<T, ChannelError> {
188        // Improve timeout precision because this is used in the app main loop and timers are implemented using it
189
190        const WORST_SLEEP_ERR: Duration = Duration::from_millis(if cfg!(windows) { 20 } else { 10 });
191        const WORST_SPIN_ERR: Duration = Duration::from_millis(if cfg!(windows) { 2 } else { 1 });
192
193        loop {
194            if let Some(d) = deadline.0.checked_duration_since(INSTANT.now()) {
195                if matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
196                    // manual time is probably desynced from `Instant`, so we use `recv_timeout` that
197                    // is slightly less precise, but an app in manual mode probably does not care.
198                    match self.0.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
199                        Err(flume::RecvTimeoutError::Timeout) => continue, // continue to try_recv spin
200                        interrupt => return interrupt.map_err(ChannelError::from),
201                    }
202                } else if d > WORST_SLEEP_ERR {
203                    // probably sleeps here.
204                    #[cfg(not(target_arch = "wasm32"))]
205                    match self.0.recv_deadline(deadline.0.checked_sub(WORST_SLEEP_ERR).unwrap().into()) {
206                        Err(flume::RecvTimeoutError::Timeout) => continue, // continue to try_recv spin
207                        interrupt => return interrupt.map_err(ChannelError::from),
208                    }
209
210                    #[cfg(target_arch = "wasm32")] // this actually panics because flume tries to use Instant::now
211                    match self.0.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
212                        Err(flume::RecvTimeoutError::Timeout) => continue, // continue to try_recv spin
213                        interrupt => return interrupt.map_err(ChannelError::from),
214                    }
215                } else if d > WORST_SPIN_ERR {
216                    let spin_deadline = Deadline(deadline.0.checked_sub(WORST_SPIN_ERR).unwrap());
217
218                    // try_recv spin
219                    while !spin_deadline.has_elapsed() {
220                        match self.0.try_recv() {
221                            Err(flume::TryRecvError::Empty) => std::thread::yield_now(),
222                            interrupt => return interrupt.map_err(ChannelError::from),
223                        }
224                    }
225                    continue; // continue to timeout spin
226                } else {
227                    // last millis spin for better timeout precision
228                    while !deadline.has_elapsed() {
229                        std::thread::yield_now();
230                    }
231                    return Err(ChannelError::Timeout);
232                }
233            } else {
234                return Err(ChannelError::Timeout);
235            }
236        }
237    }
238
239    /// Returns the next incoming message in the channel or `None`.
240    pub fn try_recv(&self) -> Result<Option<T>, ChannelError> {
241        match self.0.try_recv() {
242            Ok(r) => Ok(Some(r)),
243            Err(e) => match e {
244                flume::TryRecvError::Empty => Ok(None),
245                flume::TryRecvError::Disconnected => Err(ChannelError::disconnected()),
246            },
247        }
248    }
249
250    /// Create a blocking iterator that receives until a channel error.
251    pub fn iter(&self) -> impl Iterator<Item = T> {
252        self.0.iter()
253    }
254
255    /// Iterate over all the pending incoming messages in the channel, until the channel is empty or error.
256    pub fn try_iter(&self) -> impl Iterator<Item = T> {
257        self.0.try_iter()
258    }
259
260    /// Gets if the channel has no pending messages.
261    ///
262    /// Note that [`rendezvous`] channels are always empty.
263    pub fn is_empty(&self) -> bool {
264        self.0.is_empty()
265    }
266}
267
268/// Create a channel with no maximum capacity.
269///
270/// Unbound channels always [`send`] messages immediately, never yielding on await.
271/// If the messages are not [received] they accumulate in the channel buffer.
272///
273/// # Examples
274///
275/// The example [spawns] two parallel tasks, the receiver task takes a while to start receiving but then
276/// rapidly consumes all messages in the buffer and new messages as they are send.
277///
278/// ```no_run
279/// use zng_task::{self as task, channel};
280/// # use zng_unit::*;
281///
282/// let (sender, receiver) = channel::unbounded();
283///
284/// task::spawn(async move {
285///     for msg in ["Hello!", "Are you still there?"].into_iter().cycle() {
286///         task::deadline(300.ms()).await;
287///         if let Err(e) = sender.send(msg).await {
288///             eprintln!("no receiver connected, the message `{e}` was not send");
289///             break;
290///         }
291///     }
292/// });
293/// task::spawn(async move {
294///     task::deadline(5.secs()).await;
295///
296///     loop {
297///         match receiver.recv().await {
298///             Ok(msg) => println!("{msg}"),
299///             Err(_) => {
300///                 eprintln!("no message in channel and no sender connected");
301///                 break;
302///             }
303///         }
304///     }
305/// });
306/// ```
307///
308/// Note that you don't need to `.await` on [`send`] as there is always space in the channel buffer.
309///
310/// [`send`]: Sender::send
311/// [received]: Receiver::recv
312/// [spawns]: crate::spawn
313pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
314    let (s, r) = flume::unbounded();
315    (Sender(s), Receiver(r))
316}
317
318/// Create a channel with a maximum capacity.
319///
320/// Bounded channels [`send`] until the channel reaches its capacity then it awaits until a message
321/// is [received] before sending another message.
322///
323/// # Examples
324///
325/// The example [spawns] two parallel tasks, the receiver task takes a while to start receiving but then
326/// rapidly consumes the 2 messages in the buffer and unblocks the sender to send more messages.
327///
328/// ```no_run
329/// use zng_task::{self as task, channel};
330/// # use zng_unit::*;
331///
332/// let (sender, receiver) = channel::bounded(2);
333///
334/// task::spawn(async move {
335///     for msg in ["Hello!", "Data!"].into_iter().cycle() {
336///         task::deadline(300.ms()).await;
337///         if let Err(e) = sender.send(msg).await {
338///             eprintln!("no receiver connected, the message `{e}` was not send");
339///             break;
340///         }
341///     }
342/// });
343/// task::spawn(async move {
344///     task::deadline(5.secs()).await;
345///
346///     loop {
347///         match receiver.recv().await {
348///             Ok(msg) => println!("{msg}"),
349///             Err(_) => {
350///                 eprintln!("no message in channel and no sender connected");
351///                 break;
352///             }
353///         }
354///     }
355/// });
356/// ```
357///
358/// [`send`]: Sender::send
359/// [received]: Receiver::recv
360/// [spawns]: crate::spawn
361pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
362    let (s, r) = flume::bounded(capacity);
363    (Sender(s), Receiver(r))
364}
365
366/// Create a [`bounded`] channel with `0` capacity.
367///
368/// Rendezvous channels always awaits until the message is [received] to *return* from [`send`], there is no buffer.
369///
370/// # Examples
371///
372/// The example [spawns] two parallel tasks, the sender and receiver *handshake* when transferring the message, the
373/// receiver takes 2 seconds to receive, so the sender takes 2 seconds to send.
374///
375/// ```no_run
376/// use zng_task::{self as task, channel};
377/// # use zng_unit::*;
378/// # use std::time::*;
379/// # use zng_time::*;
380///
381/// let (sender, receiver) = channel::rendezvous();
382///
383/// task::spawn(async move {
384///     loop {
385///         let t = INSTANT.now();
386///
387///         if let Err(e) = sender.send("the stuff").await {
388///             eprintln!(r#"failed to send "{}", no receiver connected"#, e);
389///             break;
390///         }
391///
392///         assert!(t.elapsed() >= 2.secs());
393///     }
394/// });
395/// task::spawn(async move {
396///     loop {
397///         task::deadline(2.secs()).await;
398///
399///         match receiver.recv().await {
400///             Ok(msg) => println!(r#"got "{msg}""#),
401///             Err(_) => {
402///                 eprintln!("no sender connected");
403///                 break;
404///             }
405///         }
406///     }
407/// });
408/// ```
409///
410/// [`send`]: Sender::send
411/// [received]: Receiver::recv
412/// [spawns]: crate::spawn
413pub fn rendezvous<T>() -> (Sender<T>, Receiver<T>) {
414    bounded::<T>(0)
415}
416
417/// Error during channel send or receive.
418#[derive(Debug, Clone)]
419pub enum ChannelError {
420    /// Channel has disconnected.
421    Disconnected {
422        /// Inner error that caused disconnection.
423        ///
424        /// Is `None` if disconnection was due to endpoint dropping or if the error happened at the other endpoint.
425        cause: Option<Arc<dyn std::error::Error + Send + Sync + 'static>>,
426    },
427    /// Deadline elapsed before message could be send/received.
428    Timeout,
429}
430impl ChannelError {
431    /// Channel has disconnected due to endpoint drop.
432    pub fn disconnected() -> Self {
433        ChannelError::Disconnected { cause: None }
434    }
435
436    /// New from other `error`.
437    pub fn disconnected_by(cause: impl std::error::Error + Send + Sync + 'static) -> Self {
438        ChannelError::Disconnected {
439            cause: Some(Arc::new(cause)),
440        }
441    }
442}
443impl fmt::Display for ChannelError {
444    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
445        match self {
446            ChannelError::Disconnected { cause: source } => match source {
447                Some(e) => write!(f, "channel disconnected due to, {e}"),
448                None => write!(f, "channel disconnected"),
449            },
450            ChannelError::Timeout => write!(f, "deadline elapsed before message could be transferred"),
451        }
452    }
453}
454impl std::error::Error for ChannelError {
455    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
456        if let Self::Disconnected { cause: Some(e) } = self {
457            Some(e)
458        } else {
459            None
460        }
461    }
462}
463impl PartialEq for ChannelError {
464    fn eq(&self, other: &Self) -> bool {
465        match (self, other) {
466            (Self::Disconnected { cause: l_cause }, Self::Disconnected { cause: r_cause }) => match (l_cause, r_cause) {
467                (None, None) => true,
468                (Some(a), Some(b)) => a.to_txt() == b.to_txt(),
469                _ => false,
470            },
471            _ => core::mem::discriminant(self) == core::mem::discriminant(other),
472        }
473    }
474}
475impl Eq for ChannelError {}
476impl From<flume::RecvError> for ChannelError {
477    fn from(value: flume::RecvError) -> Self {
478        match value {
479            flume::RecvError::Disconnected => ChannelError::disconnected(),
480        }
481    }
482}
483impl From<flume::RecvTimeoutError> for ChannelError {
484    fn from(value: flume::RecvTimeoutError) -> Self {
485        match value {
486            flume::RecvTimeoutError::Timeout => ChannelError::Timeout,
487            flume::RecvTimeoutError::Disconnected => ChannelError::disconnected(),
488        }
489    }
490}
491impl<T> From<flume::SendError<T>> for ChannelError {
492    fn from(_: flume::SendError<T>) -> Self {
493        ChannelError::disconnected()
494    }
495}
496impl From<flume::TryRecvError> for ChannelError {
497    fn from(value: flume::TryRecvError) -> Self {
498        match value {
499            flume::TryRecvError::Empty => ChannelError::Timeout,
500            flume::TryRecvError::Disconnected => ChannelError::disconnected(),
501        }
502    }
503}