1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
use std::ptr;
use std::fmt::Debug;

use log::debug;

use futures_lite::Stream;
use futures_lite::stream::StreamExt;
use pin_utils::unsafe_pinned;
use pin_utils::unsafe_unpinned;

use fluvio_future::task::spawn;

use crate::sys::napi_value;
use crate::val::JsEnv;
use crate::NjError;
use crate::TryIntoJs;

pub trait NjStream: Stream {
    fn js_then<F>(self, fut: F) -> JsThen<Self, F>
    where
        F: FnMut(Self::Item),
        Self: Sized,
    {
        JsThen::new(self, fut)
    }
}

impl<T: ?Sized> NjStream for T where T: Stream {}

pub struct JsThen<St, F> {
    stream: St,
    f: F,
}

impl<St: Unpin, F> Unpin for JsThen<St, F> {}

impl<St, F> JsThen<St, F>
where
    St: Stream,
    F: FnMut(St::Item),
{
    unsafe_pinned!(stream: St);
    unsafe_unpinned!(f: F);

    pub fn new(stream: St, f: F) -> JsThen<St, F> {
        Self { stream, f }
    }
}

impl<St, F> TryIntoJs for JsThen<St, F>
where
    St: Stream + Send + 'static,
    F: FnMut(St::Item) + Send + 'static,
    St::Item: Debug,
{
    fn try_to_js(self, _js_env: &JsEnv) -> Result<napi_value, NjError> {
        let mut stream = Box::pin(self.stream);
        let mut cb = self.f;

        spawn(async move {
            while let Some(item) = stream.next().await {
                debug!("got item: {:#?}, invoking Js callback", item);
                cb(item);
            }
        });

        Ok(ptr::null_mut())
    }
}