use crate::collectors::Collector;
use anyhow::Result;
use futures::future::BoxFuture;
use prometheus::{Counter, Gauge, IntGauge, Opts, Registry};
use sqlx::PgPool;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use sysinfo::{Pid, System};
use tracing::{debug, instrument, warn};
#[derive(Clone)]
pub struct ProcessCollector {
cpu_seconds_total: Counter,
cpu_cores: IntGauge,
resident_memory_bytes: IntGauge,
virtual_memory_bytes: IntGauge,
open_fds: IntGauge,
threads: IntGauge,
start_time_seconds: Gauge,
system: Arc<Mutex<System>>,
pid: Pid,
num_cores: usize,
}
impl Default for ProcessCollector {
fn default() -> Self {
Self::new()
}
}
impl ProcessCollector {
pub fn new() -> Self {
let cpu_seconds_total = Counter::with_opts(Opts::new(
"pg_exporter_process_cpu_seconds_total",
"Total user and system CPU time spent in seconds (cumulative across all cores)",
))
.expect("pg_exporter_process_cpu_seconds_total");
let cpu_cores = IntGauge::with_opts(Opts::new(
"pg_exporter_process_cpu_cores",
"Number of CPU cores available to the system",
))
.expect("pg_exporter_process_cpu_cores");
let resident_memory_bytes = IntGauge::with_opts(Opts::new(
"pg_exporter_process_resident_memory_bytes",
"Resident memory size in bytes (RSS)",
))
.expect("pg_exporter_process_resident_memory_bytes");
let virtual_memory_bytes = IntGauge::with_opts(Opts::new(
"pg_exporter_process_virtual_memory_bytes",
"Virtual memory size in bytes (VSZ)",
))
.expect("pg_exporter_process_virtual_memory_bytes");
let open_fds = IntGauge::with_opts(Opts::new(
"pg_exporter_process_open_fds",
"Number of open file descriptors",
))
.expect("pg_exporter_process_open_fds");
let threads = IntGauge::with_opts(Opts::new(
"pg_exporter_process_threads",
"Number of OS threads in the process",
))
.expect("pg_exporter_process_threads");
let start_time_seconds = Gauge::with_opts(Opts::new(
"pg_exporter_process_start_time_seconds",
"Start time of the process since unix epoch in seconds",
))
.expect("pg_exporter_process_start_time_seconds");
let system = System::new_all();
let num_cores = system.cpus().len().max(1); let system = Arc::new(Mutex::new(system));
let pid = Pid::from(std::process::id() as usize);
cpu_cores.set(num_cores as i64);
let start_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();
start_time_seconds.set(start_time);
Self {
cpu_seconds_total,
cpu_cores,
resident_memory_bytes,
virtual_memory_bytes,
open_fds,
threads,
start_time_seconds,
system,
pid,
num_cores,
}
}
fn collect_stats(&self) {
let mut system = match self.system.lock() {
Ok(guard) => guard,
Err(poisoned) => {
warn!("System mutex was poisoned, recovering");
poisoned.into_inner()
}
};
system.refresh_all();
if let Some(process) = system.process(self.pid) {
let rss = process.memory();
let vsz = process.virtual_memory();
self.resident_memory_bytes.set(rss as i64);
self.virtual_memory_bytes.set(vsz as i64);
let cpu_percent = process.cpu_usage() as f64;
let cores_in_use = cpu_percent / 100.0;
let estimated_interval_seconds = 5.0;
let cpu_seconds_delta = cores_in_use * estimated_interval_seconds;
if cpu_seconds_delta > 0.0 {
self.cpu_seconds_total.inc_by(cpu_seconds_delta);
}
#[cfg(target_os = "linux")]
{
if let Ok(entries) = std::fs::read_dir(format!("/proc/{}/task", self.pid)) {
let thread_count = entries.count() as i64;
self.threads.set(thread_count);
}
}
#[cfg(not(target_os = "linux"))]
{
self.threads.set(1);
}
#[cfg(target_os = "linux")]
{
if let Ok(entries) = std::fs::read_dir(format!("/proc/{}/fd", self.pid)) {
let fd_count = entries.count() as i64;
self.open_fds.set(fd_count);
}
}
#[cfg(not(target_os = "linux"))]
{
self.open_fds.set(0);
}
debug!(
rss_mb = rss / 1024 / 1024,
vsz_mb = vsz / 1024 / 1024,
cpu_seconds_total = self.cpu_seconds_total.get(),
cpu_cores = self.num_cores,
threads = self.threads.get(),
fds = self.open_fds.get(),
"collected process metrics"
);
}
}
}
impl Collector for ProcessCollector {
fn name(&self) -> &'static str {
"metrics.process"
}
fn register_metrics(&self, registry: &Registry) -> Result<()> {
registry.register(Box::new(self.cpu_seconds_total.clone()))?;
registry.register(Box::new(self.cpu_cores.clone()))?;
registry.register(Box::new(self.resident_memory_bytes.clone()))?;
registry.register(Box::new(self.virtual_memory_bytes.clone()))?;
registry.register(Box::new(self.open_fds.clone()))?;
registry.register(Box::new(self.threads.clone()))?;
registry.register(Box::new(self.start_time_seconds.clone()))?;
Ok(())
}
#[instrument(skip(self, _pool), level = "debug")]
fn collect<'a>(&'a self, _pool: &'a PgPool) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
self.collect_stats();
Ok(())
})
}
fn enabled_by_default(&self) -> bool {
false
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_process_collector_new() {
let collector = ProcessCollector::new();
assert!(collector.start_time_seconds.get() > 0.0);
}
#[test]
fn test_process_collector_registers_without_error() {
let collector = ProcessCollector::new();
let registry = Registry::new();
assert!(collector.register_metrics(®istry).is_ok());
}
#[test]
fn test_process_collector_collects_stats() {
let collector = ProcessCollector::new();
collector.collect_stats();
assert!(collector.resident_memory_bytes.get() > 0);
assert!(collector.virtual_memory_bytes.get() > 0);
assert!(collector.threads.get() >= 1);
#[cfg(target_os = "linux")]
assert!(collector.open_fds.get() >= 3);
}
}