use std::cell::Cell;
use std::iter;
use std::thread;
use std::time::Duration;
use crossbeam_deque::{Injector, Stealer, Worker};
use once_cell::sync::Lazy;
use once_cell::unsync::OnceCell;
use crate::task::executor::Sleepers;
use crate::task::Runnable;
use crate::utils::{abort_on_panic, random};
struct Pool {
injector: Injector<Runnable>,
stealers: Vec<Stealer<Runnable>>,
sleepers: Sleepers,
}
static POOL: Lazy<Pool> = Lazy::new(|| {
let num_threads = num_cpus::get().max(1);
let mut stealers = Vec::new();
for _ in 0..num_threads {
let worker = Worker::new_fifo();
stealers.push(worker.stealer());
let proc = Processor {
worker,
slot: Cell::new(None),
slot_runs: Cell::new(0),
};
thread::Builder::new()
.name("async-std/executor".to_string())
.spawn(|| {
let _ = PROCESSOR.with(|p| p.set(proc));
abort_on_panic(main_loop);
})
.expect("cannot start a thread driving tasks");
}
Pool {
injector: Injector::new(),
stealers,
sleepers: Sleepers::new(),
}
});
struct Processor {
worker: Worker<Runnable>,
slot: Cell<Option<Runnable>>,
slot_runs: Cell<u32>,
}
thread_local! {
static PROCESSOR: OnceCell<Processor> = OnceCell::new();
}
pub(crate) fn schedule(task: Runnable) {
PROCESSOR.with(|proc| {
match proc.get() {
Some(proc) => {
if let Some(task) = proc.slot.replace(Some(task)) {
proc.worker.push(task);
POOL.sleepers.notify_one();
}
}
None => {
POOL.injector.push(task);
POOL.sleepers.notify_one();
}
}
})
}
fn main_loop() {
const YIELDS: u32 = 3;
const SLEEPS: u32 = 1;
let mut fails = 0;
loop {
match find_runnable() {
Some(task) => {
fails = 0;
task.run();
}
None => {
fails += 1;
if fails <= YIELDS {
thread::yield_now();
} else if fails <= YIELDS + SLEEPS {
thread::sleep(Duration::from_micros(10));
} else {
POOL.sleepers.wait();
fails = 0;
}
}
}
}
}
fn find_runnable() -> Option<Runnable> {
const SLOT_LIMIT: u32 = 16;
PROCESSOR.with(|proc| {
let proc = proc.get().unwrap();
let runs = proc.slot_runs.get();
if runs < SLOT_LIMIT {
if let Some(task) = proc.slot.take() {
proc.slot_runs.set(runs + 1);
return Some(task);
}
}
proc.slot_runs.set(0);
proc.worker.pop().or_else(|| {
iter::repeat_with(|| {
POOL.injector
.steal_batch_and_pop(&proc.worker)
.or_else(|| {
let len = POOL.stealers.len();
let start = random(len as u32) as usize;
let (l, r) = POOL.stealers.split_at(start);
let stealers = r.iter().chain(l.iter());
stealers
.map(|s| s.steal_batch_and_pop(&proc.worker))
.collect()
})
})
.find(|s| !s.is_retry())
.and_then(|s| s.success())
})
})
}