1use std::ptr;
2use std::fmt::Debug;
3
4use tracing::debug;
5
6use futures_lite::Stream;
7use futures_lite::stream::StreamExt;
8use pin_utils::unsafe_pinned;
9use pin_utils::unsafe_unpinned;
10
11use fluvio_future::task::spawn;
12
13use crate::sys::napi_value;
14use crate::val::JsEnv;
15use crate::NjError;
16use crate::TryIntoJs;
17
18pub trait NjStream: Stream {
19 fn js_then<F>(self, fut: F) -> JsThen<Self, F>
20 where
21 F: FnMut(Self::Item),
22 Self: Sized,
23 {
24 JsThen::new(self, fut)
25 }
26}
27
28impl<T: ?Sized> NjStream for T where T: Stream {}
29
30pub struct JsThen<St, F> {
31 stream: St,
32 f: F,
33}
34
35impl<St: Unpin, F> Unpin for JsThen<St, F> {}
36
37impl<St, F> JsThen<St, F>
38where
39 St: Stream,
40 F: FnMut(St::Item),
41{
42 unsafe_pinned!(stream: St);
43 unsafe_unpinned!(f: F);
44
45 pub fn new(stream: St, f: F) -> JsThen<St, F> {
46 Self { stream, f }
47 }
48}
49
50impl<St, F> TryIntoJs for JsThen<St, F>
51where
52 St: Stream + Send + 'static,
53 F: FnMut(St::Item) + Send + 'static,
54 St::Item: Debug,
55{
56 fn try_to_js(self, _js_env: &JsEnv) -> Result<napi_value, NjError> {
57 let mut stream = Box::pin(self.stream);
58 let mut cb = self.f;
59
60 spawn(async move {
61 while let Some(item) = stream.next().await {
62 debug!("got item: {:#?}, invoking Js callback", item);
63 cb(item);
64 }
65 });
66
67 Ok(ptr::null_mut())
68 }
69}