use crate::load_balancer;
use crate::placement;
use arrayvec::ArrayVec;
use lazy_static::*;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
use std::{fmt, usize};
pub trait SmpStats {
fn store_load(&self, affinity: usize, load: usize);
fn get_sorted_load(&self) -> ArrayVec<[(usize, usize); MAX_CORE]>;
fn mean(&self) -> usize;
fn update_mean(&self);
}
#[derive(Debug)]
pub struct LoadBalancer;
impl LoadBalancer {
pub fn amql_generation() {
thread::Builder::new()
.name("bastion-load-balancer-thread".to_string())
.spawn(move || {
loop {
load_balancer::stats().update_mean();
thread::sleep(Duration::from_millis(245));
thread::yield_now();
}
})
.expect("load-balancer couldn't start");
}
}
const MAX_CORE: usize = 256;
pub struct Stats {
smp_load: [AtomicUsize; MAX_CORE],
mean_level: AtomicUsize,
}
impl fmt::Debug for Stats {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Stats")
.field("smp_load", &&self.smp_load[..])
.field("mean_level", &self.mean_level)
.finish()
}
}
impl Stats {
pub fn new(num_cores: usize) -> Stats {
let smp_load: [AtomicUsize; MAX_CORE] = {
let mut data: [MaybeUninit<AtomicUsize>; MAX_CORE] =
unsafe { MaybeUninit::uninit().assume_init() };
let mut i = 0;
while i < MAX_CORE {
if i < num_cores {
unsafe {
std::ptr::write(data[i].as_mut_ptr(), AtomicUsize::new(0));
}
i += 1;
continue;
}
unsafe {
std::ptr::write(data[i].as_mut_ptr(), AtomicUsize::new(usize::MAX));
}
i += 1;
}
unsafe { std::mem::transmute::<_, [AtomicUsize; MAX_CORE]>(data) }
};
Stats {
smp_load,
mean_level: AtomicUsize::new(0),
}
}
}
unsafe impl Sync for Stats {}
unsafe impl Send for Stats {}
impl SmpStats for Stats {
fn store_load(&self, affinity: usize, load: usize) {
self.smp_load[affinity].store(load, Ordering::SeqCst);
}
fn get_sorted_load(&self) -> ArrayVec<[(usize, usize); MAX_CORE]> {
let mut sorted_load = ArrayVec::<[(usize, usize); MAX_CORE]>::new();
for (i, item) in self.smp_load.iter().enumerate() {
let load = item.load(Ordering::SeqCst);
if load == usize::MAX {
break;
}
unsafe { sorted_load.push_unchecked((i, load)) };
}
sorted_load.sort_by(|x, y| y.1.cmp(&x.1));
sorted_load
}
fn mean(&self) -> usize {
self.mean_level.load(Ordering::SeqCst)
}
fn update_mean(&self) {
let mut sum: usize = 0;
for item in self.smp_load.iter() {
let load = item.load(Ordering::SeqCst);
if let Some(tmp) = sum.checked_add(load) {
sum = tmp;
continue;
}
break;
}
self.mean_level.store(
sum.wrapping_div(placement::get_core_ids().unwrap().len()),
Ordering::SeqCst,
);
}
}
#[inline]
pub fn stats() -> &'static Stats {
lazy_static! {
static ref LOCKLESS_STATS: Stats = Stats::new(*core_retrieval());
}
&*LOCKLESS_STATS
}
#[inline]
pub fn core_retrieval() -> &'static usize {
lazy_static! {
static ref CORE_COUNT: usize = placement::get_core_ids().unwrap().len();
}
&*CORE_COUNT
}