napi/
threadsafe_function.rs

1#![allow(clippy::single_component_path_imports)]
2
3use std::convert::Into;
4use std::ffi::CString;
5use std::marker::PhantomData;
6use std::os::raw::c_void;
7use std::ptr::{self, null_mut};
8use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
9use std::sync::{Arc, RwLock, RwLockWriteGuard, Weak};
10
11use crate::bindgen_runtime::{
12  FromNapiValue, JsValuesTupleIntoVec, ToNapiValue, TypeName, ValidateNapiValue,
13};
14use crate::{check_status, sys, Env, JsError, JsUnknown, Result, Status};
15
16/// ThreadSafeFunction Context object
17/// the `value` is the value passed to `call` method
18pub struct ThreadSafeCallContext<T: 'static> {
19  pub env: Env,
20  pub value: T,
21}
22
23#[repr(u8)]
24#[derive(Clone, Copy, Debug, Eq, PartialEq)]
25pub enum ThreadsafeFunctionCallMode {
26  NonBlocking,
27  Blocking,
28}
29
30impl From<ThreadsafeFunctionCallMode> for sys::napi_threadsafe_function_call_mode {
31  fn from(value: ThreadsafeFunctionCallMode) -> Self {
32    match value {
33      ThreadsafeFunctionCallMode::Blocking => sys::ThreadsafeFunctionCallMode::blocking,
34      ThreadsafeFunctionCallMode::NonBlocking => sys::ThreadsafeFunctionCallMode::nonblocking,
35    }
36  }
37}
38
39type_level_enum! {
40  /// Type-level `enum` to express how to feed [`ThreadsafeFunction`] errors to
41  /// the inner [`JsFunction`].
42  ///
43  /// ### Context
44  ///
45  /// For callbacks that expect a `Result`-like kind of input, the convention is
46  /// to have the callback take an `error` parameter as its first parameter.
47  ///
48  /// This way receiving a `Result<Args…>` can be modelled as follows:
49  ///
50  ///   - In case of `Err(error)`, feed that `error` entity as the first parameter
51  ///     of the callback;
52  ///
53  ///   - Otherwise (in case of `Ok(_)`), feed `null` instead.
54  ///
55  /// In pseudo-code:
56  ///
57  /// ```rust,ignore
58  /// match result_args {
59  ///     Ok(args) => {
60  ///         let js_null = /* … */;
61  ///         callback.call(
62  ///             // this
63  ///             None,
64  ///             // args…
65  ///             &iter::once(js_null).chain(args).collect::<Vec<_>>(),
66  ///         )
67  ///     },
68  ///     Err(err) => callback.call(None, &[JsError::from(err)]),
69  /// }
70  /// ```
71  ///
72  /// **Note that the `Err` case can stem from a failed conversion from native
73  /// values to js values when calling the callback!**
74  ///
75  /// That's why:
76  ///
77  /// > **[This][`ErrorStrategy::CalleeHandled`] is the default error strategy**.
78  ///
79  /// In order to opt-out of it, [`ThreadsafeFunction`] has an optional second
80  /// generic parameter (of "kind" [`ErrorStrategy::T`]) that defines whether
81  /// this behavior ([`ErrorStrategy::CalleeHandled`]) or a non-`Result` one
82  /// ([`ErrorStrategy::Fatal`]) is desired.
83  pub enum ErrorStrategy {
84    /// Input errors (including conversion errors) are left for the callee to
85    /// handle:
86    ///
87    /// The callee receives an extra `error` parameter (the first one), which is
88    /// `null` if no error occurred, and the error payload otherwise.
89    CalleeHandled,
90
91    /// Input errors (including conversion errors) are deemed fatal:
92    ///
93    /// they can thus cause a `panic!` or abort the process.
94    ///
95    /// The callee thus is not expected to have to deal with [that extra `error`
96    /// parameter][CalleeHandled], which is thus not added.
97    Fatal,
98  }
99}
100
101struct ThreadsafeFunctionHandle {
102  raw: AtomicPtr<sys::napi_threadsafe_function__>,
103  aborted: RwLock<bool>,
104  referred: AtomicBool,
105}
106
107impl ThreadsafeFunctionHandle {
108  /// create a Arc to hold the `ThreadsafeFunctionHandle`
109  fn new(raw: sys::napi_threadsafe_function) -> Arc<Self> {
110    Arc::new(Self {
111      raw: AtomicPtr::new(raw),
112      aborted: RwLock::new(false),
113      referred: AtomicBool::new(true),
114    })
115  }
116
117  /// Lock `aborted` with read access, call `f` with the value of `aborted`, then unlock it
118  fn with_read_aborted<RT, F>(&self, f: F) -> RT
119  where
120    F: FnOnce(bool) -> RT,
121  {
122    let aborted_guard = self
123      .aborted
124      .read()
125      .expect("Threadsafe Function aborted lock failed");
126    f(*aborted_guard)
127  }
128
129  /// Lock `aborted` with write access, call `f` with the `RwLockWriteGuard`, then unlock it
130  fn with_write_aborted<RT, F>(&self, f: F) -> RT
131  where
132    F: FnOnce(RwLockWriteGuard<bool>) -> RT,
133  {
134    let aborted_guard = self
135      .aborted
136      .write()
137      .expect("Threadsafe Function aborted lock failed");
138    f(aborted_guard)
139  }
140
141  #[allow(clippy::arc_with_non_send_sync)]
142  fn null() -> Arc<Self> {
143    Self::new(null_mut())
144  }
145
146  fn get_raw(&self) -> sys::napi_threadsafe_function {
147    self.raw.load(Ordering::SeqCst)
148  }
149
150  fn set_raw(&self, raw: sys::napi_threadsafe_function) {
151    self.raw.store(raw, Ordering::SeqCst)
152  }
153}
154
155impl Drop for ThreadsafeFunctionHandle {
156  fn drop(&mut self) {
157    self.with_read_aborted(|aborted| {
158      if !aborted {
159        let release_status = unsafe {
160          sys::napi_release_threadsafe_function(
161            self.get_raw(),
162            sys::ThreadsafeFunctionReleaseMode::release,
163          )
164        };
165        assert!(
166          release_status == sys::Status::napi_ok,
167          "Threadsafe Function release failed {}",
168          Status::from(release_status)
169        );
170      }
171    })
172  }
173}
174
175#[repr(u8)]
176enum ThreadsafeFunctionCallVariant {
177  Direct,
178  WithCallback,
179}
180
181struct ThreadsafeFunctionCallJsBackData<T> {
182  data: T,
183  call_variant: ThreadsafeFunctionCallVariant,
184  callback: Box<dyn FnOnce(Result<JsUnknown>) -> Result<()>>,
185}
186
187/// Communicate with the addon's main thread by invoking a JavaScript function from other threads.
188///
189/// ## Example
190/// An example of using `ThreadsafeFunction`:
191///
192/// ```rust
193/// #[macro_use]
194/// extern crate napi_derive;
195///
196/// use std::thread;
197///
198/// use napi::{
199///     threadsafe_function::{
200///         ThreadSafeCallContext, ThreadsafeFunctionCallMode, ThreadsafeFunctionReleaseMode,
201///     },
202///     CallContext, Error, JsFunction, JsNumber, JsUndefined, Result, Status,
203/// };
204///
205/// #[js_function(1)]
206/// pub fn test_threadsafe_function(ctx: CallContext) -> Result<JsUndefined> {
207///   let func = ctx.get::<JsFunction>(0)?;
208///
209///   let tsfn =
210///       ctx
211///           .env
212///           .create_threadsafe_function(&func, 0, |ctx: ThreadSafeCallContext<Vec<u32>>| {
213///             ctx.value
214///                 .iter()
215///                 .map(|v| ctx.env.create_uint32(*v))
216///                 .collect::<Result<Vec<JsNumber>>>()
217///           })?;
218///
219///   let tsfn_cloned = tsfn.clone();
220///
221///   thread::spawn(move || {
222///       let output: Vec<u32> = vec![0, 1, 2, 3];
223///       // It's okay to call a threadsafe function multiple times.
224///       tsfn.call(Ok(output.clone()), ThreadsafeFunctionCallMode::Blocking);
225///   });
226///
227///   thread::spawn(move || {
228///       let output: Vec<u32> = vec![3, 2, 1, 0];
229///       // It's okay to call a threadsafe function multiple times.
230///       tsfn_cloned.call(Ok(output.clone()), ThreadsafeFunctionCallMode::NonBlocking);
231///   });
232///
233///   ctx.env.get_undefined()
234/// }
235/// ```
236pub struct ThreadsafeFunction<T: 'static, ES: ErrorStrategy::T = ErrorStrategy::CalleeHandled> {
237  handle: Arc<ThreadsafeFunctionHandle>,
238  _phantom: PhantomData<(T, ES)>,
239}
240
241unsafe impl<T: 'static, ES: ErrorStrategy::T> Send for ThreadsafeFunction<T, ES> {}
242unsafe impl<T: 'static, ES: ErrorStrategy::T> Sync for ThreadsafeFunction<T, ES> {}
243
244impl<T: 'static, ES: ErrorStrategy::T> Clone for ThreadsafeFunction<T, ES> {
245  fn clone(&self) -> Self {
246    self.handle.with_read_aborted(|aborted| {
247      if aborted {
248        panic!("ThreadsafeFunction was aborted, can not clone it");
249      };
250
251      Self {
252        handle: self.handle.clone(),
253        _phantom: PhantomData,
254      }
255    })
256  }
257}
258
259impl<T: ToNapiValue> JsValuesTupleIntoVec for T {
260  #[allow(clippy::not_unsafe_ptr_arg_deref)]
261  fn into_vec(self, env: sys::napi_env) -> Result<Vec<sys::napi_value>> {
262    Ok(vec![unsafe {
263      <T as ToNapiValue>::to_napi_value(env, self)?
264    }])
265  }
266}
267
268macro_rules! impl_js_value_tuple_to_vec {
269  ($($ident:ident),*) => {
270    impl<$($ident: ToNapiValue),*> JsValuesTupleIntoVec for ($($ident,)*) {
271      #[allow(clippy::not_unsafe_ptr_arg_deref)]
272      fn into_vec(self, env: sys::napi_env) -> Result<Vec<sys::napi_value>> {
273        #[allow(non_snake_case)]
274        let ($($ident,)*) = self;
275        Ok(vec![$(unsafe { <$ident as ToNapiValue>::to_napi_value(env, $ident)? }),*])
276      }
277    }
278  };
279}
280
281impl_js_value_tuple_to_vec!(A);
282impl_js_value_tuple_to_vec!(A, B);
283impl_js_value_tuple_to_vec!(A, B, C);
284impl_js_value_tuple_to_vec!(A, B, C, D);
285impl_js_value_tuple_to_vec!(A, B, C, D, E);
286impl_js_value_tuple_to_vec!(A, B, C, D, E, F);
287impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G);
288impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H);
289impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I);
290impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J);
291impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K);
292impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L);
293impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M);
294impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N);
295impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O);
296impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P);
297impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q);
298impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R);
299impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S);
300impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T);
301impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U);
302impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V);
303impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W);
304impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X);
305impl_js_value_tuple_to_vec!(
306  A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y
307);
308impl_js_value_tuple_to_vec!(
309  A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y, Z
310);
311
312impl<T: JsValuesTupleIntoVec + 'static, ES: ErrorStrategy::T> FromNapiValue
313  for ThreadsafeFunction<T, ES>
314{
315  unsafe fn from_napi_value(env: sys::napi_env, napi_val: sys::napi_value) -> Result<Self> {
316    Self::create(env, napi_val, 0, |ctx| ctx.value.into_vec(ctx.env.0))
317  }
318}
319
320impl<T: 'static, ES: ErrorStrategy::T> ThreadsafeFunction<T, ES> {
321  /// See [napi_create_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_create_threadsafe_function)
322  /// for more information.
323  pub(crate) fn create<
324    V: ToNapiValue,
325    R: 'static + Send + FnMut(ThreadSafeCallContext<T>) -> Result<Vec<V>>,
326  >(
327    env: sys::napi_env,
328    func: sys::napi_value,
329    max_queue_size: usize,
330    callback: R,
331  ) -> Result<Self> {
332    let mut async_resource_name = ptr::null_mut();
333    let s = "napi_rs_threadsafe_function";
334    let len = s.len();
335    let s = CString::new(s)?;
336    check_status!(unsafe {
337      sys::napi_create_string_utf8(env, s.as_ptr(), len, &mut async_resource_name)
338    })?;
339
340    let mut raw_tsfn = ptr::null_mut();
341    let callback_ptr = Box::into_raw(Box::new(callback));
342    let handle = ThreadsafeFunctionHandle::null();
343    check_status!(unsafe {
344      sys::napi_create_threadsafe_function(
345        env,
346        func,
347        ptr::null_mut(),
348        async_resource_name,
349        max_queue_size,
350        1,
351        Arc::downgrade(&handle).into_raw() as *mut c_void, // pass handler to thread_finalize_cb
352        Some(thread_finalize_cb::<T, V, R>),
353        callback_ptr.cast(),
354        Some(call_js_cb::<T, V, R, ES>),
355        &mut raw_tsfn,
356      )
357    })?;
358    handle.set_raw(raw_tsfn);
359
360    Ok(ThreadsafeFunction {
361      handle,
362      _phantom: PhantomData,
363    })
364  }
365
366  /// See [napi_ref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_ref_threadsafe_function)
367  /// for more information.
368  ///
369  /// "ref" is a keyword so that we use "refer" here.
370  pub fn refer(&mut self, env: &Env) -> Result<()> {
371    self.handle.with_read_aborted(|aborted| {
372      if !aborted && !self.handle.referred.load(Ordering::Relaxed) {
373        check_status!(unsafe { sys::napi_ref_threadsafe_function(env.0, self.handle.get_raw()) })?;
374        self.handle.referred.store(true, Ordering::Relaxed);
375      }
376      Ok(())
377    })
378  }
379
380  /// See [napi_unref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_unref_threadsafe_function)
381  /// for more information.
382  pub fn unref(&mut self, env: &Env) -> Result<()> {
383    self.handle.with_read_aborted(|aborted| {
384      if !aborted && self.handle.referred.load(Ordering::Relaxed) {
385        check_status!(unsafe {
386          sys::napi_unref_threadsafe_function(env.0, self.handle.get_raw())
387        })?;
388        self.handle.referred.store(false, Ordering::Relaxed);
389      }
390      Ok(())
391    })
392  }
393
394  pub fn aborted(&self) -> bool {
395    self.handle.with_read_aborted(|aborted| aborted)
396  }
397
398  pub fn abort(self) -> Result<()> {
399    self.handle.with_write_aborted(|mut aborted_guard| {
400      if !*aborted_guard {
401        check_status!(unsafe {
402          sys::napi_release_threadsafe_function(
403            self.handle.get_raw(),
404            sys::ThreadsafeFunctionReleaseMode::abort,
405          )
406        })?;
407        *aborted_guard = true;
408      }
409      Ok(())
410    })
411  }
412
413  /// Get the raw `ThreadSafeFunction` pointer
414  pub fn raw(&self) -> sys::napi_threadsafe_function {
415    self.handle.get_raw()
416  }
417}
418
419impl<T: 'static> ThreadsafeFunction<T, ErrorStrategy::CalleeHandled> {
420  /// See [napi_call_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_call_threadsafe_function)
421  /// for more information.
422  pub fn call(&self, value: Result<T>, mode: ThreadsafeFunctionCallMode) -> Status {
423    self.handle.with_read_aborted(|aborted| {
424      if aborted {
425        return Status::Closing;
426      }
427
428      unsafe {
429        sys::napi_call_threadsafe_function(
430          self.handle.get_raw(),
431          Box::into_raw(Box::new(value.map(|data| {
432            ThreadsafeFunctionCallJsBackData {
433              data,
434              call_variant: ThreadsafeFunctionCallVariant::Direct,
435              callback: Box::new(|_d: Result<JsUnknown>| Ok(())),
436            }
437          })))
438          .cast(),
439          mode.into(),
440        )
441      }
442      .into()
443    })
444  }
445
446  pub fn call_with_return_value<D: FromNapiValue, F: 'static + FnOnce(D) -> Result<()>>(
447    &self,
448    value: Result<T>,
449    mode: ThreadsafeFunctionCallMode,
450    cb: F,
451  ) -> Status {
452    self.handle.with_read_aborted(|aborted| {
453      if aborted {
454        return Status::Closing;
455      }
456
457      unsafe {
458        sys::napi_call_threadsafe_function(
459          self.handle.get_raw(),
460          Box::into_raw(Box::new(value.map(|data| {
461            ThreadsafeFunctionCallJsBackData {
462              data,
463              call_variant: ThreadsafeFunctionCallVariant::WithCallback,
464              callback: Box::new(move |d: Result<JsUnknown>| {
465                d.and_then(|d| D::from_napi_value(d.0.env, d.0.value).and_then(cb))
466              }),
467            }
468          })))
469          .cast(),
470          mode.into(),
471        )
472      }
473      .into()
474    })
475  }
476
477  #[cfg(feature = "tokio_rt")]
478  pub async fn call_async<D: 'static + FromNapiValue>(&self, value: Result<T>) -> Result<D> {
479    let (sender, receiver) = tokio::sync::oneshot::channel::<Result<D>>();
480
481    self.handle.with_read_aborted(|aborted| {
482      if aborted {
483        return Err(crate::Error::from_status(Status::Closing));
484      }
485
486      check_status!(
487        unsafe {
488          sys::napi_call_threadsafe_function(
489            self.handle.get_raw(),
490            Box::into_raw(Box::new(value.map(|data| {
491              ThreadsafeFunctionCallJsBackData {
492                data,
493                call_variant: ThreadsafeFunctionCallVariant::WithCallback,
494                callback: Box::new(move |d: Result<JsUnknown>| {
495                  sender
496                    .send(d.and_then(|d| D::from_napi_value(d.0.env, d.0.value)))
497                    // The only reason for send to return Err is if the receiver isn't listening
498                    // Not hiding the error would result in a napi_fatal_error call, it's safe to ignore it instead.
499                    .or(Ok(()))
500                }),
501              }
502            })))
503            .cast(),
504            ThreadsafeFunctionCallMode::NonBlocking.into(),
505          )
506        },
507        "Threadsafe function call_async failed"
508      )
509    })?;
510    receiver
511      .await
512      .map_err(|_| {
513        crate::Error::new(
514          Status::GenericFailure,
515          "Receive value from threadsafe function sender failed",
516        )
517      })
518      .and_then(|ret| ret)
519  }
520}
521
522impl<T: 'static> ThreadsafeFunction<T, ErrorStrategy::Fatal> {
523  /// See [napi_call_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_call_threadsafe_function)
524  /// for more information.
525  pub fn call(&self, value: T, mode: ThreadsafeFunctionCallMode) -> Status {
526    self.handle.with_read_aborted(|aborted| {
527      if aborted {
528        return Status::Closing;
529      }
530
531      unsafe {
532        sys::napi_call_threadsafe_function(
533          self.handle.get_raw(),
534          Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
535            data: value,
536            call_variant: ThreadsafeFunctionCallVariant::Direct,
537            callback: Box::new(|_d: Result<JsUnknown>| Ok(())),
538          }))
539          .cast(),
540          mode.into(),
541        )
542      }
543      .into()
544    })
545  }
546
547  pub fn call_with_return_value<D: FromNapiValue, F: 'static + FnOnce(D) -> Result<()>>(
548    &self,
549    value: T,
550    mode: ThreadsafeFunctionCallMode,
551    cb: F,
552  ) -> Status {
553    self.handle.with_read_aborted(|aborted| {
554      if aborted {
555        return Status::Closing;
556      }
557
558      unsafe {
559        sys::napi_call_threadsafe_function(
560          self.handle.get_raw(),
561          Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
562            data: value,
563            call_variant: ThreadsafeFunctionCallVariant::WithCallback,
564            callback: Box::new(move |d: Result<JsUnknown>| {
565              d.and_then(|d| D::from_napi_value(d.0.env, d.0.value).and_then(cb))
566            }),
567          }))
568          .cast(),
569          mode.into(),
570        )
571      }
572      .into()
573    })
574  }
575
576  #[cfg(feature = "tokio_rt")]
577  pub async fn call_async<D: 'static + FromNapiValue>(&self, value: T) -> Result<D> {
578    let (sender, receiver) = tokio::sync::oneshot::channel::<D>();
579
580    self.handle.with_read_aborted(|aborted| {
581      if aborted {
582        return Err(crate::Error::from_status(Status::Closing));
583      }
584
585      check_status!(unsafe {
586        sys::napi_call_threadsafe_function(
587          self.handle.get_raw(),
588          Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
589            data: value,
590            call_variant: ThreadsafeFunctionCallVariant::WithCallback,
591            callback: Box::new(move |d: Result<JsUnknown>| {
592              d.and_then(|d| {
593                D::from_napi_value(d.0.env, d.0.value).and_then(move |d| {
594                  sender
595                    .send(d)
596                    // The only reason for send to return Err is if the receiver isn't listening
597                    // Not hiding the error would result in a napi_fatal_error call, it's safe to ignore it instead.
598                    .or(Ok(()))
599                })
600              })
601            }),
602          }))
603          .cast(),
604          ThreadsafeFunctionCallMode::NonBlocking.into(),
605        )
606      })
607    })?;
608
609    receiver
610      .await
611      .map_err(|err| crate::Error::new(Status::GenericFailure, format!("{}", err)))
612  }
613}
614
615#[allow(unused_variables)]
616unsafe extern "C" fn thread_finalize_cb<T: 'static, V: ToNapiValue, R>(
617  env: sys::napi_env,
618  finalize_data: *mut c_void,
619  finalize_hint: *mut c_void,
620) where
621  R: 'static + Send + FnMut(ThreadSafeCallContext<T>) -> Result<Vec<V>>,
622{
623  let handle_option =
624    unsafe { Weak::from_raw(finalize_data.cast::<ThreadsafeFunctionHandle>()).upgrade() };
625
626  if let Some(handle) = handle_option {
627    handle.with_write_aborted(|mut aborted_guard| {
628      if !*aborted_guard {
629        *aborted_guard = true;
630      }
631    });
632  }
633
634  // cleanup
635  drop(unsafe { Box::<R>::from_raw(finalize_hint.cast()) });
636}
637
638unsafe extern "C" fn call_js_cb<T: 'static, V: ToNapiValue, R, ES>(
639  raw_env: sys::napi_env,
640  js_callback: sys::napi_value,
641  context: *mut c_void,
642  data: *mut c_void,
643) where
644  R: 'static + Send + FnMut(ThreadSafeCallContext<T>) -> Result<Vec<V>>,
645  ES: ErrorStrategy::T,
646{
647  // env and/or callback can be null when shutting down
648  if raw_env.is_null() || js_callback.is_null() {
649    return;
650  }
651
652  let ctx: &mut R = unsafe { Box::leak(Box::from_raw(context.cast())) };
653  let val = unsafe {
654    match ES::VALUE {
655      ErrorStrategy::CalleeHandled::VALUE => {
656        *Box::<Result<ThreadsafeFunctionCallJsBackData<T>>>::from_raw(data.cast())
657      }
658      ErrorStrategy::Fatal::VALUE => Ok(*Box::<ThreadsafeFunctionCallJsBackData<T>>::from_raw(
659        data.cast(),
660      )),
661    }
662  };
663
664  let mut recv = ptr::null_mut();
665  unsafe { sys::napi_get_undefined(raw_env, &mut recv) };
666
667  let ret = val.and_then(|v| {
668    (ctx)(ThreadSafeCallContext {
669      env: unsafe { Env::from_raw(raw_env) },
670      value: v.data,
671    })
672    .map(|ret| (ret, v.call_variant, v.callback))
673  });
674
675  // Follow async callback conventions: https://nodejs.org/en/knowledge/errors/what-are-the-error-conventions/
676  // Check if the Result is okay, if so, pass a null as the first (error) argument automatically.
677  // If the Result is an error, pass that as the first argument.
678  let status = match ret {
679    Ok((values, call_variant, callback)) => {
680      let values = values
681        .into_iter()
682        .map(|v| unsafe { ToNapiValue::to_napi_value(raw_env, v) });
683      let args: Result<Vec<sys::napi_value>> = if ES::VALUE == ErrorStrategy::CalleeHandled::VALUE {
684        let mut js_null = ptr::null_mut();
685        unsafe { sys::napi_get_null(raw_env, &mut js_null) };
686        ::core::iter::once(Ok(js_null)).chain(values).collect()
687      } else {
688        values.collect()
689      };
690      let mut return_value = ptr::null_mut();
691      let mut status = match args {
692        Ok(args) => unsafe {
693          sys::napi_call_function(
694            raw_env,
695            recv,
696            js_callback,
697            args.len(),
698            args.as_ptr(),
699            &mut return_value,
700          )
701        },
702        Err(e) => match ES::VALUE {
703          ErrorStrategy::Fatal::VALUE => unsafe {
704            sys::napi_fatal_exception(raw_env, JsError::from(e).into_value(raw_env))
705          },
706          ErrorStrategy::CalleeHandled::VALUE => unsafe {
707            sys::napi_call_function(
708              raw_env,
709              recv,
710              js_callback,
711              1,
712              [JsError::from(e).into_value(raw_env)].as_mut_ptr(),
713              &mut return_value,
714            )
715          },
716        },
717      };
718      if let ThreadsafeFunctionCallVariant::WithCallback = call_variant {
719        // throw Error in JavaScript callback
720        let callback_arg = if status == sys::Status::napi_pending_exception {
721          let mut exception = ptr::null_mut();
722          status = unsafe { sys::napi_get_and_clear_last_exception(raw_env, &mut exception) };
723          Err(
724            JsUnknown(crate::Value {
725              env: raw_env,
726              value: exception,
727              value_type: crate::ValueType::Unknown,
728            })
729            .into(),
730          )
731        } else {
732          Ok(JsUnknown(crate::Value {
733            env: raw_env,
734            value: return_value,
735            value_type: crate::ValueType::Unknown,
736          }))
737        };
738        if let Err(err) = callback(callback_arg) {
739          let message = format!(
740            "Failed to convert return value in ThreadsafeFunction callback into Rust value: {}",
741            err
742          );
743          let message_length = message.len();
744          let c_message = CString::new(message).unwrap();
745          unsafe {
746            sys::napi_fatal_error(
747              "threadsafe_function.rs:749\0".as_ptr().cast(),
748              26,
749              c_message.as_ptr(),
750              message_length,
751            )
752          };
753        }
754      }
755      status
756    }
757    Err(e) if ES::VALUE == ErrorStrategy::Fatal::VALUE => unsafe {
758      sys::napi_fatal_exception(raw_env, JsError::from(e).into_value(raw_env))
759    },
760    Err(e) => unsafe {
761      sys::napi_call_function(
762        raw_env,
763        recv,
764        js_callback,
765        1,
766        [JsError::from(e).into_value(raw_env)].as_mut_ptr(),
767        ptr::null_mut(),
768      )
769    },
770  };
771  handle_call_js_cb_status(status, raw_env)
772}
773
774fn handle_call_js_cb_status(status: sys::napi_status, raw_env: sys::napi_env) {
775  if status == sys::Status::napi_ok {
776    return;
777  }
778  if status == sys::Status::napi_pending_exception {
779    let mut error_result = ptr::null_mut();
780    assert_eq!(
781      unsafe { sys::napi_get_and_clear_last_exception(raw_env, &mut error_result) },
782      sys::Status::napi_ok
783    );
784
785    // When shutting down, napi_fatal_exception sometimes returns another exception
786    let stat = unsafe { sys::napi_fatal_exception(raw_env, error_result) };
787    assert!(stat == sys::Status::napi_ok || stat == sys::Status::napi_pending_exception);
788  } else {
789    let error_code: Status = status.into();
790    let error_code_string = format!("{:?}", error_code);
791    let mut error_code_value = ptr::null_mut();
792    assert_eq!(
793      unsafe {
794        sys::napi_create_string_utf8(
795          raw_env,
796          error_code_string.as_ptr() as *const _,
797          error_code_string.len(),
798          &mut error_code_value,
799        )
800      },
801      sys::Status::napi_ok,
802    );
803    let error_msg = "Call JavaScript callback failed in threadsafe function";
804    let mut error_msg_value = ptr::null_mut();
805    assert_eq!(
806      unsafe {
807        sys::napi_create_string_utf8(
808          raw_env,
809          error_msg.as_ptr() as *const _,
810          error_msg.len(),
811          &mut error_msg_value,
812        )
813      },
814      sys::Status::napi_ok,
815    );
816    let mut error_value = ptr::null_mut();
817    assert_eq!(
818      unsafe {
819        sys::napi_create_error(raw_env, error_code_value, error_msg_value, &mut error_value)
820      },
821      sys::Status::napi_ok,
822    );
823    assert_eq!(
824      unsafe { sys::napi_fatal_exception(raw_env, error_value) },
825      sys::Status::napi_ok
826    );
827  }
828}
829
830/// Helper
831macro_rules! type_level_enum {(
832  $( #[doc = $doc:tt] )*
833  $pub:vis
834  enum $EnumName:ident {
835    $(
836      $( #[doc = $doc_variant:tt] )*
837      $Variant:ident
838    ),* $(,)?
839  }
840) => (type_level_enum! { // This requires the macro to be in scope when called.
841  with_docs! {
842    $( #[doc = $doc] )*
843    ///
844    /// ### Type-level `enum`
845    ///
846    /// Until `const_generics` can handle custom `enum`s, this pattern must be
847    /// implemented at the type level.
848    ///
849    /// We thus end up with:
850    ///
851    /// ```rust,ignore
852    /// #[type_level_enum]
853    #[doc = ::core::concat!(
854      " enum ", ::core::stringify!($EnumName), " {",
855    )]
856    $(
857      #[doc = ::core::concat!(
858        "     ", ::core::stringify!($Variant), ",",
859      )]
860    )*
861    #[doc = " }"]
862    /// ```
863    ///
864    #[doc = ::core::concat!(
865      "With [`", ::core::stringify!($EnumName), "::T`](#reexports) \
866      being the type-level \"enum type\":",
867    )]
868    ///
869    /// ```rust,ignore
870    #[doc = ::core::concat!(
871      "<Param: ", ::core::stringify!($EnumName), "::T>"
872    )]
873    /// ```
874  }
875  #[allow(warnings)]
876  $pub mod $EnumName {
877    #[doc(no_inline)]
878    pub use $EnumName as T;
879
880    super::type_level_enum! {
881      with_docs! {
882        #[doc = ::core::concat!(
883          "See [`", ::core::stringify!($EnumName), "`]\
884          [super::", ::core::stringify!($EnumName), "]"
885        )]
886      }
887      pub trait $EnumName : __sealed::$EnumName + ::core::marker::Sized + 'static {
888        const VALUE: __value::$EnumName;
889      }
890    }
891
892    mod __sealed { pub trait $EnumName {} }
893
894    mod __value {
895      #[derive(Debug, PartialEq, Eq)]
896      pub enum $EnumName { $( $Variant ),* }
897    }
898
899    $(
900      $( #[doc = $doc_variant] )*
901      pub enum $Variant {}
902      impl __sealed::$EnumName for $Variant {}
903      impl $EnumName for $Variant {
904        const VALUE: __value::$EnumName = __value::$EnumName::$Variant;
905      }
906      impl $Variant {
907        pub const VALUE: __value::$EnumName = __value::$EnumName::$Variant;
908      }
909    )*
910  }
911});(
912  with_docs! {
913    $( #[doc = $doc:expr] )*
914  }
915  $item:item
916) => (
917  $( #[doc = $doc] )*
918  $item
919)}
920
921use type_level_enum;
922
923pub struct UnknownReturnValue;
924
925impl TypeName for UnknownReturnValue {
926  fn type_name() -> &'static str {
927    "UnknownReturnValue"
928  }
929
930  fn value_type() -> crate::ValueType {
931    crate::ValueType::Unknown
932  }
933}
934
935impl ValidateNapiValue for UnknownReturnValue {}
936
937impl FromNapiValue for UnknownReturnValue {
938  unsafe fn from_napi_value(_env: sys::napi_env, _napi_val: sys::napi_value) -> Result<Self> {
939    Ok(UnknownReturnValue)
940  }
941}