use crate::collectors::Collector;
use anyhow::Result;
use futures::future::BoxFuture;
use prometheus::{Counter, Gauge, IntGauge, Opts, Registry};
use sqlx::PgPool;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use sysinfo::{Pid, System};
use tracing::{debug, instrument, warn};
#[derive(Clone)]
pub struct ProcessCollector {
cpu_seconds_total: Counter,
cpu_cores: IntGauge,
resident_memory_bytes: IntGauge,
virtual_memory_bytes: IntGauge,
open_fds: IntGauge,
threads: IntGauge,
start_time_seconds: Gauge,
state: Arc<Mutex<CollectorState>>,
pid: Pid,
num_cores: usize,
}
struct CollectorState {
system: System,
last_cpu_time: Option<Duration>,
last_collection: Option<Instant>,
}
impl Default for ProcessCollector {
fn default() -> Self {
Self::new()
}
}
impl ProcessCollector {
pub fn new() -> Self {
let cpu_seconds_total = Counter::with_opts(Opts::new(
"pg_exporter_process_cpu_seconds_total",
"Total user and system CPU time spent in seconds (cumulative across all cores)",
))
.expect("pg_exporter_process_cpu_seconds_total");
let cpu_cores = IntGauge::with_opts(Opts::new(
"pg_exporter_process_cpu_cores",
"Number of CPU cores available to the system",
))
.expect("pg_exporter_process_cpu_cores");
let resident_memory_bytes = IntGauge::with_opts(Opts::new(
"pg_exporter_process_resident_memory_bytes",
"Resident memory size in bytes (RSS)",
))
.expect("pg_exporter_process_resident_memory_bytes");
let virtual_memory_bytes = IntGauge::with_opts(Opts::new(
"pg_exporter_process_virtual_memory_bytes",
"Virtual memory size in bytes (VSZ)",
))
.expect("pg_exporter_process_virtual_memory_bytes");
let open_fds = IntGauge::with_opts(Opts::new(
"pg_exporter_process_open_fds",
"Number of open file descriptors",
))
.expect("pg_exporter_process_open_fds");
let threads = IntGauge::with_opts(Opts::new(
"pg_exporter_process_threads",
"Number of OS threads in the process",
))
.expect("pg_exporter_process_threads");
let start_time_seconds = Gauge::with_opts(Opts::new(
"pg_exporter_process_start_time_seconds",
"Start time of the process since unix epoch in seconds",
))
.expect("pg_exporter_process_start_time_seconds");
let system = System::new_all();
let num_cores = system.cpus().len().max(1); let state = Arc::new(Mutex::new(CollectorState {
system,
last_cpu_time: None,
last_collection: None,
}));
let pid = Pid::from(std::process::id() as usize);
cpu_cores.set(num_cores as i64);
let start_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();
start_time_seconds.set(start_time);
Self {
cpu_seconds_total,
cpu_cores,
resident_memory_bytes,
virtual_memory_bytes,
open_fds,
threads,
start_time_seconds,
state,
pid,
num_cores,
}
}
fn get_cpu_time(process: &sysinfo::Process) -> Duration {
let cpu_seconds = process.run_time();
Duration::from_secs(cpu_seconds)
}
fn collect_stats(&self) {
let now = Instant::now();
let mut state = match self.state.lock() {
Ok(guard) => guard,
Err(poisoned) => {
warn!("State mutex was poisoned, recovering");
poisoned.into_inner()
}
};
state.system.refresh_all();
if let Some(process) = state.system.process(self.pid) {
let rss = process.memory();
let vsz = process.virtual_memory();
self.resident_memory_bytes.set(rss as i64);
self.virtual_memory_bytes.set(vsz as i64);
let current_cpu_time = Self::get_cpu_time(process);
if let (Some(last_cpu), Some(last_time)) = (state.last_cpu_time, state.last_collection) {
let elapsed = now.duration_since(last_time);
if elapsed.as_secs_f64() > 0.1 {
let cpu_delta = current_cpu_time.saturating_sub(last_cpu);
let cpu_seconds = cpu_delta.as_secs_f64();
if cpu_seconds > 0.0 {
self.cpu_seconds_total.inc_by(cpu_seconds);
debug!(
cpu_delta_seconds = cpu_seconds,
elapsed_seconds = elapsed.as_secs_f64(),
cpu_percent = (cpu_seconds / elapsed.as_secs_f64()) * 100.0,
"CPU time delta"
);
}
}
}
state.last_cpu_time = Some(current_cpu_time);
state.last_collection = Some(now);
#[cfg(target_os = "linux")]
{
if let Ok(entries) = std::fs::read_dir(format!("/proc/{}/task", self.pid)) {
let thread_count = entries.count() as i64;
self.threads.set(thread_count);
}
}
#[cfg(not(target_os = "linux"))]
{
self.threads.set(1);
}
#[cfg(target_os = "linux")]
{
if let Ok(entries) = std::fs::read_dir(format!("/proc/{}/fd", self.pid)) {
let fd_count = entries.count() as i64;
self.open_fds.set(fd_count);
}
}
#[cfg(not(target_os = "linux"))]
{
self.open_fds.set(0);
}
debug!(
rss_mb = rss / 1024 / 1024,
vsz_mb = vsz / 1024 / 1024,
cpu_seconds_total = self.cpu_seconds_total.get(),
cpu_cores = self.num_cores,
threads = self.threads.get(),
fds = self.open_fds.get(),
"collected process metrics"
);
}
}
}
impl Collector for ProcessCollector {
fn name(&self) -> &'static str {
"metrics.process"
}
fn register_metrics(&self, registry: &Registry) -> Result<()> {
registry.register(Box::new(self.cpu_seconds_total.clone()))?;
registry.register(Box::new(self.cpu_cores.clone()))?;
registry.register(Box::new(self.resident_memory_bytes.clone()))?;
registry.register(Box::new(self.virtual_memory_bytes.clone()))?;
registry.register(Box::new(self.open_fds.clone()))?;
registry.register(Box::new(self.threads.clone()))?;
registry.register(Box::new(self.start_time_seconds.clone()))?;
Ok(())
}
#[instrument(skip(self, _pool), level = "debug")]
fn collect<'a>(&'a self, _pool: &'a PgPool) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
self.collect_stats();
Ok(())
})
}
fn enabled_by_default(&self) -> bool {
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
#[test]
fn test_process_collector_new() {
let collector = ProcessCollector::new();
assert!(collector.start_time_seconds.get() > 0.0);
assert_eq!(collector.cpu_cores.get(), collector.num_cores as i64);
assert!(collector.num_cores > 0);
}
#[test]
fn test_process_collector_registers_without_error() {
let collector = ProcessCollector::new();
let registry = Registry::new();
assert!(collector.register_metrics(®istry).is_ok());
let metrics = registry.gather();
let metric_names: Vec<String> = metrics.iter()
.map(|m| m.name().to_string())
.collect();
assert!(metric_names.contains(&"pg_exporter_process_cpu_seconds_total".to_string()));
assert!(metric_names.contains(&"pg_exporter_process_cpu_cores".to_string()));
assert!(metric_names.contains(&"pg_exporter_process_resident_memory_bytes".to_string()));
assert!(metric_names.contains(&"pg_exporter_process_virtual_memory_bytes".to_string()));
assert!(metric_names.contains(&"pg_exporter_process_threads".to_string()));
assert!(metric_names.contains(&"pg_exporter_process_open_fds".to_string()));
assert!(metric_names.contains(&"pg_exporter_process_start_time_seconds".to_string()));
}
#[test]
fn test_process_collector_collects_stats() {
let collector = ProcessCollector::new();
collector.collect_stats();
assert!(collector.resident_memory_bytes.get() > 0);
assert!(collector.virtual_memory_bytes.get() > 0);
assert!(collector.virtual_memory_bytes.get() >= collector.resident_memory_bytes.get());
assert!(collector.threads.get() >= 1);
#[cfg(target_os = "linux")]
assert!(collector.open_fds.get() >= 3);
}
#[test]
fn test_cpu_time_tracking_first_collection() {
let collector = ProcessCollector::new();
let initial_cpu = collector.cpu_seconds_total.get();
collector.collect_stats();
assert_eq!(collector.cpu_seconds_total.get(), initial_cpu);
}
#[test]
fn test_cpu_time_tracking_increments() {
let collector = ProcessCollector::new();
collector.collect_stats();
let cpu_after_first = collector.cpu_seconds_total.get();
let mut sum = 0u64;
for i in 0..1_000_000 {
sum = sum.wrapping_add(i);
}
assert!(sum > 0);
thread::sleep(Duration::from_millis(100));
collector.collect_stats();
let cpu_after_second = collector.cpu_seconds_total.get();
assert!(cpu_after_second >= cpu_after_first);
}
#[test]
fn test_cpu_time_reasonable_range() {
let collector = ProcessCollector::new();
collector.collect_stats();
let start = Instant::now();
let mut sum = 0u64;
while start.elapsed() < Duration::from_millis(100) {
for i in 0..10_000 {
sum = sum.wrapping_add(i);
}
}
assert!(sum > 0);
collector.collect_stats();
let cpu_time = collector.cpu_seconds_total.get();
assert!(cpu_time >= 0.0);
assert!(cpu_time < 3600.0);
}
#[test]
fn test_memory_metrics_reasonable() {
let collector = ProcessCollector::new();
collector.collect_stats();
let rss_mb = collector.resident_memory_bytes.get() / 1024 / 1024;
let vsz_mb = collector.virtual_memory_bytes.get() / 1024 / 1024;
assert!(rss_mb > 1);
assert!(rss_mb < 10_000);
assert!(vsz_mb > rss_mb);
assert!(vsz_mb < 100_000);
}
#[test]
fn test_multiple_collections_dont_panic() {
let collector = ProcessCollector::new();
for _ in 0..10 {
collector.collect_stats();
}
assert!(collector.resident_memory_bytes.get() > 0);
assert!(collector.cpu_seconds_total.get() >= 0.0);
}
#[test]
fn test_collector_state_initialized() {
let collector = ProcessCollector::new();
let state = collector.state.lock().unwrap();
assert!(state.last_cpu_time.is_none());
assert!(state.last_collection.is_none());
}
#[test]
fn test_collector_state_updated_after_collection() {
let collector = ProcessCollector::new();
collector.collect_stats();
let state = collector.state.lock().unwrap();
assert!(state.last_cpu_time.is_some());
assert!(state.last_collection.is_some());
}
#[test]
fn test_get_cpu_time() {
let collector = ProcessCollector::new();
let mut state = collector.state.lock().unwrap();
state.system.refresh_all();
if let Some(process) = state.system.process(collector.pid) {
let cpu_time = ProcessCollector::get_cpu_time(process);
assert!(cpu_time.as_secs_f64() >= 0.0);
} else {
panic!("Could not find own process");
}
}
#[test]
#[cfg(target_os = "linux")]
fn test_thread_count_linux() {
let collector = ProcessCollector::new();
collector.collect_stats();
assert!(collector.threads.get() >= 1);
assert!(collector.threads.get() < 1000);
}
#[test]
#[cfg(target_os = "linux")]
fn test_file_descriptors_linux() {
let collector = ProcessCollector::new();
collector.collect_stats();
let fd_count = collector.open_fds.get();
assert!(fd_count >= 3);
assert!(fd_count > 0);
}
}