1#[cfg(not(feature = "noop"))]
2use std::sync::{LazyLock, OnceLock, RwLock};
3use std::{future::Future, marker::PhantomData};
4
5use tokio::runtime::Runtime;
6
7use crate::{bindgen_runtime::ToNapiValue, sys, Env, Error, Result};
8#[cfg(not(feature = "noop"))]
9use crate::{JsDeferred, SendableResolver, Unknown};
10
11#[cfg(not(feature = "noop"))]
12fn create_runtime() -> Runtime {
13 if IS_USER_DEFINED_RT.get().copied().unwrap_or(false) {
15 if let Some(user_defined_rt) = USER_DEFINED_RT
17 .get()
18 .and_then(|rt| rt.write().ok().and_then(|mut rt| rt.take()))
19 {
20 return user_defined_rt;
21 }
22 }
25
26 #[cfg(any(
27 all(target_family = "wasm", tokio_unstable),
28 not(target_family = "wasm")
29 ))]
30 {
31 tokio::runtime::Builder::new_multi_thread()
32 .enable_all()
33 .build()
34 .expect("Create tokio runtime failed")
35 }
36 #[cfg(all(target_family = "wasm", not(tokio_unstable)))]
37 {
38 tokio::runtime::Builder::new_current_thread()
39 .enable_all()
40 .build()
41 .expect("Create tokio runtime failed")
42 }
43}
44
45#[cfg(not(feature = "noop"))]
46static RT: LazyLock<RwLock<Option<Runtime>>> =
47 LazyLock::new(|| RwLock::new(Some(create_runtime())));
48
49#[cfg(not(feature = "noop"))]
50static USER_DEFINED_RT: OnceLock<RwLock<Option<Runtime>>> = OnceLock::new();
51
52#[cfg(not(feature = "noop"))]
53static IS_USER_DEFINED_RT: OnceLock<bool> = OnceLock::new();
54
55#[cfg(not(feature = "noop"))]
56pub fn create_custom_tokio_runtime(rt: Runtime) {
69 USER_DEFINED_RT.get_or_init(move || RwLock::new(Some(rt)));
70 IS_USER_DEFINED_RT.get_or_init(|| true);
71}
72
73#[cfg(feature = "noop")]
74pub fn create_custom_tokio_runtime(_: Runtime) {}
75
76#[cfg(not(feature = "noop"))]
77pub fn start_async_runtime() {
87 if let Ok(mut rt) = RT.write() {
88 if rt.is_none() {
89 *rt = Some(create_runtime());
90 }
91 }
92}
93
94#[cfg(not(feature = "noop"))]
95pub fn shutdown_async_runtime() {
96 if let Some(rt) = RT.write().ok().and_then(|mut rt| rt.take()) {
97 rt.shutdown_background();
98 }
99}
100
101#[cfg(not(feature = "noop"))]
102pub fn spawn<F>(fut: F) -> tokio::task::JoinHandle<F::Output>
107where
108 F: 'static + Send + Future<Output = ()>,
109{
110 RT.read()
111 .ok()
112 .and_then(|rt| rt.as_ref().map(|rt| rt.spawn(fut)))
113 .expect("Access tokio runtime failed in spawn")
114}
115
116#[cfg(not(feature = "noop"))]
117pub fn block_on<F: Future>(fut: F) -> F::Output {
121 RT.read()
122 .ok()
123 .and_then(|rt| rt.as_ref().map(|rt| rt.block_on(fut)))
124 .expect("Access tokio runtime failed in block_on")
125}
126
127#[cfg(feature = "noop")]
128pub fn block_on<F: Future>(_: F) -> F::Output {
132 unreachable!("noop feature is enabled, block_on is not available")
133}
134
135#[cfg(not(feature = "noop"))]
136pub fn spawn_blocking<F, R>(func: F) -> tokio::task::JoinHandle<R>
138where
139 F: FnOnce() -> R + Send + 'static,
140 R: Send + 'static,
141{
142 RT.read()
143 .ok()
144 .and_then(|rt| rt.as_ref().map(|rt| rt.spawn_blocking(func)))
145 .expect("Access tokio runtime failed in spawn_blocking")
146}
147
148#[cfg(not(feature = "noop"))]
149#[cfg(not(feature = "noop"))]
152pub fn within_runtime_if_available<F: FnOnce() -> T, T>(f: F) -> T {
155 RT.read()
156 .ok()
157 .and_then(|rt| {
158 rt.as_ref().map(|rt| {
159 let rt_guard = rt.enter();
160 let ret = f();
161 drop(rt_guard);
162 ret
163 })
164 })
165 .expect("Access tokio runtime failed in within_runtime_if_available")
166}
167
168#[cfg(feature = "noop")]
169pub fn within_runtime_if_available<F: FnOnce() -> T, T>(f: F) -> T {
170 f()
171}
172
173#[cfg(feature = "noop")]
174#[allow(unused)]
175pub fn execute_tokio_future<
176 Data: 'static + Send,
177 Fut: 'static + Send + Future<Output = std::result::Result<Data, impl Into<Error>>>,
178 Resolver: 'static + FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>,
179>(
180 env: sys::napi_env,
181 fut: Fut,
182 resolver: Resolver,
183) -> Result<sys::napi_value> {
184 Ok(std::ptr::null_mut())
185}
186
187#[cfg(not(feature = "noop"))]
188#[allow(clippy::not_unsafe_ptr_arg_deref)]
189pub fn execute_tokio_future<
190 Data: 'static + Send,
191 Fut: 'static + Send + Future<Output = std::result::Result<Data, impl Into<Error>>>,
192 Resolver: 'static + FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>,
193>(
194 env: sys::napi_env,
195 fut: Fut,
196 resolver: Resolver,
197) -> Result<sys::napi_value> {
198 let env = Env::from_raw(env);
199 let (deferred, promise) = JsDeferred::new(&env)?;
200 #[cfg(any(
201 all(target_family = "wasm", tokio_unstable),
202 not(target_family = "wasm")
203 ))]
204 let deferred_for_panic = deferred.clone();
205 let sendable_resolver = SendableResolver::new(resolver);
206
207 let inner = async move {
208 match fut.await {
209 Ok(v) => deferred.resolve(move |env| {
210 sendable_resolver
211 .resolve(env.raw(), v)
212 .map(|v| unsafe { Unknown::from_raw_unchecked(env.raw(), v) })
213 }),
214 Err(e) => deferred.reject(e.into()),
215 }
216 };
217
218 #[cfg(any(
219 all(target_family = "wasm", tokio_unstable),
220 not(target_family = "wasm")
221 ))]
222 let jh = spawn(inner);
223
224 #[cfg(any(
225 all(target_family = "wasm", tokio_unstable),
226 not(target_family = "wasm")
227 ))]
228 spawn(async move {
229 if let Err(err) = jh.await {
230 if let Ok(reason) = err.try_into_panic() {
231 if let Some(s) = reason.downcast_ref::<&str>() {
232 deferred_for_panic.reject(Error::new(crate::Status::GenericFailure, s));
233 } else {
234 deferred_for_panic.reject(Error::new(
235 crate::Status::GenericFailure,
236 "Panic in async function",
237 ));
238 }
239 }
240 }
241 });
242
243 #[cfg(all(target_family = "wasm", not(tokio_unstable)))]
244 {
245 std::thread::spawn(|| {
246 block_on(inner);
247 });
248 }
249
250 Ok(promise.0.value)
251}
252
253#[doc(hidden)]
254#[cfg(not(feature = "noop"))]
255#[allow(clippy::not_unsafe_ptr_arg_deref)]
256pub fn execute_tokio_future_with_finalize_callback<
257 Data: 'static + Send,
258 Fut: 'static + Send + Future<Output = std::result::Result<Data, impl Into<Error>>>,
259 Resolver: 'static + FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>,
260>(
261 env: sys::napi_env,
262 fut: Fut,
263 resolver: Resolver,
264 finalize_callback: Option<Box<dyn FnOnce(sys::napi_env)>>,
265) -> Result<sys::napi_value> {
266 let env = Env::from_raw(env);
267 let (mut deferred, promise) = JsDeferred::new(&env)?;
268 deferred.set_finalize_callback(finalize_callback);
269 #[cfg(any(
270 all(target_family = "wasm", tokio_unstable),
271 not(target_family = "wasm")
272 ))]
273 let deferred_for_panic = deferred.clone();
274 let sendable_resolver = SendableResolver::new(resolver);
275
276 let inner = async move {
277 match fut.await {
278 Ok(v) => deferred.resolve(move |env| {
279 sendable_resolver
280 .resolve(env.raw(), v)
281 .map(|v| unsafe { Unknown::from_raw_unchecked(env.raw(), v) })
282 }),
283 Err(e) => deferred.reject(e.into()),
284 }
285 };
286
287 #[cfg(any(
288 all(target_family = "wasm", tokio_unstable),
289 not(target_family = "wasm")
290 ))]
291 let jh = spawn(inner);
292
293 #[cfg(any(
294 all(target_family = "wasm", tokio_unstable),
295 not(target_family = "wasm")
296 ))]
297 spawn(async move {
298 if let Err(err) = jh.await {
299 if let Ok(reason) = err.try_into_panic() {
300 if let Some(s) = reason.downcast_ref::<&str>() {
301 deferred_for_panic.reject(Error::new(crate::Status::GenericFailure, s));
302 } else {
303 deferred_for_panic.reject(Error::new(
304 crate::Status::GenericFailure,
305 "Panic in async function",
306 ));
307 }
308 }
309 }
310 });
311
312 #[cfg(all(target_family = "wasm", not(tokio_unstable)))]
313 {
314 std::thread::spawn(|| {
315 block_on(inner);
316 });
317 }
318
319 Ok(promise.0.value)
320}
321
322#[cfg(feature = "noop")]
323#[doc(hidden)]
324pub fn execute_tokio_future_with_finalize_callback<
325 Data: 'static + Send,
326 Fut: 'static + Send + Future<Output = std::result::Result<Data, impl Into<Error>>>,
327 Resolver: 'static + FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>,
328>(
329 _env: sys::napi_env,
330 _fut: Fut,
331 _resolver: Resolver,
332 _finalize_callback: Option<Box<dyn FnOnce(sys::napi_env)>>,
333) -> Result<sys::napi_value> {
334 Ok(std::ptr::null_mut())
335}
336
337pub struct AsyncBlockBuilder<
338 V: Send + 'static,
339 F: Future<Output = Result<V>> + Send + 'static,
340 Dispose: FnOnce(Env) -> Result<()> + 'static = fn(Env) -> Result<()>,
341> {
342 inner: F,
343 dispose: Option<Dispose>,
344}
345
346impl<V: ToNapiValue + Send + 'static, F: Future<Output = Result<V>> + Send + 'static>
347 AsyncBlockBuilder<V, F>
348{
349 pub fn new(inner: F) -> Self {
351 Self {
352 inner,
353 dispose: None,
354 }
355 }
356}
357
358impl<
359 V: ToNapiValue + Send + 'static,
360 F: Future<Output = Result<V>> + Send + 'static,
361 Dispose: FnOnce(Env) -> Result<()> + 'static,
362 > AsyncBlockBuilder<V, F, Dispose>
363{
364 pub fn with(inner: F) -> Self {
365 Self {
366 inner,
367 dispose: None,
368 }
369 }
370
371 pub fn with_dispose(mut self, dispose: Dispose) -> Self {
372 self.dispose = Some(dispose);
373 self
374 }
375
376 pub fn build(self, env: &Env) -> Result<AsyncBlock<V>> {
377 Ok(AsyncBlock {
378 inner: execute_tokio_future(env.0, self.inner, |env, v| unsafe {
379 if let Some(dispose) = self.dispose {
380 let env = Env::from_raw(env);
381 dispose(env)?;
382 }
383 V::to_napi_value(env, v)
384 })?,
385 _phantom: PhantomData,
386 })
387 }
388}
389
390impl<V: Send + 'static, F: Future<Output = Result<V>> + Send + 'static> AsyncBlockBuilder<V, F> {
391 pub fn build_with_map<T: ToNapiValue, Map: FnOnce(Env, V) -> Result<T> + 'static>(
393 env: &Env,
394 inner: F,
395 map: Map,
396 ) -> Result<AsyncBlock<T>> {
397 Ok(AsyncBlock {
398 inner: execute_tokio_future(env.0, inner, |env, v| unsafe {
399 let v = map(Env::from_raw(env), v)?;
400 T::to_napi_value(env, v)
401 })?,
402 _phantom: PhantomData,
403 })
404 }
405}
406
407pub struct AsyncBlock<T: ToNapiValue + 'static> {
408 inner: sys::napi_value,
409 _phantom: PhantomData<T>,
410}
411
412impl<T: ToNapiValue + 'static> ToNapiValue for AsyncBlock<T> {
413 unsafe fn to_napi_value(_: napi_sys::napi_env, val: Self) -> Result<napi_sys::napi_value> {
414 Ok(val.inner)
415 }
416}