nj_core/
worker.rs

1use std::ptr;
2
3use tracing::error;
4use tracing::trace;
5use tracing::debug;
6use futures_lite::Future;
7
8use fluvio_future::task::spawn;
9
10use crate::sys::napi_deferred;
11use crate::sys::napi_value;
12use crate::val::JsEnv;
13use crate::NjError;
14use crate::sys::napi_env;
15use crate::TryIntoJs;
16use crate::assert_napi;
17use crate::ThreadSafeFunction;
18
19pub struct JsPromiseFuture<F> {
20    future: F,
21    name: String,
22}
23
24impl<F> JsPromiseFuture<F>
25where
26    F: Future,
27    F::Output: TryIntoJs,
28{
29    pub fn new<S>(future: F, name: S) -> Self
30    where
31        S: Into<String>,
32    {
33        Self {
34            future,
35            name: name.into(),
36        }
37    }
38}
39
40impl<F> TryIntoJs for JsPromiseFuture<F>
41where
42    F: Future + 'static + Send,
43    F::Output: TryIntoJs,
44{
45    fn try_to_js(self, js_env: &JsEnv) -> Result<napi_value, NjError> {
46        create_promise(js_env, &self.name, self.future)
47    }
48}
49
50struct JsDeferred(napi_deferred);
51unsafe impl Send for JsDeferred {}
52
53pub struct WorkerResult<O> {
54    deferred: JsDeferred,
55    result: O,
56}
57
58/// create promise and schedule work
59/// when this is finished it will return result in the main thread
60pub fn create_promise<F, O>(js_env: &JsEnv, name: &str, future: F) -> Result<napi_value, NjError>
61where
62    F: Future<Output = O> + 'static + Send,
63    O: TryIntoJs,
64{
65    let (promise, deferred) = js_env.create_promise()?;
66    let function_name = format!("async_worker_th_{name}");
67    let ts_fn =
68        js_env.create_thread_safe_function(&function_name, None, Some(promise_complete::<O>))?;
69    let js_deferred = JsDeferred(deferred);
70
71    spawn(async move {
72        let result = future.await;
73        finish_worker(ts_fn, result, js_deferred);
74    });
75
76    Ok(promise)
77}
78
79extern "C" fn promise_complete<O>(
80    env: napi_env,
81    _js_cb: napi_value,
82    _context: *mut ::std::os::raw::c_void,
83    data: *mut ::std::os::raw::c_void,
84) where
85    O: TryIntoJs,
86{
87    if !env.is_null() {
88        trace!("promise complete");
89        let js_env = JsEnv::new(env);
90
91        let worker_result: Box<WorkerResult<O>> =
92            unsafe { Box::from_raw(data as *mut WorkerResult<O>) };
93
94        let result: Result<(), NjError> = match worker_result.result.try_to_js(&js_env) {
95            Ok(val) => {
96                trace!("trying to resolve to deferred");
97                js_env.resolve_deferred(worker_result.deferred.0, val)
98            }
99            Err(js_err) => {
100                trace!("trying to resolve to deferred");
101                js_env.reject_deferred(worker_result.deferred.0, js_err.as_js(&js_env))
102            }
103        };
104        assert_napi!(result)
105    }
106}
107
108fn finish_worker<O>(ts_fn: ThreadSafeFunction, result: O, deferred: JsDeferred)
109where
110    O: TryIntoJs,
111{
112    let boxed_worker = Box::new(WorkerResult { deferred, result });
113    let ptr = Box::into_raw(boxed_worker);
114    if let Err(err) = ts_fn.call(Some(ptr as *mut core::ffi::c_void)) {
115        error!("error finishing worker: {}", err);
116    }
117}
118
119pub trait NjFutureExt: Future {
120    fn try_to_js(self, js_env: &JsEnv) -> Result<napi_value, NjError>
121    where
122        Self: Sized + Send + 'static,
123        Self::Output: TryIntoJs,
124    {
125        extern "C" fn promise_complete2<O>(
126            env: napi_env,
127            _js_cb: napi_value,
128            _context: *mut ::std::os::raw::c_void,
129            data: *mut ::std::os::raw::c_void,
130        ) {
131            if !env.is_null() {
132                trace!("promise complete");
133                let _ = JsEnv::new(env);
134
135                let _: Box<O> = unsafe { Box::from_raw(data as *mut O) };
136            }
137        }
138
139        let function_name = "stream_example_1".to_string();
140        let _ = js_env.create_thread_safe_function(
141            &function_name,
142            None,
143            Some(promise_complete2::<Self::Output>),
144        )?;
145
146        debug!("spawning task");
147        spawn(async move {
148            let _ = self.await;
149            debug!("task completed");
150        });
151
152        Ok(ptr::null_mut())
153    }
154}
155
156impl<T: ?Sized> NjFutureExt for T where T: Future {}