1use std::ptr;
2
3use tracing::error;
4use tracing::trace;
5use tracing::debug;
6use futures_lite::Future;
7
8use fluvio_future::task::spawn;
9
10use crate::sys::napi_deferred;
11use crate::sys::napi_value;
12use crate::val::JsEnv;
13use crate::NjError;
14use crate::sys::napi_env;
15use crate::TryIntoJs;
16use crate::assert_napi;
17use crate::ThreadSafeFunction;
18
19pub struct JsPromiseFuture<F> {
20 future: F,
21 name: String,
22}
23
24impl<F> JsPromiseFuture<F>
25where
26 F: Future,
27 F::Output: TryIntoJs,
28{
29 pub fn new<S>(future: F, name: S) -> Self
30 where
31 S: Into<String>,
32 {
33 Self {
34 future,
35 name: name.into(),
36 }
37 }
38}
39
40impl<F> TryIntoJs for JsPromiseFuture<F>
41where
42 F: Future + 'static + Send,
43 F::Output: TryIntoJs,
44{
45 fn try_to_js(self, js_env: &JsEnv) -> Result<napi_value, NjError> {
46 create_promise(js_env, &self.name, self.future)
47 }
48}
49
50struct JsDeferred(napi_deferred);
51unsafe impl Send for JsDeferred {}
52
53pub struct WorkerResult<O> {
54 deferred: JsDeferred,
55 result: O,
56}
57
58pub fn create_promise<F, O>(js_env: &JsEnv, name: &str, future: F) -> Result<napi_value, NjError>
61where
62 F: Future<Output = O> + 'static + Send,
63 O: TryIntoJs,
64{
65 let (promise, deferred) = js_env.create_promise()?;
66 let function_name = format!("async_worker_th_{name}");
67 let ts_fn =
68 js_env.create_thread_safe_function(&function_name, None, Some(promise_complete::<O>))?;
69 let js_deferred = JsDeferred(deferred);
70
71 spawn(async move {
72 let result = future.await;
73 finish_worker(ts_fn, result, js_deferred);
74 });
75
76 Ok(promise)
77}
78
79extern "C" fn promise_complete<O>(
80 env: napi_env,
81 _js_cb: napi_value,
82 _context: *mut ::std::os::raw::c_void,
83 data: *mut ::std::os::raw::c_void,
84) where
85 O: TryIntoJs,
86{
87 if !env.is_null() {
88 trace!("promise complete");
89 let js_env = JsEnv::new(env);
90
91 let worker_result: Box<WorkerResult<O>> =
92 unsafe { Box::from_raw(data as *mut WorkerResult<O>) };
93
94 let result: Result<(), NjError> = match worker_result.result.try_to_js(&js_env) {
95 Ok(val) => {
96 trace!("trying to resolve to deferred");
97 js_env.resolve_deferred(worker_result.deferred.0, val)
98 }
99 Err(js_err) => {
100 trace!("trying to resolve to deferred");
101 js_env.reject_deferred(worker_result.deferred.0, js_err.as_js(&js_env))
102 }
103 };
104 assert_napi!(result)
105 }
106}
107
108fn finish_worker<O>(ts_fn: ThreadSafeFunction, result: O, deferred: JsDeferred)
109where
110 O: TryIntoJs,
111{
112 let boxed_worker = Box::new(WorkerResult { deferred, result });
113 let ptr = Box::into_raw(boxed_worker);
114 if let Err(err) = ts_fn.call(Some(ptr as *mut core::ffi::c_void)) {
115 error!("error finishing worker: {}", err);
116 }
117}
118
119pub trait NjFutureExt: Future {
120 fn try_to_js(self, js_env: &JsEnv) -> Result<napi_value, NjError>
121 where
122 Self: Sized + Send + 'static,
123 Self::Output: TryIntoJs,
124 {
125 extern "C" fn promise_complete2<O>(
126 env: napi_env,
127 _js_cb: napi_value,
128 _context: *mut ::std::os::raw::c_void,
129 data: *mut ::std::os::raw::c_void,
130 ) {
131 if !env.is_null() {
132 trace!("promise complete");
133 let _ = JsEnv::new(env);
134
135 let _: Box<O> = unsafe { Box::from_raw(data as *mut O) };
136 }
137 }
138
139 let function_name = "stream_example_1".to_string();
140 let _ = js_env.create_thread_safe_function(
141 &function_name,
142 None,
143 Some(promise_complete2::<Self::Output>),
144 )?;
145
146 debug!("spawning task");
147 spawn(async move {
148 let _ = self.await;
149 debug!("task completed");
150 });
151
152 Ok(ptr::null_mut())
153 }
154}
155
156impl<T: ?Sized> NjFutureExt for T where T: Future {}