use crate::Task;
use async_channel::{Receiver, Sender};
use async_lock::Mutex;
use futures_lite::future;
use once_cell::sync::OnceCell;
use std::{io, thread};
static GLOBAL_EXECUTOR_THREADS_NUMBER: Mutex<usize> = Mutex::new(0);
static GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER: Mutex<usize> = Mutex::new(0);
thread_local! {
static THREAD_SHUTDOWN: OnceCell<(Sender<()>, Receiver<()>)> = OnceCell::new();
}
pub async fn spawn_more_threads(count: usize) -> io::Result<usize> {
let config = crate::config::GLOBAL_EXECUTOR_CONFIG
.get()
.unwrap_or_else(|| {
crate::init();
crate::config::GLOBAL_EXECUTOR_CONFIG.get().unwrap()
});
let mut threads_number = GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await;
let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await;
let count = count.min(config.max_threads - *threads_number);
for _ in 0..count {
thread::Builder::new()
.name((config.thread_name_fn)())
.spawn(thread_main_loop)?;
*threads_number += 1;
*expected_threads_number += 1;
}
Ok(count)
}
pub fn stop_thread() -> Task<bool> {
crate::spawn(stop_current_executor_thread())
}
pub fn stop_current_thread() -> Task<bool> {
crate::spawn_local(stop_current_executor_thread())
}
fn thread_main_loop() {
let (s, r) = async_channel::bounded(1);
let (s_ack, r_ack) = async_channel::bounded(1);
THREAD_SHUTDOWN.with(|thread_shutdown| drop(thread_shutdown.set((s, r_ack))));
loop {
#[allow(clippy::blocks_in_if_conditions)]
if std::panic::catch_unwind(|| {
crate::executor::LOCAL_EXECUTOR.with(|executor| {
let local = executor.run(async {
let _ = r.recv().await;
});
let global = crate::executor::GLOBAL_EXECUTOR.run(future::pending::<()>());
crate::reactor::block_on(future::or(local, global));
});
})
.is_ok()
{
break;
}
}
wait_for_local_executor_completion();
crate::reactor::block_on(async {
let _ = s_ack.send(()).await;
});
}
fn wait_for_local_executor_completion() {
loop {
#[allow(clippy::blocks_in_if_conditions)]
if std::panic::catch_unwind(|| {
crate::executor::LOCAL_EXECUTOR.with(|executor| {
crate::reactor::block_on(async {
while !executor.is_empty() {
executor.tick().await;
}
});
});
})
.is_ok()
{
break;
}
}
}
async fn stop_current_executor_thread() -> bool {
let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await;
if *expected_threads_number
> crate::config::GLOBAL_EXECUTOR_CONFIG
.get()
.unwrap()
.min_threads
{
let (s, r_ack) =
THREAD_SHUTDOWN.with(|thread_shutdown| thread_shutdown.get().unwrap().clone());
let _ = s.send(()).await;
*expected_threads_number -= 1;
drop(expected_threads_number);
let _ = r_ack.recv().await;
*GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await -= 1;
true
} else {
false
}
}