remoc/rch/broadcast/
receiver.rs

1use futures::{Stream, ready};
2use serde::{Deserialize, Serialize};
3use std::{
4    convert::TryFrom,
5    error::Error,
6    fmt,
7    pin::Pin,
8    task::{Context, Poll},
9};
10use tokio_util::sync::ReusableBoxFuture;
11
12use super::{
13    super::{DEFAULT_BUFFER, DEFAULT_MAX_ITEM_SIZE, base, mpsc},
14    BroadcastMsg,
15};
16use crate::{RemoteSend, chmux, codec};
17
18/// An error occurred during receiving over a broadcast channel.
19#[derive(Clone, Debug, Serialize, Deserialize)]
20pub enum RecvError {
21    /// There are no more active senders implying no further messages will ever be sent.
22    Closed,
23    /// The receiver lagged too far behind.
24    ///
25    /// Attempting to receive again will return the oldest message still retained by the channel.
26    Lagged,
27    /// Receiving from a remote endpoint failed.
28    RemoteReceive(base::RecvError),
29    /// Connecting a sent channel failed.
30    RemoteConnect(chmux::ConnectError),
31    /// Listening for a connection from a received channel failed.
32    RemoteListen(chmux::ListenerError),
33}
34
35impl RecvError {
36    /// True, if all senders have been dropped.
37    pub fn is_closed(&self) -> bool {
38        matches!(self, Self::Closed)
39    }
40
41    /// True, if the receiver has lagged behind and messages have been lost.
42    pub fn is_lagged(&self) -> bool {
43        matches!(self, Self::Lagged)
44    }
45
46    /// Returns whether the error is final, i.e. no further receive operation can succeed.
47    pub fn is_final(&self) -> bool {
48        match self {
49            Self::RemoteReceive(err) => err.is_final(),
50            Self::Closed | Self::RemoteConnect(_) | Self::RemoteListen(_) => true,
51            Self::Lagged => false,
52        }
53    }
54}
55
56impl fmt::Display for RecvError {
57    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
58        match self {
59            Self::Closed => write!(f, "channel closed"),
60            Self::Lagged => write!(f, "receiver lagged behind"),
61            Self::RemoteReceive(err) => write!(f, "receive error: {err}"),
62            Self::RemoteConnect(err) => write!(f, "connect error: {err}"),
63            Self::RemoteListen(err) => write!(f, "listen error: {err}"),
64        }
65    }
66}
67
68impl From<mpsc::RecvError> for RecvError {
69    fn from(err: mpsc::RecvError) -> Self {
70        match err {
71            mpsc::RecvError::RemoteReceive(err) => Self::RemoteReceive(err),
72            mpsc::RecvError::RemoteConnect(err) => Self::RemoteConnect(err),
73            mpsc::RecvError::RemoteListen(err) => Self::RemoteListen(err),
74        }
75    }
76}
77
78impl TryFrom<TryRecvError> for RecvError {
79    type Error = TryRecvError;
80
81    fn try_from(err: TryRecvError) -> Result<Self, Self::Error> {
82        match err {
83            TryRecvError::Empty => Err(TryRecvError::Empty),
84            TryRecvError::Closed => Ok(Self::Closed),
85            TryRecvError::Lagged => Ok(Self::Lagged),
86            TryRecvError::RemoteReceive(err) => Ok(Self::RemoteReceive(err)),
87            TryRecvError::RemoteConnect(err) => Ok(Self::RemoteConnect(err)),
88            TryRecvError::RemoteListen(err) => Ok(Self::RemoteListen(err)),
89        }
90    }
91}
92
93impl Error for RecvError {}
94
95/// An error occurred during trying to receive over a broadcast channel.
96#[derive(Clone, Debug, Serialize, Deserialize)]
97pub enum TryRecvError {
98    /// The channel is currently empty. There are still active sender, so data may yet become available.
99    Empty,
100    /// There are no more active senders implying no further messages will ever be sent.
101    Closed,
102    /// The receiver lagged too far behind.
103    ///
104    /// Attempting to receive again will return the oldest message still retained by the channel.
105    Lagged,
106    /// Receiving from a remote endpoint failed.
107    RemoteReceive(base::RecvError),
108    /// Connecting a sent channel failed.
109    RemoteConnect(chmux::ConnectError),
110    /// Listening for a connection from a received channel failed.
111    RemoteListen(chmux::ListenerError),
112}
113
114impl TryRecvError {
115    /// True, if no value is currently present.
116    pub fn is_empty(&self) -> bool {
117        matches!(self, Self::Empty)
118    }
119
120    /// True, if all senders have been dropped.
121    pub fn is_closed(&self) -> bool {
122        matches!(self, Self::Closed)
123    }
124
125    /// True, if the receiver has lagged behind and messages have been lost.
126    pub fn is_lagged(&self) -> bool {
127        matches!(self, Self::Lagged)
128    }
129
130    /// Returns whether the error is final, i.e. no further receive operation can succeed.
131    pub fn is_final(&self) -> bool {
132        match self {
133            Self::RemoteReceive(err) => err.is_final(),
134            Self::Closed | Self::RemoteConnect(_) | Self::RemoteListen(_) => true,
135            Self::Empty | Self::Lagged => false,
136        }
137    }
138}
139
140impl fmt::Display for TryRecvError {
141    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
142        match self {
143            Self::Empty => write!(f, "channel empty"),
144            Self::Closed => write!(f, "channel closed"),
145            Self::Lagged => write!(f, "receiver lagged behind"),
146            Self::RemoteReceive(err) => write!(f, "receive error: {err}"),
147            Self::RemoteConnect(err) => write!(f, "connect error: {err}"),
148            Self::RemoteListen(err) => write!(f, "listen error: {err}"),
149        }
150    }
151}
152
153impl From<mpsc::RecvError> for TryRecvError {
154    fn from(err: mpsc::RecvError) -> Self {
155        match err {
156            mpsc::RecvError::RemoteReceive(err) => Self::RemoteReceive(err),
157            mpsc::RecvError::RemoteConnect(err) => Self::RemoteConnect(err),
158            mpsc::RecvError::RemoteListen(err) => Self::RemoteListen(err),
159        }
160    }
161}
162
163impl From<mpsc::TryRecvError> for TryRecvError {
164    fn from(err: mpsc::TryRecvError) -> Self {
165        match err {
166            mpsc::TryRecvError::RemoteReceive(err) => Self::RemoteReceive(err),
167            mpsc::TryRecvError::RemoteConnect(err) => Self::RemoteConnect(err),
168            mpsc::TryRecvError::RemoteListen(err) => Self::RemoteListen(err),
169            mpsc::TryRecvError::Closed => Self::Closed,
170            mpsc::TryRecvError::Empty => Self::Empty,
171        }
172    }
173}
174
175impl From<RecvError> for TryRecvError {
176    fn from(err: RecvError) -> Self {
177        match err {
178            RecvError::Closed => Self::Closed,
179            RecvError::Lagged => Self::Lagged,
180            RecvError::RemoteReceive(err) => Self::RemoteReceive(err),
181            RecvError::RemoteConnect(err) => Self::RemoteConnect(err),
182            RecvError::RemoteListen(err) => Self::RemoteListen(err),
183        }
184    }
185}
186
187impl Error for TryRecvError {}
188
189/// Receiving-half of the broadcast channel.
190///
191/// Can be sent over a remote channel.
192///
193/// This can be converted into a [Stream](futures::Stream) of values by wrapping it into
194/// a [ReceiverStream].
195#[derive(Serialize, Deserialize)]
196#[serde(bound(serialize = "T: RemoteSend, Codec: codec::Codec"))]
197#[serde(bound(deserialize = "T: RemoteSend, Codec: codec::Codec"))]
198pub struct Receiver<
199    T,
200    Codec = codec::Default,
201    const BUFFER: usize = DEFAULT_BUFFER,
202    const MAX_ITEM_SIZE: usize = DEFAULT_MAX_ITEM_SIZE,
203> {
204    rx: mpsc::Receiver<BroadcastMsg<T>, Codec, BUFFER, MAX_ITEM_SIZE>,
205}
206
207impl<T, Codec, const BUFFER: usize, const MAX_ITEM_SIZE: usize> fmt::Debug
208    for Receiver<T, Codec, BUFFER, MAX_ITEM_SIZE>
209{
210    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
211        f.debug_struct("Receiver").finish()
212    }
213}
214
215impl<T, Codec, const BUFFER: usize, const MAX_ITEM_SIZE: usize> Receiver<T, Codec, BUFFER, MAX_ITEM_SIZE>
216where
217    T: RemoteSend,
218    Codec: codec::Codec,
219{
220    pub(crate) fn new(rx: mpsc::Receiver<BroadcastMsg<T>, Codec, BUFFER, MAX_ITEM_SIZE>) -> Self {
221        Self { rx }
222    }
223
224    /// Receives the next value for this receiver.
225    pub async fn recv(&mut self) -> Result<T, RecvError> {
226        match self.rx.recv().await {
227            Ok(Some(BroadcastMsg::Value(value))) => Ok(value),
228            Ok(Some(BroadcastMsg::Lagged)) => Err(RecvError::Lagged),
229            Ok(None) => Err(RecvError::Closed),
230            Err(err) => Err(err.into()),
231        }
232    }
233
234    /// Attempts to return a pending value on this receiver without awaiting.
235    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
236        match self.rx.try_recv() {
237            Ok(BroadcastMsg::Value(value)) => Ok(value),
238            Ok(BroadcastMsg::Lagged) => Err(TryRecvError::Lagged),
239            Err(err) => Err(err.into()),
240        }
241    }
242
243    /// The maximum item size in bytes.
244    pub fn max_item_size(&self) -> usize {
245        self.rx.max_item_size()
246    }
247
248    /// Sets the maximum item size in bytes.
249    pub fn set_max_item_size<const NEW_MAX_ITEM_SIZE: usize>(
250        self,
251    ) -> Receiver<T, Codec, BUFFER, NEW_MAX_ITEM_SIZE> {
252        Receiver { rx: self.rx.set_max_item_size() }
253    }
254
255    /// The maximum item size of the remote sender.
256    ///
257    /// If this is larger than [max_item_size](Self::max_item_size) sending of oversized
258    /// items will succeed but receiving will fail with a
259    /// [MaxItemSizeExceeded error](base::RecvError::MaxItemSizeExceeded).
260    pub fn remote_max_item_size(&self) -> Option<usize> {
261        self.rx.remote_max_item_size()
262    }
263}
264
265/// An error occurred during receiving over a broadcast channel receiver stream.
266#[derive(Clone, Debug, Serialize, Deserialize)]
267pub enum StreamError {
268    /// The receiver stream lagged too far behind.
269    ///
270    /// The next value will be the oldest message still retained by the channel.
271    Lagged,
272    /// Receiving from a remote endpoint failed.
273    RemoteReceive(base::RecvError),
274    /// Connecting a sent channel failed.
275    RemoteConnect(chmux::ConnectError),
276    /// Listening for a connection from a received channel failed.
277    RemoteListen(chmux::ListenerError),
278}
279
280impl StreamError {
281    /// True, if the receiver has lagged behind and messages have been lost.
282    pub fn is_lagged(&self) -> bool {
283        matches!(self, Self::Lagged)
284    }
285
286    /// Returns whether the error is final, i.e. no further receive operation can succeed.
287    pub fn is_final(&self) -> bool {
288        match self {
289            Self::RemoteReceive(err) => err.is_final(),
290            Self::RemoteConnect(_) | Self::RemoteListen(_) => true,
291            Self::Lagged => false,
292        }
293    }
294}
295
296impl fmt::Display for StreamError {
297    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
298        match self {
299            Self::Lagged => write!(f, "receiver lagged behind"),
300            Self::RemoteReceive(err) => write!(f, "receive error: {err}"),
301            Self::RemoteConnect(err) => write!(f, "connect error: {err}"),
302            Self::RemoteListen(err) => write!(f, "listen error: {err}"),
303        }
304    }
305}
306
307impl TryFrom<RecvError> for StreamError {
308    type Error = RecvError;
309    fn try_from(err: RecvError) -> Result<Self, Self::Error> {
310        match err {
311            RecvError::Lagged => Ok(Self::Lagged),
312            RecvError::RemoteReceive(err) => Ok(Self::RemoteReceive(err)),
313            RecvError::RemoteConnect(err) => Ok(Self::RemoteConnect(err)),
314            RecvError::RemoteListen(err) => Ok(Self::RemoteListen(err)),
315            other => Err(other),
316        }
317    }
318}
319
320impl Error for StreamError {}
321
322/// A wrapper around a broadcast [Receiver] that implements [Stream](futures::Stream).
323pub struct ReceiverStream<T, Codec = codec::Default, const BUFFER: usize = DEFAULT_BUFFER> {
324    #[allow(clippy::type_complexity)]
325    inner: ReusableBoxFuture<'static, (Result<T, RecvError>, Receiver<T, Codec, BUFFER>)>,
326}
327
328impl<T, Codec> fmt::Debug for ReceiverStream<T, Codec> {
329    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
330        f.debug_struct("ReceiverStream").finish()
331    }
332}
333
334impl<T, Codec, const BUFFER: usize> ReceiverStream<T, Codec, BUFFER>
335where
336    T: RemoteSend + Sync,
337    Codec: codec::Codec,
338{
339    /// Creates a new `ReceiverStream`.
340    pub fn new(rx: Receiver<T, Codec, BUFFER>) -> Self {
341        Self { inner: ReusableBoxFuture::new(Self::make_future(rx)) }
342    }
343
344    async fn make_future(
345        mut rx: Receiver<T, Codec, BUFFER>,
346    ) -> (Result<T, RecvError>, Receiver<T, Codec, BUFFER>) {
347        let result = rx.recv().await;
348        (result, rx)
349    }
350}
351
352impl<T: Clone, Codec, const BUFFER: usize> Stream for ReceiverStream<T, Codec, BUFFER>
353where
354    T: RemoteSend + Sync,
355    Codec: codec::Codec,
356{
357    type Item = Result<T, StreamError>;
358
359    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
360        let (result, rx) = ready!(self.inner.poll(cx));
361        self.inner.set(Self::make_future(rx));
362        match result {
363            Ok(value) => Poll::Ready(Some(Ok(value))),
364            Err(RecvError::Closed) => Poll::Ready(None),
365            Err(err) => Poll::Ready(Some(Err(StreamError::try_from(err).unwrap()))),
366        }
367    }
368}
369
370impl<T, Codec, const BUFFER: usize> Unpin for ReceiverStream<T, Codec, BUFFER> {}
371
372impl<T, Codec, const BUFFER: usize> From<Receiver<T, Codec, BUFFER>> for ReceiverStream<T, Codec, BUFFER>
373where
374    T: RemoteSend + Sync,
375    Codec: codec::Codec,
376{
377    fn from(recv: Receiver<T, Codec, BUFFER>) -> Self {
378        Self::new(recv)
379    }
380}