athena_rs 3.4.7

Database driver
Documentation
//! Vacuum health background worker.
//!
//! Periodically samples `pg_stat_user_tables` and XID pressure from every
//! connected Postgres client, then stores per-table rollups in the logging
//! database for health dashboards and alerting.

use actix_web::rt::time::{Instant as RtInstant, interval_at};
use actix_web::web::Data;
use std::env::var;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Interval;
use tracing::{debug, info, warn};
use uuid::Uuid;

use crate::AppState;
use crate::config_validation::runtime_env_settings;
use crate::data::vacuum_health::{
    VacuumHealthThresholds, VacuumRollup, collect_vacuum_catalog_rows,
    insert_vacuum_health_failure, insert_vacuum_health_snapshot, prune_vacuum_health_snapshots,
    rollup_vacuum_rows,
};
use crate::drivers::postgresql::sqlx_driver::{PostgresClientRegistry, RegisteredClient};

fn is_pool_timeout(err: &sqlx::Error) -> bool {
    err.to_string()
        .to_ascii_lowercase()
        .contains("pool timed out while waiting for an open connection")
}

/// Spawns a background task that samples `pg_stat_user_tables` / XID age on each
/// registered Postgres client and stores rows in `vacuum_health_*` tables on the
/// logging database.
pub fn spawn_vacuum_health_collector(app_state: Data<AppState>) {
    let Some(logging_client_name) = app_state.logging_client_name.clone() else {
        debug!("No logging client configured; skipping vacuum health collector");
        return;
    };

    let Some(logging_pool) = app_state.pg_registry.get_pool(&logging_client_name) else {
        warn!(
            client = %logging_client_name,
            "Logging client pool unavailable; skipping vacuum health collector"
        );
        return;
    };

    let registry: Arc<PostgresClientRegistry> = app_state.pg_registry.clone();
    let runtime_env: &crate::config_validation::RuntimeEnvSettings = runtime_env_settings();
    let host: String = var("HOSTNAME").unwrap_or_else(|_| "unknown".to_string());
    let instance_id: Uuid = Uuid::new_v4();
    let interval_secs: u64 = runtime_env.vacuum_health_interval_secs;
    let retention_days: i64 = runtime_env.vacuum_health_retention_days;
    let statement_timeout_ms: u64 = runtime_env.vacuum_health_statement_timeout_ms;
    let thresholds: VacuumHealthThresholds = VacuumHealthThresholds {
        bloat_dead_pct: runtime_env.vacuum_health_bloat_dead_pct,
        xid_risk_pct: runtime_env.vacuum_health_xid_risk_pct,
    };

    info!(
        client = %logging_client_name,
        interval_secs,
        retention_days,
        statement_timeout_ms,
        host,
        instance_id = %instance_id,
        "Starting vacuum health collector"
    );

    actix_web::rt::spawn(async move {
        let start: RtInstant = RtInstant::now();
        let mut ticker: Interval = interval_at(start, Duration::from_secs(interval_secs));

        loop {
            ticker.tick().await;
            let clients: Vec<RegisteredClient> = registry.list_registered_clients();
            let mut logging_pool_saturated: bool = false;

            for reg in clients {
                if !reg.pool_connected {
                    continue;
                }
                let Some(target_pool) = registry.get_pool(&reg.client_name) else {
                    continue;
                };

                let client_name: &str = reg.client_name.as_str();
                match collect_vacuum_catalog_rows(&target_pool, statement_timeout_ms).await {
                    Ok(rows) => {
                        let rollup: VacuumRollup = rollup_vacuum_rows(&rows, &thresholds);
                        if let Err(err) = insert_vacuum_health_snapshot(
                            &logging_pool,
                            client_name,
                            Some(host.as_str()),
                            Some(instance_id),
                            &rollup,
                            &rows,
                        )
                        .await
                        {
                            warn!(
                                target_client = %client_name,
                                logging_client = %logging_client_name,
                                error = %err,
                                "Failed to write vacuum_health snapshot to logging database"
                            );
                            if is_pool_timeout(&err) {
                                logging_pool_saturated = true;
                                break;
                            }
                        }
                    }
                    Err(err) => {
                        warn!(
                            target_client = %client_name,
                            error = %err,
                            "Vacuum health catalog query failed"
                        );
                        let msg: String = err.to_string();
                        if let Err(insert_err) = insert_vacuum_health_failure(
                            &logging_pool,
                            client_name,
                            Some(host.as_str()),
                            Some(instance_id),
                            &msg,
                        )
                        .await
                        {
                            warn!(
                                target_client = %client_name,
                                logging_client = %logging_client_name,
                                error = %insert_err,
                                "Failed to record vacuum_health failure row"
                            );
                            if is_pool_timeout(&insert_err) {
                                logging_pool_saturated = true;
                                break;
                            }
                        }
                    }
                }
            }

            if logging_pool_saturated {
                warn!(
                    logging_client = %logging_client_name,
                    "Vacuum health sweep interrupted early due to logging pool saturation"
                );
                continue;
            }

            if let Err(err) = prune_vacuum_health_snapshots(&logging_pool, retention_days).await {
                debug!(error = %err, "Failed to prune old vacuum_health_snapshots rows");
            }
        }
    });
}