1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
use std::ffi::CStr;
use std::mem;
use std::os::raw::c_void;
use std::ptr;
use std::rc::Rc;
use std::sync::atomic::{AtomicU8, Ordering};

use crate::{
  bindgen_runtime::ToNapiValue, check_status, js_values::NapiValue, sys, Env, JsError, JsObject,
  Result, Task,
};

struct AsyncWork<T: Task> {
  inner_task: T,
  deferred: sys::napi_deferred,
  value: Result<mem::MaybeUninit<T::Output>>,
  napi_async_work: sys::napi_async_work,
  status: Rc<AtomicU8>,
}

pub struct AsyncWorkPromise {
  pub(crate) napi_async_work: sys::napi_async_work,
  raw_promise: sys::napi_value,
  pub(crate) deferred: sys::napi_deferred,
  env: sys::napi_env,
  /// share with AsyncWork
  /// 0: not started
  /// 1: completed
  /// 2: canceled
  pub(crate) status: Rc<AtomicU8>,
}

impl AsyncWorkPromise {
  pub fn promise_object(&self) -> JsObject {
    unsafe { JsObject::from_raw_unchecked(self.env, self.raw_promise) }
  }

  pub fn cancel(&self) -> Result<()> {
    // must be happened in the main thread, relaxed is enough
    self.status.store(2, Ordering::Relaxed);
    check_status!(unsafe { sys::napi_cancel_async_work(self.env, self.napi_async_work) })
  }
}

pub fn run<T: Task>(
  env: sys::napi_env,
  task: T,
  abort_status: Option<Rc<AtomicU8>>,
) -> Result<AsyncWorkPromise> {
  let mut raw_resource = ptr::null_mut();
  check_status!(unsafe { sys::napi_create_object(env, &mut raw_resource) })?;
  let mut raw_promise = ptr::null_mut();
  let mut deferred = ptr::null_mut();
  check_status!(unsafe { sys::napi_create_promise(env, &mut deferred, &mut raw_promise) })?;
  let task_status = abort_status.unwrap_or_else(|| Rc::new(AtomicU8::new(0)));
  let result = Box::leak(Box::new(AsyncWork {
    inner_task: task,
    deferred,
    value: Ok(mem::MaybeUninit::zeroed()),
    napi_async_work: ptr::null_mut(),
    status: task_status.clone(),
  }));
  let async_work_name = unsafe { CStr::from_bytes_with_nul_unchecked(b"napi_rs_async_work\0") };
  check_status!(unsafe {
    sys::napi_create_async_work(
      env,
      raw_resource,
      async_work_name.as_ptr() as *mut _,
      Some(execute::<T> as unsafe extern "C" fn(env: sys::napi_env, data: *mut c_void)),
      Some(
        complete::<T>
          as unsafe extern "C" fn(env: sys::napi_env, status: sys::napi_status, data: *mut c_void),
      ),
      result as *mut _ as *mut c_void,
      &mut result.napi_async_work,
    )
  })?;
  check_status!(unsafe { sys::napi_queue_async_work(env, result.napi_async_work) })?;
  Ok(AsyncWorkPromise {
    napi_async_work: result.napi_async_work,
    raw_promise,
    deferred,
    env,
    status: task_status,
  })
}

#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl<T: Task> Send for AsyncWork<T> {}

unsafe impl<T: Task> Sync for AsyncWork<T> {}

/// env here is the same with the one in `CallContext`.
/// So it actually could do nothing here, because `execute` function is called in the other thread mostly.
unsafe extern "C" fn execute<T: Task>(_env: sys::napi_env, data: *mut c_void) {
  let mut work = unsafe { Box::from_raw(data as *mut AsyncWork<T>) };
  let _ = mem::replace(
    &mut work.value,
    work.inner_task.compute().map(mem::MaybeUninit::new),
  );
  Box::leak(work);
}

unsafe extern "C" fn complete<T: Task>(
  env: sys::napi_env,
  status: sys::napi_status,
  data: *mut c_void,
) {
  let mut work = unsafe { Box::from_raw(data as *mut AsyncWork<T>) };
  let value_ptr = mem::replace(&mut work.value, Ok(mem::MaybeUninit::zeroed()));
  let deferred = mem::replace(&mut work.deferred, ptr::null_mut());
  let napi_async_work = mem::replace(&mut work.napi_async_work, ptr::null_mut());
  let value = match value_ptr {
    Ok(v) => {
      let output = unsafe { v.assume_init() };
      work
        .inner_task
        .resolve(unsafe { Env::from_raw(env) }, output)
    }
    Err(e) => work.inner_task.reject(unsafe { Env::from_raw(env) }, e),
  };
  if status != sys::Status::napi_cancelled && work.status.load(Ordering::Relaxed) != 2 {
    match check_status!(status)
      .and_then(move |_| value)
      .and_then(|v| unsafe { ToNapiValue::to_napi_value(env, v) })
    {
      Ok(v) => {
        let status = unsafe { sys::napi_resolve_deferred(env, deferred, v) };
        debug_assert!(status == sys::Status::napi_ok, "Resolve promise failed");
      }
      Err(e) => {
        let status =
          unsafe { sys::napi_reject_deferred(env, deferred, JsError::from(e).into_value(env)) };
        debug_assert!(status == sys::Status::napi_ok, "Reject promise failed");
      }
    };
  }
  if let Err(e) = work.inner_task.finally(unsafe { Env::from_raw(env) }) {
    debug_assert!(false, "Panic in Task finally fn: {:?}", e);
  }
  let delete_status = unsafe { sys::napi_delete_async_work(env, napi_async_work) };
  debug_assert!(
    delete_status == sys::Status::napi_ok,
    "Delete async work failed"
  );
  work.status.store(1, Ordering::Relaxed);
}