use std::sync::{Arc, Mutex, OnceLock};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use sysinfo::{Pid, ProcessesToUpdate, System};
use super::types::ThermalState;
pub mod pressure;
pub use pressure::MemoryPressure;
#[derive(Debug, Clone, Copy, PartialEq, Default)]
pub enum ResourceTelemetryMode {
#[default]
Off,
Boundary,
Summary { interval_ms: u32 },
DebugLocal { interval_ms: u32 },
}
impl ResourceTelemetryMode {
pub const DEFAULT_SUMMARY_INTERVAL_MS: u32 = 1000;
pub const MIN_SAMPLE_INTERVAL_MS: u32 = 250;
pub fn summary() -> Self {
Self::Summary {
interval_ms: Self::DEFAULT_SUMMARY_INTERVAL_MS,
}
}
pub fn normalized(self) -> Self {
match self {
Self::Summary { interval_ms } => Self::Summary {
interval_ms: interval_ms.max(Self::MIN_SAMPLE_INTERVAL_MS),
},
Self::DebugLocal { interval_ms } => Self::DebugLocal {
interval_ms: interval_ms.max(Self::MIN_SAMPLE_INTERVAL_MS),
},
other => other,
}
}
pub fn is_off(&self) -> bool {
matches!(self, Self::Off)
}
pub fn needs_sampler(&self) -> bool {
matches!(self, Self::Summary { .. } | Self::DebugLocal { .. })
}
pub fn interval_ms(&self) -> Option<u32> {
match self {
Self::Summary { interval_ms } | Self::DebugLocal { interval_ms } => Some(*interval_ms),
_ => None,
}
}
pub fn label(&self) -> &'static str {
match self {
Self::Off => "off",
Self::Boundary => "boundary",
Self::Summary { .. } => "summary",
Self::DebugLocal { .. } => "debug_local",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub struct ResourceSnapshot {
pub cpu_pct: Option<f32>,
pub process_rss_mb: Option<u32>,
pub available_mem_mb: Option<u32>,
pub total_mem_mb: Option<u32>,
pub memory_pressure: MemoryPressure,
pub thermal_state: ThermalState,
pub battery_pct: Option<u8>,
pub captured_at_ms: u64,
}
impl ResourceSnapshot {
pub fn unknown() -> Self {
Self {
cpu_pct: None,
process_rss_mb: None,
available_mem_mb: None,
total_mem_mb: None,
memory_pressure: MemoryPressure::Unknown,
thermal_state: ThermalState::Normal,
battery_pct: None,
captured_at_ms: now_ms(),
}
}
}
impl Default for ResourceSnapshot {
fn default() -> Self {
Self::unknown()
}
}
pub trait ResourceSnapshotProvider: Send + Sync + std::fmt::Debug {
fn current_snapshot(&self, max_age: Duration) -> ResourceSnapshot;
}
impl ResourceSnapshotProvider for ResourceMonitor {
fn current_snapshot(&self, max_age: Duration) -> ResourceSnapshot {
ResourceMonitor::current_snapshot(self, max_age)
}
}
impl<T: ResourceSnapshotProvider + ?Sized> ResourceSnapshotProvider for Arc<T> {
fn current_snapshot(&self, max_age: Duration) -> ResourceSnapshot {
self.as_ref().current_snapshot(max_age)
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ResourceUsageSummary {
pub cpu_avg_pct: Option<f32>,
pub cpu_peak_pct: Option<f32>,
pub process_rss_peak_mb: Option<u32>,
pub available_mem_min_mb: Option<u32>,
pub memory_pressure_peak: MemoryPressure,
pub thermal_state_peak: ThermalState,
pub battery_pct_end: Option<u8>,
pub sample_count: u32,
pub sampling_mode: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sampling_interval_ms: Option<u32>,
}
#[derive(Debug)]
struct Aggregator {
cpu_sum: f64,
cpu_samples: u32,
cpu_peak: Option<f32>,
rss_peak: Option<u32>,
mem_avail_min: Option<u32>,
pressure_peak: MemoryPressure,
thermal_peak: ThermalState,
latest_battery: Option<u8>,
sample_count: u32,
}
impl Aggregator {
fn new() -> Self {
Self {
cpu_sum: 0.0,
cpu_samples: 0,
cpu_peak: None,
rss_peak: None,
mem_avail_min: None,
pressure_peak: MemoryPressure::Unknown,
thermal_peak: ThermalState::Normal,
latest_battery: None,
sample_count: 0,
}
}
fn observe(&mut self, s: &ResourceSnapshot) {
self.sample_count = self.sample_count.saturating_add(1);
if let Some(cpu) = s.cpu_pct {
self.cpu_sum += cpu as f64;
self.cpu_samples = self.cpu_samples.saturating_add(1);
self.cpu_peak = Some(match self.cpu_peak {
Some(peak) if peak >= cpu => peak,
_ => cpu,
});
}
if let Some(rss) = s.process_rss_mb {
self.rss_peak = Some(match self.rss_peak {
Some(peak) if peak >= rss => peak,
_ => rss,
});
}
if let Some(avail) = s.available_mem_mb {
self.mem_avail_min = Some(match self.mem_avail_min {
Some(min) if min <= avail => min,
_ => avail,
});
}
self.pressure_peak = self.pressure_peak.worse_of(s.memory_pressure);
self.thermal_peak = thermal_worse_of(self.thermal_peak, s.thermal_state);
if s.battery_pct.is_some() {
self.latest_battery = s.battery_pct;
}
}
fn finish(self, mode: ResourceTelemetryMode) -> ResourceUsageSummary {
let cpu_avg_pct = if self.cpu_samples > 0 {
Some((self.cpu_sum / self.cpu_samples as f64) as f32)
} else {
None
};
ResourceUsageSummary {
cpu_avg_pct,
cpu_peak_pct: self.cpu_peak,
process_rss_peak_mb: self.rss_peak,
available_mem_min_mb: self.mem_avail_min,
memory_pressure_peak: self.pressure_peak,
thermal_state_peak: self.thermal_peak,
battery_pct_end: self.latest_battery,
sample_count: self.sample_count,
sampling_mode: mode.label().to_string(),
sampling_interval_ms: mode.interval_ms(),
}
}
}
fn thermal_worse_of(a: ThermalState, b: ThermalState) -> ThermalState {
fn rank(t: ThermalState) -> u8 {
match t {
ThermalState::Normal => 0,
ThermalState::Warm => 1,
ThermalState::Hot => 2,
ThermalState::Critical => 3,
}
}
if rank(b) > rank(a) {
b
} else {
a
}
}
#[derive(Debug, Clone)]
pub struct ResourceMonitor {
inner: Arc<Mutex<Inner>>,
pid: Option<Pid>,
}
#[derive(Debug)]
struct Inner {
system: System,
cached: Option<ResourceSnapshot>,
cached_at: Option<Instant>,
total_mem_mb: Option<u32>,
}
impl ResourceMonitor {
pub fn new() -> Self {
let system = System::new();
let pid = sysinfo::get_current_pid().ok();
Self {
inner: Arc::new(Mutex::new(Inner {
system,
cached: None,
cached_at: None,
total_mem_mb: None,
})),
pid,
}
}
pub fn global() -> Arc<Self> {
static MONITOR: OnceLock<Arc<ResourceMonitor>> = OnceLock::new();
MONITOR.get_or_init(|| Arc::new(Self::new())).clone()
}
pub fn prewarm(&self) {
let _ = self.refresh_locked();
}
pub fn current_snapshot(&self, max_age: Duration) -> ResourceSnapshot {
{
let inner = match self.inner.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
if let (Some(snap), Some(at)) = (inner.cached, inner.cached_at) {
if at.elapsed() <= max_age {
return snap;
}
}
}
self.refresh_locked()
}
pub fn begin_run(&self, mode: ResourceTelemetryMode) -> RunGuard {
let mode = mode.normalized();
if mode.is_off() {
return RunGuard::disabled(mode);
}
let monitor = self.clone();
let start = monitor.current_snapshot(Duration::ZERO);
let mut aggregator = Aggregator::new();
aggregator.observe(&start);
let sampler = if mode.needs_sampler() {
sampler::start(monitor.clone(), mode)
} else {
None
};
RunGuard {
monitor: Some(monitor),
mode,
aggregator: Some(aggregator),
sampler,
}
}
fn refresh_locked(&self) -> ResourceSnapshot {
let mut inner = match self.inner.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
inner.system.refresh_memory();
inner.system.refresh_cpu_all();
if let Some(pid) = self.pid {
inner
.system
.refresh_processes(ProcessesToUpdate::Some(&[pid]), true);
}
let total_bytes = inner.system.total_memory();
let total_mb = bytes_to_mb(total_bytes);
if total_mb.is_some() && inner.total_mem_mb.is_none() {
inner.total_mem_mb = total_mb;
}
let available_mb = bytes_to_mb(inner.system.available_memory());
let cpu = inner.system.global_cpu_usage();
let cpu_pct = if cpu.is_finite() && cpu >= 0.0 {
Some(cpu.min(100.0))
} else {
None
};
let process_rss_mb = self.pid.and_then(|pid| {
inner
.system
.process(pid)
.map(|p| bytes_to_mb_u64(p.memory()))
.unwrap_or(None)
});
super::platform_state::refresh_native_platform_state();
let platform = super::platform_state::current_platform_state();
let snap = ResourceSnapshot {
cpu_pct,
process_rss_mb,
available_mem_mb: available_mb,
total_mem_mb: inner.total_mem_mb.or(total_mb),
memory_pressure: MemoryPressure::derive(available_mb, inner.total_mem_mb.or(total_mb)),
thermal_state: platform.thermal_state.unwrap_or(ThermalState::Normal),
battery_pct: platform.battery_pct,
captured_at_ms: now_ms(),
};
inner.cached = Some(snap);
inner.cached_at = Some(Instant::now());
snap
}
}
impl Default for ResourceMonitor {
fn default() -> Self {
Self::new()
}
}
pub struct RunGuard {
monitor: Option<ResourceMonitor>,
mode: ResourceTelemetryMode,
aggregator: Option<Aggregator>,
sampler: Option<sampler::Handle>,
}
impl RunGuard {
fn disabled(mode: ResourceTelemetryMode) -> Self {
Self {
monitor: None,
mode,
aggregator: None,
sampler: None,
}
}
pub fn finish(mut self) -> Option<ResourceUsageSummary> {
let monitor = self.monitor.take()?;
let mut aggregator = self.aggregator.take()?;
if let Some(sampler) = self.sampler.take() {
for snap in sampler.stop() {
aggregator.observe(&snap);
}
}
let end = monitor.current_snapshot(Duration::ZERO);
aggregator.observe(&end);
Some(aggregator.finish(self.mode))
}
}
impl Drop for RunGuard {
fn drop(&mut self) {
if let Some(sampler) = self.sampler.take() {
let _ = sampler.stop();
}
}
}
mod sampler {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use super::{ResourceMonitor, ResourceSnapshot, ResourceTelemetryMode};
const POLL_SLICE_MS: u64 = 25;
pub(super) struct Handle {
shared: Arc<Shared>,
thread: Option<thread::JoinHandle<()>>,
}
struct Shared {
stop: AtomicBool,
samples: Mutex<Vec<ResourceSnapshot>>,
}
pub(super) fn start(monitor: ResourceMonitor, mode: ResourceTelemetryMode) -> Option<Handle> {
let interval_ms = mode.interval_ms()?;
let shared = Arc::new(Shared {
stop: AtomicBool::new(false),
samples: Mutex::new(Vec::new()),
});
let thread_shared = Arc::clone(&shared);
let thread = thread::Builder::new()
.name("xybrid-resource-sampler".to_string())
.spawn(move || {
let period = Duration::from_millis(interval_ms as u64);
let poll_slice = Duration::from_millis(POLL_SLICE_MS);
while !thread_shared.stop.load(Ordering::Relaxed) {
let deadline = Instant::now() + period;
while Instant::now() < deadline {
if thread_shared.stop.load(Ordering::Relaxed) {
return;
}
thread::sleep(poll_slice);
}
if thread_shared.stop.load(Ordering::Relaxed) {
return;
}
let snap = monitor.current_snapshot(Duration::ZERO);
if let Ok(mut samples) = thread_shared.samples.lock() {
samples.push(snap);
}
}
})
.ok()?;
Some(Handle {
shared,
thread: Some(thread),
})
}
impl Handle {
pub(super) fn stop(mut self) -> Vec<ResourceSnapshot> {
self.shared.stop.store(true, Ordering::Relaxed);
if let Some(thread) = self.thread.take() {
let _ = thread.join();
}
match self.shared.samples.lock() {
Ok(mut g) => std::mem::take(&mut *g),
Err(poisoned) => std::mem::take(&mut *poisoned.into_inner()),
}
}
}
}
fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
fn bytes_to_mb(bytes: u64) -> Option<u32> {
if bytes == 0 {
None
} else {
Some((bytes / (1024 * 1024)) as u32)
}
}
fn bytes_to_mb_u64(bytes: u64) -> Option<u32> {
bytes_to_mb(bytes)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn mode_normalizes_interval_floor() {
let m = ResourceTelemetryMode::Summary { interval_ms: 10 }.normalized();
assert_eq!(
m,
ResourceTelemetryMode::Summary {
interval_ms: ResourceTelemetryMode::MIN_SAMPLE_INTERVAL_MS
}
);
let boundary = ResourceTelemetryMode::Boundary.normalized();
assert_eq!(boundary, ResourceTelemetryMode::Boundary);
}
#[test]
fn aggregator_tracks_peak_avg_min() {
let mut agg = Aggregator::new();
agg.observe(&ResourceSnapshot {
cpu_pct: Some(10.0),
process_rss_mb: Some(100),
available_mem_mb: Some(8000),
total_mem_mb: Some(16000),
memory_pressure: MemoryPressure::Normal,
thermal_state: ThermalState::Normal,
battery_pct: Some(80),
captured_at_ms: 0,
});
agg.observe(&ResourceSnapshot {
cpu_pct: Some(90.0),
process_rss_mb: Some(500),
available_mem_mb: Some(1000),
total_mem_mb: Some(16000),
memory_pressure: MemoryPressure::Warn,
thermal_state: ThermalState::Hot,
battery_pct: Some(78),
captured_at_ms: 1_000,
});
let summary = agg.finish(ResourceTelemetryMode::Boundary);
assert_eq!(summary.cpu_peak_pct, Some(90.0));
assert!((summary.cpu_avg_pct.unwrap() - 50.0).abs() < 0.01);
assert_eq!(summary.process_rss_peak_mb, Some(500));
assert_eq!(summary.available_mem_min_mb, Some(1000));
assert_eq!(summary.memory_pressure_peak, MemoryPressure::Warn);
assert_eq!(summary.thermal_state_peak, ThermalState::Hot);
assert_eq!(summary.battery_pct_end, Some(78));
assert_eq!(summary.sample_count, 2);
}
#[test]
fn aggregator_handles_all_missing_cpu() {
let mut agg = Aggregator::new();
agg.observe(&ResourceSnapshot::unknown());
agg.observe(&ResourceSnapshot::unknown());
let summary = agg.finish(ResourceTelemetryMode::Boundary);
assert_eq!(summary.cpu_avg_pct, None);
assert_eq!(summary.cpu_peak_pct, None);
assert_eq!(summary.sample_count, 2);
}
#[test]
fn monitor_current_snapshot_is_cached_within_max_age() {
let monitor = ResourceMonitor::new();
let first = monitor.current_snapshot(Duration::ZERO);
let second = monitor.current_snapshot(Duration::from_secs(10));
assert_eq!(first.captured_at_ms, second.captured_at_ms);
}
#[test]
fn monitor_current_snapshot_refreshes_when_ttl_expires() {
let monitor = ResourceMonitor::new();
let first = monitor.current_snapshot(Duration::ZERO);
std::thread::sleep(Duration::from_millis(2));
let second = monitor.current_snapshot(Duration::ZERO);
assert!(second.captured_at_ms >= first.captured_at_ms);
}
#[test]
fn begin_run_off_returns_no_summary() {
let monitor = ResourceMonitor::new();
let guard = monitor.begin_run(ResourceTelemetryMode::Off);
assert!(guard.finish().is_none());
}
#[test]
fn begin_run_boundary_produces_two_sample_summary() {
let monitor = ResourceMonitor::new();
let guard = monitor.begin_run(ResourceTelemetryMode::Boundary);
let summary = guard.finish().expect("Boundary mode produces a summary");
assert_eq!(summary.sample_count, 2);
assert_eq!(summary.sampling_mode, "boundary");
assert_eq!(summary.sampling_interval_ms, None);
}
#[test]
fn begin_run_summary_mode_collects_interval_samples() {
let monitor = ResourceMonitor::new();
let guard = monitor.begin_run(ResourceTelemetryMode::Summary { interval_ms: 250 });
std::thread::sleep(Duration::from_millis(700));
let summary = guard.finish().expect("Summary mode produces a summary");
assert!(
summary.sample_count >= 3,
"expected at least start + 1 mid + end samples, got {}",
summary.sample_count
);
assert_eq!(summary.sampling_mode, "summary");
assert_eq!(summary.sampling_interval_ms, Some(250));
}
#[test]
fn dropping_guard_without_finish_does_not_emit() {
let monitor = ResourceMonitor::new();
{
let _guard = monitor.begin_run(ResourceTelemetryMode::Boundary);
}
let snap = monitor.current_snapshot(Duration::ZERO);
assert!(snap.captured_at_ms > 0);
}
#[test]
fn global_monitor_is_shared() {
let a = ResourceMonitor::global();
let b = ResourceMonitor::global();
assert!(Arc::ptr_eq(&a, &b));
}
#[test]
fn summary_run_shorter_than_sampler_tick_still_produces_summary() {
let monitor = ResourceMonitor::new();
monitor.prewarm();
let guard = monitor.begin_run(ResourceTelemetryMode::Summary { interval_ms: 1000 });
let summary = guard
.finish()
.expect("Summary mode should produce a summary even without sampler ticks");
assert!(summary.sample_count >= 2);
assert_eq!(summary.sampling_mode, "summary");
assert_eq!(summary.sampling_interval_ms, Some(1000));
}
#[test]
fn run_guard_drop_without_finish_stops_sampler_cleanly() {
let monitor = ResourceMonitor::new();
monitor.prewarm();
for _ in 0..64 {
let guard = monitor.begin_run(ResourceTelemetryMode::Summary { interval_ms: 250 });
drop(guard);
}
}
#[test]
fn concurrent_snapshots_share_one_system() {
let monitor = ResourceMonitor::global();
let handles: Vec<_> = (0..8)
.map(|_| {
let m = monitor.clone();
std::thread::spawn(move || {
for _ in 0..32 {
let s = m.current_snapshot(Duration::from_millis(100));
assert!(s.captured_at_ms > 0);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
}
}