use crate::collectors::{Collector, util::get_excluded_databases};
use anyhow::Result;
use futures::future::BoxFuture;
use prometheus::{IntGauge, IntGaugeVec, Opts, Registry};
use sqlx::{PgPool, Row};
use std::collections::{HashMap, HashSet};
use tracing::{debug, info_span, instrument};
use tracing_futures::Instrument as _;
#[derive(Clone)]
pub struct VacuumStatsCollector {
db_freeze_age_xids: IntGaugeVec, freeze_max_age_xids: IntGauge, db_freeze_age_pct_of_max: IntGaugeVec, autovac_workers: IntGaugeVec, }
impl Default for VacuumStatsCollector {
fn default() -> Self {
Self::new()
}
}
impl VacuumStatsCollector {
#[must_use]
#[allow(clippy::expect_used)]
pub fn new() -> Self {
let db_freeze_age_xids = IntGaugeVec::new(
Opts::new(
"pg_vacuum_database_freeze_age_xids",
"Age in transactions (xids) since database freeze (age(datfrozenxid)).",
),
&["datname"],
)
.expect("create pg_vacuum_database_freeze_age_xids");
let freeze_max_age_xids = IntGauge::with_opts(Opts::new(
"pg_vacuum_freeze_max_age_xids",
"Configured autovacuum_freeze_max_age (xids).",
))
.expect("create pg_vacuum_freeze_max_age_xids");
let db_freeze_age_pct_of_max = IntGaugeVec::new(
Opts::new(
"pg_vacuum_database_freeze_age_pct_of_max",
"Freeze age as percent of autovacuum_freeze_max_age (0..100).",
),
&["datname"],
)
.expect("create pg_vacuum_database_freeze_age_pct_of_max");
let autovac_workers = IntGaugeVec::new(
Opts::new(
"pg_vacuum_autovacuum_workers",
"Number of autovacuum workers currently running per database.",
),
&["datname"],
)
.expect("create pg_vacuum_autovacuum_workers");
Self {
db_freeze_age_xids,
freeze_max_age_xids,
db_freeze_age_pct_of_max,
autovac_workers,
}
}
}
impl Collector for VacuumStatsCollector {
fn name(&self) -> &'static str {
"vacuum_stats"
}
#[instrument(
skip(self, registry),
level = "info",
err,
fields(collector = "vacuum_stats")
)]
fn register_metrics(&self, registry: &Registry) -> Result<()> {
registry.register(Box::new(self.db_freeze_age_xids.clone()))?;
registry.register(Box::new(self.freeze_max_age_xids.clone()))?;
registry.register(Box::new(self.db_freeze_age_pct_of_max.clone()))?;
registry.register(Box::new(self.autovac_workers.clone()))?;
Ok(())
}
#[instrument(
skip(self, pool),
level = "info",
err,
fields(collector="vacuum_stats", otel.kind="internal")
)]
fn collect<'a>(&'a self, pool: &'a PgPool) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let excluded: Vec<String> = get_excluded_databases().to_vec();
let q_freeze_max = info_span!(
"db.query",
otel.kind = "client",
db.system = "postgresql",
db.operation = "SELECT",
db.statement = "SELECT current_setting('autovacuum_freeze_max_age')",
);
let freeze_max_age_xids: i64 = sqlx::query_scalar(
r"SELECT current_setting('autovacuum_freeze_max_age')::bigint",
)
.fetch_one(pool)
.instrument(q_freeze_max)
.await?;
let q_db_freeze_age = info_span!(
"db.query",
otel.kind = "client",
db.system = "postgresql",
db.operation = "SELECT",
db.statement = "SELECT datname, age(datfrozenxid) FROM pg_database",
db.sql.table = "pg_database"
);
let rows = sqlx::query(
r"
SELECT
datname,
age(datfrozenxid)::bigint AS freeze_age
FROM pg_database
WHERE datallowconn
AND NOT datistemplate
AND NOT (datname = ANY($1))
ORDER BY datname
",
)
.bind(&excluded)
.fetch_all(pool)
.instrument(q_db_freeze_age)
.await?;
let mut seen_dbs: HashSet<String> = HashSet::new();
let mut freeze_age_values: HashMap<String, i64> = HashMap::new();
let mut freeze_pct_values: HashMap<String, i64> = HashMap::new();
for row in &rows {
let datname: String = row
.try_get::<Option<String>, _>("datname")?
.unwrap_or_else(|| "[unknown]".to_string());
let age_xids: i64 = row.try_get::<i64, _>("freeze_age").unwrap_or(0);
seen_dbs.insert(datname.clone());
freeze_age_values.insert(datname.clone(), age_xids);
let pct = if freeze_max_age_xids > 0 {
let numerator = i128::from(age_xids).saturating_mul(100);
let denominator = i128::from(freeze_max_age_xids);
if denominator > 0 {
let rounded = numerator.saturating_add(denominator / 2) / denominator;
i64::try_from(rounded.clamp(0, 100)).unwrap_or(0)
} else {
0
}
} else {
0
};
freeze_pct_values.insert(datname.clone(), pct);
debug!(
datname = %datname,
age_xids,
freeze_max_age_xids,
pct_of_max = pct,
"updated freeze age metrics"
);
}
let q_workers = info_span!(
"db.query",
otel.kind = "client",
db.system = "postgresql",
db.operation = "SELECT",
db.statement =
"SELECT count(*) FROM pg_stat_activity WHERE backend_type='autovacuum worker'",
db.sql.table = "pg_stat_activity"
);
let worker_rows = sqlx::query(
r"
SELECT
datname,
COUNT(*)::bigint AS workers
FROM pg_stat_activity
WHERE backend_type = 'autovacuum worker'
AND NOT (COALESCE(datname,'') = ANY($1))
GROUP BY datname
ORDER BY datname
",
)
.bind(&excluded)
.fetch_all(pool)
.instrument(q_workers)
.await?;
let mut worker_map: HashMap<String, i64> = HashMap::new();
for row in &worker_rows {
let datname: String = row
.try_get::<Option<String>, _>("datname")?
.unwrap_or_else(|| "[unknown]".to_string());
let workers: i64 = row.try_get::<i64, _>("workers").unwrap_or(0);
worker_map.insert(datname.clone(), workers);
}
for db in seen_dbs {
worker_map.entry(db).or_insert(0);
}
self.db_freeze_age_xids.reset();
self.db_freeze_age_pct_of_max.reset();
self.autovac_workers.reset();
self.freeze_max_age_xids.set(freeze_max_age_xids);
for (datname, age_xids) in freeze_age_values {
self.db_freeze_age_xids
.with_label_values(&[&datname])
.set(age_xids);
}
for (datname, pct) in freeze_pct_values {
self.db_freeze_age_pct_of_max
.with_label_values(&[&datname])
.set(pct);
}
for (datname, workers) in worker_map {
self.autovac_workers
.with_label_values(&[&datname])
.set(workers);
}
Ok(())
})
}
}