use crate::load_balancer;
use crate::pool::{self, Pool};
use crate::run_queue::{Steal, Worker};
use lightproc::prelude::*;
use load_balancer::SmpStats;
use std::cell::{Cell, UnsafeCell};
use std::{iter, ptr};
pub fn current() -> ProcStack {
get_proc_stack(|proc| proc.clone())
.expect("`proc::current()` called outside the context of the proc")
}
thread_local! {
static STACK: Cell<*const ProcStack> = Cell::new(ptr::null_mut());
}
pub(crate) fn set_stack<F, R>(stack: *const ProcStack, f: F) -> R
where
F: FnOnce() -> R,
{
struct ResetStack<'a>(&'a Cell<*const ProcStack>);
impl Drop for ResetStack<'_> {
fn drop(&mut self) {
self.0.set(ptr::null());
}
}
STACK.with(|st| {
st.set(stack);
let _guard = ResetStack(st);
f()
})
}
pub(crate) fn get_proc_stack<F, R>(f: F) -> Option<R>
where
F: FnOnce(&ProcStack) -> R,
{
let res = STACK.try_with(|st| unsafe { st.get().as_ref().map(f) });
match res {
Ok(Some(val)) => Some(val),
Ok(None) | Err(_) => None,
}
}
thread_local! {
static QUEUE: UnsafeCell<Option<Worker<LightProc>>> = UnsafeCell::new(None);
}
pub(crate) fn schedule(proc: LightProc) {
QUEUE.with(|queue| {
let local = unsafe { (*queue.get()).as_ref() };
match local {
None => pool::get().injector.push(proc),
Some(q) => q.push(proc),
}
});
pool::get().sleepers.notify_one();
}
pub fn fetch_proc(affinity: usize) -> Option<LightProc> {
let pool = pool::get();
QUEUE.with(|queue| {
let local = unsafe { (*queue.get()).as_ref().unwrap() };
local.pop().or_else(|| affine_steal(pool, local, affinity))
})
}
fn affine_steal(pool: &Pool, local: &Worker<LightProc>, affinity: usize) -> Option<LightProc> {
let load_mean = load_balancer::stats().mean();
local.pop().or_else(|| {
iter::repeat_with(|| {
let core_vec = load_balancer::stats().get_sorted_load();
pool.injector.steal_batch_and_pop(&local).or_else(|| {
match core_vec.get(0) {
Some((core, _)) => {
if *core == affinity {
Steal::Retry
} else {
core_vec
.iter()
.map(|s| {
if load_mean > 0 {
pool.stealers
.get(s.0)
.unwrap()
.steal_batch_and_pop_with_amount(&local, load_mean)
} else {
pool.stealers.get(s.0).unwrap().steal_batch_and_pop(&local)
}
})
.collect()
}
}
_ => Steal::Retry,
}
})
})
.find(|s| !s.is_retry())
.and_then(|s| s.success())
})
}
pub(crate) fn stats_generator(affinity: usize, local: &Worker<LightProc>) {
load_balancer::stats().store_load(affinity, local.worker_run_queue_size());
}
pub(crate) fn main_loop(affinity: usize, local: Worker<LightProc>) {
QUEUE.with(|queue| unsafe { *queue.get() = Some(local) });
loop {
QUEUE.with(|queue| {
let local = unsafe { (*queue.get()).as_ref().unwrap() };
stats_generator(affinity, local);
});
match fetch_proc(affinity) {
Some(proc) => set_stack(proc.stack(), || proc.run()),
None => pool::get().sleepers.wait(),
}
}
}