nj_core/
stream.rs

1use std::ptr;
2use std::fmt::Debug;
3
4use tracing::debug;
5
6use futures_lite::Stream;
7use futures_lite::stream::StreamExt;
8use pin_utils::unsafe_pinned;
9use pin_utils::unsafe_unpinned;
10
11use fluvio_future::task::spawn;
12
13use crate::sys::napi_value;
14use crate::val::JsEnv;
15use crate::NjError;
16use crate::TryIntoJs;
17
18pub trait NjStream: Stream {
19    fn js_then<F>(self, fut: F) -> JsThen<Self, F>
20    where
21        F: FnMut(Self::Item),
22        Self: Sized,
23    {
24        JsThen::new(self, fut)
25    }
26}
27
28impl<T: ?Sized> NjStream for T where T: Stream {}
29
30pub struct JsThen<St, F> {
31    stream: St,
32    f: F,
33}
34
35impl<St: Unpin, F> Unpin for JsThen<St, F> {}
36
37impl<St, F> JsThen<St, F>
38where
39    St: Stream,
40    F: FnMut(St::Item),
41{
42    unsafe_pinned!(stream: St);
43    unsafe_unpinned!(f: F);
44
45    pub fn new(stream: St, f: F) -> JsThen<St, F> {
46        Self { stream, f }
47    }
48}
49
50impl<St, F> TryIntoJs for JsThen<St, F>
51where
52    St: Stream + Send + 'static,
53    F: FnMut(St::Item) + Send + 'static,
54    St::Item: Debug,
55{
56    fn try_to_js(self, _js_env: &JsEnv) -> Result<napi_value, NjError> {
57        let mut stream = Box::pin(self.stream);
58        let mut cb = self.f;
59
60        spawn(async move {
61            while let Some(item) = stream.next().await {
62                debug!("got item: {:#?}, invoking Js callback", item);
63                cb(item);
64            }
65        });
66
67        Ok(ptr::null_mut())
68    }
69}