#![forbid(unsafe_code)]
#![deny(missing_docs)]
#![cfg_attr(docsrs, feature(doc_cfg))]
use std::{future::Future, pin::Pin, sync::OnceLock};
use thiserror::Error;
pub type PinnedFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
pub type PinnedLocalFuture<T> = Pin<Box<dyn Future<Output = T>>>;
type SpawnFn = fn(PinnedFuture<()>);
type SpawnLocalFn = fn(PinnedLocalFuture<()>);
type PollLocalFn = fn();
#[derive(Clone, Copy)]
struct ExecutorFns {
spawn: SpawnFn,
spawn_local: SpawnLocalFn,
poll_local: PollLocalFn,
}
static EXECUTOR_FNS: OnceLock<ExecutorFns> = OnceLock::new();
#[cfg(any(feature = "tokio", feature = "wasm-bindgen", feature = "glib"))]
#[cold]
#[inline(never)]
fn no_op_poll() {}
#[cfg(all(not(feature = "wasm-bindgen"), not(debug_assertions)))]
#[cold]
#[inline(never)]
fn no_op_spawn(_: PinnedFuture<()>) {
#[cfg(debug_assertions)]
eprintln!(
"Warning: Executor::spawn called, but no global 'spawn' function is \
configured (perhaps only spawn_local is supported, e.g., on wasm \
without threading?)."
);
}
#[cfg(feature = "wasm-bindgen")]
#[cold]
#[inline(never)]
fn no_op_spawn(_: PinnedFuture<()>) {
panic!(
"Executor::spawn called, but no global 'spawn' function is configured."
);
}
#[cfg(not(debug_assertions))]
#[cold]
#[inline(never)]
fn no_op_spawn_local(_: PinnedLocalFuture<()>) {
panic!(
"Executor::spawn_local called, but no global 'spawn_local' function \
is configured."
);
}
#[derive(Error, Debug)]
pub enum ExecutorError {
#[error("Global executor has already been set.")]
AlreadySet,
}
pub struct Executor;
impl Executor {
#[inline(always)]
#[track_caller]
pub fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
let pinned_fut = Box::pin(fut);
if let Some(fns) = EXECUTOR_FNS.get() {
(fns.spawn)(pinned_fut)
} else {
handle_uninitialized_spawn(pinned_fut);
}
}
#[inline(always)]
#[track_caller]
pub fn spawn_local(fut: impl Future<Output = ()> + 'static) {
let pinned_fut = Box::pin(fut);
if let Some(fns) = EXECUTOR_FNS.get() {
(fns.spawn_local)(pinned_fut)
} else {
handle_uninitialized_spawn_local(pinned_fut);
}
}
#[inline(always)]
pub async fn tick() {
let (tx, rx) = futures::channel::oneshot::channel();
#[cfg(not(all(feature = "wasm-bindgen", target_family = "wasm")))]
Executor::spawn(async move {
_ = tx.send(());
});
#[cfg(all(feature = "wasm-bindgen", target_family = "wasm"))]
Executor::spawn_local(async move {
_ = tx.send(());
});
_ = rx.await;
}
#[inline(always)]
pub fn poll_local() {
if let Some(fns) = EXECUTOR_FNS.get() {
(fns.poll_local)()
}
}
}
impl Executor {
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub fn init_tokio() -> Result<(), ExecutorError> {
let executor_impl = ExecutorFns {
spawn: |fut| {
tokio::spawn(fut);
},
spawn_local: |fut| {
tokio::task::spawn_local(fut);
},
poll_local: no_op_poll,
};
EXECUTOR_FNS
.set(executor_impl)
.map_err(|_| ExecutorError::AlreadySet)
}
#[cfg(feature = "wasm-bindgen")]
#[cfg_attr(docsrs, doc(cfg(feature = "wasm-bindgen")))]
pub fn init_wasm_bindgen() -> Result<(), ExecutorError> {
let executor_impl = ExecutorFns {
spawn: no_op_spawn,
spawn_local: |fut| {
wasm_bindgen_futures::spawn_local(fut);
},
poll_local: no_op_poll,
};
EXECUTOR_FNS
.set(executor_impl)
.map_err(|_| ExecutorError::AlreadySet)
}
#[cfg(feature = "glib")]
#[cfg_attr(docsrs, doc(cfg(feature = "glib")))]
pub fn init_glib() -> Result<(), ExecutorError> {
let executor_impl = ExecutorFns {
spawn: |fut| {
let main_context = glib::MainContext::default();
main_context.spawn(fut);
},
spawn_local: |fut| {
let main_context = glib::MainContext::default();
main_context.spawn_local(fut);
},
poll_local: no_op_poll,
};
EXECUTOR_FNS
.set(executor_impl)
.map_err(|_| ExecutorError::AlreadySet)
}
#[cfg(feature = "futures-executor")]
#[cfg_attr(docsrs, doc(cfg(feature = "futures-executor")))]
pub fn init_futures_executor() -> Result<(), ExecutorError> {
use futures::{
executor::{LocalPool, LocalSpawner, ThreadPool},
task::{LocalSpawnExt, SpawnExt},
};
use std::cell::RefCell;
static THREAD_POOL: OnceLock<ThreadPool> = OnceLock::new();
thread_local! {
static LOCAL_POOL: RefCell<LocalPool> = RefCell::new(LocalPool::new());
static SPAWNER: LocalSpawner = LOCAL_POOL.with(|pool| pool.borrow().spawner());
}
fn get_thread_pool() -> &'static ThreadPool {
THREAD_POOL.get_or_init(|| {
ThreadPool::new()
.expect("could not create futures executor ThreadPool")
})
}
let executor_impl = ExecutorFns {
spawn: |fut| {
get_thread_pool()
.spawn(fut)
.expect("failed to spawn future on ThreadPool");
},
spawn_local: |fut| {
SPAWNER.with(|spawner| {
spawner
.spawn_local(fut)
.expect("failed to spawn local future");
});
},
poll_local: || {
LOCAL_POOL.with(|pool| {
if let Ok(mut pool) = pool.try_borrow_mut() {
pool.run_until_stalled();
}
});
},
};
EXECUTOR_FNS
.set(executor_impl)
.map_err(|_| ExecutorError::AlreadySet)
}
#[cfg(feature = "async-executor")]
#[cfg_attr(docsrs, doc(cfg(feature = "async-executor")))]
pub fn init_async_executor() -> Result<(), ExecutorError> {
use async_executor::{Executor as AsyncExecutor, LocalExecutor};
static ASYNC_EXECUTOR: OnceLock<AsyncExecutor<'static>> =
OnceLock::new();
thread_local! {
static LOCAL_EXECUTOR_POOL: LocalExecutor<'static> = const { LocalExecutor::new() };
}
fn get_async_executor() -> &'static AsyncExecutor<'static> {
ASYNC_EXECUTOR.get_or_init(AsyncExecutor::new)
}
let executor_impl = ExecutorFns {
spawn: |fut| {
get_async_executor().spawn(fut).detach();
},
spawn_local: |fut| {
LOCAL_EXECUTOR_POOL.with(|pool| pool.spawn(fut).detach());
},
poll_local: || {
LOCAL_EXECUTOR_POOL.with(|pool| {
pool.try_tick();
});
},
};
EXECUTOR_FNS
.set(executor_impl)
.map_err(|_| ExecutorError::AlreadySet)
}
pub fn init_custom_executor(
custom_executor: impl CustomExecutor + Send + Sync + 'static,
) -> Result<(), ExecutorError> {
static CUSTOM_EXECUTOR_INSTANCE: OnceLock<
Box<dyn CustomExecutor + Send + Sync>,
> = OnceLock::new();
CUSTOM_EXECUTOR_INSTANCE
.set(Box::new(custom_executor))
.map_err(|_| ExecutorError::AlreadySet)?;
let executor_impl = ExecutorFns {
spawn: |fut| {
CUSTOM_EXECUTOR_INSTANCE.get().unwrap().spawn(fut);
},
spawn_local: |fut| {
CUSTOM_EXECUTOR_INSTANCE.get().unwrap().spawn_local(fut);
},
poll_local: || {
CUSTOM_EXECUTOR_INSTANCE.get().unwrap().poll_local();
},
};
EXECUTOR_FNS
.set(executor_impl)
.map_err(|_| ExecutorError::AlreadySet)
}
pub fn init_local_custom_executor(
custom_executor: impl CustomExecutor + 'static,
) -> Result<(), ExecutorError> {
thread_local! {
static CUSTOM_EXECUTOR_INSTANCE: OnceLock<
Box<dyn CustomExecutor>,
> = OnceLock::new();
};
CUSTOM_EXECUTOR_INSTANCE.with(|this| {
this.set(Box::new(custom_executor))
.map_err(|_| ExecutorError::AlreadySet)
})?;
let executor_impl = ExecutorFns {
spawn: |fut| {
CUSTOM_EXECUTOR_INSTANCE
.with(|this| this.get().unwrap().spawn(fut));
},
spawn_local: |fut| {
CUSTOM_EXECUTOR_INSTANCE
.with(|this| this.get().unwrap().spawn_local(fut));
},
poll_local: || {
CUSTOM_EXECUTOR_INSTANCE
.with(|this| this.get().unwrap().poll_local());
},
};
EXECUTOR_FNS
.set(executor_impl)
.map_err(|_| ExecutorError::AlreadySet)
}
}
pub trait CustomExecutor {
fn spawn(&self, fut: PinnedFuture<()>);
fn spawn_local(&self, fut: PinnedLocalFuture<()>);
fn poll_local(&self);
}
#[allow(dead_code)]
fn test_object_safety(_: Box<dyn CustomExecutor + Send + Sync>) {}
#[cold] #[inline(never)]
#[track_caller]
fn handle_uninitialized_spawn(_fut: PinnedFuture<()>) {
let caller = std::panic::Location::caller();
#[cfg(all(debug_assertions, feature = "tracing"))]
{
tracing::error!(
target: "any_spawner",
spawn_caller=%caller,
"Executor::spawn called before a global executor was initialized. Task dropped."
);
drop(_fut);
}
#[cfg(all(debug_assertions, not(feature = "tracing")))]
{
panic!(
"At {caller}, tried to spawn a Future with Executor::spawn() \
before a global executor was initialized."
);
}
#[cfg(not(debug_assertions))]
{
no_op_spawn(_fut);
}
}
#[cold] #[inline(never)]
#[track_caller]
fn handle_uninitialized_spawn_local(_fut: PinnedLocalFuture<()>) {
let caller = std::panic::Location::caller();
#[cfg(all(debug_assertions, feature = "tracing"))]
{
tracing::error!(
target: "any_spawner",
spawn_caller=%caller,
"Executor::spawn_local called before a global executor was initialized. \
Task likely dropped or panicked."
);
}
#[cfg(all(debug_assertions, not(feature = "tracing")))]
{
panic!(
"At {caller}, tried to spawn a Future with \
Executor::spawn_local() before a global executor was initialized."
);
}
#[cfg(not(debug_assertions))]
{
no_op_spawn_local(_fut);
}
}