1use std::io::Write;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicBool, Ordering};
4
5use std::task::Poll;
6
7use futures::AsyncRead;
8use futures::{Future, FutureExt, Stream};
9use wasmrs_runtime::unbounded_channel;
10use wasmrs_runtime::BoxFuture;
11use wasmrs_runtime::ConditionallySendSync;
12use wasmrs_runtime::UnboundedSender;
13
14use crate::Error;
15
16mod ops;
17pub use ops::*;
18mod receiver;
19pub use receiver::*;
20mod signal;
21pub use signal::*;
22mod observer;
23pub use observer::*;
24mod observable;
25pub use observable::*;
26
27type FutureResult<Item, Err> = BoxFuture<Result<Option<Result<Item, Err>>, Error>>;
28
29pub type FluxBox<Item, Err> = Pin<Box<dyn Observable<Item, Err>>>;
31
32pub trait MonoFuture<Item, Err>: Future<Output = Result<Item, Err>> + ConditionallySendSync {}
34
35#[allow(missing_debug_implementations)]
36#[must_use]
37pub struct Mono<Item, Err>
39where
40 Item: ConditionallySendSync,
41 Err: ConditionallySendSync + Sync,
42{
43 inner: Option<Pin<Box<dyn MonoFuture<Item, Err>>>>,
44 done: AtomicBool,
45}
46
47impl<Item, Err> Mono<Item, Err>
48where
49 Item: ConditionallySendSync,
50 Err: ConditionallySendSync + Sync,
51{
52 pub fn new() -> Self {
54 Self {
55 inner: None,
56 done: AtomicBool::new(false),
57 }
58 }
59
60 #[must_use]
62 pub fn boxed(self) -> Pin<Box<Self>> {
63 Box::pin(self)
64 }
65
66 pub fn from_future<Fut>(fut: Fut) -> Self
68 where
69 Fut: MonoFuture<Item, Err>,
70 {
71 Self {
72 inner: Some(Box::pin(fut)),
73 done: AtomicBool::new(false),
74 }
75 }
76
77 pub fn new_error(err: Err) -> Self {
79 Self {
80 inner: Some(Box::pin(futures::future::ready(Err(err)))),
81 done: AtomicBool::new(false),
82 }
83 }
84
85 pub fn new_success(ok: Item) -> Self {
87 Self {
88 inner: Some(Box::pin(futures::future::ready(Ok(ok)))),
89 done: AtomicBool::new(false),
90 }
91 }
92
93 pub fn success(&mut self, ok: Item) {
95 assert!(self.inner.is_none(), "Can not push more than one value to a Mono");
96 self.inner = Some(Box::pin(futures::future::ready(Ok(ok))));
97 }
98
99 pub fn error(&mut self, error: Err) {
101 assert!(self.inner.is_none(), "Can not push more than one value to a Mono");
102 self.inner = Some(Box::pin(futures::future::ready(Err(error))));
103 }
104}
105
106impl<Item, Err> Default for Mono<Item, Err>
107where
108 Item: ConditionallySendSync,
109 Err: ConditionallySendSync + Sync,
110{
111 fn default() -> Self {
112 Self::new()
113 }
114}
115
116impl<Item, Err> Stream for Mono<Item, Err>
117where
118 Item: ConditionallySendSync,
119 Err: ConditionallySendSync + Sync,
120{
121 type Item = Result<Item, Err>;
122
123 fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
124 if self.done.load(Ordering::SeqCst) {
125 return Poll::Ready(None);
126 }
127 let s = self.get_mut();
128 match s.inner.as_mut() {
129 Some(inner_future) => match inner_future.poll_unpin(cx) {
130 Poll::Ready(v) => {
131 s.done.store(true, Ordering::SeqCst);
132 Poll::Ready(Some(v))
133 }
134 Poll::Pending => Poll::Pending,
135 },
136 None => Poll::Pending,
137 }
138 }
139}
140
141impl<Item, Err, T> MonoFuture<Item, Err> for T
142where
143 T: Future<Output = Result<Item, Err>> + ConditionallySendSync,
144 Item: ConditionallySendSync,
145 Err: ConditionallySendSync,
146{
147}
148
149impl<Item, Err> Future for Mono<Item, Err>
150where
151 Item: ConditionallySendSync,
152 Err: ConditionallySendSync + Sync,
153{
154 type Output = Result<Item, Err>;
155
156 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
157 self
158 .get_mut()
159 .inner
160 .as_mut()
161 .map_or_else(|| Poll::Pending, |inner_future| inner_future.poll_unpin(cx))
162 }
163}
164
165#[must_use]
166#[allow(missing_debug_implementations)]
167pub struct FluxChannel<Item, Err>
169where
170 Item: ConditionallySendSync,
171 Err: ConditionallySendSync,
172{
173 tx: UnboundedSender<Signal<Item, Err>>,
174 rx: FluxReceiver<Item, Err>,
175}
176
177impl<Item, Err> FluxChannel<Item, Err>
178where
179 Item: ConditionallySendSync,
180 Err: ConditionallySendSync,
181{
182 pub fn new() -> Self {
184 let (tx, rx) = unbounded_channel();
185
186 Self {
187 tx,
188 rx: FluxReceiver::new(rx),
189 }
190 }
191
192 #[must_use]
194 pub fn boxed(self) -> Pin<Box<Self>> {
195 Box::pin(self)
196 }
197
198 pub fn new_parts() -> (Self, FluxReceiver<Item, Err>) {
200 let (tx, rx) = unbounded_channel();
201
202 (
203 Self {
204 tx,
205 rx: FluxReceiver::none(),
206 },
207 FluxReceiver::new(rx),
208 )
209 }
210
211 #[must_use]
212 pub fn is_closed(&self) -> bool {
214 self.tx.is_closed()
215 }
216
217 #[must_use]
218 pub fn recv(&self) -> FutureResult<Item, Err>
220 where
221 Err: 'static + std::fmt::Debug,
222 Item: 'static + std::fmt::Debug,
223 {
224 let val = self.rx.recv();
225 Box::pin(async move { val.await })
226 }
227
228 pub fn take_rx(&self) -> Result<FluxReceiver<Item, Err>, Error> {
230 self.rx.eject().ok_or(Error::ReceiverAlreadyGone)
231 }
232}
233
234impl<Item, Err> TryFrom<FluxChannel<Item, Err>> for FluxReceiver<Item, Err>
235where
236 Item: ConditionallySendSync,
237 Err: ConditionallySendSync,
238{
239 type Error = Error;
240
241 fn try_from(value: FluxChannel<Item, Err>) -> Result<Self, Self::Error> {
242 value.take_rx()
243 }
244}
245
246impl<Item, Err> Clone for FluxChannel<Item, Err>
247where
248 Item: ConditionallySendSync,
249 Err: ConditionallySendSync,
250{
251 fn clone(&self) -> Self {
252 Self {
253 tx: self.tx.clone(),
254 rx: self.rx.clone(),
255 }
256 }
257}
258
259impl<Err> AsyncRead for FluxChannel<Vec<u8>, Err>
260where
261 Err: ConditionallySendSync,
262{
263 fn poll_read(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
264 match Pin::new(&mut self.get_mut().rx).poll_next(cx) {
265 Poll::Ready(Some(Ok(item))) => {
266 let len = item.len();
267 let mut buf = std::io::Cursor::new(buf);
268 buf.write_all(&item).unwrap();
269 Poll::Ready(Ok(len))
270 }
271 Poll::Ready(Some(Err(_err))) => Poll::Ready(Err(std::io::Error::new(
272 std::io::ErrorKind::Other,
273 crate::Error::RecvFailed(98),
274 ))),
275 Poll::Ready(None) => Poll::Ready(Ok(0)),
276 Poll::Pending => Poll::Pending,
277 }
278 }
279}
280
281impl<Item, Err> Observable<Item, Err> for FluxChannel<Item, Err>
282where
283 Item: ConditionallySendSync,
284 Err: ConditionallySendSync,
285{
286}
287
288impl<Item, Err> Observer<Item, Err> for FluxChannel<Item, Err>
289where
290 Item: ConditionallySendSync,
291 Err: ConditionallySendSync,
292{
293 fn send_signal(&self, signal: Signal<Item, Err>) -> Result<(), Error> {
294 Ok(self.tx.send(signal)?)
295 }
296
297 fn is_complete(&self) -> bool {
298 self.tx.is_closed()
299 }
300
301 fn complete(&self) {
302 let _ = self.send_signal(Signal::Complete);
303 }
304}
305
306impl<Item, Err> Default for FluxChannel<Item, Err>
307where
308 Item: ConditionallySendSync,
309 Err: ConditionallySendSync,
310{
311 fn default() -> Self {
312 Self::new()
313 }
314}
315
316impl<Item, Err> Stream for FluxChannel<Item, Err>
317where
318 Item: ConditionallySendSync,
319 Err: ConditionallySendSync,
320{
321 type Item = Result<Item, Err>;
322
323 fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
324 self.rx.poll_recv(cx)
325 }
326}
327
328impl<Item, Err> From<Vec<Result<Item, Err>>> for FluxChannel<Item, Err>
329where
330 Item: ConditionallySendSync,
331 Err: ConditionallySendSync,
332{
333 fn from(value: Vec<Result<Item, Err>>) -> Self {
334 Self::from_iter(value.into_iter())
335 }
336}
337
338impl<Item, Err, const N: usize> From<[Result<Item, Err>; N]> for FluxChannel<Item, Err>
339where
340 Item: ConditionallySendSync,
341 Err: ConditionallySendSync,
342{
343 fn from(value: [Result<Item, Err>; N]) -> Self {
344 Self::from_iter(value.into_iter())
345 }
346}
347
348impl<Item, Err> FromIterator<Result<Item, Err>> for FluxChannel<Item, Err>
349where
350 Item: ConditionallySendSync,
351 Err: ConditionallySendSync,
352{
353 fn from_iter<T: IntoIterator<Item = Result<Item, Err>>>(iter: T) -> Self {
354 let (tx, _) = Self::new_parts();
355 for item in iter {
356 let _ = tx.send_result(item);
357 }
358 tx.complete();
359 tx
360 }
361}
362
363fn signal_into_result<Item, Err>(signal: Option<Signal<Item, Err>>) -> Option<Result<Item, Err>>
364where
365 Item: ConditionallySendSync,
366 Err: ConditionallySendSync,
367{
368 match signal {
369 Some(Signal::Complete) => None,
370 Some(Signal::Ok(v)) => Some(Ok(v)),
371 Some(Signal::Err(e)) => Some(Err(e)),
372 None => None,
373 }
374}
375
376#[cfg(all(test, not(target_family = "wasm")))]
377mod test {
378
379 use anyhow::Result;
380 use futures::StreamExt;
381
382 use super::*;
383
384 #[tokio::test]
385 async fn test_flux() -> Result<()> {
386 let mut flux = FluxChannel::<u32, u32>::new();
387 flux.send(1)?;
388 let value = flux.next().await;
389 assert_eq!(value, Some(Ok(1)));
390 let stream = flux.take_rx().unwrap();
391
392 flux.send(2)?;
393 let value = stream.recv().await?;
394 assert_eq!(value, Some(Ok(2)));
395 let stream = flux.take_rx();
396 assert!(stream.is_err());
397 Ok(())
398 }
399
400 #[tokio::test]
401 async fn test_mono() -> Result<()> {
402 let mut mono = Mono::<String, String>::new();
403 mono.success("Hello".to_owned());
404 let value = mono.await;
405 assert_eq!(value, Ok("Hello".to_owned()));
406 Ok(())
407 }
408}