zng_task/channel.rs
1//! Communication channels.
2//!
3//! Use [`bounded`], [`unbounded`] and [`rendezvous`] to create channels for use across threads in the same process.
4//! Use [`ipc_unbounded`] to create channels that work across processes.
5//!
6//! # Examples
7//!
8//! ```no_run
9//! use zng_task::{self as task, channel};
10//! # use zng_unit::*;
11//!
12//! let (sender, receiver) = channel::bounded(5);
13//!
14//! task::spawn(async move {
15//! task::deadline(5.secs()).await;
16//! if let Err(e) = sender.send("Data!").await {
17//! eprintln!("no receiver connected, did not send message: '{e}'")
18//! }
19//! });
20//! task::spawn(async move {
21//! match receiver.recv().await {
22//! Ok(msg) => println!("{msg}"),
23//! Err(_) => eprintln!("no message in channel and no sender connected"),
24//! }
25//! });
26//! ```
27
28use std::{fmt, sync::Arc, time::Duration};
29
30use zng_time::{Deadline, INSTANT};
31
32mod ipc;
33pub use ipc::{IpcReceiver, IpcSender, IpcValue, NamedIpcReceiver, NamedIpcSender, ipc_unbounded};
34
35mod ipc_bytes;
36mod ipc_bytes_cast;
37#[cfg(ipc)]
38mod ipc_bytes_memmap;
39mod ipc_bytes_mut;
40mod ipc_read;
41pub use ipc_bytes::{IpcBytes, IpcBytesIntoIter, WeakIpcBytes};
42pub use ipc_bytes_cast::{IpcBytesCast, IpcBytesCastIntoIter, IpcBytesMutCast};
43pub use ipc_bytes_mut::{IpcBytesMut, IpcBytesWriter, IpcBytesWriterBlocking};
44pub use ipc_read::{IpcRead, IpcReadBlocking, IpcReadHandle};
45
46#[cfg(ipc)]
47pub use ipc_bytes::{is_ipc_serialization, with_ipc_serialization};
48
49mod ipc_file;
50pub use ipc_file::IpcFileHandle;
51
52use zng_txt::ToTxt;
53
54/// The transmitting end of a channel.
55///
56/// Use [`unbounded`], [`bounded`] or [`rendezvous`] to create a channel.
57pub struct Sender<T>(flume::Sender<T>);
58impl<T> fmt::Debug for Sender<T> {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 write!(f, "Sender<{}>", pretty_type_name::pretty_type_name::<T>())
61 }
62}
63impl<T> Clone for Sender<T> {
64 fn clone(&self) -> Self {
65 Sender(self.0.clone())
66 }
67}
68impl<T> From<flume::Sender<T>> for Sender<T> {
69 fn from(s: flume::Sender<T>) -> Self {
70 Sender(s)
71 }
72}
73impl<T> From<Sender<T>> for flume::Sender<T> {
74 fn from(s: Sender<T>) -> Self {
75 s.0
76 }
77}
78impl<T> Sender<T> {
79 /// Send a value into the channel.
80 ///
81 /// Waits until there is space in the channel buffer.
82 ///
83 /// Returns an error if all receivers have been dropped.
84 pub async fn send(&self, msg: T) -> Result<(), ChannelError> {
85 self.0.send_async(msg).await?;
86 Ok(())
87 }
88
89 /// Send a value into the channel.
90 ///
91 /// Waits until there is space in the channel buffer or the `deadline` is reached.
92 ///
93 /// Returns an error if all receivers have been dropped or the `deadline` is reached. The `msg` is lost in case of timeout.
94 pub async fn send_deadline(&self, msg: T, deadline: impl Into<Deadline>) -> Result<(), ChannelError> {
95 match super::with_deadline(self.send(msg), deadline).await {
96 Ok(r) => match r {
97 Ok(_) => Ok(()),
98 Err(e) => Err(e),
99 },
100 Err(_) => Err(ChannelError::Timeout),
101 }
102 }
103
104 /// Send a value into the channel.
105 ///
106 /// Blocks until there is space in the channel buffer.
107 ///
108 /// Returns an error if all receivers have been dropped.
109 pub fn send_blocking(&self, msg: T) -> Result<(), ChannelError> {
110 self.0.send(msg)?;
111 Ok(())
112 }
113
114 /// Send a value into the channel.
115 ///
116 /// Blocks until there is space in the channel buffer or the `deadline` is reached.
117 ///
118 /// Returns an error if all receivers have been dropped or the `deadline` is reached. The `msg` is lost in case of timeout.
119 pub fn send_deadline_blocking(&self, msg: T, deadline: impl Into<Deadline>) -> Result<(), ChannelError> {
120 super::block_on(self.send_deadline(msg, deadline))
121 }
122
123 /// Gets if the channel has no pending messages.
124 ///
125 /// Note that [`rendezvous`] channels are always empty.
126 pub fn is_empty(&self) -> bool {
127 self.0.is_empty()
128 }
129}
130
131/// The receiving end of a channel.
132///
133/// Use [`unbounded`], [`bounded`] or [`rendezvous`] to create a channel.
134///
135/// # Work Stealing
136///
137/// Cloning the receiver **does not** turn this channel into a broadcast channel.
138/// Each message will only be received by a single receiver. You can use this to
139/// to implement work stealing.
140pub struct Receiver<T>(flume::Receiver<T>);
141impl<T> fmt::Debug for Receiver<T> {
142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143 write!(f, "Receiver<{}>", pretty_type_name::pretty_type_name::<T>())
144 }
145}
146impl<T> Clone for Receiver<T> {
147 fn clone(&self) -> Self {
148 Receiver(self.0.clone())
149 }
150}
151impl<T> Receiver<T> {
152 /// Wait for an incoming value from the channel associated with this receiver.
153 ///
154 /// Returns an error if all senders have been dropped.
155 pub async fn recv(&self) -> Result<T, ChannelError> {
156 let r = self.0.recv_async().await?;
157 Ok(r)
158 }
159
160 /// Wait for an incoming value from the channel associated with this receiver.
161 ///
162 /// Returns an error if all senders have been dropped or the `deadline` is reached.
163 pub async fn recv_deadline(&self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
164 match super::with_deadline(self.recv(), deadline).await {
165 Ok(r) => match r {
166 Ok(m) => Ok(m),
167 e => e,
168 },
169 Err(_) => Err(ChannelError::Timeout),
170 }
171 }
172
173 /// Wait for an incoming value from the channel associated with this receiver.
174 ///
175 /// Returns an error if all senders have been dropped.
176 pub fn recv_blocking(&self) -> Result<T, ChannelError> {
177 let r = self.0.recv()?;
178 Ok(r)
179 }
180
181 /// Block for an incoming value from the channel associated with this receiver.
182 ///
183 /// Returns an error if all senders have been dropped or the `deadline` is reached.
184 pub fn recv_deadline_blocking(&self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
185 self.recv_deadline_blocking_impl(deadline.into())
186 }
187 fn recv_deadline_blocking_impl(&self, deadline: Deadline) -> Result<T, ChannelError> {
188 // Improve timeout precision because this is used in the app main loop and timers are implemented using it
189
190 const WORST_SLEEP_ERR: Duration = Duration::from_millis(if cfg!(windows) { 20 } else { 10 });
191 const WORST_SPIN_ERR: Duration = Duration::from_millis(if cfg!(windows) { 2 } else { 1 });
192
193 loop {
194 if let Some(d) = deadline.0.checked_duration_since(INSTANT.now()) {
195 if matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
196 // manual time is probably desynced from `Instant`, so we use `recv_timeout` that
197 // is slightly less precise, but an app in manual mode probably does not care.
198 match self.0.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
199 Err(flume::RecvTimeoutError::Timeout) => continue, // continue to try_recv spin
200 interrupt => return interrupt.map_err(ChannelError::from),
201 }
202 } else if d > WORST_SLEEP_ERR {
203 // probably sleeps here.
204 #[cfg(not(target_arch = "wasm32"))]
205 match self.0.recv_deadline(deadline.0.checked_sub(WORST_SLEEP_ERR).unwrap().into()) {
206 Err(flume::RecvTimeoutError::Timeout) => continue, // continue to try_recv spin
207 interrupt => return interrupt.map_err(ChannelError::from),
208 }
209
210 #[cfg(target_arch = "wasm32")] // this actually panics because flume tries to use Instant::now
211 match self.0.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
212 Err(flume::RecvTimeoutError::Timeout) => continue, // continue to try_recv spin
213 interrupt => return interrupt.map_err(ChannelError::from),
214 }
215 } else if d > WORST_SPIN_ERR {
216 let spin_deadline = Deadline(deadline.0.checked_sub(WORST_SPIN_ERR).unwrap());
217
218 // try_recv spin
219 while !spin_deadline.has_elapsed() {
220 match self.0.try_recv() {
221 Err(flume::TryRecvError::Empty) => std::thread::yield_now(),
222 interrupt => return interrupt.map_err(ChannelError::from),
223 }
224 }
225 continue; // continue to timeout spin
226 } else {
227 // last millis spin for better timeout precision
228 while !deadline.has_elapsed() {
229 std::thread::yield_now();
230 }
231 return Err(ChannelError::Timeout);
232 }
233 } else {
234 return Err(ChannelError::Timeout);
235 }
236 }
237 }
238
239 /// Returns the next incoming message in the channel or `None`.
240 pub fn try_recv(&self) -> Result<Option<T>, ChannelError> {
241 match self.0.try_recv() {
242 Ok(r) => Ok(Some(r)),
243 Err(e) => match e {
244 flume::TryRecvError::Empty => Ok(None),
245 flume::TryRecvError::Disconnected => Err(ChannelError::disconnected()),
246 },
247 }
248 }
249
250 /// Create a blocking iterator that receives until a channel error.
251 pub fn iter(&self) -> impl Iterator<Item = T> {
252 self.0.iter()
253 }
254
255 /// Iterate over all the pending incoming messages in the channel, until the channel is empty or error.
256 pub fn try_iter(&self) -> impl Iterator<Item = T> {
257 self.0.try_iter()
258 }
259
260 /// Gets if the channel has no pending messages.
261 ///
262 /// Note that [`rendezvous`] channels are always empty.
263 pub fn is_empty(&self) -> bool {
264 self.0.is_empty()
265 }
266}
267
268/// Create a channel with no maximum capacity.
269///
270/// Unbound channels always [`send`] messages immediately, never yielding on await.
271/// If the messages are not [received] they accumulate in the channel buffer.
272///
273/// # Examples
274///
275/// The example [spawns] two parallel tasks, the receiver task takes a while to start receiving but then
276/// rapidly consumes all messages in the buffer and new messages as they are send.
277///
278/// ```no_run
279/// use zng_task::{self as task, channel};
280/// # use zng_unit::*;
281///
282/// let (sender, receiver) = channel::unbounded();
283///
284/// task::spawn(async move {
285/// for msg in ["Hello!", "Are you still there?"].into_iter().cycle() {
286/// task::deadline(300.ms()).await;
287/// if let Err(e) = sender.send(msg).await {
288/// eprintln!("no receiver connected, the message `{e}` was not send");
289/// break;
290/// }
291/// }
292/// });
293/// task::spawn(async move {
294/// task::deadline(5.secs()).await;
295///
296/// loop {
297/// match receiver.recv().await {
298/// Ok(msg) => println!("{msg}"),
299/// Err(_) => {
300/// eprintln!("no message in channel and no sender connected");
301/// break;
302/// }
303/// }
304/// }
305/// });
306/// ```
307///
308/// Note that you don't need to `.await` on [`send`] as there is always space in the channel buffer.
309///
310/// [`send`]: Sender::send
311/// [received]: Receiver::recv
312/// [spawns]: crate::spawn
313pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
314 let (s, r) = flume::unbounded();
315 (Sender(s), Receiver(r))
316}
317
318/// Create a channel with a maximum capacity.
319///
320/// Bounded channels [`send`] until the channel reaches its capacity then it awaits until a message
321/// is [received] before sending another message.
322///
323/// # Examples
324///
325/// The example [spawns] two parallel tasks, the receiver task takes a while to start receiving but then
326/// rapidly consumes the 2 messages in the buffer and unblocks the sender to send more messages.
327///
328/// ```no_run
329/// use zng_task::{self as task, channel};
330/// # use zng_unit::*;
331///
332/// let (sender, receiver) = channel::bounded(2);
333///
334/// task::spawn(async move {
335/// for msg in ["Hello!", "Data!"].into_iter().cycle() {
336/// task::deadline(300.ms()).await;
337/// if let Err(e) = sender.send(msg).await {
338/// eprintln!("no receiver connected, the message `{e}` was not send");
339/// break;
340/// }
341/// }
342/// });
343/// task::spawn(async move {
344/// task::deadline(5.secs()).await;
345///
346/// loop {
347/// match receiver.recv().await {
348/// Ok(msg) => println!("{msg}"),
349/// Err(_) => {
350/// eprintln!("no message in channel and no sender connected");
351/// break;
352/// }
353/// }
354/// }
355/// });
356/// ```
357///
358/// [`send`]: Sender::send
359/// [received]: Receiver::recv
360/// [spawns]: crate::spawn
361pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
362 let (s, r) = flume::bounded(capacity);
363 (Sender(s), Receiver(r))
364}
365
366/// Create a [`bounded`] channel with `0` capacity.
367///
368/// Rendezvous channels always awaits until the message is [received] to *return* from [`send`], there is no buffer.
369///
370/// # Examples
371///
372/// The example [spawns] two parallel tasks, the sender and receiver *handshake* when transferring the message, the
373/// receiver takes 2 seconds to receive, so the sender takes 2 seconds to send.
374///
375/// ```no_run
376/// use zng_task::{self as task, channel};
377/// # use zng_unit::*;
378/// # use std::time::*;
379/// # use zng_time::*;
380///
381/// let (sender, receiver) = channel::rendezvous();
382///
383/// task::spawn(async move {
384/// loop {
385/// let t = INSTANT.now();
386///
387/// if let Err(e) = sender.send("the stuff").await {
388/// eprintln!(r#"failed to send "{}", no receiver connected"#, e);
389/// break;
390/// }
391///
392/// assert!(t.elapsed() >= 2.secs());
393/// }
394/// });
395/// task::spawn(async move {
396/// loop {
397/// task::deadline(2.secs()).await;
398///
399/// match receiver.recv().await {
400/// Ok(msg) => println!(r#"got "{msg}""#),
401/// Err(_) => {
402/// eprintln!("no sender connected");
403/// break;
404/// }
405/// }
406/// }
407/// });
408/// ```
409///
410/// [`send`]: Sender::send
411/// [received]: Receiver::recv
412/// [spawns]: crate::spawn
413pub fn rendezvous<T>() -> (Sender<T>, Receiver<T>) {
414 bounded::<T>(0)
415}
416
417/// Error during channel send or receive.
418#[derive(Debug, Clone)]
419pub enum ChannelError {
420 /// Channel has disconnected.
421 Disconnected {
422 /// Inner error that caused disconnection.
423 ///
424 /// Is `None` if disconnection was due to endpoint dropping or if the error happened at the other endpoint.
425 cause: Option<Arc<dyn std::error::Error + Send + Sync + 'static>>,
426 },
427 /// Deadline elapsed before message could be send/received.
428 Timeout,
429}
430impl ChannelError {
431 /// Channel has disconnected due to endpoint drop.
432 pub fn disconnected() -> Self {
433 ChannelError::Disconnected { cause: None }
434 }
435
436 /// New from other `error`.
437 pub fn disconnected_by(cause: impl std::error::Error + Send + Sync + 'static) -> Self {
438 ChannelError::Disconnected {
439 cause: Some(Arc::new(cause)),
440 }
441 }
442}
443impl fmt::Display for ChannelError {
444 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
445 match self {
446 ChannelError::Disconnected { cause: source } => match source {
447 Some(e) => write!(f, "channel disconnected due to, {e}"),
448 None => write!(f, "channel disconnected"),
449 },
450 ChannelError::Timeout => write!(f, "deadline elapsed before message could be transferred"),
451 }
452 }
453}
454impl std::error::Error for ChannelError {
455 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
456 if let Self::Disconnected { cause: Some(e) } = self {
457 Some(e)
458 } else {
459 None
460 }
461 }
462}
463impl PartialEq for ChannelError {
464 fn eq(&self, other: &Self) -> bool {
465 match (self, other) {
466 (Self::Disconnected { cause: l_cause }, Self::Disconnected { cause: r_cause }) => match (l_cause, r_cause) {
467 (None, None) => true,
468 (Some(a), Some(b)) => a.to_txt() == b.to_txt(),
469 _ => false,
470 },
471 _ => core::mem::discriminant(self) == core::mem::discriminant(other),
472 }
473 }
474}
475impl Eq for ChannelError {}
476impl From<flume::RecvError> for ChannelError {
477 fn from(value: flume::RecvError) -> Self {
478 match value {
479 flume::RecvError::Disconnected => ChannelError::disconnected(),
480 }
481 }
482}
483impl From<flume::RecvTimeoutError> for ChannelError {
484 fn from(value: flume::RecvTimeoutError) -> Self {
485 match value {
486 flume::RecvTimeoutError::Timeout => ChannelError::Timeout,
487 flume::RecvTimeoutError::Disconnected => ChannelError::disconnected(),
488 }
489 }
490}
491impl<T> From<flume::SendError<T>> for ChannelError {
492 fn from(_: flume::SendError<T>) -> Self {
493 ChannelError::disconnected()
494 }
495}
496impl From<flume::TryRecvError> for ChannelError {
497 fn from(value: flume::TryRecvError) -> Self {
498 match value {
499 flume::TryRecvError::Empty => ChannelError::Timeout,
500 flume::TryRecvError::Disconnected => ChannelError::disconnected(),
501 }
502 }
503}