napi/
threadsafe_function.rs

1#![allow(clippy::single_component_path_imports)]
2
3#[cfg(feature = "tokio_rt")]
4use std::convert::identity;
5use std::marker::PhantomData;
6use std::os::raw::c_void;
7use std::ptr::{self, null_mut};
8use std::sync::{
9  self,
10  atomic::{AtomicBool, AtomicPtr, Ordering},
11  Arc, RwLock, RwLockWriteGuard,
12};
13
14use crate::bindgen_runtime::{
15  FromNapiValue, JsValuesTupleIntoVec, TypeName, Unknown, ValidateNapiValue,
16};
17use crate::{
18  check_status, get_error_message_and_stack_trace, sys, Env, Error, JsError, Result, Status,
19};
20
21#[deprecated(since = "2.17.0", note = "Please use `ThreadsafeFunction` instead")]
22pub type ThreadSafeCallContext<T> = ThreadsafeCallContext<T>;
23
24/// ThreadSafeFunction Context object
25/// the `value` is the value passed to `call` method
26pub struct ThreadsafeCallContext<T: 'static> {
27  pub env: Env,
28  pub value: T,
29}
30
31#[repr(u8)]
32#[derive(Clone, Copy, Debug, Eq, PartialEq)]
33pub enum ThreadsafeFunctionCallMode {
34  NonBlocking,
35  Blocking,
36}
37
38impl From<ThreadsafeFunctionCallMode> for sys::napi_threadsafe_function_call_mode {
39  fn from(value: ThreadsafeFunctionCallMode) -> Self {
40    match value {
41      ThreadsafeFunctionCallMode::Blocking => sys::ThreadsafeFunctionCallMode::blocking,
42      ThreadsafeFunctionCallMode::NonBlocking => sys::ThreadsafeFunctionCallMode::nonblocking,
43    }
44  }
45}
46
47struct ThreadsafeFunctionHandle {
48  raw: AtomicPtr<sys::napi_threadsafe_function__>,
49  aborted: RwLock<bool>,
50  referred: AtomicBool,
51}
52
53impl ThreadsafeFunctionHandle {
54  /// create a Arc to hold the `ThreadsafeFunctionHandle`
55  fn new(raw: sys::napi_threadsafe_function) -> Arc<Self> {
56    Arc::new(Self {
57      raw: AtomicPtr::new(raw),
58      aborted: RwLock::new(false),
59      referred: AtomicBool::new(true),
60    })
61  }
62
63  /// Lock `aborted` with read access, call `f` with the value of `aborted`, then unlock it
64  fn with_read_aborted<RT, F>(&self, f: F) -> RT
65  where
66    F: FnOnce(bool) -> RT,
67  {
68    let aborted_guard = self
69      .aborted
70      .read()
71      .expect("Threadsafe Function aborted lock failed");
72    f(*aborted_guard)
73  }
74
75  /// Lock `aborted` with write access, call `f` with the `RwLockWriteGuard`, then unlock it
76  fn with_write_aborted<RT, F>(&self, f: F) -> RT
77  where
78    F: FnOnce(RwLockWriteGuard<bool>) -> RT,
79  {
80    let aborted_guard = self
81      .aborted
82      .write()
83      .expect("Threadsafe Function aborted lock failed");
84    f(aborted_guard)
85  }
86
87  #[allow(clippy::arc_with_non_send_sync)]
88  fn null() -> Arc<Self> {
89    Self::new(null_mut())
90  }
91
92  fn get_raw(&self) -> sys::napi_threadsafe_function {
93    self.raw.load(Ordering::SeqCst)
94  }
95
96  fn set_raw(&self, raw: sys::napi_threadsafe_function) {
97    self.raw.store(raw, Ordering::SeqCst)
98  }
99}
100
101impl Drop for ThreadsafeFunctionHandle {
102  fn drop(&mut self) {
103    self.with_read_aborted(|aborted| {
104      if !aborted {
105        let raw = self.get_raw();
106        // if ThreadsafeFunction::create failed, the raw will be null and we don't need to release it
107        if !raw.is_null() {
108          let release_status = unsafe {
109            sys::napi_release_threadsafe_function(
110              self.get_raw(),
111              sys::ThreadsafeFunctionReleaseMode::release,
112            )
113          };
114          assert!(
115            release_status == sys::Status::napi_ok,
116            "Threadsafe Function release failed {}",
117            Status::from(release_status)
118          );
119        }
120      }
121    })
122  }
123}
124
125#[repr(u8)]
126enum ThreadsafeFunctionCallVariant {
127  Direct,
128  WithCallback,
129}
130
131struct ThreadsafeFunctionCallJsBackData<T, Return = Unknown<'static>> {
132  data: T,
133  call_variant: ThreadsafeFunctionCallVariant,
134  callback: Box<dyn FnOnce(Result<Return>, Env) -> Result<()>>,
135}
136
137/// Communicate with the addon's main thread by invoking a JavaScript function from other threads.
138///
139/// ## Example
140/// An example of using `ThreadsafeFunction`:
141///
142/// ```rust
143/// use std::thread;
144/// use std::sync::Arc;
145///
146/// use napi::{
147///     threadsafe_function::{
148///         ThreadSafeCallContext, ThreadsafeFunctionCallMode, ThreadsafeFunctionReleaseMode,
149///     },
150/// };
151/// use napi_derive::napi;
152///
153/// #[napi]
154/// pub fn call_threadsafe_function(callback: Arc<ThreadsafeFunction<(u32, bool, String), ()>>) {
155///   let tsfn_cloned = tsfn.clone();
156///
157///   thread::spawn(move || {
158///       let output: Vec<u32> = vec![0, 1, 2, 3];
159///       // It's okay to call a threadsafe function multiple times.
160///       tsfn.call(Ok((1, false, "NAPI-RS".into())), ThreadsafeFunctionCallMode::Blocking);
161///       tsfn.call(Ok((2, true, "NAPI-RS".into())), ThreadsafeFunctionCallMode::NonBlocking);
162///   });
163///
164///   thread::spawn(move || {
165///       tsfn_cloned.call((3, false, "NAPI-RS".into())), ThreadsafeFunctionCallMode::NonBlocking);
166///   });
167/// }
168/// ```
169pub struct ThreadsafeFunction<
170  T: 'static,
171  Return: 'static + FromNapiValue = Unknown<'static>,
172  CallJsBackArgs: 'static + JsValuesTupleIntoVec = T,
173  ErrorStatus: AsRef<str> + From<Status> = Status,
174  const CalleeHandled: bool = true,
175  const Weak: bool = false,
176  const MaxQueueSize: usize = 0,
177> {
178  handle: Arc<ThreadsafeFunctionHandle>,
179  _phantom: PhantomData<(T, CallJsBackArgs, Return, ErrorStatus)>,
180}
181
182unsafe impl<
183    T: 'static,
184    Return: FromNapiValue,
185    CallJsBackArgs: 'static + JsValuesTupleIntoVec,
186    ErrorStatus: AsRef<str> + From<Status>,
187    const CalleeHandled: bool,
188    const Weak: bool,
189    const MaxQueueSize: usize,
190  > Send
191  for ThreadsafeFunction<
192    T,
193    Return,
194    CallJsBackArgs,
195    ErrorStatus,
196    { CalleeHandled },
197    { Weak },
198    { MaxQueueSize },
199  >
200{
201}
202
203unsafe impl<
204    T: 'static,
205    Return: FromNapiValue,
206    CallJsBackArgs: 'static + JsValuesTupleIntoVec,
207    ErrorStatus: AsRef<str> + From<Status>,
208    const CalleeHandled: bool,
209    const Weak: bool,
210    const MaxQueueSize: usize,
211  > Sync
212  for ThreadsafeFunction<
213    T,
214    Return,
215    CallJsBackArgs,
216    ErrorStatus,
217    { CalleeHandled },
218    { Weak },
219    { MaxQueueSize },
220  >
221{
222}
223
224impl<
225    T: 'static + JsValuesTupleIntoVec,
226    Return: FromNapiValue,
227    ErrorStatus: AsRef<str> + From<Status>,
228    const CalleeHandled: bool,
229    const Weak: bool,
230    const MaxQueueSize: usize,
231  > FromNapiValue
232  for ThreadsafeFunction<T, Return, T, ErrorStatus, { CalleeHandled }, { Weak }, { MaxQueueSize }>
233{
234  unsafe fn from_napi_value(env: sys::napi_env, napi_val: sys::napi_value) -> Result<Self> {
235    Self::create(env, napi_val, |ctx| Ok(ctx.value))
236  }
237}
238
239impl<
240    T: 'static + JsValuesTupleIntoVec,
241    Return: FromNapiValue,
242    ErrorStatus: AsRef<str> + From<Status>,
243    const CalleeHandled: bool,
244    const Weak: bool,
245    const MaxQueueSize: usize,
246  > TypeName
247  for ThreadsafeFunction<T, Return, T, ErrorStatus, { CalleeHandled }, { Weak }, { MaxQueueSize }>
248{
249  fn type_name() -> &'static str {
250    "ThreadsafeFunction"
251  }
252
253  fn value_type() -> crate::ValueType {
254    crate::ValueType::Function
255  }
256}
257
258impl<
259    T: 'static + JsValuesTupleIntoVec,
260    Return: FromNapiValue,
261    ErrorStatus: AsRef<str> + From<Status>,
262    const CalleeHandled: bool,
263    const Weak: bool,
264    const MaxQueueSize: usize,
265  > ValidateNapiValue
266  for ThreadsafeFunction<T, Return, T, ErrorStatus, { CalleeHandled }, { Weak }, { MaxQueueSize }>
267{
268}
269
270impl<
271    T: 'static,
272    Return: FromNapiValue,
273    CallJsBackArgs: 'static + JsValuesTupleIntoVec,
274    ErrorStatus: AsRef<str> + From<Status>,
275    const CalleeHandled: bool,
276    const Weak: bool,
277    const MaxQueueSize: usize,
278  >
279  ThreadsafeFunction<
280    T,
281    Return,
282    CallJsBackArgs,
283    ErrorStatus,
284    { CalleeHandled },
285    { Weak },
286    { MaxQueueSize },
287  >
288{
289  // See [napi_create_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_create_threadsafe_function)
290  // for more information.
291  pub(crate) fn create<
292    NewArgs: 'static + JsValuesTupleIntoVec,
293    R: 'static + FnMut(ThreadsafeCallContext<T>) -> Result<NewArgs>,
294  >(
295    env: sys::napi_env,
296    func: sys::napi_value,
297    callback: R,
298  ) -> Result<
299    ThreadsafeFunction<
300      T,
301      Return,
302      NewArgs,
303      ErrorStatus,
304      { CalleeHandled },
305      { Weak },
306      { MaxQueueSize },
307    >,
308  > {
309    let mut async_resource_name = ptr::null_mut();
310    static THREAD_SAFE_FUNCTION_ASYNC_RESOURCE_NAME: &str = "napi_rs_threadsafe_function";
311
312    #[cfg(feature = "napi10")]
313    {
314      let mut copied = false;
315      check_status!(
316        unsafe {
317          sys::node_api_create_external_string_latin1(
318            env,
319            THREAD_SAFE_FUNCTION_ASYNC_RESOURCE_NAME.as_ptr().cast(),
320            27,
321            None,
322            ptr::null_mut(),
323            &mut async_resource_name,
324            &mut copied,
325          )
326        },
327        "Create external string latin1 in ThreadsafeFunction::create failed"
328      )?;
329    }
330
331    #[cfg(not(feature = "napi10"))]
332    {
333      check_status!(
334        unsafe {
335          sys::napi_create_string_utf8(
336            env,
337            THREAD_SAFE_FUNCTION_ASYNC_RESOURCE_NAME.as_ptr().cast(),
338            27,
339            &mut async_resource_name,
340          )
341        },
342        "Create string utf8 in ThreadsafeFunction::create failed"
343      )?;
344    }
345
346    let mut raw_tsfn = ptr::null_mut();
347    let callback_ptr = Box::into_raw(Box::new(callback));
348    let handle = ThreadsafeFunctionHandle::null();
349    check_status!(
350      unsafe {
351        sys::napi_create_threadsafe_function(
352          env,
353          func,
354          ptr::null_mut(),
355          async_resource_name,
356          MaxQueueSize,
357          1,
358          Arc::downgrade(&handle).into_raw().cast_mut().cast(), // pass handler to thread_finalize_cb
359          Some(thread_finalize_cb::<T, NewArgs, R>),
360          callback_ptr.cast(),
361          Some(call_js_cb::<T, Return, NewArgs, ErrorStatus, R, CalleeHandled>),
362          &mut raw_tsfn,
363        )
364      },
365      "Create threadsafe function in ThreadsafeFunction::create failed"
366    )?;
367    handle.set_raw(raw_tsfn);
368
369    // Weak ThreadsafeFunction will not prevent the event loop from exiting
370    if Weak {
371      check_status!(
372        unsafe { sys::napi_unref_threadsafe_function(env, raw_tsfn) },
373        "Unref threadsafe function failed in Weak mode"
374      )?;
375    }
376
377    Ok(ThreadsafeFunction {
378      handle,
379      _phantom: PhantomData,
380    })
381  }
382
383  #[deprecated(
384    since = "2.17.0",
385    note = "Please use `ThreadsafeFunction::clone` instead of manually increasing the reference count"
386  )]
387  /// See [napi_ref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_ref_threadsafe_function)
388  /// for more information.
389  ///
390  /// "ref" is a keyword so that we use "refer" here.
391  pub fn refer(&mut self, env: &Env) -> Result<()> {
392    self.handle.with_read_aborted(|aborted| {
393      if !aborted && !self.handle.referred.load(Ordering::Relaxed) {
394        check_status!(unsafe { sys::napi_ref_threadsafe_function(env.0, self.handle.get_raw()) })?;
395        self.handle.referred.store(true, Ordering::Relaxed);
396      }
397      Ok(())
398    })
399  }
400
401  #[deprecated(
402    since = "2.17.0",
403    note = "Please use `ThreadsafeFunction::clone` instead of manually decreasing the reference count"
404  )]
405  /// See [napi_unref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_unref_threadsafe_function)
406  /// for more information.
407  pub fn unref(&mut self, env: &Env) -> Result<()> {
408    self.handle.with_read_aborted(|aborted| {
409      if !aborted && self.handle.referred.load(Ordering::Relaxed) {
410        check_status!(unsafe {
411          sys::napi_unref_threadsafe_function(env.0, self.handle.get_raw())
412        })?;
413        self.handle.referred.store(false, Ordering::Relaxed);
414      }
415      Ok(())
416    })
417  }
418
419  pub fn aborted(&self) -> bool {
420    self.handle.with_read_aborted(|aborted| aborted)
421  }
422
423  #[deprecated(
424    since = "2.17.0",
425    note = "Drop all references to the ThreadsafeFunction will automatically release it"
426  )]
427  pub fn abort(self) -> Result<()> {
428    self.handle.with_write_aborted(|mut aborted_guard| {
429      if !*aborted_guard {
430        check_status!(unsafe {
431          sys::napi_release_threadsafe_function(
432            self.handle.get_raw(),
433            sys::ThreadsafeFunctionReleaseMode::abort,
434          )
435        })?;
436        *aborted_guard = true;
437      }
438      Ok(())
439    })
440  }
441
442  /// Get the raw `ThreadSafeFunction` pointer
443  pub fn raw(&self) -> sys::napi_threadsafe_function {
444    self.handle.get_raw()
445  }
446}
447
448impl<
449    T: 'static,
450    Return: FromNapiValue,
451    CallJsBackArgs: 'static + JsValuesTupleIntoVec,
452    ErrorStatus: AsRef<str> + From<Status>,
453    const Weak: bool,
454    const MaxQueueSize: usize,
455  > ThreadsafeFunction<T, Return, CallJsBackArgs, ErrorStatus, true, { Weak }, { MaxQueueSize }>
456{
457  /// See [napi_call_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_call_threadsafe_function)
458  /// for more information.
459  pub fn call(&self, value: Result<T, ErrorStatus>, mode: ThreadsafeFunctionCallMode) -> Status {
460    self.handle.with_read_aborted(|aborted| {
461      if aborted {
462        return Status::Closing;
463      }
464
465      unsafe {
466        sys::napi_call_threadsafe_function(
467          self.handle.get_raw(),
468          Box::into_raw(Box::new(value.map(|data| {
469            ThreadsafeFunctionCallJsBackData {
470              data,
471              call_variant: ThreadsafeFunctionCallVariant::Direct,
472              callback: Box::new(|_d: Result<Return>, _| Ok(())),
473            }
474          })))
475          .cast(),
476          mode.into(),
477        )
478      }
479      .into()
480    })
481  }
482
483  /// Call the ThreadsafeFunction, and handle the return value with a callback
484  pub fn call_with_return_value<F: 'static + FnOnce(Result<Return>, Env) -> Result<()>>(
485    &self,
486    value: Result<T, ErrorStatus>,
487    mode: ThreadsafeFunctionCallMode,
488    cb: F,
489  ) -> Status {
490    self.handle.with_read_aborted(|aborted| {
491      if aborted {
492        return Status::Closing;
493      }
494
495      unsafe {
496        sys::napi_call_threadsafe_function(
497          self.handle.get_raw(),
498          Box::into_raw(Box::new(value.map(|data| {
499            ThreadsafeFunctionCallJsBackData {
500              data,
501              call_variant: ThreadsafeFunctionCallVariant::WithCallback,
502              callback: Box::new(move |d: Result<Return>, env: Env| cb(d, env)),
503            }
504          })))
505          .cast(),
506          mode.into(),
507        )
508      }
509      .into()
510    })
511  }
512
513  #[cfg(feature = "tokio_rt")]
514  /// Call the ThreadsafeFunction, and handle the return value with in `async` way
515  pub async fn call_async(&self, value: Result<T, ErrorStatus>) -> Result<Return> {
516    let (sender, receiver) = tokio::sync::oneshot::channel::<Result<Return>>();
517
518    self.handle.with_read_aborted(|aborted| {
519      if aborted {
520        return Err(crate::Error::from_status(Status::Closing));
521      }
522
523      check_status!(
524        unsafe {
525          sys::napi_call_threadsafe_function(
526            self.handle.get_raw(),
527            Box::into_raw(Box::new(value.map(|data| {
528              ThreadsafeFunctionCallJsBackData {
529                data,
530                call_variant: ThreadsafeFunctionCallVariant::WithCallback,
531                callback: Box::new(move |d: Result<Return>, _| {
532                  sender
533                    .send(d)
534                    // The only reason for send to return Err is if the receiver isn't listening
535                    // Not hiding the error would result in a napi_fatal_error call, it's safe to ignore it instead.
536                    .or(Ok(()))
537                }),
538              }
539            })))
540            .cast(),
541            ThreadsafeFunctionCallMode::NonBlocking.into(),
542          )
543        },
544        "Threadsafe function call_async failed"
545      )
546    })?;
547    receiver
548      .await
549      .map_err(|_| {
550        crate::Error::new(
551          Status::GenericFailure,
552          "Receive value from threadsafe function sender failed",
553        )
554      })
555      .and_then(identity)
556  }
557}
558
559impl<
560    T: 'static,
561    Return: FromNapiValue,
562    CallJsBackArgs: 'static + JsValuesTupleIntoVec,
563    ErrorStatus: AsRef<str> + From<Status>,
564    const Weak: bool,
565    const MaxQueueSize: usize,
566  > ThreadsafeFunction<T, Return, CallJsBackArgs, ErrorStatus, false, { Weak }, { MaxQueueSize }>
567{
568  /// See [napi_call_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_call_threadsafe_function)
569  /// for more information.
570  pub fn call(&self, value: T, mode: ThreadsafeFunctionCallMode) -> Status {
571    self.handle.with_read_aborted(|aborted| {
572      if aborted {
573        return Status::Closing;
574      }
575
576      unsafe {
577        sys::napi_call_threadsafe_function(
578          self.handle.get_raw(),
579          Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
580            data: value,
581            call_variant: ThreadsafeFunctionCallVariant::Direct,
582            callback: Box::new(|_d: Result<Return>, _: Env| Ok(())),
583          }))
584          .cast(),
585          mode.into(),
586        )
587      }
588      .into()
589    })
590  }
591
592  /// Call the ThreadsafeFunction, and handle the return value with a callback
593  pub fn call_with_return_value<F: 'static + FnOnce(Result<Return>, Env) -> Result<()>>(
594    &self,
595    value: T,
596    mode: ThreadsafeFunctionCallMode,
597    cb: F,
598  ) -> Status {
599    self.handle.with_read_aborted(|aborted| {
600      if aborted {
601        return Status::Closing;
602      }
603
604      unsafe {
605        sys::napi_call_threadsafe_function(
606          self.handle.get_raw(),
607          Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
608            data: value,
609            call_variant: ThreadsafeFunctionCallVariant::WithCallback,
610            callback: Box::new(cb),
611          }))
612          .cast(),
613          mode.into(),
614        )
615      }
616      .into()
617    })
618  }
619
620  #[cfg(feature = "tokio_rt")]
621  /// Call the ThreadsafeFunction, and handle the return value with in `async` way
622  pub async fn call_async(&self, value: T) -> Result<Return> {
623    let (sender, receiver) = tokio::sync::oneshot::channel::<Return>();
624
625    self.handle.with_read_aborted(|aborted| {
626      if aborted {
627        return Err(crate::Error::from_status(Status::Closing));
628      }
629
630      check_status!(unsafe {
631        sys::napi_call_threadsafe_function(
632          self.handle.get_raw(),
633          Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData {
634            data: value,
635            call_variant: ThreadsafeFunctionCallVariant::WithCallback,
636            callback: Box::new(move |d, _| {
637              d.and_then(|d| {
638                sender
639                  .send(d)
640                  // The only reason for send to return Err is if the receiver isn't listening
641                  // Not hiding the error would result in a napi_fatal_error call, it's safe to ignore it instead.
642                  .or(Ok(()))
643              })
644            }),
645          }))
646          .cast(),
647          ThreadsafeFunctionCallMode::NonBlocking.into(),
648        )
649      })
650    })?;
651
652    receiver
653      .await
654      .map_err(|err| crate::Error::new(Status::GenericFailure, format!("{err}")))
655  }
656}
657
658unsafe extern "C" fn thread_finalize_cb<T: 'static, V: 'static + JsValuesTupleIntoVec, R>(
659  #[allow(unused_variables)] env: sys::napi_env,
660  finalize_data: *mut c_void,
661  finalize_hint: *mut c_void,
662) where
663  R: 'static + FnMut(ThreadsafeCallContext<T>) -> Result<V>,
664{
665  let handle_option: Option<Arc<ThreadsafeFunctionHandle>> =
666    unsafe { sync::Weak::from_raw(finalize_data.cast()).upgrade() };
667
668  if let Some(handle) = handle_option {
669    handle.with_write_aborted(|mut aborted_guard| {
670      if !*aborted_guard {
671        *aborted_guard = true;
672      }
673    });
674  }
675
676  // cleanup
677  drop(unsafe { Box::<R>::from_raw(finalize_hint.cast()) });
678}
679
680unsafe extern "C" fn call_js_cb<
681  T: 'static,
682  Return: FromNapiValue,
683  V: 'static + JsValuesTupleIntoVec,
684  ErrorStatus: AsRef<str> + From<Status>,
685  R,
686  const CalleeHandled: bool,
687>(
688  raw_env: sys::napi_env,
689  js_callback: sys::napi_value,
690  context: *mut c_void,
691  data: *mut c_void,
692) where
693  R: 'static + FnMut(ThreadsafeCallContext<T>) -> Result<V>,
694{
695  // env and/or callback can be null when shutting down
696  if raw_env.is_null() || js_callback.is_null() {
697    return;
698  }
699
700  let callback: &mut R = unsafe { Box::leak(Box::from_raw(context.cast())) };
701  let val = unsafe {
702    if CalleeHandled {
703      *Box::<Result<ThreadsafeFunctionCallJsBackData<T, Return>, ErrorStatus>>::from_raw(
704        data.cast(),
705      )
706    } else {
707      Ok(*Box::<ThreadsafeFunctionCallJsBackData<T, Return>>::from_raw(data.cast()))
708    }
709  };
710
711  let mut recv = ptr::null_mut();
712  unsafe { sys::napi_get_undefined(raw_env, &mut recv) };
713
714  let ret = val.and_then(|v| {
715    (callback)(ThreadsafeCallContext {
716      env: Env::from_raw(raw_env),
717      value: v.data,
718    })
719    .and_then(|ret| Ok((ret.into_vec(raw_env)?, v.call_variant, v.callback)))
720    .map_err(|err| Error::new(err.status.into(), err.reason.clone()))
721  });
722
723  // Follow async callback conventions: https://nodejs.org/en/knowledge/errors/what-are-the-error-conventions/
724  // Check if the Result is okay, if so, pass a null as the first (error) argument automatically.
725  // If the Result is an error, pass that as the first argument.
726  let status = match ret {
727    Ok((values, call_variant, callback)) => {
728      let args: Vec<sys::napi_value> = if CalleeHandled {
729        let mut js_null = ptr::null_mut();
730        unsafe { sys::napi_get_null(raw_env, &mut js_null) };
731        core::iter::once(js_null).chain(values).collect()
732      } else {
733        values
734      };
735      let mut return_value = ptr::null_mut();
736      let mut status = sys::napi_call_function(
737        raw_env,
738        recv,
739        js_callback,
740        args.len(),
741        args.as_ptr(),
742        &mut return_value,
743      );
744      if let ThreadsafeFunctionCallVariant::WithCallback = call_variant {
745        // throw Error in JavaScript callback
746        let callback_arg = if status == sys::Status::napi_pending_exception {
747          let mut exception = ptr::null_mut();
748          unsafe { sys::napi_get_and_clear_last_exception(raw_env, &mut exception) };
749          let mut error_reference = ptr::null_mut();
750          let raw_status = status;
751          status =
752            unsafe { sys::napi_create_reference(raw_env, exception, 1, &mut error_reference) };
753
754          get_error_message_and_stack_trace(raw_env, exception).and_then(|reason| {
755            Err(Error {
756              maybe_raw: error_reference,
757              maybe_env: raw_env,
758              cause: None,
759              status: Status::from(raw_status),
760              reason,
761            })
762          })
763        } else {
764          unsafe { Return::from_napi_value(raw_env, return_value) }
765        };
766        if let Err(err) = callback(callback_arg, Env::from_raw(raw_env)) {
767          unsafe { sys::napi_fatal_exception(raw_env, JsError::from(err).into_value(raw_env)) };
768        }
769      }
770      status
771    }
772    Err(e) if !CalleeHandled => unsafe {
773      sys::napi_fatal_exception(raw_env, JsError::from(e).into_value(raw_env))
774    },
775    Err(e) => unsafe {
776      sys::napi_call_function(
777        raw_env,
778        recv,
779        js_callback,
780        1,
781        [JsError::from(e).into_value(raw_env)].as_mut_ptr(),
782        ptr::null_mut(),
783      )
784    },
785  };
786  handle_call_js_cb_status(status, raw_env)
787}
788
789fn handle_call_js_cb_status(status: sys::napi_status, raw_env: sys::napi_env) {
790  if status == sys::Status::napi_ok {
791    return;
792  }
793  if status == sys::Status::napi_pending_exception {
794    let mut error_result = ptr::null_mut();
795    assert_eq!(
796      unsafe { sys::napi_get_and_clear_last_exception(raw_env, &mut error_result) },
797      sys::Status::napi_ok
798    );
799
800    // When shutting down, napi_fatal_exception sometimes returns another exception
801    let stat = unsafe { sys::napi_fatal_exception(raw_env, error_result) };
802    assert!(stat == sys::Status::napi_ok || stat == sys::Status::napi_pending_exception);
803  } else {
804    let error_code: Status = status.into();
805    let mut error_code_value = ptr::null_mut();
806    assert_eq!(
807      unsafe {
808        sys::napi_create_string_utf8(
809          raw_env,
810          error_code.as_ref().as_ptr().cast(),
811          error_code.as_ref().len() as isize,
812          &mut error_code_value,
813        )
814      },
815      sys::Status::napi_ok,
816    );
817    const ERROR_MSG: &str = "Call JavaScript callback failed in threadsafe function";
818    let mut error_msg_value = ptr::null_mut();
819    assert_eq!(
820      unsafe {
821        sys::napi_create_string_utf8(
822          raw_env,
823          ERROR_MSG.as_ptr().cast(),
824          ERROR_MSG.len() as isize,
825          &mut error_msg_value,
826        )
827      },
828      sys::Status::napi_ok,
829    );
830    let mut error_value = ptr::null_mut();
831    assert_eq!(
832      unsafe {
833        sys::napi_create_error(raw_env, error_code_value, error_msg_value, &mut error_value)
834      },
835      sys::Status::napi_ok,
836    );
837    assert_eq!(
838      unsafe { sys::napi_fatal_exception(raw_env, error_value) },
839      sys::Status::napi_ok
840    );
841  }
842}
843
844/// This is a placeholder type that is used to indicate that the return value of a threadsafe function is unknown.
845/// Use this type when you don't care about the return value of a threadsafe function.
846///
847/// And you can't get the value of it as well because it's just a placeholder.
848pub struct UnknownReturnValue;
849
850impl TypeName for UnknownReturnValue {
851  fn type_name() -> &'static str {
852    "UnknownReturnValue"
853  }
854
855  fn value_type() -> crate::ValueType {
856    crate::ValueType::Unknown
857  }
858}
859
860impl ValidateNapiValue for UnknownReturnValue {}
861
862impl FromNapiValue for UnknownReturnValue {
863  unsafe fn from_napi_value(_env: sys::napi_env, _napi_val: sys::napi_value) -> Result<Self> {
864    Ok(UnknownReturnValue)
865  }
866}