use std::collections::{HashMap, HashSet};
use std::time::Instant;
use kanade_shared::subject;
use kanade_shared::wire::{EffectiveConfig, ProcessPerf, ProcessSnapshot};
use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, System};
use tokio::sync::watch;
use tracing::{debug, info, warn};
#[derive(Clone, Copy)]
struct PidIo {
cumulative_read: u64,
cumulative_written: u64,
}
pub async fn process_perf_loop(
client: async_nats::Client,
pc_id: String,
mut cfg_rx: watch::Receiver<EffectiveConfig>,
) {
let mut current_dur = cfg_rx.borrow().host_perf_duration();
let mut ticker = tokio::time::interval(current_dur);
let mut sys = System::new();
let mut pid_io: HashMap<u32, PidIo> = HashMap::new();
let mut last_tick_at: Option<Instant> = None;
let mut needs_baseline_reset = true;
info!(
?current_dur,
"process_perf loop scheduled (publish gated on agent_config)"
);
loop {
tokio::select! {
_ = ticker.tick() => {
let now_utc = chrono::Utc::now();
let cfg = cfg_rx.borrow().clone();
if !cfg.process_perf_active_at(now_utc) {
if !pid_io.is_empty() {
debug!("process_perf inactive — clearing PID baseline");
pid_io.clear();
}
last_tick_at = None;
needs_baseline_reset = true;
continue;
}
let now = Instant::now();
let elapsed = match last_tick_at {
Some(prev) => now.duration_since(prev).as_secs_f64(),
None => 0.0,
};
last_tick_at = Some(now);
sys.refresh_processes_specifics(
ProcessesToUpdate::All,
true,
ProcessRefreshKind::nothing()
.with_cpu()
.with_memory()
.with_disk_usage(),
);
let mut all: Vec<(Pid, f32, u64, u64, u64)> = sys
.processes()
.iter()
.map(|(pid, p)| {
let d = p.disk_usage();
(*pid, p.cpu_usage(), p.memory(), d.total_read_bytes, d.total_written_bytes)
})
.collect();
all.sort_by(|a, b| {
b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
});
let top_n = cfg.process_perf_top_n.max(1) as usize;
all.truncate(top_n);
let live_pids: HashSet<u32> = all.iter().map(|(pid, ..)| pid.as_u32()).collect();
let processes: Vec<ProcessSnapshot> = all
.iter()
.map(|(pid, cpu, mem, total_read, total_written)| {
let pid_u32 = pid.as_u32();
let name = sys
.process(*pid)
.map(|p| p.name().to_string_lossy().into_owned())
.unwrap_or_default();
let (read_rate, write_rate) = if needs_baseline_reset || elapsed < 0.001 {
(None, None)
} else {
match pid_io.get(&pid_u32) {
Some(prev) => {
let d_read = total_read.saturating_sub(prev.cumulative_read);
let d_written =
total_written.saturating_sub(prev.cumulative_written);
(
Some(d_read as f64 / elapsed),
Some(d_written as f64 / elapsed),
)
}
None => (None, None),
}
};
pid_io.insert(
pid_u32,
PidIo {
cumulative_read: *total_read,
cumulative_written: *total_written,
},
);
ProcessSnapshot {
pid: pid_u32,
name,
cpu_pct: f64::from(*cpu),
rss_bytes: (*mem).min(i64::MAX as u64) as i64,
disk_read_bytes_per_sec: read_rate,
disk_written_bytes_per_sec: write_rate,
}
})
.collect();
pid_io.retain(|pid, _| live_pids.contains(pid));
needs_baseline_reset = false;
let snapshot = ProcessPerf {
pc_id: pc_id.clone(),
at: now_utc,
processes,
};
let payload = match serde_json::to_vec(&snapshot) {
Ok(b) => b,
Err(e) => {
warn!(error = %e, "serialize ProcessPerf");
continue;
}
};
if let Err(e) = client
.publish(subject::process_perf(&pc_id), payload.into())
.await
{
warn!(error = %e, "publish process_perf");
}
}
res = cfg_rx.changed() => {
if res.is_err() {
continue;
}
let new_dur = cfg_rx.borrow().host_perf_duration();
if new_dur != current_dur {
info!(
old = ?current_dur,
new = ?new_dur,
"process_perf cadence changed (host_perf_interval); rescheduling ticker",
);
current_dur = new_dur;
ticker = tokio::time::interval(current_dur);
needs_baseline_reset = true;
last_tick_at = None;
}
}
}
}
}