use std::io;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Once};
use std::thread;
use std::time::Duration;
use crate::config::config;
use crate::coroutine_impl::{run_coroutine, CoroutineImpl};
use crate::io::{EventLoop, Selector};
use crate::pool::CoroutinePool;
use crate::std::sync::AtomicOption;
use crate::timeout_list;
use crate::yield_now::set_co_para;
use crossbeam::deque;
use crossbeam::utils::Backoff;
#[cfg(nightly)]
use std::intrinsics::likely;
#[cfg(not(nightly))]
#[inline]
fn likely(e: bool) -> bool {
e
}
#[cfg(nightly)]
#[thread_local]
#[no_mangle]
pub static WORKER_ID: AtomicUsize = AtomicUsize::new(!1);
#[cfg(not(nightly))]
thread_local! { pub static WORKER_ID: AtomicUsize = AtomicUsize::new(!1); }
type TimerData = Arc<AtomicOption<CoroutineImpl>>;
type TimerThread = timeout_list::TimerThread<TimerData>;
fn filter_cancel_panic() {
use generator::Error;
use std::panic;
if thread::panicking() {
return;
}
let old = panic::take_hook();
::std::panic::set_hook(Box::new(move |info| {
if let Some(&Error::Cancel) = info.payload().downcast_ref::<Error>() {
return;
}
old(info);
}));
}
static mut SCHED: *const Scheduler = std::ptr::null();
pub struct ParkStatus {
pub parked: AtomicU64,
workers: u64,
}
impl ParkStatus {
fn new(workers: u64) -> Self {
let parked = AtomicU64::new(0);
ParkStatus { parked, workers }
}
#[inline]
fn wake_one(&self, scheduler: &Scheduler) {
let parked = self.parked.load(Ordering::Relaxed);
let rms = parked & !parked.wrapping_sub(1);
let first_thread = rms.trailing_zeros() as u64;
if first_thread < self.workers {
let mask = self.workers + first_thread;
self.parked.fetch_and(!mask, Ordering::Relaxed);
scheduler.get_selector().wakeup(first_thread as usize);
}
}
}
#[inline(never)]
fn init_scheduler() {
let workers = config().get_workers();
let b: Box<Scheduler> = Scheduler::new(workers);
unsafe {
SCHED = Box::into_raw(b);
}
filter_cancel_panic();
thread::spawn(move || {
let s = unsafe { &*SCHED };
let timer_event_handler = |co: Arc<AtomicOption<CoroutineImpl>>| {
if let Some(mut c) = co.take() {
set_co_para(&mut c, io::Error::new(io::ErrorKind::TimedOut, "timeout"));
run_coroutine(c);
}
};
s.timer_thread.run(&timer_event_handler);
});
for id in 0..workers {
thread::spawn(move || {
let s = unsafe { &*SCHED };
s.event_loop.run(id as usize).unwrap_or_else(|e| {
panic!("event_loop failed running, err={}", e);
});
});
}
}
#[inline]
pub fn get_scheduler() -> &'static Scheduler {
unsafe {
if likely(!SCHED.is_null()) {
return &*SCHED;
}
}
static ONCE: Once = Once::new();
ONCE.call_once(init_scheduler);
unsafe { &*SCHED }
}
#[inline]
fn steal_global<T>(global: &deque::Injector<T>, local: &deque::Worker<T>) -> Option<T> {
static GLOBABLE_LOCK: AtomicUsize = AtomicUsize::new(0);
if GLOBABLE_LOCK
.compare_exchange(0, 1, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
return None;
}
let backoff = Backoff::new();
let ret = loop {
match global.steal_batch_and_pop(local) {
deque::Steal::Success(t) => break Some(t),
deque::Steal::Empty => break None,
deque::Steal::Retry => backoff.snooze(),
}
};
GLOBABLE_LOCK.store(0, Ordering::Relaxed);
ret
}
#[inline]
fn steal_local<T>(stealer: &deque::Stealer<T>, local: &deque::Worker<T>) -> Option<T> {
let backoff = Backoff::new();
loop {
match stealer.steal_batch_and_pop(local) {
deque::Steal::Success(t) => return Some(t),
deque::Steal::Empty => return None,
deque::Steal::Retry => backoff.snooze(),
}
}
}
#[repr(align(128))]
pub struct Scheduler {
pub pool: CoroutinePool,
event_loop: EventLoop,
global_queue: deque::Injector<CoroutineImpl>,
local_queues: Vec<deque::Worker<CoroutineImpl>>,
pub(crate) workers: ParkStatus,
timer_thread: TimerThread,
stealers: Vec<Vec<(usize, deque::Stealer<CoroutineImpl>)>>,
workers_len: usize,
}
impl Scheduler {
pub fn new(workers: usize) -> Box<Self> {
let mut local_queues = Vec::with_capacity(workers);
(0..workers).for_each(|_| local_queues.push(deque::Worker::new_fifo()));
let mut stealers = Vec::with_capacity(workers);
for id in 0..workers {
let mut stealers_l = Vec::with_capacity(workers);
for (i, worker) in local_queues.iter().enumerate() {
if i != id {
stealers_l.push((i, worker.stealer()));
}
}
stealers_l.rotate_left(id);
stealers.push(stealers_l);
}
Box::new(Scheduler {
pool: CoroutinePool::new(),
event_loop: EventLoop::new(workers).expect("can't create event_loop"),
global_queue: deque::Injector::new(),
local_queues,
timer_thread: TimerThread::new(),
workers: ParkStatus::new(workers as u64),
stealers,
workers_len: workers,
})
}
pub fn run_queued_tasks(&self, id: usize) {
let local = unsafe { self.local_queues.get_unchecked(id) };
let stealers = unsafe { self.stealers.get_unchecked(id) };
loop {
let co = local.pop().or_else(|| {
let parked_threads = self.workers.parked.load(Ordering::Relaxed);
stealers
.iter()
.map(|s| {
if parked_threads & (self.workers_len + s.0) as u64 != 0 {
return None;
}
steal_local(&s.1, local)
})
.find_map(|r| r)
.or_else(|| {
if self.global_queue.is_empty() {
None
} else {
steal_global(&self.global_queue, local)
}
})
});
if let Some(co) = co {
run_coroutine(co);
} else {
if self.global_queue.is_empty() {
break;
}
}
}
}
#[inline]
pub fn schedule(&self, co: CoroutineImpl) {
#[cfg(nightly)]
let id = WORKER_ID.load(Ordering::Relaxed);
#[cfg(not(nightly))]
let id = WORKER_ID.with(|id| id.load(Ordering::Relaxed));
if id == !1 {
self.schedule_global(co);
} else {
unsafe { self.local_queues.get_unchecked(id) }.push(co);
}
}
#[inline]
pub fn schedule_global(&self, co: CoroutineImpl) {
self.global_queue.push(co);
self.workers.wake_one(self);
}
#[inline]
pub fn add_timer(
&self,
dur: Duration,
co: Arc<AtomicOption<CoroutineImpl>>,
) -> timeout_list::TimeoutHandle<TimerData> {
self.timer_thread.add_timer(dur, co)
}
#[inline]
pub fn del_timer(&self, handle: timeout_list::TimeoutHandle<TimerData>) {
self.timer_thread.del_timer(handle);
}
#[inline]
pub fn get_selector(&self) -> &Selector {
self.event_loop.get_selector()
}
}