remoc/rch/watch/
receiver.rs

1use futures::{ready, FutureExt, Stream};
2use serde::{Deserialize, Serialize};
3use std::{
4    error::Error,
5    fmt,
6    marker::PhantomData,
7    mem,
8    pin::Pin,
9    task::{Context, Poll},
10};
11use tokio_util::sync::ReusableBoxFuture;
12
13use super::{
14    super::{
15        base::{self, PortDeserializer, PortSerializer},
16        RemoteSendError, DEFAULT_MAX_ITEM_SIZE,
17    },
18    Ref,
19};
20use crate::{chmux, codec, RemoteSend};
21
22/// An error occurred during receiving over a watch channel.
23#[derive(Clone, Debug, Serialize, Deserialize)]
24pub enum RecvError {
25    /// Receiving from a remote endpoint failed.
26    RemoteReceive(base::RecvError),
27    /// Connecting a sent channel failed.
28    RemoteConnect(chmux::ConnectError),
29    /// Listening for a connection from a received channel failed.
30    RemoteListen(chmux::ListenerError),
31}
32
33impl fmt::Display for RecvError {
34    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
35        match self {
36            Self::RemoteReceive(err) => write!(f, "receive error: {err}"),
37            Self::RemoteConnect(err) => write!(f, "connect error: {err}"),
38            Self::RemoteListen(err) => write!(f, "listen error: {err}"),
39        }
40    }
41}
42
43impl Error for RecvError {}
44
45impl RecvError {
46    /// Returns whether the error is final, i.e. no further receive operation can succeed.
47    pub fn is_final(&self) -> bool {
48        match self {
49            Self::RemoteReceive(err) => err.is_final(),
50            Self::RemoteConnect(_) | Self::RemoteListen(_) => true,
51        }
52    }
53}
54
55/// An error occurred during waiting for a change on a watch channel.
56#[derive(Clone, Debug, Serialize, Deserialize)]
57pub enum ChangedError {
58    /// The sender has been dropped or the connection has been lost.
59    Closed,
60}
61
62impl ChangedError {
63    /// True, if remote endpoint has closed channel.
64    #[deprecated = "a remoc::rch::watch::ChangedError is always due to closure"]
65    pub fn is_closed(&self) -> bool {
66        true
67    }
68}
69
70impl fmt::Display for ChangedError {
71    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
72        match self {
73            Self::Closed => write!(f, "closed"),
74        }
75    }
76}
77
78impl Error for ChangedError {}
79
80/// Receive values from the associated [Sender](super::Sender),
81/// which may be located on a remote endpoint.
82///
83/// Instances are created by the [channel](super::channel) function.
84///
85/// This can be converted into a [Stream](futures::Stream) of values by wrapping it into
86/// a [ReceiverStream].
87#[derive(Clone)]
88pub struct Receiver<T, Codec = codec::Default, const MAX_ITEM_SIZE: usize = DEFAULT_MAX_ITEM_SIZE> {
89    rx: tokio::sync::watch::Receiver<Result<T, RecvError>>,
90    remote_send_err_tx: tokio::sync::mpsc::UnboundedSender<RemoteSendError>,
91    remote_max_item_size: Option<usize>,
92    _codec: PhantomData<Codec>,
93}
94
95impl<T, Codec, const MAX_ITEM_SIZE: usize> fmt::Debug for Receiver<T, Codec, MAX_ITEM_SIZE> {
96    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
97        f.debug_struct("Receiver").finish()
98    }
99}
100
101/// Watch receiver in transport.
102#[derive(Serialize, Deserialize)]
103pub(crate) struct TransportedReceiver<T, Codec> {
104    /// chmux port number.
105    port: u32,
106    /// Current data value.
107    data: Result<T, RecvError>,
108    /// Data codec.
109    codec: PhantomData<Codec>,
110    /// Maximum item size.
111    #[serde(default)]
112    max_item_size: u64,
113}
114
115impl<T, Codec, const MAX_ITEM_SIZE: usize> Receiver<T, Codec, MAX_ITEM_SIZE> {
116    pub(crate) fn new(
117        rx: tokio::sync::watch::Receiver<Result<T, RecvError>>,
118        remote_send_err_tx: tokio::sync::mpsc::UnboundedSender<RemoteSendError>,
119        remote_max_item_size: Option<usize>,
120    ) -> Self {
121        Self { rx, remote_send_err_tx, remote_max_item_size, _codec: PhantomData }
122    }
123
124    /// Returns a reference to the most recently received value.
125    #[inline]
126    pub fn borrow(&self) -> Result<Ref<'_, T>, RecvError> {
127        let ref_res = self.rx.borrow();
128        match &*ref_res {
129            Ok(_) => Ok(Ref(ref_res)),
130            Err(err) => Err(err.clone()),
131        }
132    }
133
134    /// Returns a reference to the most recently received value and mark that value as seen.
135    #[inline]
136    pub fn borrow_and_update(&mut self) -> Result<Ref<'_, T>, RecvError> {
137        let ref_res = self.rx.borrow_and_update();
138        match &*ref_res {
139            Ok(_) => Ok(Ref(ref_res)),
140            Err(err) => Err(err.clone()),
141        }
142    }
143
144    /// Wait for a change notification, then mark the newest value as seen.
145    #[inline]
146    pub async fn changed(&mut self) -> Result<(), ChangedError> {
147        self.rx.changed().await.map_err(|_| ChangedError::Closed)
148    }
149
150    /// Maximum allowed item size in bytes when receiving items.
151    pub fn max_item_size(&self) -> usize {
152        MAX_ITEM_SIZE
153    }
154
155    /// Sets the maximum allowed item size in bytes when receiving items.
156    pub fn set_max_item_size<const NEW_MAX_ITEM_SIZE: usize>(mut self) -> Receiver<T, Codec, NEW_MAX_ITEM_SIZE> {
157        Receiver {
158            rx: mem::replace(
159                &mut self.rx,
160                tokio::sync::watch::channel(Err(RecvError::RemoteConnect(chmux::ConnectError::ChMux))).1,
161            ),
162            remote_send_err_tx: self.remote_send_err_tx.clone(),
163            remote_max_item_size: self.remote_max_item_size,
164            _codec: PhantomData,
165        }
166    }
167
168    /// The maximum item size of the remote sender.
169    ///
170    /// If this is larger than [max_item_size](Self::max_item_size) sending of oversized
171    /// items will succeed but receiving will fail with a
172    /// [MaxItemSizeExceeded error](base::RecvError::MaxItemSizeExceeded).
173    pub fn remote_max_item_size(&self) -> Option<usize> {
174        self.remote_max_item_size
175    }
176}
177
178impl<T, Codec, const MAX_ITEM_SIZE: usize> Drop for Receiver<T, Codec, MAX_ITEM_SIZE> {
179    fn drop(&mut self) {
180        // empty
181    }
182}
183
184impl<T, Codec, const MAX_ITEM_SIZE: usize> Serialize for Receiver<T, Codec, MAX_ITEM_SIZE>
185where
186    T: RemoteSend + Sync + Clone,
187    Codec: codec::Codec,
188{
189    /// Serializes this receiver for sending over a chmux channel.
190    #[inline]
191    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
192    where
193        S: serde::Serializer,
194    {
195        // Prepare channel for takeover.
196        let rx = self.rx.clone();
197        let remote_send_err_tx = self.remote_send_err_tx.clone();
198
199        let port = PortSerializer::connect(|connect| {
200            async move {
201                // Establish chmux channel.
202                let (raw_tx, raw_rx) = match connect.await {
203                    Ok(tx_rx) => tx_rx,
204                    Err(err) => {
205                        let _ = remote_send_err_tx.send(RemoteSendError::Connect(err));
206                        return;
207                    }
208                };
209
210                super::send_impl::<T, Codec>(rx, raw_tx, raw_rx, remote_send_err_tx, MAX_ITEM_SIZE).await;
211            }
212            .boxed()
213        })?;
214
215        // Encode chmux port number in transport type and serialize it.
216        let data = self.rx.borrow().clone();
217        let transported = TransportedReceiver::<T, Codec> {
218            port,
219            data,
220            max_item_size: self.max_item_size().try_into().unwrap_or(u64::MAX),
221            codec: PhantomData,
222        };
223        transported.serialize(serializer)
224    }
225}
226
227impl<'de, T, Codec, const MAX_ITEM_SIZE: usize> Deserialize<'de> for Receiver<T, Codec, MAX_ITEM_SIZE>
228where
229    T: RemoteSend + Sync,
230    Codec: codec::Codec,
231{
232    /// Deserializes the receiver after it has been received over a chmux channel.
233    #[inline]
234    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
235    where
236        D: serde::Deserializer<'de>,
237    {
238        // Get chmux port number from deserialized transport type.
239        let TransportedReceiver { port, data, max_item_size, .. } =
240            TransportedReceiver::<T, Codec>::deserialize(deserializer)?;
241
242        let max_item_size = usize::try_from(max_item_size).unwrap_or(usize::MAX);
243        if max_item_size > MAX_ITEM_SIZE {
244            tracing::debug!(
245                "Watch receiver maximum item size is {MAX_ITEM_SIZE} bytes, \
246                 but remote endpoint expects at least {max_item_size} bytes"
247            );
248        }
249
250        // Create channels.
251        let (tx, rx) = tokio::sync::watch::channel(data);
252        let (remote_send_err_tx, remote_send_err_rx) = tokio::sync::mpsc::unbounded_channel();
253
254        PortDeserializer::accept(port, |local_port, request| {
255            async move {
256                // Accept chmux connection request.
257                let (raw_tx, raw_rx) = match request.accept_from(local_port).await {
258                    Ok(tx_rx) => tx_rx,
259                    Err(err) => {
260                        let _ = tx.send(Err(RecvError::RemoteListen(err)));
261                        return;
262                    }
263                };
264
265                super::recv_impl::<T, Codec>(tx, raw_tx, raw_rx, remote_send_err_rx, None, MAX_ITEM_SIZE).await;
266            }
267            .boxed()
268        })?;
269
270        Ok(Self::new(rx, remote_send_err_tx, Some(max_item_size)))
271    }
272}
273
274/// A wrapper around a watch [Receiver] that implements [Stream](futures::Stream).
275///
276/// This stream will always start by yielding the current value when it is polled,
277/// regardless of whether it was the initial value or sent afterwards.
278///
279/// Note that intermediate values may be missed due to the nature of watch channels.
280pub struct ReceiverStream<T, Codec = codec::Default, const MAX_ITEM_SIZE: usize = DEFAULT_MAX_ITEM_SIZE> {
281    inner: ReusableBoxFuture<'static, (Result<(), ChangedError>, Receiver<T, Codec, MAX_ITEM_SIZE>)>,
282}
283
284impl<T, Codec, const MAX_ITEM_SIZE: usize> fmt::Debug for ReceiverStream<T, Codec, MAX_ITEM_SIZE> {
285    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
286        f.debug_struct("ReceiverStream").finish()
287    }
288}
289
290impl<T, Codec, const MAX_ITEM_SIZE: usize> ReceiverStream<T, Codec, MAX_ITEM_SIZE>
291where
292    T: RemoteSend + Sync,
293    Codec: Send + 'static,
294{
295    /// Creates a new `ReceiverStream`.
296    pub fn new(rx: Receiver<T, Codec, MAX_ITEM_SIZE>) -> Self {
297        Self { inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }) }
298    }
299
300    async fn make_future(
301        mut rx: Receiver<T, Codec, MAX_ITEM_SIZE>,
302    ) -> (Result<(), ChangedError>, Receiver<T, Codec, MAX_ITEM_SIZE>) {
303        let result = rx.changed().await;
304        (result, rx)
305    }
306}
307
308impl<T, Codec, const MAX_ITEM_SIZE: usize> Stream for ReceiverStream<T, Codec, MAX_ITEM_SIZE>
309where
310    T: Clone + RemoteSend + Sync,
311    Codec: Send + 'static,
312{
313    type Item = Result<T, RecvError>;
314
315    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
316        let (result, mut rx) = ready!(self.inner.poll(cx));
317        match result {
318            Ok(()) => {
319                let received = rx.borrow_and_update().map(|v| v.clone());
320                self.inner.set(Self::make_future(rx));
321                Poll::Ready(Some(received))
322            }
323            Err(_) => {
324                self.inner.set(Self::make_future(rx));
325                Poll::Ready(None)
326            }
327        }
328    }
329}
330
331impl<T, Codec, const MAX_ITEM_SIZE: usize> Unpin for ReceiverStream<T, Codec, MAX_ITEM_SIZE> {}
332
333impl<T, Codec, const MAX_ITEM_SIZE: usize> From<Receiver<T, Codec, MAX_ITEM_SIZE>>
334    for ReceiverStream<T, Codec, MAX_ITEM_SIZE>
335where
336    T: RemoteSend + Sync,
337    Codec: Send + 'static,
338{
339    fn from(recv: Receiver<T, Codec, MAX_ITEM_SIZE>) -> Self {
340        Self::new(recv)
341    }
342}