#[cfg(any(not(target_os = "linux"), test))]
#[allow(unused_imports)]
use crate::core::constants::delays;
use crate::core::constants::system;
use crate::monitor::{
AggregatedMetrics, ChildProcessMetrics, Metrics, ProcessMetadata, ProcessTreeMetrics, Summary,
};
use std::fs::File;
use std::io::{self, BufRead, BufReader};
use std::path::Path;
use std::process::{Child, Command, Stdio};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use sysinfo::{self, Pid, ProcessRefreshKind, ProcessesToUpdate, System};
fn process_refresh_kind() -> ProcessRefreshKind {
ProcessRefreshKind::everything().without_tasks()
}
const DEFAULT_THREAD_COUNT: usize = 1;
#[cfg(any(target_os = "linux", test))]
const LINUX_PROC_DIR: &str = "/proc";
#[cfg(any(target_os = "linux", test))]
const LINUX_TASK_SUBDIR: &str = "/task";
pub(crate) fn get_thread_count(pid: usize) -> usize {
get_linux_thread_count(pid).unwrap_or(DEFAULT_THREAD_COUNT)
}
#[cfg(target_os = "linux")]
fn get_linux_thread_count(pid: usize) -> Option<usize> {
let task_dir = format!("{LINUX_PROC_DIR}/{pid}{LINUX_TASK_SUBDIR}");
std::fs::read_dir(task_dir)
.ok()
.map(|entries| entries.count())
}
#[cfg(not(target_os = "linux"))]
fn get_linux_thread_count(_pid: usize) -> Option<usize> {
None
}
#[cfg(target_os = "linux")]
fn read_syscall_io_bytes(pid: usize) -> Option<(u64, u64)> {
let content = std::fs::read_to_string(format!("{LINUX_PROC_DIR}/{pid}/io")).ok()?;
let mut rchar = None;
let mut wchar = None;
for line in content.lines() {
if let Some(v) = line.strip_prefix("rchar: ") {
rchar = v.trim().parse::<u64>().ok();
} else if let Some(v) = line.strip_prefix("wchar: ") {
wchar = v.trim().parse::<u64>().ok();
}
}
Some((rchar?, wchar?))
}
#[cfg(not(target_os = "linux"))]
fn read_syscall_io_bytes(_pid: usize) -> Option<(u64, u64)> {
None
}
#[cfg(target_os = "linux")]
fn read_page_faults(pid: usize) -> Option<(u64, u64)> {
let content = std::fs::read_to_string(format!("{LINUX_PROC_DIR}/{pid}/stat")).ok()?;
let rparen = content.rfind(')')?;
let rest = &content[rparen + 1..];
let fields: Vec<&str> = rest.split_whitespace().collect();
let minflt = fields.get(7)?.parse::<u64>().ok()?;
let majflt = fields.get(9)?.parse::<u64>().ok()?;
Some((minflt, majflt))
}
#[cfg(not(target_os = "linux"))]
fn read_page_faults(_pid: usize) -> Option<(u64, u64)> {
None
}
#[inline]
fn option_sub(cur: Option<u64>, base: Option<u64>) -> Option<u64> {
match (cur, base) {
(Some(c), Some(b)) => Some(c.saturating_sub(b)),
_ => None,
}
}
pub fn summary_from_json_file<P: AsRef<Path>>(path: P) -> io::Result<Summary> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let mut metrics_vec: Vec<AggregatedMetrics> = Vec::new();
let mut regular_metrics: Vec<Metrics> = Vec::new();
let mut first_timestamp: Option<u64> = None;
let mut last_timestamp: Option<u64> = None;
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
if let Ok(agg_metric) = serde_json::from_str::<AggregatedMetrics>(&line) {
if first_timestamp.is_none() {
first_timestamp = Some(agg_metric.ts_ms);
}
last_timestamp = Some(agg_metric.ts_ms);
metrics_vec.push(agg_metric);
} else if let Ok(tree_metrics) = serde_json::from_str::<ProcessTreeMetrics>(&line) {
if let Some(agg) = tree_metrics.aggregated {
if first_timestamp.is_none() {
first_timestamp = Some(agg.ts_ms);
}
last_timestamp = Some(agg.ts_ms);
metrics_vec.push(agg);
}
} else if let Ok(metric) = serde_json::from_str::<Metrics>(&line) {
if first_timestamp.is_none() {
first_timestamp = Some(metric.ts_ms);
}
last_timestamp = Some(metric.ts_ms);
regular_metrics.push(metric);
}
}
let elapsed_time = match (first_timestamp, last_timestamp) {
(Some(first), Some(last)) => (last - first) as f64 / 1000.0,
_ => 0.0,
};
if !metrics_vec.is_empty() {
Ok(Summary::from_aggregated_metrics(&metrics_vec, elapsed_time))
} else if !regular_metrics.is_empty() {
Ok(Summary::from_metrics(®ular_metrics, elapsed_time))
} else {
Ok(Summary::default()) }
}
#[derive(Debug, Clone)]
pub struct IoBaseline {
pub disk_read_bytes: u64,
pub disk_write_bytes: u64,
pub syscall_read_bytes: Option<u64>,
pub syscall_write_bytes: Option<u64>,
pub page_faults_cached: Option<u64>,
pub page_faults_disk: Option<u64>,
pub sys_net_rx_bytes: u64,
pub sys_net_tx_bytes: u64,
}
#[derive(Debug, Clone)]
pub struct ChildIoBaseline {
pub pid: usize,
pub disk_read_bytes: u64,
pub disk_write_bytes: u64,
pub syscall_read_bytes: Option<u64>,
pub syscall_write_bytes: Option<u64>,
pub page_faults_cached: Option<u64>,
pub page_faults_disk: Option<u64>,
pub sys_net_rx_bytes: u64,
pub sys_net_tx_bytes: u64,
}
#[derive(Debug)]
pub struct ProcessMonitor {
child: Option<Child>,
pid: usize,
sys: System,
base_interval: Duration,
max_interval: Duration,
start_time: Instant,
t0_ms: u64,
io_baseline: Option<IoBaseline>,
child_io_baselines: std::collections::HashMap<usize, ChildIoBaseline>,
since_process_start: bool,
include_children: bool,
enable_ebpf: bool,
debug_mode: bool,
#[cfg(feature = "ebpf")]
ebpf_tracker: Option<crate::ebpf::SyscallTracker>,
#[cfg(feature = "ebpf")]
offcpu_profiler: Option<crate::ebpf::OffCpuProfiler>,
last_refresh_time: Instant,
#[cfg(target_os = "linux")]
cpu_sampler: crate::cpu_sampler::CpuSampler,
#[cfg(feature = "gpu")]
gpu_monitor: crate::gpu::GpuMonitor,
}
pub type ProcessResult<T> = std::result::Result<T, std::io::Error>;
impl ProcessMonitor {
pub fn new(
cmd: Vec<String>,
base_interval: Duration,
max_interval: Duration,
) -> ProcessResult<Self> {
Self::new_with_options(cmd, base_interval, max_interval, false)
}
pub fn new_with_options(
cmd: Vec<String>,
base_interval: Duration,
max_interval: Duration,
since_process_start: bool,
) -> ProcessResult<Self> {
if cmd.is_empty() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Command cannot be empty",
));
}
let child = Command::new(&cmd[0])
.args(&cmd[1..])
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()?;
let pid = child.id();
let mut sys = System::new();
sys.refresh_cpu_all();
let now = Instant::now();
Ok(Self {
child: Some(child),
pid: pid.try_into().unwrap(),
sys,
base_interval,
max_interval,
start_time: now,
t0_ms: SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64,
include_children: true,
debug_mode: false,
io_baseline: None,
child_io_baselines: std::collections::HashMap::new(),
since_process_start,
enable_ebpf: false,
#[cfg(feature = "ebpf")]
ebpf_tracker: None,
#[cfg(feature = "ebpf")]
offcpu_profiler: None,
last_refresh_time: now,
#[cfg(target_os = "linux")]
cpu_sampler: crate::cpu_sampler::CpuSampler::new(),
#[cfg(feature = "gpu")]
gpu_monitor: crate::gpu::GpuMonitor::new(),
})
}
pub fn from_pid(
pid: usize,
base_interval: Duration,
max_interval: Duration,
) -> ProcessResult<Self> {
Self::from_pid_with_options(pid, base_interval, max_interval, false)
}
pub fn from_pid_with_options(
pid: usize,
base_interval: Duration,
max_interval: Duration,
since_process_start: bool,
) -> ProcessResult<Self> {
let mut sys = System::new();
sys.refresh_cpu_all();
let pid_sys = Pid::from_u32(pid as u32);
let mut retries = 3;
let mut process_found = false;
while retries > 0 && !process_found {
sys.refresh_processes_specifics(
ProcessesToUpdate::Some(&[pid_sys]),
true,
process_refresh_kind(),
);
if sys.process(pid_sys).is_some() {
process_found = true;
} else {
retries -= 1;
std::thread::sleep(system::PROCESS_DETECTION);
}
}
if !process_found {
return Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("Process with PID {pid} not found"),
));
}
let now = Instant::now();
Ok(Self {
child: None,
pid,
sys,
base_interval,
max_interval,
start_time: now,
t0_ms: SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64,
include_children: true,
debug_mode: false,
io_baseline: None,
child_io_baselines: std::collections::HashMap::new(),
since_process_start,
enable_ebpf: false,
#[cfg(feature = "ebpf")]
ebpf_tracker: None,
#[cfg(feature = "ebpf")]
offcpu_profiler: None,
last_refresh_time: now,
#[cfg(target_os = "linux")]
cpu_sampler: crate::cpu_sampler::CpuSampler::new(),
#[cfg(feature = "gpu")]
gpu_monitor: crate::gpu::GpuMonitor::new(),
})
}
pub fn set_debug_mode(&mut self, debug: bool) {
self.debug_mode = debug;
#[cfg(feature = "ebpf")]
unsafe {
crate::ebpf::debug::set_debug_mode(debug);
}
if debug {
log::info!("Debug mode enabled - verbose output will be shown");
}
}
#[cfg(feature = "ebpf")]
pub fn enable_ebpf(&mut self) -> crate::error::Result<()> {
if !self.enable_ebpf {
log::info!("Attempting to enable eBPF profiling");
if self.debug_mode {
println!("DEBUG: Attempting to enable eBPF profiling");
println!(
"DEBUG: Process monitor running with PID: {}",
std::process::id()
);
println!("DEBUG: Monitoring target PID: {}", self.pid);
println!("DEBUG: eBPF feature is enabled at compile time");
}
let mut pids = vec![self.pid as u32];
self.sys.refresh_processes_specifics(
ProcessesToUpdate::All,
true,
process_refresh_kind(),
);
if let Some(_parent_proc) = self.sys.process(Pid::from_u32(self.pid as u32)) {
for child_pid in self.sys.processes().keys() {
if let Some(child_proc) = self.sys.process(*child_pid) {
if let Some(parent_pid) = child_proc.parent() {
if parent_pid == Pid::from_u32(self.pid as u32) {
pids.push(child_pid.as_u32());
}
}
}
}
}
if self.debug_mode {
println!(
"DEBUG: Collected {} PIDs to monitor: {:?}",
pids.len(),
pids
);
}
log::info!("Collected {} PIDs to monitor", pids.len());
if self.debug_mode {
let readiness_check = std::process::Command::new("sh")
.arg("-c")
.arg("echo 'Checking eBPF prerequisites from process_monitor:'; \
echo -n 'Kernel version: '; uname -r; \
echo -n 'Debugfs mounted: '; mount | grep -q debugfs && echo 'YES' || echo 'NO'; \
echo -n 'Tracefs accessible: '; [ -d /sys/kernel/debug/tracing ] && echo 'YES' || echo 'NO';")
.output();
if let Ok(output) = readiness_check {
let report = String::from_utf8_lossy(&output.stdout);
println!("DEBUG: {}", report);
log::info!("eBPF readiness: {}", report);
}
}
match crate::ebpf::SyscallTracker::new(pids.clone()) {
Ok(tracker) => {
self.ebpf_tracker = Some(tracker);
self.enable_ebpf = true;
log::info!("✅ eBPF profiling successfully enabled");
if self.debug_mode {
println!("DEBUG: eBPF profiling successfully enabled");
}
}
Err(e) => {
log::warn!("Failed to enable eBPF: {}", e);
if self.debug_mode {
println!("DEBUG: Failed to enable eBPF: {}", e);
if let Ok(output) = std::process::Command::new("sh")
.arg("-c")
.arg("dmesg | grep -i bpf | tail -5")
.output()
{
let kernel_logs = String::from_utf8_lossy(&output.stdout);
if !kernel_logs.trim().is_empty() {
println!("DEBUG: Recent kernel BPF logs:\n{}", kernel_logs);
log::warn!("Recent kernel BPF logs:\n{}", kernel_logs);
}
}
}
return Err(e);
}
}
match crate::ebpf::OffCpuProfiler::new(pids) {
Ok(mut profiler) => {
if self.debug_mode {
profiler.enable_debug_mode();
}
self.offcpu_profiler = Some(profiler);
log::info!("Off-CPU profiler enabled");
}
Err(e) => {
log::warn!("Failed to enable off-CPU profiler: {}", e);
}
}
Ok(())
} else {
Ok(())
}
}
#[cfg(not(feature = "ebpf"))]
pub fn enable_ebpf(&mut self) -> crate::error::Result<()> {
log::warn!("eBPF feature not enabled at compile time");
if self.debug_mode {
println!(
"DEBUG: eBPF feature not enabled at compile time. Cannot enable eBPF profiling."
);
println!("DEBUG: To enable eBPF support, rebuild with: cargo build --features ebpf");
}
self.enable_ebpf = false;
Err(crate::error::DenetError::EbpfNotSupported(
"eBPF feature not enabled. Build with --features ebpf".to_string(),
))
}
pub fn adaptive_interval(&self) -> Duration {
let elapsed = self.start_time.elapsed().as_secs_f64();
let interval_secs = if elapsed < 1.0 {
self.base_interval.as_secs_f64()
} else if elapsed < 10.0 {
let t = (elapsed - 1.0) / 9.0; let base = self.base_interval.as_secs_f64();
let max = self.max_interval.as_secs_f64();
base + (max - base) * t
} else {
self.max_interval.as_secs_f64()
};
Duration::from_secs_f64(interval_secs)
}
pub fn sample_metrics(&mut self) -> Option<Metrics> {
let now = Instant::now();
self.last_refresh_time = now;
let pid = Pid::from_u32(self.pid as u32);
self.sys.refresh_processes_specifics(
ProcessesToUpdate::Some(&[pid]),
false,
process_refresh_kind(),
);
let process = self.sys.process(pid)?;
let mem_rss_kb = process.memory() / 1024;
let mem_vms_kb = process.virtual_memory() / 1024;
let current_disk_read = process.disk_usage().total_read_bytes;
let current_disk_write = process.disk_usage().total_written_bytes;
let current_syscall_io = read_syscall_io_bytes(self.pid);
let current_faults = read_page_faults(self.pid);
let thread_count = get_thread_count(self.pid);
let uptime_secs = process.run_time();
#[cfg(target_os = "linux")]
let cpu_usage = self.cpu_sampler.get_cpu_usage(self.pid).unwrap_or(0.0);
#[cfg(not(target_os = "linux"))]
let cpu_usage = {
let old_cpu_usage = process.cpu_usage();
let time_since_last_refresh = now.duration_since(self.last_refresh_time);
let _pid_copy = process.pid();
let _ = process; self.sys.refresh_cpu_all();
if time_since_last_refresh < delays::CPU_MEASUREMENT {
std::thread::sleep(delays::CPU_MEASUREMENT);
self.sys.refresh_cpu_all();
self.sys.refresh_processes_specifics(
ProcessesToUpdate::Some(&[pid]),
false,
process_refresh_kind(),
);
if let Some(updated_proc) = self.sys.process(pid) {
updated_proc.cpu_usage()
} else {
old_cpu_usage
}
} else {
old_cpu_usage
}
};
let current_net_rx = self.get_process_sys_net_rx_bytes();
let current_net_tx = self.get_process_sys_net_tx_bytes();
let cur_rchar = current_syscall_io.map(|(r, _)| r);
let cur_wchar = current_syscall_io.map(|(_, w)| w);
let cur_minflt = current_faults.map(|(m, _)| m);
let cur_majflt = current_faults.map(|(_, m)| m);
let (
disk_read_bytes,
disk_write_bytes,
syscall_read_bytes,
syscall_write_bytes,
page_faults_cached,
page_faults_disk,
sys_net_rx_bytes,
sys_net_tx_bytes,
) = if self.since_process_start {
(
current_disk_read,
current_disk_write,
cur_rchar,
cur_wchar,
cur_minflt,
cur_majflt,
current_net_rx,
current_net_tx,
)
} else if let Some(baseline) = self.io_baseline.as_ref() {
(
current_disk_read.saturating_sub(baseline.disk_read_bytes),
current_disk_write.saturating_sub(baseline.disk_write_bytes),
option_sub(cur_rchar, baseline.syscall_read_bytes),
option_sub(cur_wchar, baseline.syscall_write_bytes),
option_sub(cur_minflt, baseline.page_faults_cached),
option_sub(cur_majflt, baseline.page_faults_disk),
current_net_rx.saturating_sub(baseline.sys_net_rx_bytes),
current_net_tx.saturating_sub(baseline.sys_net_tx_bytes),
)
} else {
self.io_baseline = Some(IoBaseline {
disk_read_bytes: current_disk_read,
disk_write_bytes: current_disk_write,
syscall_read_bytes: cur_rchar,
syscall_write_bytes: cur_wchar,
page_faults_cached: cur_minflt,
page_faults_disk: cur_majflt,
sys_net_rx_bytes: current_net_rx,
sys_net_tx_bytes: current_net_tx,
});
(
0,
0,
cur_rchar.map(|_| 0),
cur_wchar.map(|_| 0),
cur_minflt.map(|_| 0),
cur_majflt.map(|_| 0),
0,
0,
)
};
let ts_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;
#[cfg(feature = "gpu")]
let gpu_metrics = if self.gpu_monitor.is_enabled() {
Some(self.gpu_monitor.sample_metrics(&[self.pid as u32]))
} else {
None
};
#[cfg(not(feature = "gpu"))]
let gpu_metrics = None;
Some(Metrics {
ts_ms,
cpu_usage,
mem_rss_kb,
mem_vms_kb,
disk_read_bytes,
disk_write_bytes,
syscall_read_bytes,
syscall_write_bytes,
page_faults_cached,
page_faults_disk,
sys_net_rx_bytes,
sys_net_tx_bytes,
thread_count,
uptime_secs,
cpu_core: Self::get_process_cpu_core(self.pid),
gpu: gpu_metrics,
})
}
pub fn is_running(&mut self) -> bool {
if let Some(child) = &mut self.child {
match child.try_wait() {
Ok(Some(_)) => false,
Ok(None) => true,
Err(_) => false,
}
} else {
let pid = Pid::from_u32(self.pid as u32);
self.sys.refresh_processes_specifics(
ProcessesToUpdate::Some(&[pid]),
false,
process_refresh_kind(),
);
if self.sys.process(pid).is_none() {
self.sys.refresh_processes_specifics(
ProcessesToUpdate::All,
true,
process_refresh_kind(),
);
std::thread::sleep(system::PROCESS_DETECTION);
}
self.sys.process(pid).is_some()
}
}
pub fn get_pid(&self) -> usize {
self.pid
}
pub fn set_include_children(&mut self, include_children: bool) -> &mut Self {
self.include_children = include_children;
self
}
pub fn get_include_children(&self) -> bool {
self.include_children
}
pub fn get_metadata(&mut self) -> Option<ProcessMetadata> {
let pid = Pid::from_u32(self.pid as u32);
self.sys.refresh_processes_specifics(
ProcessesToUpdate::Some(&[pid]),
false,
process_refresh_kind(),
);
if let Some(proc) = self.sys.process(pid) {
let cmd: Vec<String> = proc
.cmd()
.iter()
.map(|os_str| os_str.to_string_lossy().to_string())
.collect();
let executable = proc
.exe()
.map(|path| path.to_string_lossy().to_string())
.unwrap_or_default();
Some(ProcessMetadata {
pid: self.pid,
cmd,
executable,
t0_ms: self.t0_ms,
})
} else {
None
}
}
pub fn get_child_pids(&mut self) -> Vec<usize> {
self.sys
.refresh_processes_specifics(ProcessesToUpdate::All, true, process_refresh_kind());
let mut children = Vec::new();
self.find_children_recursive(self.pid, &mut children);
children
}
fn find_children_recursive(&self, parent_pid: usize, children: &mut Vec<usize>) {
let parent_pid_sys = Pid::from_u32(parent_pid as u32);
for (pid, process) in self.sys.processes() {
if let Some(ppid) = process.parent() {
if ppid == parent_pid_sys {
let child_pid = pid.as_u32() as usize;
children.push(child_pid);
self.find_children_recursive(child_pid, children);
}
}
}
}
pub fn sample_tree_metrics(&mut self) -> ProcessTreeMetrics {
let tree_ts_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;
let parent_metrics = self.sample_metrics();
let child_pids = self.get_child_pids();
let mut child_metrics = Vec::new();
for child_pid in child_pids.iter() {
let pid = Pid::from_u32(*child_pid as u32);
self.sys.refresh_processes_specifics(
ProcessesToUpdate::Some(&[pid]),
false,
process_refresh_kind(),
);
if let Some(proc) = self.sys.process(pid) {
let command = proc.name().to_string_lossy().to_string();
let current_disk_read = proc.disk_usage().total_read_bytes;
let current_disk_write = proc.disk_usage().total_written_bytes;
let current_syscall_io = read_syscall_io_bytes(*child_pid);
let current_faults = read_page_faults(*child_pid);
let cur_rchar = current_syscall_io.map(|(r, _)| r);
let cur_wchar = current_syscall_io.map(|(_, w)| w);
let cur_minflt = current_faults.map(|(m, _)| m);
let cur_majflt = current_faults.map(|(_, m)| m);
let current_net_rx = 0; let current_net_tx = 0;
let (
disk_read_bytes,
disk_write_bytes,
syscall_read_bytes,
syscall_write_bytes,
page_faults_cached,
page_faults_disk,
sys_net_rx_bytes,
sys_net_tx_bytes,
) = if self.since_process_start {
(
current_disk_read,
current_disk_write,
cur_rchar,
cur_wchar,
cur_minflt,
cur_majflt,
current_net_rx,
current_net_tx,
)
} else {
match self.child_io_baselines.entry(*child_pid) {
std::collections::hash_map::Entry::Vacant(e) => {
e.insert(ChildIoBaseline {
pid: *child_pid,
disk_read_bytes: current_disk_read,
disk_write_bytes: current_disk_write,
syscall_read_bytes: cur_rchar,
syscall_write_bytes: cur_wchar,
page_faults_cached: cur_minflt,
page_faults_disk: cur_majflt,
sys_net_rx_bytes: current_net_rx,
sys_net_tx_bytes: current_net_tx,
});
(
0,
0,
cur_rchar.map(|_| 0),
cur_wchar.map(|_| 0),
cur_minflt.map(|_| 0),
cur_majflt.map(|_| 0),
0,
0,
)
}
std::collections::hash_map::Entry::Occupied(e) => {
let baseline = e.get();
(
current_disk_read.saturating_sub(baseline.disk_read_bytes),
current_disk_write.saturating_sub(baseline.disk_write_bytes),
option_sub(cur_rchar, baseline.syscall_read_bytes),
option_sub(cur_wchar, baseline.syscall_write_bytes),
option_sub(cur_minflt, baseline.page_faults_cached),
option_sub(cur_majflt, baseline.page_faults_disk),
current_net_rx.saturating_sub(baseline.sys_net_rx_bytes),
current_net_tx.saturating_sub(baseline.sys_net_tx_bytes),
)
}
}
};
let child_ts_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;
#[cfg(target_os = "linux")]
let cpu_usage = self.cpu_sampler.get_cpu_usage(*child_pid).unwrap_or(0.0);
#[cfg(not(target_os = "linux"))]
let cpu_usage = proc.cpu_usage();
let metrics = Metrics {
ts_ms: child_ts_ms,
cpu_usage,
mem_rss_kb: proc.memory() / 1024,
mem_vms_kb: proc.virtual_memory() / 1024,
disk_read_bytes,
disk_write_bytes,
syscall_read_bytes,
syscall_write_bytes,
page_faults_cached,
page_faults_disk,
sys_net_rx_bytes,
sys_net_tx_bytes,
thread_count: get_thread_count(*child_pid),
uptime_secs: proc.run_time(),
cpu_core: Self::get_process_cpu_core(*child_pid),
gpu: None, };
child_metrics.push(ChildProcessMetrics {
pid: *child_pid,
command,
metrics,
});
}
}
#[cfg(target_os = "linux")]
{
let all_pids = std::iter::once(self.pid)
.chain(child_pids.iter().copied())
.collect::<Vec<_>>();
self.cpu_sampler.cleanup_stale_entries(&all_pids);
}
let aggregated = if let Some(ref parent) = parent_metrics {
let mut agg = AggregatedMetrics {
ts_ms: tree_ts_ms,
cpu_usage: parent.cpu_usage,
mem_rss_kb: parent.mem_rss_kb,
mem_vms_kb: parent.mem_vms_kb,
disk_read_bytes: parent.disk_read_bytes,
disk_write_bytes: parent.disk_write_bytes,
syscall_read_bytes: parent.syscall_read_bytes,
syscall_write_bytes: parent.syscall_write_bytes,
page_faults_cached: parent.page_faults_cached,
page_faults_disk: parent.page_faults_disk,
sys_net_rx_bytes: parent.sys_net_rx_bytes,
sys_net_tx_bytes: parent.sys_net_tx_bytes,
thread_count: parent.thread_count,
process_count: 1, uptime_secs: parent.uptime_secs,
ebpf: None, gpu: None, };
for child in &child_metrics {
agg.cpu_usage += child.metrics.cpu_usage;
agg.mem_rss_kb += child.metrics.mem_rss_kb;
agg.mem_vms_kb += child.metrics.mem_vms_kb;
agg.disk_read_bytes += child.metrics.disk_read_bytes;
agg.disk_write_bytes += child.metrics.disk_write_bytes;
if let Some(v) = child.metrics.syscall_read_bytes {
agg.syscall_read_bytes = Some(agg.syscall_read_bytes.unwrap_or(0) + v);
}
if let Some(v) = child.metrics.syscall_write_bytes {
agg.syscall_write_bytes = Some(agg.syscall_write_bytes.unwrap_or(0) + v);
}
if let Some(v) = child.metrics.page_faults_cached {
agg.page_faults_cached = Some(agg.page_faults_cached.unwrap_or(0) + v);
}
if let Some(v) = child.metrics.page_faults_disk {
agg.page_faults_disk = Some(agg.page_faults_disk.unwrap_or(0) + v);
}
agg.sys_net_rx_bytes += child.metrics.sys_net_rx_bytes;
agg.sys_net_tx_bytes += child.metrics.sys_net_tx_bytes;
agg.thread_count += child.metrics.thread_count;
agg.process_count += 1;
}
#[cfg(feature = "ebpf")]
if self.enable_ebpf {
if let Some(ref mut tracker) = self.ebpf_tracker {
let all_pids: Vec<u32> = std::iter::once(self.pid as u32)
.chain(child_pids.iter().map(|&pid| pid as u32))
.collect();
if let Err(e) = tracker.update_pids(all_pids.clone()) {
log::warn!("Failed to update eBPF PIDs: {}", e);
}
let mut ebpf_metrics = tracker.get_metrics();
#[cfg(feature = "ebpf")]
if let Some(ref mut syscalls) = ebpf_metrics.syscalls {
let elapsed_time = (tree_ts_ms - self.t0_ms) as f64 / 1000.0;
syscalls.analysis = Some(crate::ebpf::metrics::generate_syscall_analysis(
syscalls,
elapsed_time,
));
}
if let Some(ref mut profiler) = self.offcpu_profiler {
profiler.update_pids(all_pids.clone());
let stats = profiler.get_stats();
if !stats.is_empty() {
ebpf_metrics.offcpu = Some(build_offcpu_metrics(&stats));
}
}
agg.ebpf = Some(ebpf_metrics);
}
}
#[cfg(not(feature = "ebpf"))]
{
}
#[cfg(feature = "gpu")]
if self.gpu_monitor.is_enabled() {
let all_pids: Vec<u32> = std::iter::once(self.pid as u32)
.chain(child_pids.iter().map(|&pid| pid as u32))
.collect();
agg.gpu = Some(self.gpu_monitor.sample_metrics(&all_pids));
}
#[cfg(not(feature = "gpu"))]
{
}
Some(agg)
} else {
None
};
ProcessTreeMetrics {
ts_ms: tree_ts_ms,
parent: parent_metrics,
children: child_metrics,
aggregated,
}
}
fn get_process_sys_net_rx_bytes(&self) -> u64 {
#[cfg(target_os = "linux")]
{
self.get_linux_process_net_stats().0
}
#[cfg(not(target_os = "linux"))]
{
0 }
}
fn get_process_sys_net_tx_bytes(&self) -> u64 {
#[cfg(target_os = "linux")]
{
self.get_linux_process_net_stats().1
}
#[cfg(not(target_os = "linux"))]
{
0 }
}
#[cfg(target_os = "linux")]
fn get_linux_process_net_stats(&self) -> (u64, u64) {
let net_dev_path = format!("/proc/{}/net/dev", self.pid);
let net_stats = if std::path::Path::new(&net_dev_path).exists() {
self.parse_net_dev(&net_dev_path)
} else {
self.parse_net_dev("/proc/net/dev")
};
let mut total_rx = 0u64;
let mut total_tx = 0u64;
for (interface, (rx, tx)) in net_stats {
if interface != "lo" {
total_rx += rx;
total_tx += tx;
}
}
(total_rx, total_tx)
}
#[cfg(target_os = "linux")]
fn parse_net_dev(&self, path: &str) -> std::collections::HashMap<String, (u64, u64)> {
let mut stats = std::collections::HashMap::new();
if let Ok(mut file) = std::fs::File::open(path) {
let mut contents = String::new();
if std::io::Read::read_to_string(&mut file, &mut contents).is_ok() {
for line in contents.lines().skip(2) {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 10 {
if let Some(interface) = parts[0].strip_suffix(':') {
if let (Ok(rx_bytes), Ok(tx_bytes)) =
(parts[1].parse::<u64>(), parts[9].parse::<u64>())
{
stats.insert(interface.to_string(), (rx_bytes, tx_bytes));
}
}
}
}
}
}
stats
}
#[cfg(target_os = "linux")]
fn get_process_cpu_core(pid: usize) -> Option<u32> {
let stat_path = format!("/proc/{pid}/stat");
if let Ok(contents) = std::fs::read_to_string(&stat_path) {
if let Some(last_paren) = contents.rfind(')') {
let after_comm = &contents[last_paren + 1..];
let fields: Vec<&str> = after_comm.split_whitespace().collect();
if fields.len() > 36 {
if let Ok(cpu) = fields[36].parse::<u32>() {
return Some(cpu);
}
}
}
}
None
}
#[cfg(not(target_os = "linux"))]
fn get_process_cpu_core(_pid: usize) -> Option<u32> {
None }
#[cfg(feature = "gpu")]
pub fn get_gpu_summary(&self) -> crate::gpu::GpuSummary {
let current_metrics = self.gpu_monitor.sample_metrics(&[self.pid as u32]);
self.gpu_monitor.get_summary(&[current_metrics])
}
#[cfg(feature = "gpu")]
pub fn is_gpu_enabled(&self) -> bool {
self.gpu_monitor.is_enabled()
}
#[cfg(feature = "gpu")]
pub fn gpu_device_count(&self) -> u32 {
self.gpu_monitor.device_count()
}
}
#[cfg(feature = "ebpf")]
fn build_offcpu_metrics(
stats: &std::collections::HashMap<(u32, u32), crate::ebpf::OffCpuStats>,
) -> crate::ebpf::metrics::OffCpuMetrics {
use crate::ebpf::metrics::{OffCpuMetrics, ThreadOffCpuInfo, ThreadOffCpuStats};
let mut total_time_ns: u64 = 0;
let mut total_events: u64 = 0;
let mut max_time_ns: u64 = 0;
let mut min_time_ns: u64 = u64::MAX;
let mut thread_stats = std::collections::HashMap::new();
for ((pid, tid), s) in stats {
total_time_ns = total_time_ns.saturating_add(s.total_time_ns);
total_events = total_events.saturating_add(s.count);
if s.max_time_ns > max_time_ns {
max_time_ns = s.max_time_ns;
}
if s.min_time_ns > 0 && s.min_time_ns < min_time_ns {
min_time_ns = s.min_time_ns;
}
thread_stats.insert(
format!("{pid}:{tid}"),
ThreadOffCpuStats {
tid: *tid,
total_time_ns: s.total_time_ns,
count: s.count,
avg_time_ns: s.avg_time_ns,
max_time_ns: s.max_time_ns,
min_time_ns: s.min_time_ns,
},
);
}
let avg_time_ns = if total_events > 0 {
total_time_ns / total_events
} else {
0
};
let min_time_ns = if min_time_ns == u64::MAX {
0
} else {
min_time_ns
};
let mut top_blocking_threads: Vec<ThreadOffCpuInfo> = stats
.iter()
.map(|((pid, tid), s)| ThreadOffCpuInfo {
tid: *tid,
pid: *pid,
total_time_ms: s.total_time_ns as f64 / 1_000_000.0,
percentage: if total_time_ns > 0 {
s.total_time_ns as f64 / total_time_ns as f64 * 100.0
} else {
0.0
},
})
.collect();
top_blocking_threads.sort_by(|a, b| {
b.total_time_ms
.partial_cmp(&a.total_time_ms)
.unwrap_or(std::cmp::Ordering::Equal)
});
top_blocking_threads.truncate(10);
OffCpuMetrics {
total_time_ns,
total_events,
avg_time_ns,
max_time_ns,
min_time_ns,
thread_stats,
top_blocking_threads,
bottlenecks: vec![],
stack_traces: vec![],
stacks: None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::constants::{defaults, delays, sampling};
use std::thread;
struct ProcessTestFixture {
cmd: Vec<String>,
base_interval: Duration,
max_interval: Duration,
ready_timeout: Duration,
}
impl ProcessTestFixture {
fn new(cmd: Vec<String>) -> Self {
Self {
cmd,
base_interval: defaults::BASE_INTERVAL,
max_interval: defaults::MAX_INTERVAL,
ready_timeout: delays::STARTUP,
}
}
fn create_monitor(&self) -> Result<ProcessMonitor, std::io::Error> {
ProcessMonitor::new(self.cmd.clone(), self.base_interval, self.max_interval)
}
fn create_monitor_from_pid(&self, pid: usize) -> Result<ProcessMonitor, std::io::Error> {
ProcessMonitor::from_pid(pid, self.base_interval, self.max_interval)
}
fn create_and_verify_running(&self) -> Result<(ProcessMonitor, usize), std::io::Error> {
let mut monitor = self.create_monitor()?;
let pid = monitor.get_pid();
std::thread::sleep(delays::STANDARD);
if !self.wait_for_condition(|| monitor.is_running()) {
return Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"Process did not start or was not detected",
));
}
Ok((monitor, pid))
}
fn wait_for_condition<F>(&self, mut condition: F) -> bool
where
F: FnMut() -> bool,
{
let start = std::time::Instant::now();
let mut delay_ms = 1;
while start.elapsed() < self.ready_timeout {
if condition() {
return true;
}
std::thread::sleep(Duration::from_millis(delay_ms));
delay_ms = std::cmp::min(delay_ms * 2, 50);
}
false
}
}
fn create_test_monitor(cmd: Vec<String>) -> Result<ProcessMonitor, std::io::Error> {
ProcessTestFixture::new(cmd).create_monitor()
}
#[allow(dead_code)]
fn create_test_monitor_from_pid(pid: usize) -> Result<ProcessMonitor, std::io::Error> {
let fixture = ProcessTestFixture {
cmd: vec![],
base_interval: defaults::BASE_INTERVAL,
max_interval: defaults::MAX_INTERVAL,
ready_timeout: delays::STARTUP,
};
fixture.create_monitor_from_pid(pid)
}
#[test]
fn test_from_pid() {
let cmd = if cfg!(target_os = "windows") {
vec![
"powershell".to_string(),
"-Command".to_string(),
"Start-Sleep -Seconds 5".to_string(),
]
} else {
vec!["sleep".to_string(), "5".to_string()]
};
let fixture = ProcessTestFixture::new(cmd);
let (_, pid) = fixture.create_and_verify_running().unwrap();
let pid_monitor = fixture.create_monitor_from_pid(pid);
assert!(
pid_monitor.is_ok(),
"Should be able to attach to running process"
);
let mut pid_monitor = pid_monitor.unwrap();
assert!(
fixture.wait_for_condition(|| pid_monitor.is_running()),
"PID monitor should detect the running process"
);
}
#[test]
fn test_adaptive_interval() {
let cmd = vec!["sleep".to_string(), "10".to_string()];
let monitor = create_test_monitor(cmd).unwrap();
let base_interval = monitor.base_interval;
let initial = monitor.adaptive_interval();
assert!(initial >= base_interval);
assert!(initial <= base_interval * 2);
thread::sleep(Duration::from_secs(2));
let later = monitor.adaptive_interval();
assert!(later > initial); assert!(later <= monitor.max_interval); }
#[test]
fn test_is_running() {
let fixture = ProcessTestFixture::new(vec!["echo".to_string(), "hello".to_string()]);
let mut monitor = fixture.create_monitor().unwrap();
assert!(
fixture.wait_for_condition(|| !monitor.is_running()),
"Short-lived process should terminate"
);
let fixture = ProcessTestFixture {
cmd: vec!["sleep".to_string(), "2".to_string()], base_interval: defaults::BASE_INTERVAL,
max_interval: defaults::MAX_INTERVAL,
ready_timeout: Duration::from_secs(5), };
let (mut monitor, _) = fixture.create_and_verify_running().unwrap();
assert!(monitor.is_running(), "Process should be running initially");
assert!(
fixture.wait_for_condition(|| !monitor.is_running()),
"Process should terminate within the timeout period"
);
}
#[test]
fn test_metrics_collection() {
let cmd = if cfg!(target_os = "windows") {
vec![
"powershell".to_string(),
"-Command".to_string(),
"Start-Sleep -Seconds 3".to_string(),
]
} else {
vec!["sleep".to_string(), "3".to_string()]
};
let mut monitor = create_test_monitor(cmd).unwrap();
thread::sleep(delays::STARTUP);
let metrics = monitor.sample_metrics();
assert!(
metrics.is_some(),
"Should collect metrics from running process"
);
if let Some(m) = metrics {
assert!(
m.thread_count > 0,
"Process should have at least one thread"
);
if m.uptime_secs == 0 {
thread::sleep(Duration::from_secs(1));
if let Some(m2) = monitor.sample_metrics() {
println!("Process uptime after delay: {} seconds", m2.uptime_secs);
#[cfg(target_os = "linux")]
{
assert!(
m2.uptime_secs > 0,
"Process uptime should increase after delay on Linux"
);
}
}
} else {
println!("Process uptime: {} seconds", m.uptime_secs);
}
}
}
#[test]
#[cfg_attr(
not(target_os = "linux"),
ignore = "This test relies on Linux-specific process detection behavior"
)]
fn test_child_process_detection() {
let cmd = if cfg!(target_os = "windows") {
vec![
"cmd".to_string(),
"/C".to_string(),
"timeout 2 >nul & echo child".to_string(),
]
} else {
vec![
"sh".to_string(),
"-c".to_string(),
"sleep 2 & echo child".to_string(),
]
};
let mut monitor = create_test_monitor(cmd).unwrap();
thread::sleep(sampling::SLOW);
let children = monitor.get_child_pids();
assert!(
children.is_empty() || !children.is_empty(),
"Should return a list of child PIDs (possibly empty)"
);
}
#[test]
fn test_tree_metrics_structure() {
let cmd = vec!["sleep".to_string(), "1".to_string()];
let mut monitor = create_test_monitor(cmd).unwrap();
thread::sleep(sampling::STANDARD);
let tree_metrics = monitor.sample_tree_metrics();
assert!(tree_metrics.parent.is_some(), "Should have parent metrics");
assert!(
tree_metrics.aggregated.is_some(),
"Should have aggregated metrics"
);
#[cfg(not(target_os = "linux"))]
println!("Note: On non-Linux platforms, some process metrics may be limited");
if let Some(agg) = tree_metrics.aggregated {
assert!(
agg.process_count >= 1,
"Should count at least the parent process"
);
assert!(agg.thread_count > 0, "Should have at least one thread");
}
}
#[test]
#[cfg_attr(
not(target_os = "linux"),
ignore = "This test relies on Linux-specific process aggregation behavior"
)]
fn test_child_process_aggregation() {
let cmd = vec!["sleep".to_string(), "1".to_string()];
let mut monitor = create_test_monitor(cmd).unwrap();
thread::sleep(Duration::from_millis(100));
let tree_metrics = monitor.sample_tree_metrics();
if let (Some(parent), Some(agg)) = (tree_metrics.parent, tree_metrics.aggregated) {
assert!(
agg.cpu_usage >= parent.cpu_usage,
"Aggregated CPU should be >= parent CPU"
);
assert!(
agg.mem_rss_kb >= parent.mem_rss_kb,
"Aggregated memory should be >= parent memory"
);
assert!(
agg.thread_count >= parent.thread_count,
"Aggregated threads should be >= parent threads"
);
assert!(
agg.process_count >= 1,
"Should count at least the parent process"
);
}
}
#[test]
fn test_empty_process_tree() {
let cmd = vec!["sleep".to_string(), "1".to_string()];
let mut monitor = create_test_monitor(cmd).unwrap();
thread::sleep(Duration::from_millis(50));
let tree_metrics = monitor.sample_tree_metrics();
assert!(
tree_metrics.parent.is_some(),
"Should have parent metrics even with no children"
);
if let (Some(parent), Some(agg)) = (tree_metrics.parent, tree_metrics.aggregated) {
assert_eq!(
agg.process_count,
1 + tree_metrics.children.len(),
"Process count should be parent + actual children"
);
if tree_metrics.children.is_empty() {
assert_eq!(
agg.cpu_usage, parent.cpu_usage,
"CPU should match parent when no children"
);
assert_eq!(
agg.mem_rss_kb, parent.mem_rss_kb,
"Memory should match parent when no children"
);
assert_eq!(
agg.thread_count, parent.thread_count,
"Threads should match parent when no children"
);
}
}
}
#[test]
#[cfg_attr(
not(target_os = "linux"),
ignore = "This test relies on Linux-specific process tree detection"
)]
fn test_recursive_child_detection() {
let cmd = if cfg!(target_os = "windows") {
vec![
"cmd".to_string(),
"/C".to_string(),
"timeout 3 >nul & (timeout 2 >nul & timeout 1 >nul)".to_string(),
]
} else {
vec![
"sh".to_string(),
"-c".to_string(),
"sleep 3 & (sleep 2 & sleep 1 &)".to_string(),
]
};
let mut monitor = create_test_monitor(cmd).unwrap();
thread::sleep(Duration::from_millis(300));
let _children = monitor.get_child_pids();
let _children2 = monitor.get_child_pids();
}
#[test]
#[cfg_attr(
not(target_os = "linux"),
ignore = "This test relies on Linux-specific process monitoring behavior"
)]
fn test_child_process_lifecycle() {
let cmd = if cfg!(target_os = "windows") {
vec![
"cmd".to_string(),
"/C".to_string(),
"start /b ping 127.0.0.1 -n 3 >nul".to_string(),
]
} else {
vec![
"sh".to_string(),
"-c".to_string(),
"for i in 1 2 3; do sleep $i & done; sleep 0.5; wait".to_string(),
]
};
let mut monitor = create_test_monitor(cmd).unwrap();
monitor.set_include_children(true);
println!("Measuring baseline process count...");
let mut baseline_samples = Vec::new();
for i in 0..5 {
let metrics = monitor.sample_tree_metrics();
let count = metrics
.aggregated
.as_ref()
.map(|a| a.process_count)
.unwrap_or(1);
baseline_samples.push(count);
println!("Baseline sample {}: process count: {}", i + 1, count);
thread::sleep(Duration::from_millis(100));
}
let mut counts = std::collections::HashMap::new();
for &count in &baseline_samples {
*counts.entry(count).or_insert(0) += 1;
}
let baseline_count = counts
.into_iter()
.max_by_key(|&(_, count)| count)
.map(|(val, _)| val)
.unwrap_or(1);
println!("Established baseline process count: {}", baseline_count);
let mut max_count = baseline_count;
let mut min_count_after_max = usize::MAX;
let mut saw_increase = false;
let mut saw_decrease = false;
println!("Starting sampling to detect process lifecycle...");
for i in 0..15 {
thread::sleep(Duration::from_millis(200));
let metrics = monitor.sample_tree_metrics();
let count = metrics
.aggregated
.as_ref()
.map(|a| a.process_count)
.unwrap_or(1);
println!("Sample {}: process count: {}", i + 1, count);
if count > baseline_count && !saw_increase {
saw_increase = true;
println!(
"Detected process count increase: {} -> {}",
baseline_count, count
);
}
if count > max_count {
max_count = count;
}
if saw_increase && count < max_count {
saw_decrease = true;
min_count_after_max = min_count_after_max.min(count);
println!(
"Detected process count decrease: {} -> {}",
max_count, count
);
}
}
thread::sleep(Duration::from_millis(1000));
let final_metrics = monitor.sample_tree_metrics();
let final_count = final_metrics
.aggregated
.as_ref()
.map(|a| a.process_count)
.unwrap_or(1);
println!("Final process count: {}", final_count);
println!(
"Test summary: baseline={}, max={}, min_after_max={}, final={}",
baseline_count, max_count, min_count_after_max, final_count
);
if saw_increase {
println!("✓ Successfully detected process count increase");
} else {
println!("âš Did not detect any process count increase");
}
if saw_decrease {
println!("✓ Successfully detected process count decrease");
} else {
println!("âš Did not detect any process count decrease");
}
assert!(
max_count >= baseline_count,
"Process monitoring should detect at least the baseline count"
);
assert!(
final_metrics.aggregated.is_some(),
"Final aggregated metrics should exist"
);
}
#[test]
#[cfg_attr(
not(target_os = "linux"),
ignore = "This test relies on Linux-specific network I/O monitoring behavior"
)]
fn test_network_io_limitation_for_children() {
let cmd = if cfg!(target_os = "windows") {
vec![
"cmd".to_string(),
"/C".to_string(),
"timeout 1 >nul & echo test".to_string(),
]
} else {
vec![
"sh".to_string(),
"-c".to_string(),
"sleep 1 & echo test".to_string(),
]
};
let mut monitor = create_test_monitor(cmd).unwrap();
thread::sleep(Duration::from_millis(200));
let tree_metrics = monitor.sample_tree_metrics();
for child in &tree_metrics.children {
assert_eq!(
child.metrics.sys_net_rx_bytes, 0,
"Child network RX should be 0 (known limitation)"
);
assert_eq!(
child.metrics.sys_net_tx_bytes, 0,
"Child network TX should be 0 (known limitation)"
);
}
if let Some(parent) = tree_metrics.parent {
if let Some(agg) = tree_metrics.aggregated {
assert_eq!(
agg.sys_net_rx_bytes, parent.sys_net_rx_bytes,
"Aggregated network RX should equal parent (children are 0)"
);
assert_eq!(
agg.sys_net_tx_bytes, parent.sys_net_tx_bytes,
"Aggregated network TX should equal parent (children are 0)"
);
}
}
}
#[test]
fn test_aggregation_arithmetic() {
let cmd = vec!["sleep".to_string(), "2".to_string()];
let mut monitor = create_test_monitor(cmd).unwrap();
thread::sleep(Duration::from_millis(100));
let tree_metrics = monitor.sample_tree_metrics();
if let (Some(parent), Some(agg)) = (tree_metrics.parent, tree_metrics.aggregated) {
let expected_mem = parent.mem_rss_kb
+ tree_metrics
.children
.iter()
.map(|c| c.metrics.mem_rss_kb)
.sum::<u64>();
let expected_threads = parent.thread_count
+ tree_metrics
.children
.iter()
.map(|c| c.metrics.thread_count)
.sum::<usize>();
let expected_cpu = parent.cpu_usage
+ tree_metrics
.children
.iter()
.map(|c| c.metrics.cpu_usage)
.sum::<f32>();
let expected_processes = 1 + tree_metrics.children.len();
assert_eq!(
agg.mem_rss_kb, expected_mem,
"Memory aggregation should sum parent + children"
);
assert_eq!(
agg.thread_count, expected_threads,
"Thread aggregation should sum parent + children"
);
assert_eq!(
agg.process_count, expected_processes,
"Process count should be parent + children"
);
assert!(
(agg.cpu_usage - expected_cpu).abs() < 0.01,
"CPU aggregation should approximately sum parent + children"
);
}
}
#[test]
fn test_timestamp_functionality() {
use std::thread;
use std::time::{SystemTime, UNIX_EPOCH};
let cmd = vec!["sleep".to_string(), "2".to_string()];
let mut monitor = create_test_monitor(cmd).unwrap();
thread::sleep(Duration::from_millis(100));
let sample1 = monitor.sample_metrics().unwrap();
thread::sleep(Duration::from_millis(50));
let sample2 = monitor.sample_metrics().unwrap();
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
assert!(
sample1.ts_ms <= now_ms,
"Sample1 timestamp should not be in future"
);
assert!(
sample2.ts_ms <= now_ms,
"Sample2 timestamp should not be in future"
);
assert!(
now_ms - sample1.ts_ms < 60000,
"Sample1 timestamp should be recent"
);
assert!(
now_ms - sample2.ts_ms < 60000,
"Sample2 timestamp should be recent"
);
assert!(
sample2.ts_ms >= sample1.ts_ms,
"Timestamps should be monotonic"
);
let tree_metrics = monitor.sample_tree_metrics();
let now_ms2 = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
assert!(
tree_metrics.ts_ms <= now_ms2 + 1000,
"Tree timestamp should be reasonable"
);
if let Some(parent) = tree_metrics.parent {
assert!(
parent.ts_ms <= now_ms2 + 1000,
"Parent timestamp should be reasonable"
);
}
if let Some(agg) = tree_metrics.aggregated {
assert!(
agg.ts_ms <= now_ms2 + 1000,
"Aggregated timestamp should be reasonable"
);
}
}
#[test]
fn test_enhanced_memory_metrics() {
use std::thread;
use std::time::{SystemTime, UNIX_EPOCH};
let cmd = vec!["sleep".to_string(), "2".to_string()];
let mut monitor = create_test_monitor(cmd).unwrap();
thread::sleep(Duration::from_millis(200));
let mut metrics = monitor.sample_metrics().unwrap();
for _ in 0..5 {
if metrics.mem_rss_kb > 0 {
break;
}
thread::sleep(Duration::from_millis(100));
metrics = monitor.sample_metrics().unwrap();
}
if metrics.mem_rss_kb > 0 && metrics.mem_vms_kb > 0 {
assert!(
metrics.mem_vms_kb >= metrics.mem_rss_kb,
"Virtual memory should be >= RSS when both > 0"
);
}
let has_memory_data = metrics.mem_rss_kb > 0 || metrics.mem_vms_kb > 0;
if !has_memory_data {
println!("Warning: No memory data available from sysinfo - this can happen in test environments");
}
let metadata = monitor.get_metadata().unwrap();
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;
assert!(
metadata.t0_ms <= now_ms,
"Start time should not be in future"
);
assert!(
now_ms - metadata.t0_ms < 60000,
"Start time should be recent (within 60 seconds)"
);
let tree_metrics = monitor.sample_tree_metrics();
if let Some(parent) = tree_metrics.parent {
assert!(
parent.mem_vms_kb >= parent.mem_rss_kb,
"Parent VMS should be >= RSS"
);
}
if let Some(agg) = tree_metrics.aggregated {
assert!(
agg.mem_vms_kb >= agg.mem_rss_kb,
"Aggregated VMS should be >= RSS"
);
}
}
#[test]
fn test_process_metadata() {
use std::thread;
use std::time::{SystemTime, UNIX_EPOCH};
let cmd = vec!["sleep".to_string(), "2".to_string()];
let mut monitor = create_test_monitor(cmd).unwrap();
thread::sleep(Duration::from_millis(100));
let metadata = monitor.get_metadata().unwrap();
assert!(metadata.pid > 0, "PID should be positive");
assert!(!metadata.cmd.is_empty(), "Command should not be empty");
assert_eq!(
metadata.cmd[0], "sleep",
"First command arg should be 'sleep'"
);
assert!(
!metadata.executable.is_empty(),
"Executable path should not be empty"
);
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;
assert!(
metadata.t0_ms <= now_ms,
"Start time should not be in future"
);
assert!(
now_ms - metadata.t0_ms < 60000,
"Start time should be recent (within 60 seconds)"
);
let remainder = metadata.t0_ms % 1000;
println!("t0_ms: {}, remainder: {}", metadata.t0_ms, remainder);
let tree_metrics = monitor.sample_tree_metrics();
assert!(
tree_metrics.parent.is_some(),
"Tree should have parent metrics"
);
}
#[test]
#[cfg_attr(
not(target_os = "linux"),
ignore = "This test may be flaky on non-Linux platforms due to process timing"
)]
fn test_t0_ms_precision() {
use std::thread;
use std::time::{SystemTime, UNIX_EPOCH};
let before_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;
let cmd = vec!["sleep".to_string(), "1".to_string()];
let mut monitor = create_test_monitor(cmd).unwrap();
let after_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;
thread::sleep(Duration::from_millis(50));
let metadata = match monitor.get_metadata() {
Some(md) => md,
None => {
println!("Process exited before metadata could be collected, skipping test");
return;
}
};
assert!(
metadata.t0_ms > 1000000000000,
"t0_ms should be a reasonable Unix timestamp in milliseconds"
);
assert!(
metadata.t0_ms >= before_ms,
"t0_ms should be after we started creating the monitor"
);
assert!(
metadata.t0_ms <= after_ms,
"t0_ms should be before we finished creating the monitor"
);
let remainder = metadata.t0_ms % 1000;
println!("t0_ms: {}, remainder: {}", metadata.t0_ms, remainder);
assert!(
metadata.t0_ms <= after_ms + 1000,
"t0_ms should be close to creation time (within 1 second tolerance)"
);
assert!(
metadata.t0_ms > 0,
"t0_ms should be a valid positive timestamp"
);
}
#[test]
fn test_summary_from_json_file_function() {
use std::io::Write;
use tempfile::NamedTempFile;
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(
temp_file,
r#"{{"ts_ms":1000,"cpu_usage":25.0,"mem_rss_kb":1024,"mem_vms_kb":2048,"disk_read_bytes":0,"disk_write_bytes":0,"sys_net_rx_bytes":0,"sys_net_tx_bytes":0,"thread_count":2,"process_count":1,"uptime_secs":10,"ebpf":null}}"#
).unwrap();
writeln!(
temp_file,
r#"{{"ts_ms":2000,"cpu_usage":50.0,"mem_rss_kb":1536,"mem_vms_kb":3072,"disk_read_bytes":100,"disk_write_bytes":200,"sys_net_rx_bytes":50,"sys_net_tx_bytes":75,"thread_count":3,"process_count":2,"uptime_secs":15,"ebpf":null}}"#
).unwrap();
temp_file.flush().unwrap();
let summary = summary_from_json_file(temp_file.path()).unwrap();
assert_eq!(summary.sample_count, 2);
assert_eq!(summary.total_time_secs, 1.0); }
#[test]
fn test_summary_from_json_file_with_tree_metrics() {
use std::io::Write;
use tempfile::NamedTempFile;
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(
temp_file,
r#"{{"ts_ms":1000,"parent":null,"children":[],"aggregated":{{"ts_ms":1000,"cpu_usage":30.0,"mem_rss_kb":1024,"mem_vms_kb":2048,"disk_read_bytes":0,"disk_write_bytes":0,"sys_net_rx_bytes":0,"sys_net_tx_bytes":0,"thread_count":1,"process_count":1,"uptime_secs":5,"ebpf":null}}}}"#
).unwrap();
temp_file.flush().unwrap();
let summary = summary_from_json_file(temp_file.path()).unwrap();
assert_eq!(summary.sample_count, 1);
assert_eq!(summary.avg_cpu_usage, 30.0);
}
#[test]
fn test_summary_from_json_file_with_regular_metrics() {
use std::io::Write;
use tempfile::NamedTempFile;
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(
temp_file,
r#"{{"ts_ms":1000,"cpu_usage":40.0,"mem_rss_kb":512,"mem_vms_kb":1024,"disk_read_bytes":0,"disk_write_bytes":0,"sys_net_rx_bytes":0,"sys_net_tx_bytes":0,"thread_count":1,"uptime_secs":8,"cpu_core":null}}"#
).unwrap();
temp_file.flush().unwrap();
let summary = summary_from_json_file(temp_file.path()).unwrap();
assert_eq!(summary.sample_count, 1);
assert_eq!(summary.avg_cpu_usage, 40.0);
}
#[test]
fn test_summary_from_json_file_empty() {
use tempfile::NamedTempFile;
let temp_file = NamedTempFile::new().unwrap();
let summary = summary_from_json_file(temp_file.path()).unwrap();
assert_eq!(summary.sample_count, 0);
assert_eq!(summary.total_time_secs, 0.0);
}
#[test]
fn test_summary_from_json_file_with_empty_lines() {
use std::io::Write;
use tempfile::NamedTempFile;
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file).unwrap(); writeln!(temp_file, " ").unwrap(); writeln!(
temp_file,
r#"{{"ts_ms":1000,"cpu_usage":35.0,"mem_rss_kb":768,"mem_vms_kb":1536,"disk_read_bytes":0,"disk_write_bytes":0,"sys_net_rx_bytes":0,"sys_net_tx_bytes":0,"thread_count":1,"uptime_secs":12,"cpu_core":null}}"#
).unwrap();
writeln!(temp_file, "invalid json line").unwrap(); temp_file.flush().unwrap();
let summary = summary_from_json_file(temp_file.path()).unwrap();
assert_eq!(summary.sample_count, 1);
assert_eq!(summary.avg_cpu_usage, 35.0);
}
#[test]
fn test_get_thread_count_functions() {
let current_pid = std::process::id() as usize;
let thread_count = get_thread_count(current_pid);
assert!(thread_count >= 1);
let invalid_thread_count = get_thread_count(999999);
assert_eq!(invalid_thread_count, DEFAULT_THREAD_COUNT);
}
#[test]
fn test_io_baseline_and_child_io_baseline() {
let baseline = IoBaseline {
disk_read_bytes: 100,
disk_write_bytes: 200,
syscall_read_bytes: None,
syscall_write_bytes: None,
page_faults_cached: None,
page_faults_disk: None,
sys_net_rx_bytes: 50,
sys_net_tx_bytes: 75,
};
let child_baseline = ChildIoBaseline {
pid: 123,
disk_read_bytes: 300,
disk_write_bytes: 400,
syscall_read_bytes: None,
syscall_write_bytes: None,
page_faults_cached: None,
page_faults_disk: None,
sys_net_rx_bytes: 150,
sys_net_tx_bytes: 175,
};
let baseline_clone = baseline.clone();
let child_baseline_clone = child_baseline.clone();
assert_eq!(baseline.disk_read_bytes, baseline_clone.disk_read_bytes);
assert_eq!(child_baseline.pid, child_baseline_clone.pid);
let debug_str = format!("{:?}", baseline);
assert!(debug_str.contains("IoBaseline"));
let _child_debug_str = format!("{:?}", child_baseline);
assert!(debug_str.contains("IoBaseline"));
}
#[test]
fn test_process_monitor_from_pid() {
use crate::core::constants::sampling;
let current_pid = std::process::id() as usize;
let mut monitor =
ProcessMonitor::from_pid(current_pid, sampling::STANDARD, sampling::MAX_ADAPTIVE)
.unwrap();
assert_eq!(monitor.get_pid(), current_pid);
assert!(monitor.is_running());
assert!(monitor.child.is_none()); }
#[test]
fn test_process_monitor_from_pid_with_options() {
use crate::core::constants::sampling;
let current_pid = std::process::id() as usize;
let mut monitor = ProcessMonitor::from_pid_with_options(
current_pid,
sampling::STANDARD,
sampling::MAX_ADAPTIVE,
true, )
.unwrap();
assert_eq!(monitor.get_pid(), current_pid);
assert!(monitor.since_process_start);
assert!(monitor.is_running());
}
#[test]
fn test_process_monitor_new_with_empty_command() {
use crate::core::constants::sampling;
let result = ProcessMonitor::new(vec![], sampling::STANDARD, sampling::MAX_ADAPTIVE);
assert!(result.is_err());
}
#[test]
fn test_process_monitor_new_with_options_empty_command() {
use crate::core::constants::sampling;
let result = ProcessMonitor::new_with_options(
vec![],
sampling::STANDARD,
sampling::MAX_ADAPTIVE,
false,
);
assert!(result.is_err());
}
#[test]
fn test_process_monitor_setters() {
use crate::core::constants::sampling;
let current_pid = std::process::id() as usize;
let mut monitor = ProcessMonitor::from_pid_with_options(
current_pid,
sampling::STANDARD,
sampling::MAX_ADAPTIVE,
false,
)
.unwrap();
monitor.set_include_children(false);
monitor.set_debug_mode(true);
let _result = monitor.enable_ebpf();
assert_eq!(monitor.get_pid(), current_pid);
}
#[test]
fn test_process_monitor_invalid_pid() {
use crate::core::constants::sampling;
let result = ProcessMonitor::from_pid(
999999, sampling::STANDARD,
sampling::MAX_ADAPTIVE,
);
assert!(result.is_err());
}
#[test]
fn test_process_monitor_sample_metrics_invalid_process() {
use crate::core::constants::sampling;
let mut monitor =
ProcessMonitor::new(vec!["true".to_string()], sampling::FAST, sampling::STANDARD)
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(100));
let _metrics = monitor.sample_metrics();
}
#[test]
fn test_process_monitor_debug_traits() {
use crate::core::constants::sampling;
let current_pid = std::process::id() as usize;
let monitor =
ProcessMonitor::from_pid(current_pid, sampling::STANDARD, sampling::MAX_ADAPTIVE)
.unwrap();
let debug_str = format!("{:?}", monitor);
assert!(debug_str.contains("ProcessMonitor"));
}
#[cfg(target_os = "linux")]
#[test]
fn test_get_linux_thread_count() {
let current_pid = std::process::id() as usize;
let thread_count = get_linux_thread_count(current_pid);
assert!(thread_count.is_some());
assert!(thread_count.unwrap() >= 1);
let invalid_thread_count = get_linux_thread_count(999999);
assert!(invalid_thread_count.is_none());
}
#[cfg(not(target_os = "linux"))]
#[test]
fn test_get_linux_thread_count_non_linux() {
let current_pid = std::process::id() as usize;
let thread_count = get_linux_thread_count(current_pid);
assert!(thread_count.is_none());
}
#[test]
fn test_process_result_type_alias() {
let success: ProcessResult<i32> = Ok(42);
match success {
Ok(v) => assert_eq!(v, 42),
Err(_) => panic!("Expected Ok but got Err"),
}
let error: ProcessResult<i32> = Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"test error",
));
assert!(error.is_err());
}
#[test]
fn test_constants() {
assert_eq!(DEFAULT_THREAD_COUNT, 1);
assert_eq!(LINUX_PROC_DIR, "/proc");
assert_eq!(LINUX_TASK_SUBDIR, "/task");
}
#[test]
fn test_summary_from_json_file_nonexistent() {
let result = summary_from_json_file("/nonexistent/path/file.json");
assert!(result.is_err());
}
#[test]
fn test_process_monitor_with_short_lived_process() {
use crate::core::constants::sampling;
let mut monitor = ProcessMonitor::new(
vec!["sleep".to_string(), "0.1".to_string()],
sampling::FAST,
sampling::STANDARD,
)
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(20));
let _initial_state = monitor.is_running();
std::thread::sleep(std::time::Duration::from_millis(150));
let _still_running = monitor.is_running();
let _final_metrics = monitor.sample_metrics();
}
#[test]
fn test_get_include_children_functionality() {
use crate::core::constants::sampling;
let current_pid = std::process::id() as usize;
let mut monitor = ProcessMonitor::from_pid_with_options(
current_pid,
sampling::STANDARD,
sampling::MAX_ADAPTIVE,
false,
)
.unwrap();
assert!(monitor.get_include_children());
monitor.set_include_children(true);
assert!(monitor.get_include_children());
monitor.set_include_children(false);
assert!(!monitor.get_include_children());
}
#[test]
fn test_debug_logging_functionality() {
use crate::core::constants::sampling;
let current_pid = std::process::id() as usize;
let mut monitor = ProcessMonitor::from_pid_with_options(
current_pid,
sampling::STANDARD,
sampling::MAX_ADAPTIVE,
false,
)
.unwrap();
monitor.set_debug_mode(false);
monitor.set_debug_mode(true);
let _result = monitor.enable_ebpf();
}
#[test]
fn test_process_attachment_scenarios() {
use crate::core::constants::sampling;
let current_pid = std::process::id() as usize;
let mut monitor =
ProcessMonitor::from_pid(current_pid, sampling::FAST, sampling::STANDARD).unwrap();
assert!(monitor.is_running());
assert!(monitor.is_running());
assert!(monitor.is_running());
let _metrics = monitor.sample_metrics();
}
#[test]
fn test_summary_from_json_file_regular_metrics() {
use std::fs::File;
use std::io::Write;
use tempfile::tempdir;
let dir = tempdir().unwrap();
let file_path = dir.path().join("regular_metrics.json");
let mut file = File::create(&file_path).unwrap();
writeln!(
file,
r#"{{"ts_ms":1000000,"cpu_usage":50.0,"mem_rss_kb":102400,"mem_vms_kb":204800,"disk_read_bytes":0,"disk_write_bytes":0,"sys_net_rx_bytes":0,"sys_net_tx_bytes":0,"thread_count":1,"uptime_secs":10,"cpu_core":0}}"#
).unwrap();
writeln!(
file,
r#"{{"ts_ms":1005000,"cpu_usage":60.0,"mem_rss_kb":112640,"mem_vms_kb":225280,"disk_read_bytes":100,"disk_write_bytes":200,"sys_net_rx_bytes":50,"sys_net_tx_bytes":75,"thread_count":2,"uptime_secs":15,"cpu_core":1}}"#
).unwrap();
let summary = summary_from_json_file(&file_path).unwrap();
assert!(summary.total_time_secs >= 5.0); assert!(summary.sample_count > 0);
}
#[test]
fn test_adaptive_interval_edge_cases() {
use crate::core::constants::sampling;
let mut monitor = ProcessMonitor::new(
vec!["echo".to_string(), "test".to_string()],
sampling::FAST,
sampling::STANDARD,
)
.unwrap();
let interval1 = monitor.adaptive_interval();
assert!(interval1.as_millis() > 0);
monitor.start_time = std::time::Instant::now() - std::time::Duration::from_secs(15);
let interval2 = monitor.adaptive_interval();
assert!(interval2.as_millis() > 0);
monitor.start_time = std::time::Instant::now() - std::time::Duration::from_secs(100);
let interval3 = monitor.adaptive_interval();
assert_eq!(interval3, sampling::STANDARD);
}
#[test]
fn test_process_metadata_edge_cases() {
use crate::core::constants::sampling;
let current_pid = std::process::id() as usize;
let mut monitor =
ProcessMonitor::from_pid(current_pid, sampling::STANDARD, sampling::MAX_ADAPTIVE)
.unwrap();
let metadata = monitor.get_metadata();
assert!(metadata.is_some());
if let Some(meta) = metadata {
assert!(meta.pid > 0);
assert!(!meta.cmd.is_empty());
assert!(!meta.executable.is_empty());
}
}
#[test]
fn test_tree_metrics_collection_edge_cases() {
use crate::core::constants::sampling;
let mut monitor = ProcessMonitor::new(
vec!["sleep".to_string(), "0.1".to_string()],
sampling::FAST,
sampling::STANDARD,
)
.unwrap();
monitor.set_include_children(true);
std::thread::sleep(std::time::Duration::from_millis(50));
let _tree_metrics = monitor.sample_tree_metrics();
std::thread::sleep(std::time::Duration::from_millis(200));
let _tree_metrics2 = monitor.sample_tree_metrics();
}
#[test]
#[cfg_attr(
not(target_os = "linux"),
ignore = "This test relies on Linux-specific process discovery behavior"
)]
fn test_child_process_discovery() {
use crate::core::constants::sampling;
let current_pid = std::process::id() as usize;
let mut monitor =
ProcessMonitor::from_pid(current_pid, sampling::STANDARD, sampling::MAX_ADAPTIVE)
.unwrap();
let child_pids = monitor.get_child_pids();
assert!(child_pids.is_empty() || !child_pids.is_empty());
}
#[test]
fn test_network_io_functions() {
use crate::core::constants::sampling;
let current_pid = std::process::id() as usize;
let monitor =
ProcessMonitor::from_pid(current_pid, sampling::STANDARD, sampling::MAX_ADAPTIVE)
.unwrap();
let _rx_bytes = monitor.get_process_sys_net_rx_bytes();
let _tx_bytes = monitor.get_process_sys_net_tx_bytes();
let _rx_bytes_again = monitor.get_process_sys_net_rx_bytes();
let _tx_bytes_again = monitor.get_process_sys_net_tx_bytes();
}
#[test]
fn test_process_monitor_comprehensive_functionality() {
use crate::core::constants::sampling;
let mut monitor = ProcessMonitor::new_with_options(
vec!["echo".to_string(), "comprehensive_test".to_string()],
sampling::FAST,
sampling::STANDARD,
true, )
.unwrap();
monitor.set_include_children(true);
monitor.set_debug_mode(true);
let _ebpf_result = monitor.enable_ebpf();
assert!(monitor.get_include_children());
let pid = monitor.get_pid();
assert!(pid > 0);
std::thread::sleep(std::time::Duration::from_millis(10));
let _is_running = monitor.is_running();
let _metrics = monitor.sample_metrics();
let _tree_metrics = monitor.sample_tree_metrics();
let _child_pids = monitor.get_child_pids();
let _metadata = monitor.get_metadata();
let _interval = monitor.adaptive_interval();
}
#[test]
#[cfg(target_os = "linux")]
fn test_is_running_pid_monitor_after_process_exits() {
let mut child = std::process::Command::new("sleep")
.arg("10")
.spawn()
.expect("failed to spawn sleep");
let pid = child.id() as usize;
let mut monitor =
ProcessMonitor::from_pid(pid, Duration::from_millis(100), Duration::from_millis(500))
.expect("failed to create pid-based monitor");
let _ = child.kill();
let _ = child.wait();
std::thread::sleep(Duration::from_millis(50));
let _ = monitor.get_child_pids();
assert!(
!monitor.is_running(),
"pid-based monitor must report process as not running after exit"
);
}
}