use crate::load_balancer;
use crate::placement;
use arrayvec::ArrayVec;
use fmt::{Debug, Formatter};
use lazy_static::*;
use once_cell::sync::Lazy;
use placement::CoreId;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::RwLock;
use std::time::{Duration, Instant};
use std::{fmt, usize};
use tracing::{debug, error};
const MEAN_UPDATE_TRESHOLD: Duration = Duration::from_millis(200);
pub trait SmpStats {
fn store_load(&self, affinity: usize, load: usize);
fn get_sorted_load(&self) -> ArrayVec<(usize, usize), MAX_CORE>;
fn mean(&self) -> usize;
fn update_mean(&self);
}
static LOAD_BALANCER: Lazy<LoadBalancer> = Lazy::new(|| {
let lb = LoadBalancer::new(placement::get_core_ids().unwrap());
debug!("Instantiated load_balancer: {:?}", lb);
lb
});
pub struct LoadBalancer {
pub num_cores: usize,
pub cores: Vec<CoreId>,
mean_last_updated_at: RwLock<Instant>,
}
impl LoadBalancer {
pub fn new(cores: Vec<CoreId>) -> Self {
Self {
num_cores: cores.len(),
cores,
mean_last_updated_at: RwLock::new(Instant::now()),
}
}
}
impl Debug for LoadBalancer {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
fmt.debug_struct("LoadBalancer")
.field("num_cores", &self.num_cores)
.field("cores", &self.cores)
.field("mean_last_updated_at", &self.mean_last_updated_at)
.finish()
}
}
impl LoadBalancer {
pub fn update_load_mean(&self) {
if !self.should_update() {
return;
}
self.mean_last_updated_at
.write()
.map(|mut last_updated_at| {
*last_updated_at = Instant::now();
})
.unwrap_or_else(|e| error!("couldn't update mean timestamp - {}", e));
load_balancer::stats().update_mean();
}
fn should_update(&self) -> bool {
self.mean_last_updated_at
.try_read()
.map(|last_updated_at| last_updated_at.elapsed() > MEAN_UPDATE_TRESHOLD)
.unwrap_or(false)
}
}
pub fn update() {
LOAD_BALANCER.update_load_mean()
}
const MAX_CORE: usize = 256;
pub struct Stats {
smp_load: [AtomicUsize; MAX_CORE],
mean_level: AtomicUsize,
updating_mean: AtomicBool,
}
impl fmt::Debug for Stats {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Stats")
.field("smp_load", &&self.smp_load[..])
.field("mean_level", &self.mean_level)
.field("updating_mean", &self.updating_mean)
.finish()
}
}
impl Stats {
pub fn new(num_cores: usize) -> Stats {
let smp_load: [AtomicUsize; MAX_CORE] = {
let mut data: [MaybeUninit<AtomicUsize>; MAX_CORE] =
unsafe { MaybeUninit::uninit().assume_init() };
for core_data in data.iter_mut().take(num_cores) {
unsafe {
std::ptr::write(core_data.as_mut_ptr(), AtomicUsize::new(0));
}
}
for core_data in data.iter_mut().take(MAX_CORE).skip(num_cores) {
unsafe {
std::ptr::write(core_data.as_mut_ptr(), AtomicUsize::new(usize::MAX));
}
}
unsafe { std::mem::transmute::<_, [AtomicUsize; MAX_CORE]>(data) }
};
Stats {
smp_load,
mean_level: AtomicUsize::new(0),
updating_mean: AtomicBool::new(false),
}
}
}
unsafe impl Sync for Stats {}
unsafe impl Send for Stats {}
impl SmpStats for Stats {
fn store_load(&self, affinity: usize, load: usize) {
self.smp_load[affinity].store(load, Ordering::SeqCst);
}
fn get_sorted_load(&self) -> ArrayVec<(usize, usize), MAX_CORE> {
let mut sorted_load = ArrayVec::new();
for (core, load) in self.smp_load.iter().enumerate() {
let load = load.load(Ordering::SeqCst);
if load == usize::MAX {
break;
}
unsafe { sorted_load.push_unchecked((core, load)) };
}
sorted_load.sort_by(|x, y| y.1.cmp(&x.1));
sorted_load
}
fn mean(&self) -> usize {
self.mean_level.load(Ordering::Acquire)
}
fn update_mean(&self) {
if self.updating_mean.load(Ordering::Acquire) {
return;
}
self.updating_mean.store(true, Ordering::Release);
let mut sum: usize = 0;
let num_cores = LOAD_BALANCER.num_cores;
for item in self.smp_load.iter().take(num_cores) {
if let Some(tmp) = sum.checked_add(item.load(Ordering::Acquire)) {
sum = tmp;
}
}
self.mean_level
.store(sum.wrapping_div(num_cores), Ordering::Release);
self.updating_mean.store(false, Ordering::Release);
}
}
#[inline]
pub fn stats() -> &'static Stats {
lazy_static! {
static ref LOCKLESS_STATS: Stats = Stats::new(*core_count());
}
&*LOCKLESS_STATS
}
#[inline]
pub fn core_count() -> &'static usize {
&LOAD_BALANCER.num_cores
}
#[inline]
pub fn get_cores() -> &'static [CoreId] {
&*LOAD_BALANCER.cores
}