#![forbid(unsafe_code)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#[cfg(doctest)]
doc_comment::doctest!("../README.md");
use async_executor::{Executor, LocalExecutor};
use futures_lite::future;
use once_cell::sync::Lazy;
use std::{
future::Future,
sync::atomic::{AtomicBool, Ordering},
thread,
};
pub use async_executor::Task;
mod reactor {
pub(crate) fn block_on<F: std::future::Future<Output = T>, T>(future: F) -> T {
#[cfg(feature = "async-io")]
use async_io::block_on;
#[cfg(not(feature = "async-io"))]
use future::block_on;
#[cfg(feature = "tokio02")]
let future = async_compat::Compat::new(future);
block_on(future)
}
}
static GLOBAL_EXECUTOR_INIT: AtomicBool = AtomicBool::new(false);
static GLOBAL_EXECUTOR_THREADS: Lazy<()> = Lazy::new(init);
static GLOBAL_EXECUTOR: Executor<'_> = Executor::new();
thread_local! {
static LOCAL_EXECUTOR: LocalExecutor<'static> = LocalExecutor::new();
}
#[derive(Default, Debug)]
pub struct GlobalExecutorConfig {
pub env_var: Option<&'static str>,
pub default_threads: Option<usize>,
pub thread_name: Option<String>,
pub thread_name_prefix: Option<&'static str>,
}
impl GlobalExecutorConfig {
pub fn with_env_var(mut self, env_var: &'static str) -> Self {
self.env_var = Some(env_var);
self
}
pub fn with_default_threads(mut self, default_threads: usize) -> Self {
self.default_threads = Some(default_threads);
self
}
pub fn with_thread_name(mut self, thread_name: String) -> Self {
self.thread_name = Some(thread_name);
self
}
pub fn with_thread_name_prefix(mut self, thread_name_prefix: &'static str) -> Self {
self.thread_name_prefix = Some(thread_name_prefix);
self
}
}
pub fn init_with_config(config: GlobalExecutorConfig) {
if !GLOBAL_EXECUTOR_INIT.compare_and_swap(false, true, Ordering::AcqRel) {
let num_cpus = std::env::var(config.env_var.unwrap_or("ASYNC_GLOBAL_EXECUTOR_THREADS"))
.ok()
.and_then(|threads| threads.parse().ok())
.or(config.default_threads)
.unwrap_or_else(num_cpus::get)
.max(1);
for n in 1..=num_cpus {
thread::Builder::new()
.name(config.thread_name.clone().unwrap_or_else(|| {
format!(
"{}{}",
config
.thread_name_prefix
.unwrap_or("async-global-executor-"),
n
)
}))
.spawn(|| loop {
let _ = std::panic::catch_unwind(|| {
LOCAL_EXECUTOR.with(|executor| {
let local = executor.run(future::pending::<()>());
let global = GLOBAL_EXECUTOR.run(future::pending::<()>());
reactor::block_on(future::or(local, global))
})
});
})
.expect("cannot spawn executor thread");
}
}
}
pub fn init() {
init_with_config(GlobalExecutorConfig::default());
}
pub fn block_on<F: Future<Output = T>, T>(future: F) -> T {
LOCAL_EXECUTOR.with(|executor| reactor::block_on(executor.run(future)))
}
pub fn spawn<F: Future<Output = T> + Send + 'static, T: Send + 'static>(future: F) -> Task<T> {
Lazy::force(&GLOBAL_EXECUTOR_THREADS);
GLOBAL_EXECUTOR.spawn(future)
}
pub fn spawn_local<F: Future<Output = T> + 'static, T: 'static>(future: F) -> Task<T> {
LOCAL_EXECUTOR.with(|executor| executor.spawn(future))
}
#[cfg(all(test, feature = "tokio02"))]
mod test_tokio02 {
use super::*;
async fn compute() -> u8 {
tokio::spawn(async { 1 + 2 }).await.unwrap()
}
#[test]
fn spawn_tokio() {
block_on(async {
assert_eq!(
spawn(compute()).await
+ spawn_local(compute()).await
+ tokio::spawn(compute()).await.unwrap(),
9
);
});
}
}