use std::future::Future;
use std::hint::unreachable_unchecked;
use std::mem::transmute;
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use crossbeam_channel::{bounded, Receiver, Sender};
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use crossbeam_utils::Backoff;
use log::trace;
use once_cell::sync::Lazy;
use crate::thread_pool;
use crate::utils::abort_on_panic;
use crate::utils::monotonic_ms as now_ms;
const BLOCKING_THRESHOLD: Duration = Duration::from_millis(100);
const INVALID_ID: usize = usize::MAX;
struct TaskTag {
id: usize,
schedule_hint: AtomicUsize,
}
type Task = async_task::Task<TaskTag>;
struct Executor {
processors: Vec<Processor>,
processor_push_index_hint: AtomicUsize,
machines: Vec<Arc<Machine>>,
machine_steal_index_hint: AtomicUsize,
wake_up: Sender<()>,
wake_up_notif: Receiver<()>,
}
struct Processor {
id: usize,
machine_id: AtomicUsize,
last_seen: AtomicU64,
pinned: AtomicBool,
injector: Injector<Task>,
}
struct Machine {
id: usize,
stealer: Stealer<Task>,
inherit: Stealer<Task>,
}
static EXECUTOR: Lazy<Executor> = Lazy::new(|| {
let num_cpus = std::cmp::max(1, num_cpus::get());
let mut processors = Vec::with_capacity(num_cpus);
for id in 0..num_cpus {
let p = Processor {
id,
machine_id: AtomicUsize::new(INVALID_ID),
last_seen: AtomicU64::new(0),
pinned: AtomicBool::new(true),
injector: Injector::new(),
};
trace!("{:?} is created", p);
processors.push(p);
}
let empty_worker = Worker::new_fifo();
let mut machines = Vec::with_capacity(num_cpus);
for index in 0..num_cpus {
machines.push(Machine::create_with_processor(
&processors[index],
empty_worker.stealer(),
));
}
thread::spawn(move || abort_on_panic(move || EXECUTOR.sysmon_main()));
let (wake_up, wake_up_notif) = bounded(1);
Executor {
processors,
processor_push_index_hint: AtomicUsize::new(0),
machines,
machine_steal_index_hint: AtomicUsize::new(0),
wake_up,
wake_up_notif,
}
});
static TASK_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
static MACHINE_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
impl TaskTag {
fn new() -> TaskTag {
let tag = TaskTag {
id: TASK_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
schedule_hint: AtomicUsize::new(INVALID_ID),
};
trace!("{} is created", TaskTag::string_rep(tag.id));
tag
}
fn string_rep(id: usize) -> String {
format!("T({})", id)
}
}
impl Drop for TaskTag {
fn drop(&mut self) {
trace!("{} is destroyed", TaskTag::string_rep(self.id));
}
}
impl Executor {
fn sysmon_main(&self) {
for index in 0..self.processors.len() {
let p = &self.processors[index];
assert_eq!(index, p.id);
}
thread::sleep(BLOCKING_THRESHOLD);
loop {
thread::sleep(BLOCKING_THRESHOLD / 2);
let must_seen_at = now_ms() - BLOCKING_THRESHOLD.as_millis() as u64;
for index in 0..self.processors.len() {
let p = &self.processors[index];
if p.is_pinned() || must_seen_at <= p.get_last_seen() {
continue;
}
let current: &Arc<Machine> = &self.machines[index];
let new: &Arc<Machine> = &Machine::create_with_processor(p, current.stealer.clone());
trace!(
"{:?} is not responding while running on {:?}, replacing with {:?}",
p,
current,
new
);
unsafe {
if false {
transmute::<*mut (), Arc<Machine>>(std::ptr::null_mut());
}
let current = transmute::<&Arc<Machine>, &AtomicPtr<()>>(current);
let new = transmute::<&Arc<Machine>, &AtomicPtr<()>>(&new);
let old = current.swap(new.load(Ordering::SeqCst), Ordering::SeqCst);
new.store(old, Ordering::SeqCst);
}
}
}
}
fn push(&self, t: Task) {
let mut index = t.tag().schedule_hint.load(Ordering::Relaxed);
if index > self.processors.len() {
index = self.processor_push_index_hint.load(Ordering::Relaxed);
self
.processor_push_index_hint
.store((index + 1) % self.processors.len(), Ordering::Relaxed);
}
self.processors[index].push(t);
}
fn pop(&self, index: usize, dest: &Worker<Task>) -> Option<Task> {
let (l, r) = self.processors.split_at(index);
r.iter()
.chain(l.iter())
.map(|p| p.pop(dest))
.filter(|s| matches!(s, Some(_)))
.nth(0)
.flatten()
}
fn steal(&self, dest: &Worker<Task>) -> Option<Task> {
let m = self.machine_steal_index_hint.load(Ordering::Relaxed);
let (l, r) = self.machines.split_at(m);
(1..)
.zip(r.iter().chain(l.iter()))
.map(|(hint_add, m)| {
(
hint_add,
std::iter::repeat_with(|| m.stealer.steal_batch_and_pop(dest))
.filter(|s| !matches!(s, Steal::Retry))
.map(|s| match s {
Steal::Success(task) => Some(task),
Steal::Empty => None,
Steal::Retry => unsafe { unreachable_unchecked() },
})
.nth(0)
.unwrap(),
)
})
.filter(|(_, s)| matches!(s, Some(_)))
.nth(0)
.map(|(hint_add, s)| {
self
.machine_steal_index_hint
.store((m + hint_add) % self.machines.len(), Ordering::Relaxed);
s
})
.flatten()
}
}
impl Processor {
fn is_pinned(&self) -> bool {
self.pinned.load(Ordering::Relaxed)
}
fn set_pinned(&self, b: bool) {
self.pinned.store(b, Ordering::Relaxed)
}
fn sleep(&self) {
self.set_pinned(true);
defer! {
self.set_pinned(false);
}
let backoff = Backoff::new();
loop {
match EXECUTOR.wake_up_notif.try_recv() {
Ok(()) => return,
Err(_) => {
if backoff.is_completed() {
{
trace!("{:?} entering sleep", self);
defer! {
trace!("{:?} leaving sleep", self);
}
EXECUTOR.wake_up_notif.recv().unwrap();
}
return;
} else {
backoff.snooze();
}
}
}
}
}
fn get_last_seen(&self) -> u64 {
self.last_seen.load(Ordering::Relaxed)
}
fn tick(&self) {
self.last_seen.store(now_ms(), Ordering::Relaxed);
}
fn push(&self, t: Task) {
self.injector.push(t);
let _ = EXECUTOR.wake_up.try_send(());
}
fn pop(&self, dest: &Worker<Task>) -> Option<Task> {
std::iter::repeat_with(|| self.injector.steal_batch_and_pop(dest))
.filter(|s| !matches!(s, Steal::Retry))
.map(|s| match s {
Steal::Success(task) => Some(task),
Steal::Empty => None,
Steal::Retry => unsafe { unreachable_unchecked() },
})
.nth(0)
.unwrap()
}
}
impl std::fmt::Debug for Processor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&format!("P({})", self.id))
}
}
impl Machine {
fn create_with_processor(p: &Processor, inherit: Stealer<Task>) -> Arc<Machine> {
let id = MACHINE_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
p.machine_id.store(id, Ordering::Relaxed);
let worker = Worker::new_fifo();
let stealer = worker.stealer();
let machine = Arc::new(Machine {
id,
stealer: stealer,
inherit,
});
trace!("{:?} is created", machine);
{
let machine = machine.clone();
let processor: &'static Processor = unsafe { transmute(p) };
thread_pool::spawn_box(Box::new(move || {
abort_on_panic(move || machine.main(worker, processor))
}));
}
machine
}
fn main(&self, worker: Worker<Task>, processor: &Processor) {
trace!("{:?} is running on {:?}", processor, self);
processor.tick();
processor.set_pinned(true);
loop {
match self.inherit.steal_batch(&worker) {
Steal::Retry => continue,
_ => break,
}
}
processor.set_pinned(false);
const MAX_RUNS: u64 = 64;
let mut run_counter = 0;
'main: loop {
macro_rules! run_task {
($task:ident) => {{
$task
.tag()
.schedule_hint
.store(processor.id, Ordering::Relaxed);
let task_id = $task.tag().id;
trace!(
"{} is running on {:?}",
TaskTag::string_rep(task_id),
processor
);
processor.tick();
$task.run();
if processor.machine_id.load(Ordering::Relaxed) != self.id {
trace!(
"{:?} is no longer holding {:?}, blocking when executing {}",
self,
processor,
TaskTag::string_rep(task_id),
);
return;
}
run_counter += 1;
continue 'main;
}};
}
macro_rules! get_tasks {
() => {{
run_counter = 0;
match EXECUTOR.pop(processor.id, &worker) {
Some(task) => run_task!(task),
None => {}
}
}};
}
if run_counter > MAX_RUNS {
get_tasks!();
}
if let Some(task) = worker.pop() {
run_task!(task);
}
match self.inherit.steal_batch_and_pop(&worker) {
Steal::Success(task) => run_task!(task),
_ => {}
}
get_tasks!();
match EXECUTOR.steal(&worker) {
Some(task) => run_task!(task),
None => {}
}
processor.sleep();
get_tasks!();
}
}
}
impl std::fmt::Debug for Machine {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&format!("M({})", self.id))
}
}
impl Drop for Machine {
fn drop(&mut self) {
trace!("{:?} is destroyed", self);
}
}
pub fn spawn<F: Future<Output = ()> + Send + 'static>(f: F) {
let (task, _) = async_task::spawn(f, |t| EXECUTOR.push(t), TaskTag::new());
task.schedule();
}