use crate::collectors::Collector;
use anyhow::Result;
use futures::future::BoxFuture;
use prometheus::{Counter, Gauge, IntGauge, Opts, Registry};
use sqlx::PgPool;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use sysinfo::{Pid, System};
use tracing::{debug, instrument, warn};
#[derive(Clone)]
pub struct ProcessCollector {
cpu_seconds_total: Counter,
resident_memory_bytes: IntGauge,
virtual_memory_bytes: IntGauge,
open_fds: IntGauge,
threads: IntGauge,
start_time_seconds: Gauge,
system: Arc<Mutex<System>>,
pid: Pid,
}
impl Default for ProcessCollector {
fn default() -> Self {
Self::new()
}
}
impl ProcessCollector {
pub fn new() -> Self {
let cpu_seconds_total = Counter::with_opts(Opts::new(
"pg_exporter_process_cpu_seconds_total",
"Total user and system CPU time spent in seconds",
))
.expect("pg_exporter_process_cpu_seconds_total");
let resident_memory_bytes = IntGauge::with_opts(Opts::new(
"pg_exporter_process_resident_memory_bytes",
"Resident memory size in bytes (RSS)",
))
.expect("pg_exporter_process_resident_memory_bytes");
let virtual_memory_bytes = IntGauge::with_opts(Opts::new(
"pg_exporter_process_virtual_memory_bytes",
"Virtual memory size in bytes (VSZ)",
))
.expect("pg_exporter_process_virtual_memory_bytes");
let open_fds = IntGauge::with_opts(Opts::new(
"pg_exporter_process_open_fds",
"Number of open file descriptors",
))
.expect("pg_exporter_process_open_fds");
let threads = IntGauge::with_opts(Opts::new(
"pg_exporter_process_threads",
"Number of OS threads in the process",
))
.expect("pg_exporter_process_threads");
let start_time_seconds = Gauge::with_opts(Opts::new(
"pg_exporter_process_start_time_seconds",
"Start time of the process since unix epoch in seconds",
))
.expect("pg_exporter_process_start_time_seconds");
let system = Arc::new(Mutex::new(System::new_all()));
let pid = Pid::from(std::process::id() as usize);
let start_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();
start_time_seconds.set(start_time);
Self {
cpu_seconds_total,
resident_memory_bytes,
virtual_memory_bytes,
open_fds,
threads,
start_time_seconds,
system,
pid,
}
}
fn collect_stats(&self) {
let mut system = match self.system.lock() {
Ok(guard) => guard,
Err(poisoned) => {
warn!("System mutex was poisoned, recovering");
poisoned.into_inner()
}
};
system.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[self.pid]), true);
if let Some(process) = system.process(self.pid) {
let rss = process.memory();
let vsz = process.virtual_memory();
self.resident_memory_bytes.set(rss as i64);
self.virtual_memory_bytes.set(vsz as i64);
let cpu_time = process.run_time() as f64;
let current_cpu = self.cpu_seconds_total.get();
if cpu_time > current_cpu {
self.cpu_seconds_total.inc_by(cpu_time - current_cpu);
}
#[cfg(target_os = "linux")]
{
if let Ok(entries) = std::fs::read_dir(format!("/proc/{}/task", self.pid)) {
let thread_count = entries.count() as i64;
self.threads.set(thread_count);
}
}
#[cfg(not(target_os = "linux"))]
{
self.threads.set(1);
}
#[cfg(target_os = "linux")]
{
if let Ok(entries) = std::fs::read_dir(format!("/proc/{}/fd", self.pid)) {
let fd_count = entries.count() as i64;
self.open_fds.set(fd_count);
}
}
#[cfg(not(target_os = "linux"))]
{
self.open_fds.set(0);
}
debug!(
rss_mb = rss / 1024 / 1024,
vsz_mb = vsz / 1024 / 1024,
cpu_seconds = cpu_time,
threads = self.threads.get(),
fds = self.open_fds.get(),
"collected process metrics"
);
}
}
}
impl Collector for ProcessCollector {
fn name(&self) -> &'static str {
"metrics.process"
}
fn register_metrics(&self, registry: &Registry) -> Result<()> {
registry.register(Box::new(self.cpu_seconds_total.clone()))?;
registry.register(Box::new(self.resident_memory_bytes.clone()))?;
registry.register(Box::new(self.virtual_memory_bytes.clone()))?;
registry.register(Box::new(self.open_fds.clone()))?;
registry.register(Box::new(self.threads.clone()))?;
registry.register(Box::new(self.start_time_seconds.clone()))?;
Ok(())
}
#[instrument(skip(self, _pool), level = "debug")]
fn collect<'a>(&'a self, _pool: &'a PgPool) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
self.collect_stats();
Ok(())
})
}
fn enabled_by_default(&self) -> bool {
false
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_process_collector_new() {
let collector = ProcessCollector::new();
assert!(collector.start_time_seconds.get() > 0.0);
}
#[test]
fn test_process_collector_registers_without_error() {
let collector = ProcessCollector::new();
let registry = Registry::new();
assert!(collector.register_metrics(®istry).is_ok());
}
#[test]
fn test_process_collector_collects_stats() {
let collector = ProcessCollector::new();
collector.collect_stats();
assert!(collector.resident_memory_bytes.get() > 0);
assert!(collector.virtual_memory_bytes.get() > 0);
assert!(collector.threads.get() >= 1);
#[cfg(target_os = "linux")]
assert!(collector.open_fds.get() >= 3);
}
}