1use futures::{ready, FutureExt, Stream};
2use serde::{Deserialize, Serialize};
3use std::{
4 error::Error,
5 fmt,
6 marker::PhantomData,
7 mem,
8 pin::Pin,
9 task::{Context, Poll},
10};
11use tokio_util::sync::ReusableBoxFuture;
12
13use super::{
14 super::{
15 base::{self, PortDeserializer, PortSerializer},
16 RemoteSendError, DEFAULT_MAX_ITEM_SIZE,
17 },
18 Ref,
19};
20use crate::{chmux, codec, RemoteSend};
21
22#[derive(Clone, Debug, Serialize, Deserialize)]
24pub enum RecvError {
25 RemoteReceive(base::RecvError),
27 RemoteConnect(chmux::ConnectError),
29 RemoteListen(chmux::ListenerError),
31}
32
33impl fmt::Display for RecvError {
34 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
35 match self {
36 Self::RemoteReceive(err) => write!(f, "receive error: {err}"),
37 Self::RemoteConnect(err) => write!(f, "connect error: {err}"),
38 Self::RemoteListen(err) => write!(f, "listen error: {err}"),
39 }
40 }
41}
42
43impl Error for RecvError {}
44
45impl RecvError {
46 pub fn is_final(&self) -> bool {
48 match self {
49 Self::RemoteReceive(err) => err.is_final(),
50 Self::RemoteConnect(_) | Self::RemoteListen(_) => true,
51 }
52 }
53}
54
55#[derive(Clone, Debug, Serialize, Deserialize)]
57pub enum ChangedError {
58 Closed,
60}
61
62impl ChangedError {
63 #[deprecated = "a remoc::rch::watch::ChangedError is always due to closure"]
65 pub fn is_closed(&self) -> bool {
66 true
67 }
68}
69
70impl fmt::Display for ChangedError {
71 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
72 match self {
73 Self::Closed => write!(f, "closed"),
74 }
75 }
76}
77
78impl Error for ChangedError {}
79
80#[derive(Clone)]
88pub struct Receiver<T, Codec = codec::Default, const MAX_ITEM_SIZE: usize = DEFAULT_MAX_ITEM_SIZE> {
89 rx: tokio::sync::watch::Receiver<Result<T, RecvError>>,
90 remote_send_err_tx: tokio::sync::mpsc::UnboundedSender<RemoteSendError>,
91 remote_max_item_size: Option<usize>,
92 _codec: PhantomData<Codec>,
93}
94
95impl<T, Codec, const MAX_ITEM_SIZE: usize> fmt::Debug for Receiver<T, Codec, MAX_ITEM_SIZE> {
96 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
97 f.debug_struct("Receiver").finish()
98 }
99}
100
101#[derive(Serialize, Deserialize)]
103pub(crate) struct TransportedReceiver<T, Codec> {
104 port: u32,
106 data: Result<T, RecvError>,
108 codec: PhantomData<Codec>,
110 #[serde(default)]
112 max_item_size: u64,
113}
114
115impl<T, Codec, const MAX_ITEM_SIZE: usize> Receiver<T, Codec, MAX_ITEM_SIZE> {
116 pub(crate) fn new(
117 rx: tokio::sync::watch::Receiver<Result<T, RecvError>>,
118 remote_send_err_tx: tokio::sync::mpsc::UnboundedSender<RemoteSendError>,
119 remote_max_item_size: Option<usize>,
120 ) -> Self {
121 Self { rx, remote_send_err_tx, remote_max_item_size, _codec: PhantomData }
122 }
123
124 #[inline]
126 pub fn borrow(&self) -> Result<Ref<'_, T>, RecvError> {
127 let ref_res = self.rx.borrow();
128 match &*ref_res {
129 Ok(_) => Ok(Ref(ref_res)),
130 Err(err) => Err(err.clone()),
131 }
132 }
133
134 #[inline]
136 pub fn borrow_and_update(&mut self) -> Result<Ref<'_, T>, RecvError> {
137 let ref_res = self.rx.borrow_and_update();
138 match &*ref_res {
139 Ok(_) => Ok(Ref(ref_res)),
140 Err(err) => Err(err.clone()),
141 }
142 }
143
144 #[inline]
146 pub async fn changed(&mut self) -> Result<(), ChangedError> {
147 self.rx.changed().await.map_err(|_| ChangedError::Closed)
148 }
149
150 pub fn max_item_size(&self) -> usize {
152 MAX_ITEM_SIZE
153 }
154
155 pub fn set_max_item_size<const NEW_MAX_ITEM_SIZE: usize>(mut self) -> Receiver<T, Codec, NEW_MAX_ITEM_SIZE> {
157 Receiver {
158 rx: mem::replace(
159 &mut self.rx,
160 tokio::sync::watch::channel(Err(RecvError::RemoteConnect(chmux::ConnectError::ChMux))).1,
161 ),
162 remote_send_err_tx: self.remote_send_err_tx.clone(),
163 remote_max_item_size: self.remote_max_item_size,
164 _codec: PhantomData,
165 }
166 }
167
168 pub fn remote_max_item_size(&self) -> Option<usize> {
174 self.remote_max_item_size
175 }
176}
177
178impl<T, Codec, const MAX_ITEM_SIZE: usize> Drop for Receiver<T, Codec, MAX_ITEM_SIZE> {
179 fn drop(&mut self) {
180 }
182}
183
184impl<T, Codec, const MAX_ITEM_SIZE: usize> Serialize for Receiver<T, Codec, MAX_ITEM_SIZE>
185where
186 T: RemoteSend + Sync + Clone,
187 Codec: codec::Codec,
188{
189 #[inline]
191 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
192 where
193 S: serde::Serializer,
194 {
195 let rx = self.rx.clone();
197 let remote_send_err_tx = self.remote_send_err_tx.clone();
198
199 let port = PortSerializer::connect(|connect| {
200 async move {
201 let (raw_tx, raw_rx) = match connect.await {
203 Ok(tx_rx) => tx_rx,
204 Err(err) => {
205 let _ = remote_send_err_tx.send(RemoteSendError::Connect(err));
206 return;
207 }
208 };
209
210 super::send_impl::<T, Codec>(rx, raw_tx, raw_rx, remote_send_err_tx, MAX_ITEM_SIZE).await;
211 }
212 .boxed()
213 })?;
214
215 let data = self.rx.borrow().clone();
217 let transported = TransportedReceiver::<T, Codec> {
218 port,
219 data,
220 max_item_size: self.max_item_size().try_into().unwrap_or(u64::MAX),
221 codec: PhantomData,
222 };
223 transported.serialize(serializer)
224 }
225}
226
227impl<'de, T, Codec, const MAX_ITEM_SIZE: usize> Deserialize<'de> for Receiver<T, Codec, MAX_ITEM_SIZE>
228where
229 T: RemoteSend + Sync,
230 Codec: codec::Codec,
231{
232 #[inline]
234 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
235 where
236 D: serde::Deserializer<'de>,
237 {
238 let TransportedReceiver { port, data, max_item_size, .. } =
240 TransportedReceiver::<T, Codec>::deserialize(deserializer)?;
241
242 let max_item_size = usize::try_from(max_item_size).unwrap_or(usize::MAX);
243 if max_item_size > MAX_ITEM_SIZE {
244 tracing::debug!(
245 "Watch receiver maximum item size is {MAX_ITEM_SIZE} bytes, \
246 but remote endpoint expects at least {max_item_size} bytes"
247 );
248 }
249
250 let (tx, rx) = tokio::sync::watch::channel(data);
252 let (remote_send_err_tx, remote_send_err_rx) = tokio::sync::mpsc::unbounded_channel();
253
254 PortDeserializer::accept(port, |local_port, request| {
255 async move {
256 let (raw_tx, raw_rx) = match request.accept_from(local_port).await {
258 Ok(tx_rx) => tx_rx,
259 Err(err) => {
260 let _ = tx.send(Err(RecvError::RemoteListen(err)));
261 return;
262 }
263 };
264
265 super::recv_impl::<T, Codec>(tx, raw_tx, raw_rx, remote_send_err_rx, None, MAX_ITEM_SIZE).await;
266 }
267 .boxed()
268 })?;
269
270 Ok(Self::new(rx, remote_send_err_tx, Some(max_item_size)))
271 }
272}
273
274pub struct ReceiverStream<T, Codec = codec::Default, const MAX_ITEM_SIZE: usize = DEFAULT_MAX_ITEM_SIZE> {
281 inner: ReusableBoxFuture<'static, (Result<(), ChangedError>, Receiver<T, Codec, MAX_ITEM_SIZE>)>,
282}
283
284impl<T, Codec, const MAX_ITEM_SIZE: usize> fmt::Debug for ReceiverStream<T, Codec, MAX_ITEM_SIZE> {
285 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
286 f.debug_struct("ReceiverStream").finish()
287 }
288}
289
290impl<T, Codec, const MAX_ITEM_SIZE: usize> ReceiverStream<T, Codec, MAX_ITEM_SIZE>
291where
292 T: RemoteSend + Sync,
293 Codec: Send + 'static,
294{
295 pub fn new(rx: Receiver<T, Codec, MAX_ITEM_SIZE>) -> Self {
297 Self { inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }) }
298 }
299
300 async fn make_future(
301 mut rx: Receiver<T, Codec, MAX_ITEM_SIZE>,
302 ) -> (Result<(), ChangedError>, Receiver<T, Codec, MAX_ITEM_SIZE>) {
303 let result = rx.changed().await;
304 (result, rx)
305 }
306}
307
308impl<T, Codec, const MAX_ITEM_SIZE: usize> Stream for ReceiverStream<T, Codec, MAX_ITEM_SIZE>
309where
310 T: Clone + RemoteSend + Sync,
311 Codec: Send + 'static,
312{
313 type Item = Result<T, RecvError>;
314
315 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
316 let (result, mut rx) = ready!(self.inner.poll(cx));
317 match result {
318 Ok(()) => {
319 let received = rx.borrow_and_update().map(|v| v.clone());
320 self.inner.set(Self::make_future(rx));
321 Poll::Ready(Some(received))
322 }
323 Err(_) => {
324 self.inner.set(Self::make_future(rx));
325 Poll::Ready(None)
326 }
327 }
328 }
329}
330
331impl<T, Codec, const MAX_ITEM_SIZE: usize> Unpin for ReceiverStream<T, Codec, MAX_ITEM_SIZE> {}
332
333impl<T, Codec, const MAX_ITEM_SIZE: usize> From<Receiver<T, Codec, MAX_ITEM_SIZE>>
334 for ReceiverStream<T, Codec, MAX_ITEM_SIZE>
335where
336 T: RemoteSend + Sync,
337 Codec: Send + 'static,
338{
339 fn from(recv: Receiver<T, Codec, MAX_ITEM_SIZE>) -> Self {
340 Self::new(recv)
341 }
342}