#[allow(unused_imports)]
use crate::error::{Error, Result};
#[cfg(feature = "metrics")]
use crate::metrics::MetricsCollector;
use arc_swap::ArcSwap;
use std::collections::VecDeque;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
#[allow(unused_imports)]
use async_std::task::JoinHandle as AsyncJoinHandle;
#[cfg(not(any(feature = "tokio", feature = "async-std")))]
#[cfg(feature = "tokio")]
#[allow(unused_imports)]
use tokio::task::JoinHandle;
#[cfg(feature = "tokio")]
#[allow(unused_imports)]
use tokio::time;
#[cfg(target_os = "linux")]
use std::fs::File;
#[cfg(target_os = "linux")]
use std::io::{BufRead, BufReader};
#[cfg(target_os = "linux")]
use std::num::NonZeroI64;
#[cfg(target_os = "macos")]
use std::process::Command;
#[cfg(all(target_os = "windows", feature = "windows-monitoring"))]
use windows::Win32::System::Diagnostics::ToolHelp::{
CreateToolhelp32Snapshot, Thread32First, Thread32Next, TH32CS_SNAPTHREAD, THREADENTRY32,
};
#[cfg(all(target_os = "windows", feature = "windows-monitoring"))]
use windows::Win32::System::ProcessStatus::{GetProcessMemoryInfo, PROCESS_MEMORY_COUNTERS};
#[cfg(all(target_os = "windows", feature = "windows-monitoring"))]
use windows::Win32::System::Threading::{GetProcessTimes, OpenProcess, PROCESS_QUERY_INFORMATION};
#[cfg(all(target_os = "windows", feature = "windows-monitoring"))]
use windows::Win32::Foundation::{CloseHandle, FILETIME};
#[derive(Debug, Clone)]
pub struct ResourceUsage {
timestamp: Instant,
memory_bytes: u64,
cpu_percent: f64,
thread_count: u32,
}
#[derive(Debug, Clone)]
pub enum Alert {
MemorySoftLimit {
limit_bytes: u64,
current_bytes: u64,
},
}
impl ResourceUsage {
#[must_use]
pub fn new(memory_bytes: u64, cpu_percent: f64, thread_count: u32) -> Self {
Self {
timestamp: Instant::now(),
memory_bytes,
cpu_percent,
thread_count,
}
}
#[must_use]
pub const fn memory_bytes(&self) -> u64 {
self.memory_bytes
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn memory_mb(&self) -> f64 {
self.memory_bytes as f64 / 1_048_576.0
}
#[must_use]
pub const fn cpu_percent(&self) -> f64 {
self.cpu_percent
}
#[must_use]
pub const fn thread_count(&self) -> u32 {
self.thread_count
}
#[must_use]
pub fn age(&self) -> Duration {
self.timestamp.elapsed()
}
}
#[allow(dead_code)]
pub struct ResourceTracker {
sample_interval: Duration,
current_usage: Arc<ArcSwap<ResourceUsage>>,
history: Arc<RwLock<VecDeque<ResourceUsage>>>,
max_history: usize,
#[cfg(feature = "tokio")]
task_handle: Option<tokio::task::JoinHandle<()>>,
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
task_handle: Option<async_std::task::JoinHandle<()>>,
#[cfg(not(any(feature = "tokio", feature = "async-std")))]
task_handle: Option<std::thread::JoinHandle<()>>,
pid: u32,
memory_soft_limit_bytes: Option<u64>,
#[allow(clippy::type_complexity)]
on_alert: Option<Arc<dyn Fn(Alert) + Send + Sync + 'static>>,
#[cfg(feature = "metrics")]
metrics: Option<MetricsCollector>,
}
impl ResourceTracker {
#[must_use]
pub fn new(sample_interval: Duration) -> Self {
let initial_usage = ResourceUsage::new(0, 0.0, 0);
let current_pid = std::process::id();
Self {
sample_interval,
current_usage: Arc::new(ArcSwap::from_pointee(initial_usage)),
history: Arc::new(RwLock::new(VecDeque::new())),
max_history: 60, task_handle: None,
pid: current_pid,
memory_soft_limit_bytes: None,
on_alert: None,
#[cfg(feature = "metrics")]
metrics: None,
}
}
#[must_use]
pub const fn with_pid(mut self, pid: u32) -> Self {
self.pid = pid;
self
}
#[must_use]
pub const fn with_max_history(mut self, max_entries: usize) -> Self {
self.max_history = max_entries;
self
}
#[must_use]
pub const fn with_memory_soft_limit_bytes(mut self, bytes: u64) -> Self {
self.memory_soft_limit_bytes = Some(bytes);
self
}
#[must_use]
pub fn with_alert_handler<F>(mut self, f: F) -> Self
where
F: Fn(Alert) + Send + Sync + 'static,
{
self.on_alert = Some(Arc::new(f));
self
}
#[cfg(feature = "metrics")]
#[must_use]
pub fn with_metrics(mut self, metrics: MetricsCollector) -> Self {
self.metrics = Some(metrics);
self
}
#[must_use]
pub fn with_alert_to_tracing(mut self) -> Self {
self.on_alert = Some(Arc::new(|alert| match alert {
Alert::MemorySoftLimit {
limit_bytes,
current_bytes,
} => {
tracing::warn!(
target: "proc_daemon::resources",
limit_bytes,
current_bytes,
"Resource alert: soft memory limit exceeded"
);
}
}));
self
}
#[cfg(all(feature = "tokio", not(feature = "async-std")))]
pub fn start(&mut self) -> Result<()> {
if self.task_handle.is_some() {
return Ok(()); }
let sample_interval = self.sample_interval;
let usage_history = Arc::clone(&self.history);
let current_usage = Arc::clone(&self.current_usage);
let pid = self.pid;
let max_history = self.max_history;
let memory_soft_limit_bytes = self.memory_soft_limit_bytes;
let on_alert = self.on_alert.clone();
#[cfg(feature = "metrics")]
let metrics = self.metrics.clone();
let handle = tokio::spawn(async move {
let mut interval_timer = time::interval(sample_interval);
let mut last_cpu_time = 0.0;
let mut last_timestamp = Instant::now();
#[cfg(feature = "metrics")]
let mut last_tick = Instant::now();
loop {
interval_timer.tick().await;
#[cfg(feature = "metrics")]
let tick_now = Instant::now();
if let Ok(usage) =
Self::sample_resource_usage(pid, &mut last_cpu_time, &mut last_timestamp)
{
current_usage.store(Arc::new(usage.clone()));
{
let mut hist = usage_history.write().unwrap();
hist.push_back(usage.clone());
while hist.len() > max_history {
hist.pop_front();
}
drop(hist); }
if let Some(limit) = memory_soft_limit_bytes {
if usage.memory_bytes() > limit {
if let Some(cb) = on_alert.as_ref() {
cb(Alert::MemorySoftLimit {
limit_bytes: limit,
current_bytes: usage.memory_bytes(),
});
}
}
}
#[cfg(feature = "metrics")]
if let Some(m) = metrics.as_ref() {
m.set_gauge("proc.memory_bytes", usage.memory_bytes());
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let cpu_milli = (usage.cpu_percent() * 1000.0).max(0.0).round() as u64;
m.set_gauge("proc.cpu_milli_percent", cpu_milli);
m.set_gauge("proc.thread_count", u64::from(usage.thread_count()));
m.increment_counter("proc.samples_total", 1);
m.record_histogram(
"proc.sample_interval",
tick_now.saturating_duration_since(last_tick),
);
last_tick = tick_now;
}
}
}
});
self.task_handle = Some(handle);
Ok(())
}
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
#[allow(clippy::missing_errors_doc)]
pub fn start(&mut self) -> Result<()> {
if self.task_handle.is_some() {
return Ok(()); }
let sample_interval = self.sample_interval;
let usage_history = Arc::clone(&self.history);
let current_usage = Arc::clone(&self.current_usage);
let pid = self.pid;
let max_history = self.max_history; let memory_soft_limit_bytes = self.memory_soft_limit_bytes;
let on_alert = self.on_alert.clone();
#[cfg(feature = "metrics")]
let metrics = self.metrics.clone();
let handle = async_std::task::spawn(async move {
let mut last_cpu_time = 0.0;
let mut last_timestamp = Instant::now();
#[cfg(feature = "metrics")]
let mut last_tick = Instant::now();
loop {
async_std::task::sleep(sample_interval).await;
#[cfg(feature = "metrics")]
let tick_now = Instant::now();
if let Ok(usage) =
Self::sample_resource_usage(pid, &mut last_cpu_time, &mut last_timestamp)
{
let new_arc = Arc::new(usage.clone());
current_usage.store(new_arc);
{
let mut hist = usage_history.write().unwrap();
hist.push_back(usage.clone());
while hist.len() > max_history {
hist.pop_front();
}
}
if let Some(limit) = memory_soft_limit_bytes {
if usage.memory_bytes() > limit {
if let Some(cb) = on_alert.as_ref() {
cb(Alert::MemorySoftLimit {
limit_bytes: limit,
current_bytes: usage.memory_bytes(),
});
}
}
}
#[cfg(feature = "metrics")]
if let Some(m) = metrics.as_ref() {
m.set_gauge("proc.memory_bytes", usage.memory_bytes());
let cpu_milli = (usage.cpu_percent() * 1000.0).max(0.0).round() as u64;
m.set_gauge("proc.cpu_milli_percent", cpu_milli);
m.set_gauge("proc.thread_count", u64::from(usage.thread_count()));
m.increment_counter("proc.samples_total", 1);
m.record_histogram(
"proc.sample_interval",
tick_now.saturating_duration_since(last_tick),
);
}
#[cfg(feature = "metrics")]
{
last_tick = tick_now;
}
}
}
});
self.task_handle = Some(handle);
Ok(())
}
#[cfg(all(feature = "tokio", not(feature = "async-std")))]
pub async fn stop(&mut self) {
if let Some(handle) = self.task_handle.take() {
handle.abort();
let _ = handle.await;
}
}
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
pub fn stop(&mut self) {
self.task_handle.take();
}
#[must_use]
pub fn current_usage(&self) -> ResourceUsage {
self.current_usage.load_full().as_ref().clone()
}
#[must_use]
pub fn history(&self) -> Vec<ResourceUsage> {
self.history
.read()
.map_or_else(|_| Vec::new(), |history| history.iter().cloned().collect())
}
#[allow(unused_variables, dead_code)]
#[allow(clippy::needless_pass_by_ref_mut)]
fn sample_resource_usage(
pid: u32,
last_cpu_time: &mut f64,
last_timestamp: &mut Instant,
) -> Result<ResourceUsage> {
#[cfg(target_os = "linux")]
{
let memory = Self::get_memory_linux(pid)?;
let (cpu, threads) = Self::get_cpu_linux(pid, last_cpu_time, last_timestamp)?;
Ok(ResourceUsage::new(memory, cpu, threads))
}
#[cfg(target_os = "macos")]
{
let memory = Self::get_memory_macos(pid)?;
let (cpu, threads) = Self::get_cpu_macos(pid)?;
Ok(ResourceUsage::new(memory, cpu, threads))
}
#[cfg(target_os = "windows")]
{
let memory = Self::get_memory_windows(pid)?;
let (cpu, threads) = Self::get_cpu_windows(pid, last_cpu_time, last_timestamp)?;
Ok(ResourceUsage::new(memory, cpu, threads))
}
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
{
Ok(ResourceUsage::new(0, 0.0, 0))
}
}
#[cfg(target_os = "linux")]
fn get_memory_linux(pid: u32) -> Result<u64> {
let path = format!("/proc/{pid}/status");
let file = File::open(&path).map_err(|e| {
Error::io_with_source(format!("Failed to open {path} for memory stats"), e)
})?;
let reader = BufReader::new(file);
let mut memory_bytes = 0;
for line in reader.lines() {
let line = line.map_err(|e| {
Error::io_with_source("Failed to read process memory stats".to_string(), e)
})?;
if line.starts_with("VmRSS:") {
let parts: Vec<&str> = line.split_whitespace().collect();
if !parts.is_empty() {
if let Ok(kb) = parts[1].parse::<u64>() {
memory_bytes = kb * 1024;
break;
}
}
}
}
Ok(memory_bytes)
}
#[cfg(target_os = "linux")]
#[allow(
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::similar_names
)]
fn get_cpu_linux(
pid: u32,
last_cpu_time: &mut f64,
last_timestamp: &mut Instant,
) -> Result<(f64, u32)> {
let path = format!("/proc/{pid}/stat");
let file = File::open(&path).map_err(|e| {
Error::io_with_source(format!("Failed to open {path} for CPU stats"), e)
})?;
let reader = BufReader::new(file);
let mut cpu_percent = 0.0;
let mut thread_count: u32 = 0;
if let Ok(line) = reader.lines().next().ok_or_else(|| {
Error::runtime("Failed to read CPU stats from proc filesystem".to_string())
}) {
let line = line.map_err(|e| {
Error::io_with_source("Failed to read process CPU stats".to_string(), e)
})?;
if let Some((cpu_time, threads)) = Self::parse_proc_stat(&line) {
thread_count = threads;
let now = Instant::now();
if *last_timestamp != now {
let time_diff = now.duration_since(*last_timestamp).as_secs_f64();
if time_diff > 0.0 {
let num_cores = num_cpus::get() as f64;
let cpu_time_diff = cpu_time - *last_cpu_time;
let ticks = Self::linux_clk_tck();
cpu_percent = (cpu_time_diff / ticks) / time_diff * 100.0 / num_cores;
}
}
*last_cpu_time = cpu_time;
*last_timestamp = now;
}
}
Ok((cpu_percent, thread_count))
}
#[cfg(target_os = "linux")]
fn parse_proc_stat(line: &str) -> Option<(f64, u32)> {
let open = line.find('(')?;
let close = line.rfind(')')?;
if close <= open {
return None;
}
let rest = line.get((close + 1)..)?;
let parts: Vec<&str> = rest.split_whitespace().collect();
if parts.len() <= 17 {
return None;
}
let utime = parts.get(11)?.parse::<f64>().unwrap_or(0.0);
let stime = parts.get(12)?.parse::<f64>().unwrap_or(0.0);
let child_user_time = parts.get(13)?.parse::<f64>().unwrap_or(0.0);
let child_system_time = parts.get(14)?.parse::<f64>().unwrap_or(0.0);
let thread_count = parts.get(17)?.parse::<u32>().unwrap_or(0);
let current_cpu_time = utime + stime + child_user_time + child_system_time;
Some((current_cpu_time, thread_count))
}
#[cfg(target_os = "linux")]
#[allow(clippy::cast_precision_loss)]
#[allow(unsafe_code)]
fn linux_clk_tck() -> f64 {
#[cfg_attr(not(target_os = "linux"), allow(unused_unsafe))]
let ticks = unsafe { libc::sysconf(libc::_SC_CLK_TCK) };
NonZeroI64::new(ticks).map_or(100.0, |v| v.get() as f64)
}
#[cfg(target_os = "macos")]
#[allow(dead_code)]
fn get_memory_macos(pid: u32) -> Result<u64> {
let output = Command::new("/bin/ps")
.args(["-xo", "rss=", "-p", &pid.to_string()])
.output()
.map_err(|e| {
Error::io_with_source(
"Failed to execute ps command for memory stats".to_string(),
e,
)
})?;
let memory_kb = String::from_utf8_lossy(&output.stdout)
.trim()
.parse::<u64>()
.unwrap_or(0);
Ok(memory_kb * 1024)
}
#[cfg(target_os = "macos")]
#[allow(dead_code)]
fn get_cpu_macos(pid: u32) -> Result<(f64, u32)> {
let output = Command::new("/bin/ps")
.args(["-xo", "%cpu,thcount=", "-p", &pid.to_string()])
.output()
.map_err(|e| {
Error::io_with_source("Failed to execute ps command for CPU stats".to_string(), e)
})?;
let stats = String::from_utf8_lossy(&output.stdout);
let parts: Vec<&str> = stats.split_whitespace().collect();
let cpu_percent = if parts.is_empty() {
0.0
} else {
parts[0].parse::<f64>().unwrap_or(0.0)
};
let thread_count = if parts.len() > 1 {
parts[1].parse::<u32>().unwrap_or(0)
} else {
0
};
Ok((cpu_percent, thread_count))
}
#[cfg(all(target_os = "windows", feature = "windows-monitoring"))]
#[allow(unsafe_code)]
fn get_memory_windows(pid: u32) -> Result<u64> {
let mut pmc = PROCESS_MEMORY_COUNTERS::default();
let handle =
unsafe { OpenProcess(PROCESS_QUERY_INFORMATION, false, pid) }.map_err(|e| {
Error::runtime_with_source(
format!("Failed to open process {} for memory stats", pid),
e,
)
})?;
let result = unsafe {
GetProcessMemoryInfo(
handle,
&mut pmc,
std::mem::size_of::<PROCESS_MEMORY_COUNTERS>() as u32,
)
}
.map_err(|e| {
Error::runtime_with_source("Failed to get process memory info".to_string(), e)
});
unsafe { CloseHandle(handle) };
result?;
Ok(u64::try_from(pmc.WorkingSetSize).unwrap_or(pmc.WorkingSetSize as u64))
}
#[cfg(all(target_os = "windows", not(feature = "windows-monitoring")))]
fn get_memory_windows(_pid: u32) -> Result<u64> {
Err(Error::runtime(
"Windows monitoring not enabled. Enable the 'windows-monitoring' feature".to_string(),
))
}
#[cfg(all(target_os = "windows", feature = "windows-monitoring"))]
#[allow(unsafe_code, clippy::cast_precision_loss)]
fn get_cpu_windows(
pid: u32,
last_cpu_time: &mut f64,
last_timestamp: &mut Instant,
) -> Result<(f64, u32)> {
let mut cpu_percent = 0.0;
let mut thread_count = 0;
let handle =
unsafe { OpenProcess(PROCESS_QUERY_INFORMATION, false, pid) }.map_err(|e| {
Error::runtime_with_source(
format!("Failed to open process {} for CPU stats", pid),
e,
)
})?;
unsafe {
let snapshot = CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0).map_err(|e| {
Error::runtime_with_source(
"Failed to create ToolHelp snapshot for threads".to_string(),
e,
)
})?;
let mut entry: THREADENTRY32 = std::mem::zeroed();
entry.dwSize = std::mem::size_of::<THREADENTRY32>() as u32;
if Thread32First(snapshot, &mut entry).is_ok() {
loop {
if entry.th32OwnerProcessID == pid {
thread_count += 1;
}
if Thread32Next(snapshot, &mut entry).is_err() {
break;
}
}
}
let _ = CloseHandle(snapshot);
}
let mut creation_time = FILETIME::default();
let mut exit_time = FILETIME::default();
let mut kernel_time = FILETIME::default();
let mut user_time = FILETIME::default();
let result = unsafe {
GetProcessTimes(
handle,
&mut creation_time,
&mut exit_time,
&mut kernel_time,
&mut user_time,
)
};
if result.is_ok() {
let kernel_ns = Self::filetime_to_ns(&kernel_time);
let user_ns = Self::filetime_to_ns(&user_time);
let total_time = (kernel_ns + user_ns) as f64 / 1_000_000_000.0;
let now = Instant::now();
if *last_timestamp != now {
let time_diff = now.duration_since(*last_timestamp).as_secs_f64();
if time_diff > 0.0 {
let time_diff_cpu = total_time - *last_cpu_time;
let num_cores = num_cpus::get() as f64;
cpu_percent = (time_diff_cpu / time_diff) * 100.0 / num_cores;
}
}
*last_cpu_time = total_time;
*last_timestamp = now;
}
unsafe { CloseHandle(handle) };
Ok((cpu_percent, thread_count))
}
#[cfg(all(target_os = "windows", not(feature = "windows-monitoring")))]
fn get_cpu_windows(
_pid: u32,
_last_cpu_time: &mut f64,
_last_timestamp: &mut Instant,
) -> Result<(f64, u32)> {
Err(Error::runtime(
"Windows monitoring not enabled. Enable the 'windows-monitoring' feature".to_string(),
))
}
#[cfg(all(target_os = "windows", feature = "windows-monitoring"))]
fn filetime_to_ns(ft: &windows::Win32::Foundation::FILETIME) -> u64 {
let high = (ft.dwHighDateTime as u64) << 32;
let low = ft.dwLowDateTime as u64;
(high + low) * 100
}
}
impl Drop for ResourceTracker {
fn drop(&mut self) {
if let Some(handle) = self.task_handle.take() {
#[cfg(feature = "tokio")]
handle.abort();
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
drop(handle); }
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[cfg(feature = "tokio")]
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn test_resource_tracker_creation() {
let tracker = ResourceTracker::new(Duration::from_secs(1));
assert_eq!(tracker.max_history, 60);
assert_eq!(tracker.sample_interval, Duration::from_secs(1));
}
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
#[async_std::test]
async fn test_resource_tracker_creation() {
let tracker = ResourceTracker::new(Duration::from_secs(1));
assert_eq!(tracker.max_history, 60);
assert_eq!(tracker.sample_interval, Duration::from_secs(1));
}
#[cfg(feature = "tokio")]
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn test_resource_usage_methods() {
let usage = ResourceUsage::new(1_048_576, 5.5, 4);
assert_eq!(usage.memory_bytes(), 1_048_576);
let epsilon: f64 = 1e-6;
assert!((usage.memory_mb() - 1.0).abs() < epsilon);
assert!((usage.cpu_percent() - 5.5).abs() < epsilon);
assert_eq!(usage.thread_count(), 4);
assert!(usage.age() >= Duration::from_nanos(0));
}
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
#[async_std::test]
async fn test_resource_usage_methods() {
let usage = ResourceUsage::new(1_048_576, 5.5, 4);
assert_eq!(usage.memory_bytes(), 1_048_576);
let epsilon: f64 = 1e-6;
assert!((usage.memory_mb() - 1.0).abs() < epsilon);
assert!((usage.cpu_percent() - 5.5).abs() < epsilon);
assert_eq!(usage.thread_count(), 4);
assert!(usage.age() >= Duration::from_nanos(0));
}
#[cfg(feature = "tokio")]
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn test_tracker_with_max_history() {
let tracker = ResourceTracker::new(Duration::from_secs(1)).with_max_history(100);
assert_eq!(tracker.max_history, 100);
}
#[cfg(all(target_os = "windows", feature = "windows-monitoring"))]
#[cfg_attr(miri, ignore)]
#[test]
fn test_windows_toolhelp_thread_count_path() {
let pid = std::process::id();
let mut last_cpu_time = 0.0;
let mut last_timestamp = Instant::now();
let usage = ResourceTracker::sample_resource_usage(
pid,
&mut last_cpu_time,
&mut last_timestamp,
)
.expect(
"Windows sample_resource_usage should succeed with windows-monitoring feature enabled",
);
assert!(
usage.thread_count() >= 1,
"expected at least 1 thread, got {}",
usage.thread_count()
);
}
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
#[async_std::test]
async fn test_tracker_with_max_history() {
let tracker = ResourceTracker::new(Duration::from_secs(1)).with_max_history(100);
assert_eq!(tracker.max_history, 100);
}
}