wasmrs_rx/flux/
receiver.rs1use std::task::Poll;
2use std::{io::Write, pin::Pin};
3
4use futures::Stream;
5
6use super::{signal_into_result, FutureResult, Signal};
7use crate::{Error, FluxChannel, Observable, Observer};
8use wasmrs_runtime::{ConditionallySendSync, OptionalMut, UnboundedReceiver};
9
10#[must_use]
11#[allow(missing_debug_implementations)]
12pub struct FluxReceiver<Item, Err>
14where
15 Item: ConditionallySendSync,
16 Err: ConditionallySendSync,
17{
18 rx: OptionalMut<UnboundedReceiver<Signal<Item, Err>>>,
19}
20
21impl<Item, Err> FluxReceiver<Item, Err>
22where
23 Item: ConditionallySendSync,
24 Err: ConditionallySendSync,
25{
26 pub fn new(rx: UnboundedReceiver<Signal<Item, Err>>) -> Self {
28 Self {
29 rx: OptionalMut::new(rx),
30 }
31 }
32
33 #[must_use]
35 pub fn boxed(self) -> Pin<Box<Self>> {
36 Box::pin(self)
37 }
38
39 pub fn none() -> Self {
41 Self {
42 rx: OptionalMut::none(),
43 }
44 }
45
46 pub fn one<I, E>(item: Result<I, E>) -> FluxReceiver<I, E>
48 where
49 I: ConditionallySendSync,
50 E: ConditionallySendSync,
51 {
52 let (tx, rx) = FluxChannel::new_parts();
53 tx.send_result(item).unwrap();
54 rx
55 }
56}
57
58impl<Err> futures::io::AsyncRead for FluxReceiver<Vec<u8>, Err>
59where
60 Err: ConditionallySendSync,
61{
62 fn poll_read(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
63 match Pin::new(&mut self.get_mut()).poll_next(cx) {
64 Poll::Ready(Some(Ok(item))) => {
65 let len = item.len();
66 let mut buf = std::io::Cursor::new(buf);
67 buf.write_all(&item).unwrap();
68 Poll::Ready(Ok(len))
69 }
70 Poll::Ready(Some(Err(_err))) => Poll::Ready(Err(std::io::Error::new(
71 std::io::ErrorKind::Other,
72 crate::Error::RecvFailed(99),
73 ))),
74 Poll::Ready(None) => Poll::Ready(Ok(0)),
75 Poll::Pending => Poll::Pending,
76 }
77 }
78}
79
80impl<Err> futures::io::AsyncRead for FluxReceiver<bytes::Bytes, Err>
81where
82 Err: ConditionallySendSync,
83{
84 fn poll_read(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
85 match Pin::new(&mut self.get_mut()).poll_next(cx) {
86 Poll::Ready(Some(Ok(item))) => {
87 let len = item.len();
88 let mut buf = std::io::Cursor::new(buf);
89 buf.write_all(&item).unwrap();
90 Poll::Ready(Ok(len))
91 }
92 Poll::Ready(Some(Err(_err))) => Poll::Ready(Err(std::io::Error::new(
93 std::io::ErrorKind::Other,
94 crate::Error::RecvFailed(99),
95 ))),
96 Poll::Ready(None) => Poll::Ready(Ok(0)),
97 Poll::Pending => Poll::Pending,
98 }
99 }
100}
101
102impl<Item, Err> Clone for FluxReceiver<Item, Err>
103where
104 Item: ConditionallySendSync,
105 Err: ConditionallySendSync,
106{
107 fn clone(&self) -> Self {
108 Self { rx: self.rx.clone() }
109 }
110}
111
112impl<Item, Err> Observable<Item, Err> for FluxReceiver<Item, Err>
113where
114 Item: ConditionallySendSync,
115 Err: ConditionallySendSync,
116{
117}
118
119impl<Item, Err> FluxReceiver<Item, Err>
120where
121 Item: ConditionallySendSync,
122 Err: ConditionallySendSync,
123{
124 #[must_use]
125 pub fn recv(&self) -> FutureResult<Item, Err>
127 where
128 Err: ConditionallySendSync,
129 Item: ConditionallySendSync,
130 {
131 let root_rx = self.rx.clone();
132 let opt = root_rx.take();
133 Box::pin(async move {
134 match opt {
135 Some(mut rx) => {
136 let signal = rx.recv().await;
137 root_rx.insert(rx);
138 Ok(signal_into_result(signal))
139 }
140 None => Err(Error::RecvFailed(0)),
141 }
142 })
143 }
144
145 pub fn poll_recv(&self, cx: &mut std::task::Context<'_>) -> Poll<Option<Result<Item, Err>>> {
147 let opt = self.rx.take();
148 opt.map_or(std::task::Poll::Ready(None), |mut rx| {
149 let poll = rx.poll_recv(cx);
150 match poll {
151 Poll::Ready(Some(Signal::Complete)) => Poll::Ready(None),
152 Poll::Ready(Some(Signal::Ok(v))) => {
153 self.rx.insert(rx);
154 Poll::Ready(Some(Ok(v)))
155 }
156 Poll::Ready(Some(Signal::Err(e))) => {
157 self.rx.insert(rx);
158 Poll::Ready(Some(Err(e)))
159 }
160 Poll::Ready(None) => Poll::Ready(None),
161 Poll::Pending => {
162 self.rx.insert(rx);
163 Poll::Pending
164 }
165 }
166 })
167 }
168
169 #[must_use]
170 pub fn eject(&self) -> Option<Self> {
172 self.rx.take().map(|inner| Self {
173 rx: OptionalMut::new(inner),
174 })
175 }
176}
177
178impl<Item, Err> Stream for FluxReceiver<Item, Err>
179where
180 Item: ConditionallySendSync,
181 Err: ConditionallySendSync,
182{
183 type Item = Result<Item, Err>;
184
185 fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
186 self.poll_recv(cx)
187 }
188}