#![cfg_attr(docsrs, feature(doc_cfg))]
use std::{
sync::{
Arc,
atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering},
},
time::Duration,
};
use compio::time;
pub use iter::CpuLoadIter;
use sysinfo::{CpuRefreshKind, RefreshKind, System};
mod iter;
const INIT_DELAY_MS: u64 = 100;
const DEFAULT_INTERVAL: Duration = Duration::from_secs(1);
pub struct CpuLoad {
global: AtomicU8,
cores: Box<[AtomicU8]>,
interval: Duration,
rank: Box<[AtomicUsize]>,
cursor: AtomicUsize,
sorting: AtomicBool,
}
impl CpuLoad {
fn sample(sys: &mut System, this: &Self) {
sys.refresh_cpu_all();
let g = sys.global_cpu_usage().clamp(0.0, 100.0) as u8;
this.global.store(g, Ordering::Relaxed);
let cpus = sys.cpus();
let len = cpus.len().min(this.cores.len());
for i in 0..len {
let usage = unsafe { cpus.get_unchecked(i) }
.cpu_usage()
.clamp(0.0, 100.0) as u8;
unsafe { this.cores.get_unchecked(i) }.store(usage, Ordering::Relaxed);
}
}
#[inline]
pub fn new() -> Arc<Self> {
Self::init(DEFAULT_INTERVAL)
}
pub fn init(interval: Duration) -> Arc<Self> {
let mut sys =
System::new_with_specifics(RefreshKind::nothing().with_cpu(CpuRefreshKind::nothing()));
sys.refresh_cpu_all();
let n = sys.cpus().len().max(1);
let cores: Box<[AtomicU8]> = (0..n).map(|_| AtomicU8::new(0)).collect();
let rank: Box<[AtomicUsize]> = (0..n).map(AtomicUsize::new).collect();
let inst = Arc::new(Self {
global: AtomicU8::new(0),
cores,
interval,
rank,
cursor: AtomicUsize::new(0),
sorting: AtomicBool::new(false),
});
let weak = Arc::downgrade(&inst);
compio::runtime::spawn(async move {
let mut sys =
System::new_with_specifics(RefreshKind::nothing().with_cpu(CpuRefreshKind::everything()));
if let Some(r) = weak.upgrade() {
time::sleep(Duration::from_millis(INIT_DELAY_MS)).await;
Self::sample(&mut sys, &r);
}
while let Some(r) = weak.upgrade() {
time::sleep(r.interval).await;
Self::sample(&mut sys, &r);
}
})
.detach();
inst
}
pub fn idlest(&self) -> usize {
let n = self.rank.len();
if n == 0 {
return 0;
}
if self.sorting.load(Ordering::Acquire) {
let idx = self.cursor.fetch_add(1, Ordering::Relaxed) % n;
return unsafe { self.rank.get_unchecked(idx) }.load(Ordering::Relaxed);
}
let cur = self.cursor.fetch_add(1, Ordering::Relaxed);
if cur >= n
&& self
.sorting
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
let mut idx: Vec<usize> = (0..n).collect();
idx.sort_unstable_by_key(|&i| unsafe { self.cores.get_unchecked(i) }.load(Ordering::Relaxed));
for (i, id) in idx.into_iter().enumerate() {
unsafe { self.rank.get_unchecked(i) }.store(id, Ordering::Relaxed);
}
self.cursor.store(0, Ordering::Relaxed);
self.sorting.store(false, Ordering::Release);
return unsafe { self.rank.get_unchecked(0) }.load(Ordering::Relaxed);
}
unsafe { self.rank.get_unchecked(cur % n) }.load(Ordering::Relaxed)
}
#[inline]
pub fn global(&self) -> u8 {
self.global.load(Ordering::Relaxed)
}
#[inline]
pub fn core(&self, idx: usize) -> Option<u8> {
self.cores.get(idx).map(|v| v.load(Ordering::Relaxed))
}
#[inline]
pub fn len(&self) -> usize {
self.cores.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.cores.is_empty()
}
}
impl<'a> IntoIterator for &'a CpuLoad {
type Item = u8;
type IntoIter = CpuLoadIter<'a>;
#[inline]
fn into_iter(self) -> Self::IntoIter {
CpuLoadIter::new(&self.cores)
}
}
impl Default for CpuLoad {
#[inline]
fn default() -> Self {
Arc::try_unwrap(Self::new()).unwrap_or_else(|arc| {
Self {
global: AtomicU8::new(arc.global.load(Ordering::Relaxed)),
cores: arc
.cores
.iter()
.map(|a| AtomicU8::new(a.load(Ordering::Relaxed)))
.collect(),
interval: arc.interval,
rank: arc
.rank
.iter()
.map(|a| AtomicUsize::new(a.load(Ordering::Relaxed)))
.collect(),
cursor: AtomicUsize::new(arc.cursor.load(Ordering::Relaxed)),
sorting: AtomicBool::new(arc.sorting.load(Ordering::Relaxed)),
}
})
}
}