use kanade_shared::subject;
use kanade_shared::wire::{EffectiveConfig, Heartbeat};
use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, System};
use tokio::sync::watch;
use tracing::{info, warn};
fn hostname() -> Option<String> {
std::env::var("COMPUTERNAME")
.ok()
.or_else(|| std::env::var("HOSTNAME").ok())
}
#[derive(Default)]
struct SelfPerf {
cpu_pct: Option<f64>,
rss_bytes: Option<i64>,
disk_read_bytes: Option<i64>,
disk_written_bytes: Option<i64>,
}
fn refresh_self_perf(sys: &mut System, is_first_tick: bool) -> SelfPerf {
let pid = Pid::from_u32(std::process::id());
sys.refresh_processes_specifics(
ProcessesToUpdate::Some(&[pid]),
true,
ProcessRefreshKind::nothing()
.with_cpu()
.with_memory()
.with_disk_usage(),
);
let Some(proc) = sys.process(pid) else {
return SelfPerf::default();
};
let disk = proc.disk_usage();
SelfPerf {
cpu_pct: if is_first_tick {
None
} else {
Some(proc.cpu_usage() as f64)
},
rss_bytes: Some(proc.memory().min(i64::MAX as u64) as i64),
disk_read_bytes: Some(disk.total_read_bytes.min(i64::MAX as u64) as i64),
disk_written_bytes: Some(disk.total_written_bytes.min(i64::MAX as u64) as i64),
}
}
pub async fn heartbeat_loop(
client: async_nats::Client,
pc_id: String,
agent_version: String,
mut cfg_rx: watch::Receiver<EffectiveConfig>,
) {
let mut current_dur = cfg_rx.borrow().heartbeat_duration();
let mut ticker = tokio::time::interval(current_dur);
let hostname = hostname();
let os_family = Some(std::env::consts::OS.to_string());
let mut sys = System::new();
let mut is_first_tick = true;
info!(
?current_dur,
?hostname,
?os_family,
"heartbeat loop scheduled",
);
loop {
tokio::select! {
_ = ticker.tick() => {
let perf = refresh_self_perf(&mut sys, is_first_tick);
is_first_tick = false;
let hb = Heartbeat {
pc_id: pc_id.clone(),
at: chrono::Utc::now(),
agent_version: agent_version.clone(),
hostname: hostname.clone(),
os_family: os_family.clone(),
agent_cpu_pct: perf.cpu_pct,
agent_rss_bytes: perf.rss_bytes,
agent_disk_read_bytes: perf.disk_read_bytes,
agent_disk_written_bytes: perf.disk_written_bytes,
};
let payload = match serde_json::to_vec(&hb) {
Ok(b) => b,
Err(e) => {
warn!(error = %e, "serialize heartbeat");
continue;
}
};
if let Err(e) = client
.publish(subject::heartbeat(&pc_id), payload.into())
.await
{
warn!(error = %e, "publish heartbeat");
}
}
res = cfg_rx.changed() => {
if res.is_err() {
continue;
}
let new_dur = cfg_rx.borrow().heartbeat_duration();
if new_dur != current_dur {
info!(
old = ?current_dur,
new = ?new_dur,
"heartbeat interval changed; rescheduling ticker",
);
current_dur = new_dur;
ticker = tokio::time::interval(current_dur);
}
}
}
}
}