1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
use std::ptr; use std::fmt::Debug; use log::debug; use futures_lite::Stream; use futures_lite::stream::StreamExt; use pin_utils::unsafe_pinned; use pin_utils::unsafe_unpinned; use fluvio_future::task::spawn; use crate::sys::napi_value; use crate::val::JsEnv; use crate::NjError; use crate::TryIntoJs; pub trait NjStream: Stream { fn js_then<F>(self, fut: F) -> JsThen<Self, F> where F: FnMut(Self::Item), Self: Sized, { JsThen::new(self, fut) } } impl<T: ?Sized> NjStream for T where T: Stream {} pub struct JsThen<St, F> { stream: St, f: F, } impl<St: Unpin, F> Unpin for JsThen<St, F> {} impl<St, F> JsThen<St, F> where St: Stream, F: FnMut(St::Item), { unsafe_pinned!(stream: St); unsafe_unpinned!(f: F); pub fn new(stream: St, f: F) -> JsThen<St, F> { Self { stream, f } } } impl<St, F> TryIntoJs for JsThen<St, F> where St: Stream + Send + 'static, F: FnMut(St::Item) + Send + 'static, St::Item: Debug, { fn try_to_js(self, _js_env: &JsEnv) -> Result<napi_value, NjError> { let mut stream = Box::pin(self.stream); let mut cb = self.f; spawn(async move { while let Some(item) = stream.next().await { debug!("got item: {:#?}, invoking Js callback", item); cb(item); } }); Ok(ptr::null_mut()) } }