1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
use std::ptr;
use std::fmt::Debug;

use log::debug;

use futures::Stream;
use futures::stream::StreamExt;
use pin_utils::unsafe_pinned;
use pin_utils::unsafe_unpinned;

use flv_future_aio::task::spawn;


use crate::sys::napi_value;
use crate::val::JsEnv;
use crate::NjError;
use crate::TryIntoJs;


pub trait NjStream: Stream {

    fn js_then<F>(self, fut: F) -> JsThen<Self,F>
        where F: FnMut(Self::Item),
            Self: Sized
    {

        JsThen::new(self,fut)
    }
}

impl<T: ?Sized> NjStream for T where T: Stream {}


pub struct JsThen<St, F> {
    stream: St,
    f: F,
}

impl<St: Unpin, F> Unpin for JsThen<St, F> {}


impl<St, F> JsThen<St, F>
    where St: Stream,
          F: FnMut(St::Item)
{
    unsafe_pinned!(stream: St);
    unsafe_unpinned!(f: F);

    pub fn new(stream: St, f: F) -> JsThen<St, F> {
        Self { stream, f }
    }
}


impl<St, F> TryIntoJs for JsThen<St, F>
    where St: Stream + Send + 'static,
          F: FnMut(St::Item) + Send + 'static,
          St::Item: Debug
{


    fn try_to_js(self, _js_env: &JsEnv) -> Result<napi_value,NjError> {

        let mut stream = Box::pin(self.stream);
        let mut cb = self.f;

        spawn(async move {
            while let Some(item) = stream.next().await {
                debug!("got item: {:#?}, invoking Js callback",item);
                cb(item);
            }
           
        });

        Ok(ptr::null_mut())
        
    }
}