wasmrs_rx/flux/ops/
pipe.rs1use std::pin::Pin;
2use std::task::Poll;
3
4use futures::{Stream, TryStreamExt};
5use pin_project_lite::pin_project;
6
7use crate::flux::FluxChannel;
8use wasmrs_runtime::ConditionallySendSync;
9
10pin_project! {
11pub struct FluxPipe<Item, Err, From>
13where
14 Item: ConditionallySendSync,
15 Err: ConditionallySendSync,
16{
17 #[pin]
18 from: From,
19 to: FluxChannel<Item, Err>,
20}
21}
22
23impl<Item, Err, From> FluxPipe<Item, Err, From>
24where
25 Item: ConditionallySendSync,
26 Err: ConditionallySendSync,
27{
28 pub fn new(from: From, to: FluxChannel<Item, Err>) -> Self {
30 Self { from, to }
31 }
32}
33
34impl<Item, Err, From> FluxPipe<Item, Err, From>
35where
36 Item: ConditionallySendSync,
37 Err: ConditionallySendSync,
38 From: Stream<Item = Result<Item, Err>>,
39{
40}
41
42impl<Item, Err, From> Stream for FluxPipe<Item, Err, From>
43where
44 Item: ConditionallySendSync,
45 Err: ConditionallySendSync,
46 From: Stream<Item = Result<Item, Err>> + Unpin,
47{
48 type Item = Result<Item, Err>;
49
50 fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
51 self.get_mut().from.try_poll_next_unpin(cx)
52 }
53}
54#[cfg(all(test, not(target_family = "wasm")))]
55mod test {
56
57 use anyhow::Result;
58 use futures::StreamExt;
59
60 use super::*;
61 use crate::flux::Observer;
62 use crate::Observable;
63
64 #[tokio::test]
65 async fn test_pipes() -> Result<()> {
66 let (flux, observer) = FluxChannel::new_parts();
67
68 flux.send("First".to_owned())?;
69
70 let second_flux = FluxChannel::<String, String>::new();
71
72 let mut pipe = observer.pipe(second_flux);
73
74 let value = pipe.next().await;
75 assert_eq!(value, Some(Ok("First".to_owned())));
76 Ok(())
77 }
78}