use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::time::{Duration, Instant};
const BUSY_THRESHOLD: f64 = 0.50;
const SAMPLE_PERIOD: Duration = Duration::from_millis(1000);
#[derive(Debug, Clone, Default, PartialEq)]
pub struct Telemetry {
pub n_cores: u32,
pub cpu_pct_avg: f64,
pub cpu_pct_max: f64,
pub cores_busy_avg: f64,
pub cores_busy_max: u32,
pub mem_peak_mb: f64,
pub mem_pct_max: f64,
pub elapsed_ms: f64,
pub samples: u32,
}
struct StatSnapshot {
agg: (u64, u64),
per_core: Vec<(u64, u64)>,
}
#[derive(Default)]
struct Live {
cores_busy_now: AtomicU32,
n_cores: AtomicU32,
}
pub struct Sampler {
stop: Arc<AtomicBool>,
live: Arc<Live>,
handle: Option<std::thread::JoinHandle<Telemetry>>,
start: Instant,
}
impl Sampler {
pub fn start() -> Self {
let stop = Arc::new(AtomicBool::new(false));
let live = Arc::new(Live::default());
let start = Instant::now();
let stop_t = Arc::clone(&stop);
let live_t = Arc::clone(&live);
let handle = std::thread::Builder::new()
.name("bench-telemetry".into())
.spawn(move || sample_loop(stop_t, live_t, start))
.ok();
Sampler { stop, live, handle, start }
}
pub fn cores_busy_now(&self) -> u32 {
self.live.cores_busy_now.load(Ordering::Relaxed)
}
pub fn n_cores(&self) -> u32 {
self.live.n_cores.load(Ordering::Relaxed)
}
pub fn stop(mut self) -> Telemetry {
self.stop.store(true, Ordering::Relaxed);
let elapsed_ms = self.start.elapsed().as_secs_f64() * 1000.0;
let mut t = self
.handle
.take()
.and_then(|h| h.join().ok())
.unwrap_or_default();
t.elapsed_ms = elapsed_ms;
t
}
}
fn sample_loop(stop: Arc<AtomicBool>, live: Arc<Live>, _start: Instant) -> Telemetry {
let mut acc = Telemetry::default();
let mut prev = read_proc_stat();
if let Some(p) = &prev {
let n = p.per_core.len() as u32;
acc.n_cores = n;
live.n_cores.store(n, Ordering::Relaxed);
}
let mut cpu_sum = 0.0_f64;
let mut busy_sum = 0.0_f64;
loop {
let mut waited = Duration::ZERO;
while waited < SAMPLE_PERIOD {
if stop.load(Ordering::Relaxed) {
break;
}
let slice = Duration::from_millis(50);
std::thread::sleep(slice);
waited += slice;
}
let stopping = stop.load(Ordering::Relaxed);
if let (Some(p), Some(cur)) = (prev.as_ref(), read_proc_stat()) {
if let Some((cpu_pct, cores_busy)) = interval_utilisation(p, &cur) {
acc.samples += 1;
cpu_sum += cpu_pct;
busy_sum += cores_busy as f64;
acc.cpu_pct_max = acc.cpu_pct_max.max(cpu_pct);
acc.cores_busy_max = acc.cores_busy_max.max(cores_busy);
live.cores_busy_now.store(cores_busy, Ordering::Relaxed);
}
prev = Some(cur);
}
if let Some(rss_mb) = read_self_peak_rss_mb() {
acc.mem_peak_mb = acc.mem_peak_mb.max(rss_mb);
}
if let Some(mem_pct) = read_system_mem_pct() {
acc.mem_pct_max = acc.mem_pct_max.max(mem_pct);
}
if stopping {
break;
}
}
if acc.samples > 0 {
acc.cpu_pct_avg = cpu_sum / acc.samples as f64;
acc.cores_busy_avg = busy_sum / acc.samples as f64;
}
acc
}
fn interval_utilisation(prev: &StatSnapshot, cur: &StatSnapshot) -> Option<(f64, u32)> {
let (pi, pt) = prev.agg;
let (ci, ct) = cur.agg;
let total_d = ct.saturating_sub(pt);
if total_d == 0 {
return None;
}
let idle_d = ci.saturating_sub(pi);
let cpu_pct = (1.0 - (idle_d as f64 / total_d as f64)) * 100.0;
let cpu_pct = cpu_pct.clamp(0.0, 100.0);
let mut busy = 0u32;
let n = prev.per_core.len().min(cur.per_core.len());
for i in 0..n {
let (cpi, cpt) = prev.per_core[i];
let (cci, cct) = cur.per_core[i];
let td = cct.saturating_sub(cpt);
if td == 0 {
continue;
}
let id = cci.saturating_sub(cpi);
let util = 1.0 - (id as f64 / td as f64);
if util > BUSY_THRESHOLD {
busy += 1;
}
}
Some((cpu_pct, busy))
}
fn read_proc_stat() -> Option<StatSnapshot> {
let text = std::fs::read_to_string("/proc/stat").ok()?;
let mut agg = None;
let mut per_core = Vec::new();
for line in text.lines() {
if !line.starts_with("cpu") {
if agg.is_some() {
break;
}
continue;
}
let mut it = line.split_whitespace();
let label = it.next()?; let nums: Vec<u64> = it.filter_map(|f| f.parse::<u64>().ok()).collect();
if nums.len() < 5 {
continue;
}
let total: u64 = nums.iter().sum();
let idle = nums[3] + nums[4];
if label == "cpu" {
agg = Some((idle, total));
} else {
per_core.push((idle, total));
}
}
let agg = agg?;
Some(StatSnapshot { agg, per_core })
}
fn read_self_peak_rss_mb() -> Option<f64> {
let text = std::fs::read_to_string("/proc/self/status").ok()?;
for line in text.lines() {
if let Some(rest) = line.strip_prefix("VmHWM:") {
let kb: f64 = rest.split_whitespace().next()?.parse().ok()?;
return Some(kb / 1024.0);
}
}
None
}
fn read_system_mem_pct() -> Option<f64> {
let text = std::fs::read_to_string("/proc/meminfo").ok()?;
let mut total = None;
let mut avail = None;
for line in text.lines() {
if let Some(rest) = line.strip_prefix("MemTotal:") {
total = rest.split_whitespace().next().and_then(|v| v.parse::<f64>().ok());
} else if let Some(rest) = line.strip_prefix("MemAvailable:") {
avail = rest.split_whitespace().next().and_then(|v| v.parse::<f64>().ok());
}
if total.is_some() && avail.is_some() {
break;
}
}
let (t, a) = (total?, avail?);
if t <= 0.0 {
return None;
}
Some(((t - a) / t * 100.0).clamp(0.0, 100.0))
}
pub const TELEM_PREFIX: &str = "telem_";
pub fn inject_into_metrics(
metrics: &mut serde_json::Map<String, serde_json::Value>,
t: &Telemetry,
) {
let j = |f: f64| serde_json::json!(f);
metrics.insert(format!("{TELEM_PREFIX}cpu_pct_avg"), j(round2(t.cpu_pct_avg)));
metrics.insert(format!("{TELEM_PREFIX}cpu_pct_max"), j(round2(t.cpu_pct_max)));
metrics.insert(format!("{TELEM_PREFIX}cores_busy_avg"), j(round2(t.cores_busy_avg)));
metrics.insert(format!("{TELEM_PREFIX}cores_busy_max"), serde_json::json!(t.cores_busy_max));
metrics.insert(format!("{TELEM_PREFIX}n_cores"), serde_json::json!(t.n_cores));
metrics.insert(format!("{TELEM_PREFIX}mem_peak_mb"), j(round2(t.mem_peak_mb)));
metrics.insert(format!("{TELEM_PREFIX}mem_pct_max"), j(round2(t.mem_pct_max)));
metrics.insert(format!("{TELEM_PREFIX}elapsed_ms"), j(round2(t.elapsed_ms)));
}
pub fn from_metrics(metrics: &serde_json::Map<String, serde_json::Value>) -> Option<Telemetry> {
let get = |k: &str| -> Option<f64> {
metrics.get(&format!("{TELEM_PREFIX}{k}")).and_then(|v| v.as_f64())
};
if !metrics.keys().any(|k| k.starts_with(TELEM_PREFIX)) {
return None;
}
Some(Telemetry {
n_cores: get("n_cores").unwrap_or(0.0) as u32,
cpu_pct_avg: get("cpu_pct_avg").unwrap_or(0.0),
cpu_pct_max: get("cpu_pct_max").unwrap_or(0.0),
cores_busy_avg: get("cores_busy_avg").unwrap_or(0.0),
cores_busy_max: get("cores_busy_max").unwrap_or(0.0) as u32,
mem_peak_mb: get("mem_peak_mb").unwrap_or(0.0),
mem_pct_max: get("mem_pct_max").unwrap_or(0.0),
elapsed_ms: get("elapsed_ms").unwrap_or(0.0),
samples: 0,
})
}
fn round2(f: f64) -> f64 {
(f * 100.0).round() / 100.0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn proc_stat_parses_aggregate_and_cores() {
if let Some(snap) = read_proc_stat() {
assert!(snap.agg.1 > 0, "total jiffies should be > 0");
assert!(!snap.per_core.is_empty(), "expected at least one cpuN line");
}
}
#[test]
fn interval_utilisation_counts_busy_cores() {
let prev = StatSnapshot {
agg: (100, 200),
per_core: vec![(50, 100), (50, 100)],
};
let cur = StatSnapshot {
agg: (200, 400),
per_core: vec![(50, 200), (150, 200)],
};
let (cpu_pct, busy) = interval_utilisation(&prev, &cur).unwrap();
assert!((cpu_pct - 50.0).abs() < 0.01, "cpu_pct={cpu_pct}");
assert_eq!(busy, 1, "exactly one core should be >50% busy");
}
#[test]
fn zero_interval_is_ignored() {
let snap = StatSnapshot { agg: (10, 20), per_core: vec![(5, 10)] };
let same = StatSnapshot { agg: (10, 20), per_core: vec![(5, 10)] };
assert!(interval_utilisation(&snap, &same).is_none());
}
#[test]
fn metrics_roundtrip() {
let t = Telemetry {
n_cores: 8,
cpu_pct_avg: 73.25,
cpu_pct_max: 99.5,
cores_busy_avg: 5.5,
cores_busy_max: 8,
mem_peak_mb: 1234.5,
mem_pct_max: 42.0,
elapsed_ms: 1500.0,
samples: 3,
};
let mut m = serde_json::Map::new();
inject_into_metrics(&mut m, &t);
let back = from_metrics(&m).expect("telemetry keys present");
assert_eq!(back.n_cores, 8);
assert_eq!(back.cores_busy_max, 8);
assert!((back.cpu_pct_avg - 73.25).abs() < 0.01);
assert!((back.mem_peak_mb - 1234.5).abs() < 0.01);
assert!(from_metrics(&serde_json::Map::new()).is_none());
}
#[test]
fn sampler_runs_and_reports_on_a_busy_workload() {
let sampler = Sampler::start();
let stop = Arc::new(AtomicBool::new(false));
let mut handles = Vec::new();
let n = std::thread::available_parallelism().map(|x| x.get()).unwrap_or(2).max(2);
for _ in 0..n {
let s = Arc::clone(&stop);
handles.push(std::thread::spawn(move || {
let mut x = 0u64;
while !s.load(Ordering::Relaxed) {
x = x.wrapping_mul(2654435761).wrapping_add(1);
std::hint::black_box(x);
}
}));
}
std::thread::sleep(Duration::from_millis(2200));
stop.store(true, Ordering::Relaxed);
for h in handles {
let _ = h.join();
}
let t = sampler.stop();
if read_proc_stat().is_some() {
assert!(t.n_cores >= 1, "n_cores={}", t.n_cores);
assert!(t.samples >= 1, "expected ≥1 interval, got {}", t.samples);
assert!(t.cpu_pct_max > 0.0, "cpu_pct_max should be >0 under load");
assert!(
t.cores_busy_max >= 1,
"at least one core should be >50% busy under an N-thread spin, got {}",
t.cores_busy_max
);
assert!(t.mem_peak_mb > 0.0, "peak RSS should be >0");
}
assert!(t.elapsed_ms >= 2000.0, "elapsed_ms={}", t.elapsed_ms);
}
}