wasmrs_rx/
flux.rs

1use std::io::Write;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicBool, Ordering};
4
5use std::task::Poll;
6
7use futures::AsyncRead;
8use futures::{Future, FutureExt, Stream};
9use wasmrs_runtime::unbounded_channel;
10use wasmrs_runtime::BoxFuture;
11use wasmrs_runtime::ConditionallySendSync;
12use wasmrs_runtime::UnboundedSender;
13
14use crate::Error;
15
16mod ops;
17pub use ops::*;
18mod receiver;
19pub use receiver::*;
20mod signal;
21pub use signal::*;
22mod observer;
23pub use observer::*;
24mod observable;
25pub use observable::*;
26
27type FutureResult<Item, Err> = BoxFuture<Result<Option<Result<Item, Err>>, Error>>;
28
29/// A pinned, boxed [Flux].
30pub type FluxBox<Item, Err> = Pin<Box<dyn Observable<Item, Err>>>;
31
32/// A [Future] that wraps a [Result] and can be used as a [Mono].
33pub trait MonoFuture<Item, Err>: Future<Output = Result<Item, Err>> + ConditionallySendSync {}
34
35#[allow(missing_debug_implementations)]
36#[must_use]
37/// An implementation of [Mono] as seen in RSocket and reactive streams. It is similar to a [Future<Output = Result<Item, Err>>] that can be pushed to after instantiation.
38pub struct Mono<Item, Err>
39where
40  Item: ConditionallySendSync,
41  Err: ConditionallySendSync + Sync,
42{
43  inner: Option<Pin<Box<dyn MonoFuture<Item, Err>>>>,
44  done: AtomicBool,
45}
46
47impl<Item, Err> Mono<Item, Err>
48where
49  Item: ConditionallySendSync,
50  Err: ConditionallySendSync + Sync,
51{
52  /// Create a new [Mono].
53  pub fn new() -> Self {
54    Self {
55      inner: None,
56      done: AtomicBool::new(false),
57    }
58  }
59
60  /// Create a [Pin<Box<Mono>>] from a [Mono].
61  #[must_use]
62  pub fn boxed(self) -> Pin<Box<Self>> {
63    Box::pin(self)
64  }
65
66  /// Create a [Mono] from a [Future].
67  pub fn from_future<Fut>(fut: Fut) -> Self
68  where
69    Fut: MonoFuture<Item, Err>,
70  {
71    Self {
72      inner: Some(Box::pin(fut)),
73      done: AtomicBool::new(false),
74    }
75  }
76
77  /// Create a new [Mono] that holds an [Err] value.
78  pub fn new_error(err: Err) -> Self {
79    Self {
80      inner: Some(Box::pin(futures::future::ready(Err(err)))),
81      done: AtomicBool::new(false),
82    }
83  }
84
85  /// Create a new [Mono] that holds an [Item].
86  pub fn new_success(ok: Item) -> Self {
87    Self {
88      inner: Some(Box::pin(futures::future::ready(Ok(ok)))),
89      done: AtomicBool::new(false),
90    }
91  }
92
93  /// Push an `Item` to the [Mono]
94  pub fn success(&mut self, ok: Item) {
95    assert!(self.inner.is_none(), "Can not push more than one value to a Mono");
96    self.inner = Some(Box::pin(futures::future::ready(Ok(ok))));
97  }
98
99  /// Push an `Error` to the [Mono]
100  pub fn error(&mut self, error: Err) {
101    assert!(self.inner.is_none(), "Can not push more than one value to a Mono");
102    self.inner = Some(Box::pin(futures::future::ready(Err(error))));
103  }
104}
105
106impl<Item, Err> Default for Mono<Item, Err>
107where
108  Item: ConditionallySendSync,
109  Err: ConditionallySendSync + Sync,
110{
111  fn default() -> Self {
112    Self::new()
113  }
114}
115
116impl<Item, Err> Stream for Mono<Item, Err>
117where
118  Item: ConditionallySendSync,
119  Err: ConditionallySendSync + Sync,
120{
121  type Item = Result<Item, Err>;
122
123  fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
124    if self.done.load(Ordering::SeqCst) {
125      return Poll::Ready(None);
126    }
127    let s = self.get_mut();
128    match s.inner.as_mut() {
129      Some(inner_future) => match inner_future.poll_unpin(cx) {
130        Poll::Ready(v) => {
131          s.done.store(true, Ordering::SeqCst);
132          Poll::Ready(Some(v))
133        }
134        Poll::Pending => Poll::Pending,
135      },
136      None => Poll::Pending,
137    }
138  }
139}
140
141impl<Item, Err, T> MonoFuture<Item, Err> for T
142where
143  T: Future<Output = Result<Item, Err>> + ConditionallySendSync,
144  Item: ConditionallySendSync,
145  Err: ConditionallySendSync,
146{
147}
148
149impl<Item, Err> Future for Mono<Item, Err>
150where
151  Item: ConditionallySendSync,
152  Err: ConditionallySendSync + Sync,
153{
154  type Output = Result<Item, Err>;
155
156  fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
157    self
158      .get_mut()
159      .inner
160      .as_mut()
161      .map_or_else(|| Poll::Pending, |inner_future| inner_future.poll_unpin(cx))
162  }
163}
164
165#[must_use]
166#[allow(missing_debug_implementations)]
167/// An implementation of the `Flux` as seen in RSocket and reactive streams. It is similar to a [Stream<Item = Result<Item, Err>>] or an unbounded channel.
168pub struct FluxChannel<Item, Err>
169where
170  Item: ConditionallySendSync,
171  Err: ConditionallySendSync,
172{
173  tx: UnboundedSender<Signal<Item, Err>>,
174  rx: FluxReceiver<Item, Err>,
175}
176
177impl<Item, Err> FluxChannel<Item, Err>
178where
179  Item: ConditionallySendSync,
180  Err: ConditionallySendSync,
181{
182  /// Create a new [Flux]
183  pub fn new() -> Self {
184    let (tx, rx) = unbounded_channel();
185
186    Self {
187      tx,
188      rx: FluxReceiver::new(rx),
189    }
190  }
191
192  /// Create a [Pin<Box<FluxChannel>>] from a [FluxChannel].
193  #[must_use]
194  pub fn boxed(self) -> Pin<Box<Self>> {
195    Box::pin(self)
196  }
197
198  /// Create a new [Flux] and return the parts, pre-separated.
199  pub fn new_parts() -> (Self, FluxReceiver<Item, Err>) {
200    let (tx, rx) = unbounded_channel();
201
202    (
203      Self {
204        tx,
205        rx: FluxReceiver::none(),
206      },
207      FluxReceiver::new(rx),
208    )
209  }
210
211  #[must_use]
212  /// Check if the [FluxChannel] is complete.
213  pub fn is_closed(&self) -> bool {
214    self.tx.is_closed()
215  }
216
217  #[must_use]
218  /// Await the next value in the [FluxChannel].
219  pub fn recv(&self) -> FutureResult<Item, Err>
220  where
221    Err: 'static + std::fmt::Debug,
222    Item: 'static + std::fmt::Debug,
223  {
224    let val = self.rx.recv();
225    Box::pin(async move { val.await })
226  }
227
228  /// Return and remove the receiving channel from this [Flux].
229  pub fn take_rx(&self) -> Result<FluxReceiver<Item, Err>, Error> {
230    self.rx.eject().ok_or(Error::ReceiverAlreadyGone)
231  }
232}
233
234impl<Item, Err> TryFrom<FluxChannel<Item, Err>> for FluxReceiver<Item, Err>
235where
236  Item: ConditionallySendSync,
237  Err: ConditionallySendSync,
238{
239  type Error = Error;
240
241  fn try_from(value: FluxChannel<Item, Err>) -> Result<Self, Self::Error> {
242    value.take_rx()
243  }
244}
245
246impl<Item, Err> Clone for FluxChannel<Item, Err>
247where
248  Item: ConditionallySendSync,
249  Err: ConditionallySendSync,
250{
251  fn clone(&self) -> Self {
252    Self {
253      tx: self.tx.clone(),
254      rx: self.rx.clone(),
255    }
256  }
257}
258
259impl<Err> AsyncRead for FluxChannel<Vec<u8>, Err>
260where
261  Err: ConditionallySendSync,
262{
263  fn poll_read(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
264    match Pin::new(&mut self.get_mut().rx).poll_next(cx) {
265      Poll::Ready(Some(Ok(item))) => {
266        let len = item.len();
267        let mut buf = std::io::Cursor::new(buf);
268        buf.write_all(&item).unwrap();
269        Poll::Ready(Ok(len))
270      }
271      Poll::Ready(Some(Err(_err))) => Poll::Ready(Err(std::io::Error::new(
272        std::io::ErrorKind::Other,
273        crate::Error::RecvFailed(98),
274      ))),
275      Poll::Ready(None) => Poll::Ready(Ok(0)),
276      Poll::Pending => Poll::Pending,
277    }
278  }
279}
280
281impl<Item, Err> Observable<Item, Err> for FluxChannel<Item, Err>
282where
283  Item: ConditionallySendSync,
284  Err: ConditionallySendSync,
285{
286}
287
288impl<Item, Err> Observer<Item, Err> for FluxChannel<Item, Err>
289where
290  Item: ConditionallySendSync,
291  Err: ConditionallySendSync,
292{
293  fn send_signal(&self, signal: Signal<Item, Err>) -> Result<(), Error> {
294    Ok(self.tx.send(signal)?)
295  }
296
297  fn is_complete(&self) -> bool {
298    self.tx.is_closed()
299  }
300
301  fn complete(&self) {
302    let _ = self.send_signal(Signal::Complete);
303  }
304}
305
306impl<Item, Err> Default for FluxChannel<Item, Err>
307where
308  Item: ConditionallySendSync,
309  Err: ConditionallySendSync,
310{
311  fn default() -> Self {
312    Self::new()
313  }
314}
315
316impl<Item, Err> Stream for FluxChannel<Item, Err>
317where
318  Item: ConditionallySendSync,
319  Err: ConditionallySendSync,
320{
321  type Item = Result<Item, Err>;
322
323  fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
324    self.rx.poll_recv(cx)
325  }
326}
327
328impl<Item, Err> From<Vec<Result<Item, Err>>> for FluxChannel<Item, Err>
329where
330  Item: ConditionallySendSync,
331  Err: ConditionallySendSync,
332{
333  fn from(value: Vec<Result<Item, Err>>) -> Self {
334    Self::from_iter(value.into_iter())
335  }
336}
337
338impl<Item, Err, const N: usize> From<[Result<Item, Err>; N]> for FluxChannel<Item, Err>
339where
340  Item: ConditionallySendSync,
341  Err: ConditionallySendSync,
342{
343  fn from(value: [Result<Item, Err>; N]) -> Self {
344    Self::from_iter(value.into_iter())
345  }
346}
347
348impl<Item, Err> FromIterator<Result<Item, Err>> for FluxChannel<Item, Err>
349where
350  Item: ConditionallySendSync,
351  Err: ConditionallySendSync,
352{
353  fn from_iter<T: IntoIterator<Item = Result<Item, Err>>>(iter: T) -> Self {
354    let (tx, _) = Self::new_parts();
355    for item in iter {
356      let _ = tx.send_result(item);
357    }
358    tx.complete();
359    tx
360  }
361}
362
363fn signal_into_result<Item, Err>(signal: Option<Signal<Item, Err>>) -> Option<Result<Item, Err>>
364where
365  Item: ConditionallySendSync,
366  Err: ConditionallySendSync,
367{
368  match signal {
369    Some(Signal::Complete) => None,
370    Some(Signal::Ok(v)) => Some(Ok(v)),
371    Some(Signal::Err(e)) => Some(Err(e)),
372    None => None,
373  }
374}
375
376#[cfg(all(test, not(target_family = "wasm")))]
377mod test {
378
379  use anyhow::Result;
380  use futures::StreamExt;
381
382  use super::*;
383
384  #[tokio::test]
385  async fn test_flux() -> Result<()> {
386    let mut flux = FluxChannel::<u32, u32>::new();
387    flux.send(1)?;
388    let value = flux.next().await;
389    assert_eq!(value, Some(Ok(1)));
390    let stream = flux.take_rx().unwrap();
391
392    flux.send(2)?;
393    let value = stream.recv().await?;
394    assert_eq!(value, Some(Ok(2)));
395    let stream = flux.take_rx();
396    assert!(stream.is_err());
397    Ok(())
398  }
399
400  #[tokio::test]
401  async fn test_mono() -> Result<()> {
402    let mut mono = Mono::<String, String>::new();
403    mono.success("Hello".to_owned());
404    let value = mono.await;
405    assert_eq!(value, Ok("Hello".to_owned()));
406    Ok(())
407  }
408}