wasmrs_rx/flux/ops/
pipe.rs

1use std::pin::Pin;
2use std::task::Poll;
3
4use futures::{Stream, TryStreamExt};
5use pin_project_lite::pin_project;
6
7use crate::flux::FluxChannel;
8use wasmrs_runtime::ConditionallySendSync;
9
10pin_project! {
11/// A [FluxPipe] is the result of piping one [Flux] into another.
12pub struct FluxPipe<Item, Err, From>
13where
14    Item: ConditionallySendSync,
15    Err: ConditionallySendSync,
16{
17    #[pin]
18    from: From,
19    to: FluxChannel<Item, Err>,
20}
21}
22
23impl<Item, Err, From> FluxPipe<Item, Err, From>
24where
25  Item: ConditionallySendSync,
26  Err: ConditionallySendSync,
27{
28  /// Create a new [FluxPipe]
29  pub fn new(from: From, to: FluxChannel<Item, Err>) -> Self {
30    Self { from, to }
31  }
32}
33
34impl<Item, Err, From> FluxPipe<Item, Err, From>
35where
36  Item: ConditionallySendSync,
37  Err: ConditionallySendSync,
38  From: Stream<Item = Result<Item, Err>>,
39{
40}
41
42impl<Item, Err, From> Stream for FluxPipe<Item, Err, From>
43where
44  Item: ConditionallySendSync,
45  Err: ConditionallySendSync,
46  From: Stream<Item = Result<Item, Err>> + Unpin,
47{
48  type Item = Result<Item, Err>;
49
50  fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
51    self.get_mut().from.try_poll_next_unpin(cx)
52  }
53}
54#[cfg(all(test, not(target_family = "wasm")))]
55mod test {
56
57  use anyhow::Result;
58  use futures::StreamExt;
59
60  use super::*;
61  use crate::flux::Observer;
62  use crate::Observable;
63
64  #[tokio::test]
65  async fn test_pipes() -> Result<()> {
66    let (flux, observer) = FluxChannel::new_parts();
67
68    flux.send("First".to_owned())?;
69
70    let second_flux = FluxChannel::<String, String>::new();
71
72    let mut pipe = observer.pipe(second_flux);
73
74    let value = pipe.next().await;
75    assert_eq!(value, Some(Ok("First".to_owned())));
76    Ok(())
77  }
78}