use chrono::{DateTime, Utc};
use rust_decimal::Decimal;
use serde::Serialize;
use sqlx::postgres::PgPool;
use sqlx::{Error, Postgres, Transaction};
use uuid::Uuid;
pub const DEFAULT_BLOAT_DEAD_PCT: f64 = 10.0;
pub const DEFAULT_XID_RISK_PCT: f64 = 50.0;
pub const DEFAULT_STATEMENT_TIMEOUT_MS: u64 = 60_000;
#[derive(Debug, Clone, Copy)]
pub struct VacuumHealthThresholds {
pub bloat_dead_pct: f64,
pub xid_risk_pct: f64,
}
impl Default for VacuumHealthThresholds {
fn default() -> Self {
Self {
bloat_dead_pct: DEFAULT_BLOAT_DEAD_PCT,
xid_risk_pct: DEFAULT_XID_RISK_PCT,
}
}
}
impl VacuumHealthThresholds {
pub fn from_env() -> Self {
let bloat_dead_pct: f64 = std::env::var("ATHENA_VACUUM_HEALTH_BLOAT_DEAD_PCT")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_BLOAT_DEAD_PCT);
let xid_risk_pct: f64 = std::env::var("ATHENA_VACUUM_HEALTH_XID_RISK_PCT")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_XID_RISK_PCT);
Self {
bloat_dead_pct,
xid_risk_pct,
}
}
}
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct VacuumCatalogRow {
pub schemaname: String,
pub relname: String,
pub n_dead_tup: i64,
pub n_live_tup: i64,
pub dead_pct: f64,
pub last_vacuum: Option<DateTime<Utc>>,
pub last_autovacuum: Option<DateTime<Utc>>,
pub xid_age: i64,
pub xid_age_pct: Option<f64>,
pub freeze_max_age: i64,
}
#[derive(Debug, Clone, Copy)]
pub struct VacuumRollup {
pub total_dead_rows: i64,
pub tables_with_bloat: i32,
pub xid_freeze_risk: i32,
pub tables_needing_vacuum: i32,
pub freeze_max_age: Option<i64>,
}
pub async fn collect_vacuum_catalog_rows(
target_pool: &PgPool,
statement_timeout_ms: u64,
) -> Result<Vec<VacuumCatalogRow>, Error> {
let ms: i64 = i64::try_from(statement_timeout_ms).unwrap_or(i64::MAX);
let mut tx: Transaction<'_, Postgres> = target_pool.begin().await?;
sqlx::query(&format!("SET LOCAL statement_timeout = {ms}"))
.execute(&mut *tx)
.await?;
let rows: Vec<VacuumCatalogRow> = sqlx::query_as(
r#"
WITH freeze_cfg AS (
SELECT COALESCE(setting::bigint, 200000000) AS freeze_max_age
FROM pg_settings
WHERE name = 'autovacuum_freeze_max_age'
LIMIT 1
)
SELECT
s.schemaname AS schemaname,
s.relname AS relname,
s.n_dead_tup::bigint AS n_dead_tup,
s.n_live_tup::bigint AS n_live_tup,
CASE
WHEN (s.n_live_tup + s.n_dead_tup) > 0 THEN
(100.0 * s.n_dead_tup::float8 / (s.n_live_tup + s.n_dead_tup)::float8)
ELSE 0.0
END AS dead_pct,
s.last_vacuum AS last_vacuum,
s.last_autovacuum AS last_autovacuum,
age(c.relfrozenxid)::bigint AS xid_age,
CASE
WHEN f.freeze_max_age > 0 THEN
(100.0 * age(c.relfrozenxid)::float8 / f.freeze_max_age::float8)
ELSE NULL
END AS xid_age_pct,
f.freeze_max_age AS freeze_max_age
FROM pg_stat_user_tables s
INNER JOIN pg_class c ON c.oid = s.relid
CROSS JOIN freeze_cfg f
ORDER BY s.schemaname, s.relname
"#,
)
.fetch_all(&mut *tx)
.await?;
tx.commit().await?;
Ok(rows)
}
pub fn rollup_vacuum_rows(
rows: &[VacuumCatalogRow],
thresholds: &VacuumHealthThresholds,
) -> VacuumRollup {
let total_dead_rows: i64 = rows.iter().map(|r| r.n_dead_tup).sum();
let freeze_max_age: Option<i64> = rows.first().map(|r| r.freeze_max_age);
let mut tables_with_bloat: i32 = 0;
let mut xid_freeze_risk: i32 = 0;
let mut tables_needing_vacuum: i32 = 0;
for r in rows {
let bloat: bool = r.dead_pct >= thresholds.bloat_dead_pct;
let xid_risk: bool = r
.xid_age_pct
.map(|p| p >= thresholds.xid_risk_pct)
.unwrap_or(false);
if bloat {
tables_with_bloat += 1;
}
if xid_risk {
xid_freeze_risk += 1;
}
if bloat || xid_risk {
tables_needing_vacuum += 1;
}
}
VacuumRollup {
total_dead_rows,
tables_with_bloat,
xid_freeze_risk,
tables_needing_vacuum,
freeze_max_age,
}
}
pub async fn insert_vacuum_health_snapshot(
logging_pool: &PgPool,
client_name: &str,
host: Option<&str>,
instance_id: Option<Uuid>,
rollup: &VacuumRollup,
rows: &[VacuumCatalogRow],
) -> Result<i64, Error> {
let mut tx: Transaction<'_, Postgres> = logging_pool.begin().await?;
let snapshot_id: i64 = sqlx::query_scalar(
r#"
INSERT INTO vacuum_health_snapshots (
client_name,
host,
instance_id,
total_dead_rows,
tables_with_bloat,
xid_freeze_risk,
tables_needing_vacuum,
freeze_max_age,
collection_error
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NULL)
RETURNING id
"#,
)
.bind(client_name)
.bind(host)
.bind(instance_id)
.bind(rollup.total_dead_rows)
.bind(rollup.tables_with_bloat)
.bind(rollup.xid_freeze_risk)
.bind(rollup.tables_needing_vacuum)
.bind(rollup.freeze_max_age)
.fetch_one(&mut *tx)
.await?;
for r in rows {
sqlx::query(
r#"
INSERT INTO vacuum_health_table_stats (
snapshot_id,
schemaname,
relname,
n_dead_tup,
n_live_tup,
dead_pct,
last_vacuum,
last_autovacuum,
xid_age,
xid_age_pct_of_freeze_max
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
"#,
)
.bind(snapshot_id)
.bind(&r.schemaname)
.bind(&r.relname)
.bind(r.n_dead_tup)
.bind(r.n_live_tup)
.bind(r.dead_pct)
.bind(r.last_vacuum)
.bind(r.last_autovacuum)
.bind(r.xid_age)
.bind(r.xid_age_pct)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(snapshot_id)
}
pub async fn insert_vacuum_health_failure(
logging_pool: &PgPool,
client_name: &str,
host: Option<&str>,
instance_id: Option<Uuid>,
collection_error: &str,
) -> Result<i64, sqlx::Error> {
let id: i64 = sqlx::query_scalar(
r#"
INSERT INTO vacuum_health_snapshots (
client_name,
host,
instance_id,
total_dead_rows,
tables_with_bloat,
xid_freeze_risk,
tables_needing_vacuum,
freeze_max_age,
collection_error
)
VALUES ($1, $2, $3, 0, 0, 0, 0, NULL, $4)
RETURNING id
"#,
)
.bind(client_name)
.bind(host)
.bind(instance_id)
.bind(collection_error)
.fetch_one(logging_pool)
.await?;
Ok(id)
}
pub async fn prune_vacuum_health_snapshots(
logging_pool: &PgPool,
retention_days: i64,
) -> Result<u64, sqlx::Error> {
let cutoff: DateTime<Utc> = Utc::now() - chrono::Duration::days(retention_days);
let affected = sqlx::query("DELETE FROM vacuum_health_snapshots WHERE recorded_at < $1")
.bind(cutoff)
.execute(logging_pool)
.await?
.rows_affected();
Ok(affected)
}
#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
pub struct VacuumHealthSnapshotRecord {
pub id: i64,
pub recorded_at: DateTime<Utc>,
pub client_name: String,
pub host: Option<String>,
pub instance_id: Option<Uuid>,
pub total_dead_rows: i64,
pub tables_with_bloat: i32,
pub xid_freeze_risk: i32,
pub tables_needing_vacuum: i32,
pub freeze_max_age: Option<i64>,
pub collection_error: Option<String>,
}
#[derive(Debug, Clone, Serialize, sqlx::FromRow)]
pub struct VacuumHealthTableStatRecord {
pub id: i64,
pub snapshot_id: i64,
pub schemaname: String,
pub relname: String,
pub n_dead_tup: i64,
pub n_live_tup: i64,
#[serde(with = "rust_decimal::serde::float_option")]
pub dead_pct: Option<Decimal>,
pub last_vacuum: Option<DateTime<Utc>>,
pub last_autovacuum: Option<DateTime<Utc>>,
pub xid_age: Option<i64>,
#[serde(with = "rust_decimal::serde::float_option")]
pub xid_age_pct_of_freeze_max: Option<Decimal>,
}
pub async fn list_latest_vacuum_health_summaries(
logging_pool: &PgPool,
) -> Result<Vec<VacuumHealthSnapshotRecord>, sqlx::Error> {
sqlx::query_as(
r#"
SELECT DISTINCT ON (client_name)
id,
recorded_at,
client_name,
host,
instance_id,
total_dead_rows,
tables_with_bloat,
xid_freeze_risk,
tables_needing_vacuum,
freeze_max_age,
collection_error
FROM vacuum_health_snapshots
ORDER BY client_name, recorded_at DESC
"#,
)
.fetch_all(logging_pool)
.await
}
pub async fn get_latest_vacuum_health_detail(
logging_pool: &PgPool,
client_name: &str,
) -> Result<Option<(VacuumHealthSnapshotRecord, Vec<VacuumHealthTableStatRecord>)>, sqlx::Error> {
let snapshot: Option<VacuumHealthSnapshotRecord> = sqlx::query_as(
r#"
SELECT
id,
recorded_at,
client_name,
host,
instance_id,
total_dead_rows,
tables_with_bloat,
xid_freeze_risk,
tables_needing_vacuum,
freeze_max_age,
collection_error
FROM vacuum_health_snapshots
WHERE client_name = $1
ORDER BY recorded_at DESC
LIMIT 1
"#,
)
.bind(client_name)
.fetch_optional(logging_pool)
.await?;
let Some(snap) = snapshot else {
return Ok(None);
};
let tables: Vec<VacuumHealthTableStatRecord> = sqlx::query_as(
r#"
SELECT
id,
snapshot_id,
schemaname,
relname,
n_dead_tup,
n_live_tup,
dead_pct,
last_vacuum,
last_autovacuum,
xid_age,
xid_age_pct_of_freeze_max
FROM vacuum_health_table_stats
WHERE snapshot_id = $1
ORDER BY vacuum_health_table_stats.dead_pct DESC NULLS LAST, schemaname, relname
"#,
)
.bind(snap.id)
.fetch_all(logging_pool)
.await?;
Ok(Some((snap, tables)))
}
#[cfg(test)]
mod tests {
use super::*;
fn row(dead_pct: f64, xid_pct: Option<f64>) -> VacuumCatalogRow {
VacuumCatalogRow {
schemaname: "public".to_string(),
relname: "t".to_string(),
n_dead_tup: 0,
n_live_tup: 100,
dead_pct,
last_vacuum: None,
last_autovacuum: None,
xid_age: 0,
xid_age_pct: xid_pct,
freeze_max_age: 200_000_000,
}
}
#[test]
fn rollup_counts_bloat_and_xid_and_union() {
let thresholds: VacuumHealthThresholds = VacuumHealthThresholds {
bloat_dead_pct: 10.0,
xid_risk_pct: 50.0,
};
let rows: Vec<VacuumCatalogRow> =
vec![row(11.0, None), row(0.0, Some(60.0)), row(5.0, Some(40.0))];
let r: VacuumRollup = rollup_vacuum_rows(&rows, &thresholds);
assert_eq!(r.tables_with_bloat, 1);
assert_eq!(r.xid_freeze_risk, 1);
assert_eq!(r.tables_needing_vacuum, 2);
}
#[test]
fn vacuum_query_does_not_use_reserved_freeze_cte_name() {
let bad_cte: String = ["WITH ", "freeze", " AS"].concat();
let src = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/src/data/vacuum_health.rs"
));
let hits: Vec<_> = src.match_indices(&bad_cte[..]).collect();
assert!(
hits.is_empty(),
"CTE must not be named 'freeze' — reserved keyword in some backends. \
Found at: {:?}",
hits
);
assert!(
src.contains("WITH freeze_cfg"),
"CTE should be named 'freeze_cfg'"
);
}
#[test]
fn rollup_all_clean_rows_gives_zero_counters() {
let thresholds = VacuumHealthThresholds {
bloat_dead_pct: 10.0,
xid_risk_pct: 50.0,
};
let rows = vec![row(0.0, Some(5.0)), row(1.0, Some(10.0))];
let r = rollup_vacuum_rows(&rows, &thresholds);
assert_eq!(r.tables_with_bloat, 0);
assert_eq!(r.xid_freeze_risk, 0);
assert_eq!(r.tables_needing_vacuum, 0);
}
#[test]
fn rollup_empty_rows_is_zero() {
let thresholds = VacuumHealthThresholds {
bloat_dead_pct: 10.0,
xid_risk_pct: 50.0,
};
let r = rollup_vacuum_rows(&[], &thresholds);
assert_eq!(r.total_dead_rows, 0);
assert_eq!(r.tables_with_bloat, 0);
assert_eq!(r.xid_freeze_risk, 0);
assert_eq!(r.tables_needing_vacuum, 0);
assert!(r.freeze_max_age.is_none());
}
}