use crate::collectors::{Collector, i64_to_f64, util::get_excluded_databases};
use anyhow::Result;
use futures::future::BoxFuture;
use prometheus::{GaugeVec, IntGauge, IntGaugeVec, Opts, Registry};
use sqlx::{PgPool, Row};
use tracing::{debug, info_span, instrument};
use tracing_futures::Instrument as _;
#[derive(Clone)]
pub struct VacuumProgressCollector {
in_progress: IntGaugeVec,
heap_progress: GaugeVec, heap_vacuumed: IntGaugeVec,
index_vacuum_count: IntGaugeVec,
global_active: IntGauge,
is_autovacuum: IntGaugeVec, duration_seconds: IntGaugeVec, }
impl Default for VacuumProgressCollector {
fn default() -> Self {
Self::new()
}
}
impl VacuumProgressCollector {
#[must_use]
#[allow(clippy::expect_used)]
pub fn new() -> Self {
let in_progress = IntGaugeVec::new(
Opts::new(
"pg_vacuum_in_progress",
"Is a vacuum currently running (1=yes,0=no)",
),
&["database", "table"],
)
.expect("valid pg_vacuum_in_progress opts");
let heap_progress = GaugeVec::new(
Opts::new("pg_vacuum_heap_progress", "Progress of heap blocks scanned (0.0-1.0 ratio)"),
&["database", "table"],
)
.expect("valid pg_vacuum_heap_progress opts");
let heap_vacuumed = IntGaugeVec::new(
Opts::new("pg_vacuum_heap_vacuumed", "Number of heap blocks vacuumed"),
&["database", "table"],
)
.expect("valid pg_vacuum_heap_vacuumed opts");
let index_vacuum_count = IntGaugeVec::new(
Opts::new(
"pg_vacuum_index_vacuum_count",
"Number of index vacuum passes",
),
&["database", "table"],
)
.expect("valid pg_vacuum_index_vacuum_count opts");
let global_active = IntGauge::with_opts(Opts::new(
"pg_vacuum_active",
"Are there any vacuums in progress (1=yes,0=no)",
))
.expect("valid pg_vacuum_active opts");
let is_autovacuum = IntGaugeVec::new(
Opts::new(
"pg_vacuum_is_autovacuum",
"Whether the vacuum is an autovacuum (1) or manual (0)",
),
&["database", "table"],
)
.expect("valid pg_vacuum_is_autovacuum opts");
let duration_seconds = IntGaugeVec::new(
Opts::new(
"pg_vacuum_duration_seconds",
"How long the vacuum has been running in seconds",
),
&["database", "table"],
)
.expect("valid pg_vacuum_duration_seconds opts");
Self {
in_progress,
heap_progress,
heap_vacuumed,
index_vacuum_count,
global_active,
is_autovacuum,
duration_seconds,
}
}
fn reset_progress_metrics(&self) {
self.in_progress.reset();
self.heap_progress.reset();
self.heap_vacuumed.reset();
self.index_vacuum_count.reset();
self.is_autovacuum.reset();
self.duration_seconds.reset();
}
}
impl Collector for VacuumProgressCollector {
fn name(&self) -> &'static str {
"vacuum_progress"
}
#[instrument(
skip(self, registry),
level = "info",
err,
fields(collector = "vacuum_progress")
)]
fn register_metrics(&self, registry: &Registry) -> Result<()> {
registry.register(Box::new(self.in_progress.clone()))?;
registry.register(Box::new(self.heap_progress.clone()))?;
registry.register(Box::new(self.heap_vacuumed.clone()))?;
registry.register(Box::new(self.index_vacuum_count.clone()))?;
registry.register(Box::new(self.global_active.clone()))?;
registry.register(Box::new(self.is_autovacuum.clone()))?;
registry.register(Box::new(self.duration_seconds.clone()))?;
Ok(())
}
#[instrument(
skip(self, pool),
level = "info",
err,
fields(collector="vacuum_progress", otel.kind="internal")
)]
fn collect<'a>(&'a self, pool: &'a PgPool) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
let excluded: Vec<String> = get_excluded_databases().to_vec();
let query_span = info_span!(
"db.query",
otel.kind = "client",
db.system = "postgresql",
db.operation = "SELECT",
db.statement = "SELECT progress from pg_stat_progress_vacuum joined with pg_database (filtered)",
db.sql.table = "pg_stat_progress_vacuum"
);
let rows = sqlx::query(
r"
SELECT
COALESCE(d.datname, 'unknown') AS database_name,
COALESCE(n.nspname || '.' || c.relname, p.relid::text) AS table_name,
p.heap_blks_total,
p.heap_blks_scanned,
p.heap_blks_vacuumed,
p.index_vacuum_count,
COALESCE(a.backend_type = 'autovacuum worker', false) AS is_autovacuum,
COALESCE(EXTRACT(EPOCH FROM (now() - a.xact_start))::bigint, 0) AS duration_seconds
FROM pg_stat_progress_vacuum p
LEFT JOIN pg_database d ON d.oid = p.datid
LEFT JOIN pg_class c ON c.oid = p.relid
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_stat_activity a ON a.pid = p.pid
WHERE (d.datname IS NULL OR NOT (d.datname = ANY($1)))
",
)
.bind(&excluded)
.fetch_all(pool)
.instrument(query_span)
.await?;
let update_span =
info_span!("vacuum_progress.update_metrics", active_rows = rows.len());
let _g = update_span.enter();
self.reset_progress_metrics();
if rows.is_empty() {
self.global_active.set(0);
debug!("no active vacuum operations");
} else {
self.global_active.set(1);
for row in rows {
let database: String = row.try_get("database_name")?;
let table: String = row.try_get("table_name")?;
let heap_total: i64 = row.try_get("heap_blks_total").unwrap_or(0);
let heap_scanned: i64 = row.try_get("heap_blks_scanned").unwrap_or(0);
let heap_vac: i64 = row.try_get("heap_blks_vacuumed").unwrap_or(0);
let idx_count: i64 = row.try_get("index_vacuum_count").unwrap_or(0);
let is_auto: bool = row.try_get("is_autovacuum").unwrap_or(false);
let duration: i64 = row.try_get("duration_seconds").unwrap_or(0);
let progress_ratio = if heap_total > 0 {
i64_to_f64(heap_scanned) / i64_to_f64(heap_total)
} else {
0.0
};
self.in_progress.with_label_values(&[&database, &table]).set(1);
self.heap_progress
.with_label_values(&[&database, &table])
.set(progress_ratio);
self.heap_vacuumed
.with_label_values(&[&database, &table])
.set(heap_vac);
self.index_vacuum_count
.with_label_values(&[&database, &table])
.set(idx_count);
self.is_autovacuum
.with_label_values(&[&database, &table])
.set(i64::from(is_auto));
self.duration_seconds
.with_label_values(&[&database, &table])
.set(duration);
debug!(
database = %database,
table = %table,
heap_total,
heap_scanned,
heap_vacuumed = heap_vac,
index_vacuum_count = idx_count,
progress_ratio = %format!("{progress_ratio:.2}"),
is_autovacuum = is_auto,
duration_seconds = duration,
"updated vacuum progress metrics"
);
}
}
Ok(())
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_reset_progress_metrics_clears_previous_table_series() -> Result<()> {
let collector = VacuumProgressCollector::new();
let registry = Registry::new();
collector.register_metrics(®istry)?;
collector
.in_progress
.with_label_values(&["postgres", "public.test_table"])
.set(1);
collector
.heap_progress
.with_label_values(&["postgres", "public.test_table"])
.set(0.5);
collector
.duration_seconds
.with_label_values(&["postgres", "public.test_table"])
.set(42);
collector.reset_progress_metrics();
for metric_name in [
"pg_vacuum_in_progress",
"pg_vacuum_heap_progress",
"pg_vacuum_duration_seconds",
] {
let metric_family = registry
.gather()
.into_iter()
.find(|family| family.name() == metric_name);
if let Some(metric_family) = metric_family {
assert!(
metric_family.get_metric().is_empty(),
"metric {metric_name} should have no stale series after reset"
);
}
}
Ok(())
}
}