use std::time::Instant;
use kanade_shared::subject;
use kanade_shared::wire::{EffectiveConfig, HostPerf};
use sysinfo::{Disks, Networks, System};
use tokio::sync::watch;
use tracing::{info, warn};
pub async fn host_perf_loop(
client: async_nats::Client,
pc_id: String,
mut cfg_rx: watch::Receiver<EffectiveConfig>,
) {
let mut current_dur = cfg_rx.borrow().host_perf_duration();
let mut ticker = tokio::time::interval(current_dur);
let mut sys = System::new();
let mut disks = Disks::new_with_refreshed_list();
let mut networks = Networks::new_with_refreshed_list();
let mut last_tick_at = Instant::now();
let mut is_first_tick = true;
info!(?current_dur, "host_perf loop scheduled");
loop {
tokio::select! {
_ = ticker.tick() => {
let now = Instant::now();
let elapsed = now.duration_since(last_tick_at).as_secs_f64();
last_tick_at = now;
sys.refresh_cpu_all();
sys.refresh_memory();
disks.refresh(false);
networks.refresh(false);
let cpu_pct = if is_first_tick {
None
} else {
Some(f64::from(sys.global_cpu_usage()))
};
let cpu_count = u32::try_from(sys.cpus().len()).ok();
let mem_used = i64::try_from(sys.used_memory()).ok();
let mem_total = i64::try_from(sys.total_memory()).ok();
let swap_used = i64::try_from(sys.used_swap()).ok();
let swap_total = i64::try_from(sys.total_swap()).ok();
let (disk_read_rate, disk_write_rate) = if is_first_tick || elapsed < 0.001 {
(None, None)
} else {
let mut read_bytes: u64 = 0;
let mut write_bytes: u64 = 0;
for disk in disks.iter() {
let usage = disk.usage();
read_bytes = read_bytes.saturating_add(usage.read_bytes);
write_bytes = write_bytes.saturating_add(usage.written_bytes);
}
(
Some(read_bytes as f64 / elapsed),
Some(write_bytes as f64 / elapsed),
)
};
let (net_rx_rate, net_tx_rate) = if is_first_tick || elapsed < 0.001 {
(None, None)
} else {
let mut rx_bytes: u64 = 0;
let mut tx_bytes: u64 = 0;
for (_name, data) in networks.iter() {
rx_bytes = rx_bytes.saturating_add(data.received());
tx_bytes = tx_bytes.saturating_add(data.transmitted());
}
(
Some(rx_bytes as f64 / elapsed),
Some(tx_bytes as f64 / elapsed),
)
};
is_first_tick = false;
let snapshot = HostPerf {
pc_id: pc_id.clone(),
at: chrono::Utc::now(),
cpu_pct,
cpu_count,
mem_used_bytes: mem_used,
mem_total_bytes: mem_total,
swap_used_bytes: swap_used,
swap_total_bytes: swap_total,
disk_read_bytes_per_sec: disk_read_rate,
disk_written_bytes_per_sec: disk_write_rate,
net_rx_bytes_per_sec: net_rx_rate,
net_tx_bytes_per_sec: net_tx_rate,
};
let payload = match serde_json::to_vec(&snapshot) {
Ok(b) => b,
Err(e) => {
warn!(error = %e, "serialize HostPerf");
continue;
}
};
if let Err(e) = client
.publish(subject::host_perf(&pc_id), payload.into())
.await
{
warn!(error = %e, "publish host_perf");
}
}
res = cfg_rx.changed() => {
if res.is_err() {
continue;
}
let new_dur = cfg_rx.borrow().host_perf_duration();
if new_dur != current_dur {
info!(
old = ?current_dur,
new = ?new_dur,
"host_perf interval changed; rescheduling ticker",
);
current_dur = new_dur;
ticker = tokio::time::interval(current_dur);
is_first_tick = true;
last_tick_at = Instant::now();
}
}
}
}
}