#[cfg(not(feature = "noop"))]
use std::sync::{LazyLock, OnceLock, RwLock};
use std::{future::Future, marker::PhantomData};
use tokio::runtime::Runtime;
use crate::{bindgen_runtime::ToNapiValue, sys, Env, Error, Result};
#[cfg(not(feature = "noop"))]
use crate::{JsDeferred, SendableResolver, Unknown};
#[cfg(not(feature = "noop"))]
fn create_runtime() -> Runtime {
if IS_USER_DEFINED_RT.get().copied().unwrap_or(false) {
if let Some(user_defined_rt) = USER_DEFINED_RT
.get()
.and_then(|rt| rt.write().ok().and_then(|mut rt| rt.take()))
{
return user_defined_rt;
}
}
#[cfg(any(
all(target_family = "wasm", tokio_unstable),
not(target_family = "wasm")
))]
{
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Create tokio runtime failed")
}
#[cfg(all(target_family = "wasm", not(tokio_unstable)))]
{
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Create tokio runtime failed")
}
}
#[cfg(not(feature = "noop"))]
static RT: LazyLock<RwLock<Option<Runtime>>> =
LazyLock::new(|| RwLock::new(Some(create_runtime())));
#[cfg(not(feature = "noop"))]
static USER_DEFINED_RT: OnceLock<RwLock<Option<Runtime>>> = OnceLock::new();
#[cfg(not(feature = "noop"))]
static IS_USER_DEFINED_RT: OnceLock<bool> = OnceLock::new();
#[cfg(not(feature = "noop"))]
pub fn create_custom_tokio_runtime(rt: Runtime) {
USER_DEFINED_RT.get_or_init(move || RwLock::new(Some(rt)));
IS_USER_DEFINED_RT.get_or_init(|| true);
}
#[cfg(feature = "noop")]
pub fn create_custom_tokio_runtime(_: Runtime) {}
#[cfg(not(feature = "noop"))]
pub fn start_async_runtime() {
if let Ok(mut rt) = RT.write() {
if rt.is_none() {
*rt = Some(create_runtime());
}
}
}
#[cfg(not(feature = "noop"))]
pub fn shutdown_async_runtime() {
if let Some(rt) = RT.write().ok().and_then(|mut rt| rt.take()) {
rt.shutdown_background();
}
}
#[cfg(not(feature = "noop"))]
pub fn spawn<F>(fut: F) -> tokio::task::JoinHandle<F::Output>
where
F: 'static + Send + Future<Output = ()>,
{
RT.read()
.ok()
.and_then(|rt| rt.as_ref().map(|rt| rt.spawn(fut)))
.expect("Access tokio runtime failed in spawn")
}
#[cfg(not(feature = "noop"))]
pub fn block_on<F: Future>(fut: F) -> F::Output {
RT.read()
.ok()
.and_then(|rt| rt.as_ref().map(|rt| rt.block_on(fut)))
.expect("Access tokio runtime failed in block_on")
}
#[cfg(feature = "noop")]
pub fn block_on<F: Future>(_: F) -> F::Output {
unreachable!("noop feature is enabled, block_on is not available")
}
#[cfg(not(feature = "noop"))]
pub fn spawn_blocking<F, R>(func: F) -> tokio::task::JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
RT.read()
.ok()
.and_then(|rt| rt.as_ref().map(|rt| rt.spawn_blocking(func)))
.expect("Access tokio runtime failed in spawn_blocking")
}
#[cfg(not(feature = "noop"))]
#[cfg(not(feature = "noop"))]
pub fn within_runtime_if_available<F: FnOnce() -> T, T>(f: F) -> T {
RT.read()
.ok()
.and_then(|rt| {
rt.as_ref().map(|rt| {
let rt_guard = rt.enter();
let ret = f();
drop(rt_guard);
ret
})
})
.expect("Access tokio runtime failed in within_runtime_if_available")
}
#[cfg(feature = "noop")]
pub fn within_runtime_if_available<F: FnOnce() -> T, T>(f: F) -> T {
f()
}
#[cfg(feature = "noop")]
#[allow(unused)]
pub fn execute_tokio_future<
Data: 'static + Send,
Fut: 'static + Send + Future<Output = std::result::Result<Data, impl Into<Error>>>,
Resolver: 'static + FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>,
>(
env: sys::napi_env,
fut: Fut,
resolver: Resolver,
) -> Result<sys::napi_value> {
Ok(std::ptr::null_mut())
}
#[cfg(not(feature = "noop"))]
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub fn execute_tokio_future<
Data: 'static + Send,
Fut: 'static + Send + Future<Output = std::result::Result<Data, impl Into<Error>>>,
Resolver: 'static + FnOnce(sys::napi_env, Data) -> Result<sys::napi_value>,
>(
env: sys::napi_env,
fut: Fut,
resolver: Resolver,
) -> Result<sys::napi_value> {
let env = Env::from_raw(env);
let (deferred, promise) = JsDeferred::new(&env)?;
#[cfg(any(
all(target_family = "wasm", tokio_unstable),
not(target_family = "wasm")
))]
let deferred_for_panic = deferred.clone();
let sendable_resolver = SendableResolver::new(resolver);
let inner = async move {
match fut.await {
Ok(v) => deferred.resolve(move |env| {
sendable_resolver
.resolve(env.raw(), v)
.map(|v| unsafe { Unknown::from_raw_unchecked(env.raw(), v) })
}),
Err(e) => deferred.reject(e.into()),
}
};
#[cfg(any(
all(target_family = "wasm", tokio_unstable),
not(target_family = "wasm")
))]
let jh = spawn(inner);
#[cfg(any(
all(target_family = "wasm", tokio_unstable),
not(target_family = "wasm")
))]
spawn(async move {
if let Err(err) = jh.await {
if let Ok(reason) = err.try_into_panic() {
if let Some(s) = reason.downcast_ref::<&str>() {
deferred_for_panic.reject(Error::new(crate::Status::GenericFailure, s));
} else {
deferred_for_panic.reject(Error::new(
crate::Status::GenericFailure,
"Panic in async function",
));
}
}
}
});
#[cfg(all(target_family = "wasm", not(tokio_unstable)))]
{
std::thread::spawn(|| {
block_on(inner);
});
}
Ok(promise.0.value)
}
pub struct AsyncBlockBuilder<
V: Send + 'static,
F: Future<Output = Result<V>> + Send + 'static,
Dispose: FnOnce(Env) -> Result<()> + 'static = fn(Env) -> Result<()>,
> {
inner: F,
dispose: Option<Dispose>,
}
impl<V: ToNapiValue + Send + 'static, F: Future<Output = Result<V>> + Send + 'static>
AsyncBlockBuilder<V, F>
{
pub fn new(inner: F) -> Self {
Self {
inner,
dispose: None,
}
}
}
impl<
V: ToNapiValue + Send + 'static,
F: Future<Output = Result<V>> + Send + 'static,
Dispose: FnOnce(Env) -> Result<()> + 'static,
> AsyncBlockBuilder<V, F, Dispose>
{
pub fn with(inner: F) -> Self {
Self {
inner,
dispose: None,
}
}
pub fn with_dispose(mut self, dispose: Dispose) -> Self {
self.dispose = Some(dispose);
self
}
pub fn build(self, env: &Env) -> Result<AsyncBlock<V>> {
Ok(AsyncBlock {
inner: execute_tokio_future(env.0, self.inner, |env, v| unsafe {
if let Some(dispose) = self.dispose {
let env = Env::from_raw(env);
dispose(env)?;
}
V::to_napi_value(env, v)
})?,
_phantom: PhantomData,
})
}
}
impl<V: Send + 'static, F: Future<Output = Result<V>> + Send + 'static> AsyncBlockBuilder<V, F> {
pub fn build_with_map<T: ToNapiValue, Map: FnOnce(Env, V) -> Result<T> + 'static>(
env: &Env,
inner: F,
map: Map,
) -> Result<AsyncBlock<T>> {
Ok(AsyncBlock {
inner: execute_tokio_future(env.0, inner, |env, v| unsafe {
let v = map(Env::from_raw(env), v)?;
T::to_napi_value(env, v)
})?,
_phantom: PhantomData,
})
}
}
pub struct AsyncBlock<T: ToNapiValue + 'static> {
inner: sys::napi_value,
_phantom: PhantomData<T>,
}
impl<T: ToNapiValue + 'static> ToNapiValue for AsyncBlock<T> {
unsafe fn to_napi_value(_: napi_sys::napi_env, val: Self) -> Result<napi_sys::napi_value> {
Ok(val.inner)
}
}