#![forbid(unsafe_code)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#[cfg(doctest)]
doc_comment::doctest!("../README.md");
use async_executor::{Executor, LocalExecutor};
use futures_lite::future;
use once_cell::sync::Lazy;
use std::{
future::Future,
sync::atomic::{AtomicBool, Ordering},
thread,
};
pub use async_executor::Task;
static GLOBAL_EXECUTOR_INIT: AtomicBool = AtomicBool::new(false);
static GLOBAL_EXECUTOR_THREADS: Lazy<()> = Lazy::new(init);
static GLOBAL_EXECUTOR: Executor = Executor::new();
thread_local! {
static LOCAL_EXECUTOR: LocalExecutor = LocalExecutor::new();
}
#[derive(Default, Debug)]
pub struct GlobalExecutorConfig {
pub env_var: Option<&'static str>,
pub default_threads: Option<usize>,
pub thread_name: Option<String>,
pub thread_name_prefix: Option<&'static str>,
}
impl GlobalExecutorConfig {
pub fn with_env_var(mut self, env_var: &'static str) -> Self {
self.env_var = Some(env_var);
self
}
pub fn with_default_threads(mut self, default_threads: usize) -> Self {
self.default_threads = Some(default_threads);
self
}
pub fn with_thread_name(mut self, thread_name: String) -> Self {
self.thread_name = Some(thread_name);
self
}
pub fn with_thread_name_prefix(mut self, thread_name_prefix: &'static str) -> Self {
self.thread_name_prefix = Some(thread_name_prefix);
self
}
}
pub fn init_with_config(config: GlobalExecutorConfig) {
if !GLOBAL_EXECUTOR_INIT.compare_and_swap(false, true, Ordering::AcqRel) {
let num_cpus = std::env::var(config.env_var.unwrap_or("ASYNC_GLOBAL_EXECUTOR_THREADS"))
.ok()
.and_then(|threads| threads.parse().ok())
.unwrap_or(config.default_threads.unwrap_or_else(num_cpus::get))
.max(1);
for n in 1..=num_cpus {
thread::Builder::new()
.name(config.thread_name.clone().unwrap_or_else(|| {
format!(
"{}{}",
config
.thread_name_prefix
.unwrap_or("async-global-executor-"),
n
)
}))
.spawn(|| loop {
let _ = std::panic::catch_unwind(|| block_on(future::pending::<()>()));
})
.expect("cannot spawn executor thread");
}
}
}
pub fn init() {
init_with_config(GlobalExecutorConfig::default());
}
pub fn block_on<F: Future<Output = T>, T>(future: F) -> T {
Lazy::force(&GLOBAL_EXECUTOR_THREADS);
LOCAL_EXECUTOR.with(|executor| {
let (s, r) = async_channel::bounded::<()>(1);
let future = async move {
let _s = s;
future.await
};
let global = {
let r = r.clone();
GLOBAL_EXECUTOR.run(async move { r.recv().await })
};
let local = executor.run(r.recv());
let executors = future::zip(global, local);
let all = future::zip(executors, future);
async_io::block_on(all).1
})
}
pub fn spawn<F: Future<Output = T> + Send + 'static, T: Send + 'static>(future: F) -> Task<T> {
Lazy::force(&GLOBAL_EXECUTOR_THREADS);
GLOBAL_EXECUTOR.spawn(future)
}
pub fn spawn_local<F: Future<Output = T> + 'static, T: 'static>(future: F) -> Task<T> {
Lazy::force(&GLOBAL_EXECUTOR_THREADS);
LOCAL_EXECUTOR.with(|executor| executor.spawn(future))
}