af_core/
channel.rs

1// Copyright © 2020 Alexandra Frydl
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7//! A multi-producer, multi-consumer channel.
8
9use crate::prelude::*;
10
11use async_channel::TryRecvError;
12
13/// A cloneable receiver for a channel.
14pub struct Receiver<T> {
15  rx: async_channel::Receiver<T>,
16}
17
18/// A cloneable sender for a channel.
19pub struct Sender<T> {
20  tx: async_channel::Sender<T>,
21}
22
23/// An error indicating that the channel is closed.
24#[derive(Clone, Copy, Debug, Default, Error)]
25#[error("Channel is closed.")]
26pub struct ClosedError;
27
28/// An error returned from a [`Sender::send()`] or [`Sender::try_send()`] call.
29#[derive(Clone, Copy)]
30pub struct SendError<M> {
31  /// The message that failed to send.
32  pub msg: M,
33  /// The reason for this error.
34  pub reason: SendErrorReason,
35}
36
37/// The reason a [`SendError`] was returned.
38#[derive(Clone, Copy, Debug, Eq, Error, PartialEq)]
39pub enum SendErrorReason {
40  #[error("Channel is closed.")]
41  Closed,
42  #[error("Channel is full.")]
43  Full,
44}
45
46/// Creates a channel with a buffer of a given capacity.
47pub fn with_capacity<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
48  let (tx, rx) = async_channel::bounded(capacity);
49
50  (Sender { tx }, Receiver { rx })
51}
52
53/// Creates an channel whose buffer can grow unbounded.
54pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
55  let (tx, rx) = async_channel::unbounded();
56
57  (Sender { tx }, Receiver { rx })
58}
59
60impl<T> Receiver<T> {
61  /// Returns `true` if the channel is closed.
62  ///
63  /// The channel is closed if all [`Sender`] clones are dropped.
64  pub fn is_closed(&self) -> bool {
65    self.rx.is_closed()
66  }
67
68  /// Waits for an available message in the channel and receives it.
69  pub async fn recv(&self) -> Result<T, ClosedError> {
70    self.rx.recv().await.map_err(|_| ClosedError)
71  }
72
73  /// Attempts to immediately receive an available message from the channel.
74  ///
75  /// If the channel is empty, this function returns `None`.
76  pub fn try_recv(&self) -> Result<Option<T>, ClosedError> {
77    match self.rx.try_recv() {
78      Ok(msg) => Ok(Some(msg)),
79      Err(TryRecvError::Empty) => Ok(None),
80      Err(TryRecvError::Closed) => Err(ClosedError),
81    }
82  }
83}
84
85impl<T> Sender<T> {
86  /// Returns `true` if the channel is closed.
87  ///
88  /// The channel is closed if all [`Receiver`] clones are dropped.
89  pub fn is_closed(&self) -> bool {
90    self.tx.is_closed()
91  }
92
93  /// Waits for available space in the channel and then sends a message.
94  ///
95  /// If the channel is closed before the message can be sent, this function
96  /// returns a [`SendError`] containing the failed message.
97  pub async fn send(&self, message: T) -> Result<(), SendError<T>> {
98    self
99      .tx
100      .send(message)
101      .await
102      .map_err(|err| SendError { msg: err.0, reason: SendErrorReason::Closed })
103  }
104
105  /// Attempts to send a message to the channel immediately.
106  ///
107  /// If the channel is closed or full, this function returns a [`SendError`]
108  /// containing the failed message.
109  pub fn try_send(&self, message: T) -> Result<(), SendError<T>> {
110    self.tx.try_send(message).map_err(|err| match err {
111      async_channel::TrySendError::Full(msg) => SendError { msg, reason: SendErrorReason::Full },
112      async_channel::TrySendError::Closed(msg) => {
113        SendError { msg, reason: SendErrorReason::Closed }
114      }
115    })
116  }
117}
118
119// Manually implement `Clone` for all `T`.
120
121impl<T> Clone for Receiver<T> {
122  fn clone(&self) -> Self {
123    Self { rx: self.rx.clone() }
124  }
125}
126
127impl<T> Clone for Sender<T> {
128  fn clone(&self) -> Self {
129    Self { tx: self.tx.clone() }
130  }
131}
132
133// Implement SendError`.
134
135impl<M> std::error::Error for SendError<M> {}
136
137impl<M> Debug for SendError<M> {
138  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
139    Debug::fmt(&self.reason, f)
140  }
141}
142
143impl<M> Display for SendError<M> {
144  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
145    Display::fmt(&self.reason, f)
146  }
147}