use std::cell::UnsafeCell;
use std::iter;
use std::thread;
use std::time::Duration;
use crossbeam_deque::{Injector, Stealer, Worker};
use once_cell::sync::Lazy;
use crate::task::executor::Sleepers;
use crate::task::Runnable;
use crate::utils::{abort_on_panic, random};
struct Pool {
injector: Injector<Runnable>,
stealers: Vec<Stealer<Runnable>>,
sleepers: Sleepers,
}
static POOL: Lazy<Pool> = Lazy::new(|| {
let num_threads = num_cpus::get().max(1);
let mut stealers = Vec::new();
for _ in 0..num_threads {
let worker = Worker::new_fifo();
stealers.push(worker.stealer());
thread::Builder::new()
.name("async-std/executor".to_string())
.spawn(|| abort_on_panic(|| main_loop(worker)))
.expect("cannot start a thread driving tasks");
}
Pool {
injector: Injector::new(),
stealers,
sleepers: Sleepers::new(),
}
});
thread_local! {
static QUEUE: UnsafeCell<Option<Worker<Runnable>>> = UnsafeCell::new(None);
}
pub(crate) fn schedule(task: Runnable) {
QUEUE.with(|queue| {
let local = unsafe { (*queue.get()).as_ref() };
match local {
None => POOL.injector.push(task),
Some(q) => q.push(task),
}
});
POOL.sleepers.notify_one();
}
fn main_loop(local: Worker<Runnable>) {
QUEUE.with(|queue| unsafe { *queue.get() = Some(local) });
let mut step = 0;
loop {
match find_runnable() {
Some(task) => {
task.run();
step = 0;
}
None => {
match step {
0..=2 => {
thread::yield_now();
step += 1;
}
3 => {
thread::sleep(Duration::from_micros(10));
step += 1;
}
_ => {
POOL.sleepers.wait();
step = 0;
}
}
}
}
}
}
fn find_runnable() -> Option<Runnable> {
let pool = &*POOL;
QUEUE.with(|queue| {
let local = unsafe { (*queue.get()).as_ref().unwrap() };
local.pop().or_else(|| {
iter::repeat_with(|| {
pool.injector
.steal_batch_and_pop(&local)
.or_else(|| {
let len = pool.stealers.len();
let start = random(len as u32) as usize;
let (l, r) = pool.stealers.split_at(start);
let rotated = r.iter().chain(l.iter());
rotated.map(|s| s.steal_batch_and_pop(&local)).collect()
})
})
.find(|s| !s.is_retry())
.and_then(|s| s.success())
})
})
}