Skip to main content

napi/
tokio_runtime.rs

1#[cfg(not(feature = "noop"))]
2use std::sync::{LazyLock, OnceLock, RwLock};
3use std::{future::Future, marker::PhantomData};
4
5use tokio::runtime::Runtime;
6
7use crate::{bindgen_runtime::ToNapiValue, sys, Env, Error, Result};
8#[cfg(not(feature = "noop"))]
9use crate::{JsDeferred, SendableResolver, Unknown};
10
11#[cfg(not(feature = "noop"))]
12fn create_runtime() -> Runtime {
13  // Check if we're supposed to use a user-defined runtime
14  if IS_USER_DEFINED_RT.get().copied().unwrap_or(false) {
15    // Try to take the user-defined runtime if it's still available
16    if let Some(user_defined_rt) = USER_DEFINED_RT
17      .get()
18      .and_then(|rt| rt.write().ok().and_then(|mut rt| rt.take()))
19    {
20      return user_defined_rt;
21    }
22    // If the user-defined runtime was already taken, fall back to creating a default runtime
23    // This handles the case where the runtime was shutdown and needs to be restarted
24  }
25
26  #[cfg(any(
27    all(target_family = "wasm", tokio_unstable),
28    not(target_family = "wasm")
29  ))]
30  {
31    tokio::runtime::Builder::new_multi_thread()
32      .enable_all()
33      .build()
34      .expect("Create tokio runtime failed")
35  }
36  #[cfg(all(target_family = "wasm", not(tokio_unstable)))]
37  {
38    tokio::runtime::Builder::new_current_thread()
39      .enable_all()
40      .build()
41      .expect("Create tokio runtime failed")
42  }
43}
44
45#[cfg(not(feature = "noop"))]
46static RT: LazyLock<RwLock<Option<Runtime>>> =
47  LazyLock::new(|| RwLock::new(Some(create_runtime())));
48
49#[cfg(not(feature = "noop"))]
50static USER_DEFINED_RT: OnceLock<RwLock<Option<Runtime>>> = OnceLock::new();
51
52#[cfg(not(feature = "noop"))]
53static IS_USER_DEFINED_RT: OnceLock<bool> = OnceLock::new();
54
55#[cfg(not(feature = "noop"))]
56/// Create a custom Tokio runtime used by the NAPI-RS.
57/// You can control the tokio runtime configuration by yourself.
58/// ### Example
59/// ```no_run
60/// use tokio::runtime::Builder;
61/// use napi::create_custom_tokio_runtime;
62///
63/// #[napi_derive::module_init]
64/// fn init() {
65///    let rt = Builder::new_multi_thread().enable_all().thread_stack_size(32 * 1024 * 1024).build().unwrap();
66///    create_custom_tokio_runtime(rt);
67/// }
68pub fn create_custom_tokio_runtime(rt: Runtime) {
69  USER_DEFINED_RT.get_or_init(move || RwLock::new(Some(rt)));
70  IS_USER_DEFINED_RT.get_or_init(|| true);
71}
72
73#[cfg(feature = "noop")]
74pub fn create_custom_tokio_runtime(_: Runtime) {}
75
76#[cfg(not(feature = "noop"))]
77/// Start the async runtime (Currently is tokio).
78///
79/// In Node.js native targets the async runtime will be dropped when Node env exits.
80/// But in Electron renderer process, the Node env will exits and recreate when the window reloads.
81/// So we need to ensure that the async runtime is initialized when the Node env is created.
82///
83/// In wasm targets, the async runtime will not been shutdown automatically due to the limitation of the wasm runtime.
84/// So, you need to call `shutdown_async_runtime` function to manually shutdown the async runtime.
85/// In some scenarios, you may want to start the async runtime again like in tests.
86pub fn start_async_runtime() {
87  if let Ok(mut rt) = RT.write() {
88    if rt.is_none() {
89      *rt = Some(create_runtime());
90    }
91  }
92}
93
94#[cfg(not(feature = "noop"))]
95pub fn shutdown_async_runtime() {
96  if let Some(rt) = RT.write().ok().and_then(|mut rt| rt.take()) {
97    rt.shutdown_background();
98  }
99}
100
101#[cfg(not(feature = "noop"))]
102/// Spawns a future onto the Tokio runtime.
103///
104/// Depending on where you use it, you should await or abort the future in your drop function.
105/// To avoid undefined behavior and memory corruptions.
106pub fn spawn<F>(fut: F) -> tokio::task::JoinHandle<F::Output>
107where
108  F: 'static + Send + Future<Output = ()>,
109{
110  RT.read()
111    .ok()
112    .and_then(|rt| rt.as_ref().map(|rt| rt.spawn(fut)))
113    .expect("Access tokio runtime failed in spawn")
114}
115
116#[cfg(not(feature = "noop"))]
117/// Runs a future to completion
118/// This is blocking, meaning that it pauses other execution until the future is complete,
119/// only use it when it is absolutely necessary, in other places use async functions instead.
120pub fn block_on<F: Future>(fut: F) -> F::Output {
121  RT.read()
122    .ok()
123    .and_then(|rt| rt.as_ref().map(|rt| rt.block_on(fut)))
124    .expect("Access tokio runtime failed in block_on")
125}
126
127#[cfg(feature = "noop")]
128/// Runs a future to completion
129/// This is blocking, meaning that it pauses other execution until the future is complete,
130/// only use it when it is absolutely necessary, in other places use async functions instead.
131pub fn block_on<F: Future>(_: F) -> F::Output {
132  unreachable!("noop feature is enabled, block_on is not available")
133}
134
135#[cfg(not(feature = "noop"))]
136/// spawn_blocking on the current Tokio runtime.
137pub fn spawn_blocking<F, R>(func: F) -> tokio::task::JoinHandle<R>
138where
139  F: FnOnce() -> R + Send + 'static,
140  R: Send + 'static,
141{
142  RT.read()
143    .ok()
144    .and_then(|rt| rt.as_ref().map(|rt| rt.spawn_blocking(func)))
145    .expect("Access tokio runtime failed in spawn_blocking")
146}
147
148#[cfg(not(feature = "noop"))]
149// This function's signature must be kept in sync with the one in lib.rs, otherwise napi
150// will fail to compile with the `tokio_rt` feature.
151#[cfg(not(feature = "noop"))]
152/// If the feature `tokio_rt` has been enabled this will enter the runtime context and
153/// then call the provided closure. Otherwise it will just call the provided closure.
154pub fn within_runtime_if_available<F: FnOnce() -> T, T>(f: F) -> T {
155  RT.read()
156    .ok()
157    .and_then(|rt| {
158      rt.as_ref().map(|rt| {
159        let rt_guard = rt.enter();
160        let ret = f();
161        drop(rt_guard);
162        ret
163      })
164    })
165    .expect("Access tokio runtime failed in within_runtime_if_available")
166}
167
168#[cfg(feature = "noop")]
169pub fn within_runtime_if_available<F: FnOnce() -> T, T>(f: F) -> T {
170  f()
171}
172
173#[cfg(feature = "noop")]
174#[allow(unused)]
175pub fn execute_tokio_future<
176  Data: 'static + Send,
177  Fut: 'static + Send + Future<Output = std::result::Result<Data, impl Into<Error>>>,
178  Resolver: 'static + FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>,
179>(
180  env: sys::napi_env,
181  fut: Fut,
182  resolver: Resolver,
183) -> Result<sys::napi_value> {
184  Ok(std::ptr::null_mut())
185}
186
187#[cfg(not(feature = "noop"))]
188#[allow(clippy::not_unsafe_ptr_arg_deref)]
189pub fn execute_tokio_future<
190  Data: 'static + Send,
191  Fut: 'static + Send + Future<Output = std::result::Result<Data, impl Into<Error>>>,
192  Resolver: 'static + FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>,
193>(
194  env: sys::napi_env,
195  fut: Fut,
196  resolver: Resolver,
197) -> Result<sys::napi_value> {
198  let env = Env::from_raw(env);
199  let (deferred, promise) = JsDeferred::new(&env)?;
200  #[cfg(any(
201    all(target_family = "wasm", tokio_unstable),
202    not(target_family = "wasm")
203  ))]
204  let deferred_for_panic = deferred.clone();
205  let sendable_resolver = SendableResolver::new(resolver);
206
207  let inner = async move {
208    match fut.await {
209      Ok(v) => deferred.resolve(move |env| {
210        sendable_resolver
211          .resolve(env.raw(), v)
212          .map(|v| unsafe { Unknown::from_raw_unchecked(env.raw(), v) })
213      }),
214      Err(e) => deferred.reject(e.into()),
215    }
216  };
217
218  #[cfg(any(
219    all(target_family = "wasm", tokio_unstable),
220    not(target_family = "wasm")
221  ))]
222  let jh = spawn(inner);
223
224  #[cfg(any(
225    all(target_family = "wasm", tokio_unstable),
226    not(target_family = "wasm")
227  ))]
228  spawn(async move {
229    if let Err(err) = jh.await {
230      if let Ok(reason) = err.try_into_panic() {
231        if let Some(s) = reason.downcast_ref::<&str>() {
232          deferred_for_panic.reject(Error::new(crate::Status::GenericFailure, s));
233        } else {
234          deferred_for_panic.reject(Error::new(
235            crate::Status::GenericFailure,
236            "Panic in async function",
237          ));
238        }
239      }
240    }
241  });
242
243  #[cfg(all(target_family = "wasm", not(tokio_unstable)))]
244  {
245    std::thread::spawn(|| {
246      block_on(inner);
247    });
248  }
249
250  Ok(promise.0.value)
251}
252
253#[doc(hidden)]
254#[cfg(not(feature = "noop"))]
255#[allow(clippy::not_unsafe_ptr_arg_deref)]
256pub fn execute_tokio_future_with_finalize_callback<
257  Data: 'static + Send,
258  Fut: 'static + Send + Future<Output = std::result::Result<Data, impl Into<Error>>>,
259  Resolver: 'static + FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>,
260>(
261  env: sys::napi_env,
262  fut: Fut,
263  resolver: Resolver,
264  finalize_callback: Option<Box<dyn FnOnce(sys::napi_env)>>,
265) -> Result<sys::napi_value> {
266  let env = Env::from_raw(env);
267  let (mut deferred, promise) = JsDeferred::new(&env)?;
268  deferred.set_finalize_callback(finalize_callback);
269  #[cfg(any(
270    all(target_family = "wasm", tokio_unstable),
271    not(target_family = "wasm")
272  ))]
273  let deferred_for_panic = deferred.clone();
274  let sendable_resolver = SendableResolver::new(resolver);
275
276  let inner = async move {
277    match fut.await {
278      Ok(v) => deferred.resolve(move |env| {
279        sendable_resolver
280          .resolve(env.raw(), v)
281          .map(|v| unsafe { Unknown::from_raw_unchecked(env.raw(), v) })
282      }),
283      Err(e) => deferred.reject(e.into()),
284    }
285  };
286
287  #[cfg(any(
288    all(target_family = "wasm", tokio_unstable),
289    not(target_family = "wasm")
290  ))]
291  let jh = spawn(inner);
292
293  #[cfg(any(
294    all(target_family = "wasm", tokio_unstable),
295    not(target_family = "wasm")
296  ))]
297  spawn(async move {
298    if let Err(err) = jh.await {
299      if let Ok(reason) = err.try_into_panic() {
300        if let Some(s) = reason.downcast_ref::<&str>() {
301          deferred_for_panic.reject(Error::new(crate::Status::GenericFailure, s));
302        } else {
303          deferred_for_panic.reject(Error::new(
304            crate::Status::GenericFailure,
305            "Panic in async function",
306          ));
307        }
308      }
309    }
310  });
311
312  #[cfg(all(target_family = "wasm", not(tokio_unstable)))]
313  {
314    std::thread::spawn(|| {
315      block_on(inner);
316    });
317  }
318
319  Ok(promise.0.value)
320}
321
322#[cfg(feature = "noop")]
323#[doc(hidden)]
324pub fn execute_tokio_future_with_finalize_callback<
325  Data: 'static + Send,
326  Fut: 'static + Send + Future<Output = std::result::Result<Data, impl Into<Error>>>,
327  Resolver: 'static + FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>,
328>(
329  _env: sys::napi_env,
330  _fut: Fut,
331  _resolver: Resolver,
332  _finalize_callback: Option<Box<dyn FnOnce(sys::napi_env)>>,
333) -> Result<sys::napi_value> {
334  Ok(std::ptr::null_mut())
335}
336
337pub struct AsyncBlockBuilder<
338  V: Send + 'static,
339  F: Future<Output = Result<V>> + Send + 'static,
340  Dispose: FnOnce(Env) -> Result<()> + 'static = fn(Env) -> Result<()>,
341> {
342  inner: F,
343  dispose: Option<Dispose>,
344}
345
346impl<V: ToNapiValue + Send + 'static, F: Future<Output = Result<V>> + Send + 'static>
347  AsyncBlockBuilder<V, F>
348{
349  /// Create a new `AsyncBlockBuilder` with the given future, without dispose
350  pub fn new(inner: F) -> Self {
351    Self {
352      inner,
353      dispose: None,
354    }
355  }
356}
357
358impl<
359    V: ToNapiValue + Send + 'static,
360    F: Future<Output = Result<V>> + Send + 'static,
361    Dispose: FnOnce(Env) -> Result<()> + 'static,
362  > AsyncBlockBuilder<V, F, Dispose>
363{
364  pub fn with(inner: F) -> Self {
365    Self {
366      inner,
367      dispose: None,
368    }
369  }
370
371  pub fn with_dispose(mut self, dispose: Dispose) -> Self {
372    self.dispose = Some(dispose);
373    self
374  }
375
376  pub fn build(self, env: &Env) -> Result<AsyncBlock<V>> {
377    Ok(AsyncBlock {
378      inner: execute_tokio_future(env.0, self.inner, |env, v| unsafe {
379        if let Some(dispose) = self.dispose {
380          let env = Env::from_raw(env);
381          dispose(env)?;
382        }
383        V::to_napi_value(env, v)
384      })?,
385      _phantom: PhantomData,
386    })
387  }
388}
389
390impl<V: Send + 'static, F: Future<Output = Result<V>> + Send + 'static> AsyncBlockBuilder<V, F> {
391  /// Create a new `AsyncBlockBuilder` with the given future, without dispose
392  pub fn build_with_map<T: ToNapiValue, Map: FnOnce(Env, V) -> Result<T> + 'static>(
393    env: &Env,
394    inner: F,
395    map: Map,
396  ) -> Result<AsyncBlock<T>> {
397    Ok(AsyncBlock {
398      inner: execute_tokio_future(env.0, inner, |env, v| unsafe {
399        let v = map(Env::from_raw(env), v)?;
400        T::to_napi_value(env, v)
401      })?,
402      _phantom: PhantomData,
403    })
404  }
405}
406
407pub struct AsyncBlock<T: ToNapiValue + 'static> {
408  inner: sys::napi_value,
409  _phantom: PhantomData<T>,
410}
411
412impl<T: ToNapiValue + 'static> ToNapiValue for AsyncBlock<T> {
413  unsafe fn to_napi_value(_: napi_sys::napi_env, val: Self) -> Result<napi_sys::napi_value> {
414    Ok(val.inner)
415  }
416}