wasmrs_rx/flux/
receiver.rs

1use std::task::Poll;
2use std::{io::Write, pin::Pin};
3
4use futures::Stream;
5
6use super::{signal_into_result, FutureResult, Signal};
7use crate::{Error, FluxChannel, Observable, Observer};
8use wasmrs_runtime::{ConditionallySendSync, OptionalMut, UnboundedReceiver};
9
10#[must_use]
11#[allow(missing_debug_implementations)]
12/// The receving end-only of a [crate::Flux]
13pub struct FluxReceiver<Item, Err>
14where
15  Item: ConditionallySendSync,
16  Err: ConditionallySendSync,
17{
18  rx: OptionalMut<UnboundedReceiver<Signal<Item, Err>>>,
19}
20
21impl<Item, Err> FluxReceiver<Item, Err>
22where
23  Item: ConditionallySendSync,
24  Err: ConditionallySendSync,
25{
26  /// Create a new [FluxReceiver].
27  pub fn new(rx: UnboundedReceiver<Signal<Item, Err>>) -> Self {
28    Self {
29      rx: OptionalMut::new(rx),
30    }
31  }
32
33  /// Create a [Pin<Box<FluxReceiver>>] from a [FluxReceiver].
34  #[must_use]
35  pub fn boxed(self) -> Pin<Box<Self>> {
36    Box::pin(self)
37  }
38
39  /// Create a new [FluxReceiver] that is immediately closed.
40  pub fn none() -> Self {
41    Self {
42      rx: OptionalMut::none(),
43    }
44  }
45
46  /// Create a new [FluxReceiver] that is immediately closed with the passed item.
47  pub fn one<I, E>(item: Result<I, E>) -> FluxReceiver<I, E>
48  where
49    I: ConditionallySendSync,
50    E: ConditionallySendSync,
51  {
52    let (tx, rx) = FluxChannel::new_parts();
53    tx.send_result(item).unwrap();
54    rx
55  }
56}
57
58impl<Err> futures::io::AsyncRead for FluxReceiver<Vec<u8>, Err>
59where
60  Err: ConditionallySendSync,
61{
62  fn poll_read(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
63    match Pin::new(&mut self.get_mut()).poll_next(cx) {
64      Poll::Ready(Some(Ok(item))) => {
65        let len = item.len();
66        let mut buf = std::io::Cursor::new(buf);
67        buf.write_all(&item).unwrap();
68        Poll::Ready(Ok(len))
69      }
70      Poll::Ready(Some(Err(_err))) => Poll::Ready(Err(std::io::Error::new(
71        std::io::ErrorKind::Other,
72        crate::Error::RecvFailed(99),
73      ))),
74      Poll::Ready(None) => Poll::Ready(Ok(0)),
75      Poll::Pending => Poll::Pending,
76    }
77  }
78}
79
80impl<Err> futures::io::AsyncRead for FluxReceiver<bytes::Bytes, Err>
81where
82  Err: ConditionallySendSync,
83{
84  fn poll_read(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
85    match Pin::new(&mut self.get_mut()).poll_next(cx) {
86      Poll::Ready(Some(Ok(item))) => {
87        let len = item.len();
88        let mut buf = std::io::Cursor::new(buf);
89        buf.write_all(&item).unwrap();
90        Poll::Ready(Ok(len))
91      }
92      Poll::Ready(Some(Err(_err))) => Poll::Ready(Err(std::io::Error::new(
93        std::io::ErrorKind::Other,
94        crate::Error::RecvFailed(99),
95      ))),
96      Poll::Ready(None) => Poll::Ready(Ok(0)),
97      Poll::Pending => Poll::Pending,
98    }
99  }
100}
101
102impl<Item, Err> Clone for FluxReceiver<Item, Err>
103where
104  Item: ConditionallySendSync,
105  Err: ConditionallySendSync,
106{
107  fn clone(&self) -> Self {
108    Self { rx: self.rx.clone() }
109  }
110}
111
112impl<Item, Err> Observable<Item, Err> for FluxReceiver<Item, Err>
113where
114  Item: ConditionallySendSync,
115  Err: ConditionallySendSync,
116{
117}
118
119impl<Item, Err> FluxReceiver<Item, Err>
120where
121  Item: ConditionallySendSync,
122  Err: ConditionallySendSync,
123{
124  #[must_use]
125  /// Receive the next value from the [FluxReceiver].
126  pub fn recv(&self) -> FutureResult<Item, Err>
127  where
128    Err: ConditionallySendSync,
129    Item: ConditionallySendSync,
130  {
131    let root_rx = self.rx.clone();
132    let opt = root_rx.take();
133    Box::pin(async move {
134      match opt {
135        Some(mut rx) => {
136          let signal = rx.recv().await;
137          root_rx.insert(rx);
138          Ok(signal_into_result(signal))
139        }
140        None => Err(Error::RecvFailed(0)),
141      }
142    })
143  }
144
145  /// Poll the [FluxReceiver] to see if there is a value available.
146  pub fn poll_recv(&self, cx: &mut std::task::Context<'_>) -> Poll<Option<Result<Item, Err>>> {
147    let opt = self.rx.take();
148    opt.map_or(std::task::Poll::Ready(None), |mut rx| {
149      let poll = rx.poll_recv(cx);
150      match poll {
151        Poll::Ready(Some(Signal::Complete)) => Poll::Ready(None),
152        Poll::Ready(Some(Signal::Ok(v))) => {
153          self.rx.insert(rx);
154          Poll::Ready(Some(Ok(v)))
155        }
156        Poll::Ready(Some(Signal::Err(e))) => {
157          self.rx.insert(rx);
158          Poll::Ready(Some(Err(e)))
159        }
160        Poll::Ready(None) => Poll::Ready(None),
161        Poll::Pending => {
162          self.rx.insert(rx);
163          Poll::Pending
164        }
165      }
166    })
167  }
168
169  #[must_use]
170  /// Remove the inner channel from the [FluxReceiver]
171  pub fn eject(&self) -> Option<Self> {
172    self.rx.take().map(|inner| Self {
173      rx: OptionalMut::new(inner),
174    })
175  }
176}
177
178impl<Item, Err> Stream for FluxReceiver<Item, Err>
179where
180  Item: ConditionallySendSync,
181  Err: ConditionallySendSync,
182{
183  type Item = Result<Item, Err>;
184
185  fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
186    self.poll_recv(cx)
187  }
188}