use sysinfo::{ProcessesToUpdate, System};
use tokio::task::JoinHandle;
#[must_use]
pub fn spawn_system_metrics_task(
interval_secs: u64,
shutdown_rx: tokio::sync::watch::Receiver<bool>,
) -> Option<JoinHandle<()>> {
if interval_secs == 0 {
return None;
}
let interval = std::time::Duration::from_secs(interval_secs.max(1));
Some(tokio::spawn(async move {
let mut sys = System::new();
let pid = sysinfo::get_current_pid().ok();
let mut shutdown = shutdown_rx;
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = ticker.tick() => {}
_ = shutdown.changed() => break,
}
let (rss_bytes, cpu_percent, thread_count, fd_count) = match pid {
Some(pid) => {
sys.refresh_processes(ProcessesToUpdate::Some(&[pid]), true);
match sys.process(pid) {
Some(proc_info) => {
let rss = proc_info.memory();
let cpu = proc_info.cpu_usage();
let threads = get_thread_count(proc_info);
let fds = get_fd_count(pid);
(rss, cpu, threads, fds)
}
None => (0, 0.0, 0, 0),
}
}
None => (0, 0.0, 0, 0),
};
tracing::info!(
target: "system.metrics",
rss_bytes,
cpu_percent,
thread_count,
fd_count,
);
}
tracing::debug!("system metrics task shutting down");
}))
}
#[cfg(target_os = "linux")]
fn get_thread_count(proc_info: &sysinfo::Process) -> u64 {
proc_info.tasks().map_or(0, |tasks| tasks.len() as u64)
}
#[cfg(not(target_os = "linux"))]
fn get_thread_count(_proc_info: &sysinfo::Process) -> u64 {
0
}
#[cfg(target_os = "linux")]
fn get_fd_count(pid: sysinfo::Pid) -> u64 {
let fd_path = format!("/proc/{}/fd", pid.as_u32());
std::fs::read_dir(fd_path)
.map(|entries| entries.count() as u64)
.unwrap_or(0)
}
#[cfg(not(target_os = "linux"))]
fn get_fd_count(_pid: sysinfo::Pid) -> u64 {
0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn interval_zero_returns_none() {
let (_tx, rx) = tokio::sync::watch::channel(false);
assert!(
spawn_system_metrics_task(0, rx).is_none(),
"interval_secs=0 must return None"
);
}
#[tokio::test]
async fn interval_nonzero_returns_some_handle() {
let (_tx, rx) = tokio::sync::watch::channel(false);
let handle = spawn_system_metrics_task(1, rx);
assert!(
handle.is_some(),
"interval_secs=1 must return Some(JoinHandle)"
);
if let Some(h) = handle {
h.abort();
}
}
#[tokio::test]
async fn shutdown_via_watch_channel_terminates_task() {
let (tx, rx) = tokio::sync::watch::channel(false);
let handle = spawn_system_metrics_task(60, rx).expect("interval=60 must return Some");
tx.send(true).expect("shutdown send must succeed");
let result = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
assert!(
result.is_ok(),
"task must exit within 5s after shutdown signal"
);
assert!(result.unwrap().is_ok(), "task must not panic on shutdown");
}
}