use actix_web::rt::time::{Instant as RtInstant, interval_at};
use actix_web::web::Data;
use std::env::var;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Interval;
use tracing::{debug, info, warn};
use uuid::Uuid;
use crate::AppState;
use crate::config_validation::runtime_env_settings;
use crate::data::vacuum_health::{
VacuumHealthThresholds, VacuumRollup, collect_vacuum_catalog_rows,
insert_vacuum_health_failure, insert_vacuum_health_snapshot, prune_vacuum_health_snapshots,
rollup_vacuum_rows,
};
use crate::drivers::postgresql::sqlx_driver::{PostgresClientRegistry, RegisteredClient};
fn is_pool_timeout(err: &sqlx::Error) -> bool {
err.to_string()
.to_ascii_lowercase()
.contains("pool timed out while waiting for an open connection")
}
pub fn spawn_vacuum_health_collector(app_state: Data<AppState>) {
let Some(logging_client_name) = app_state.logging_client_name.clone() else {
debug!("No logging client configured; skipping vacuum health collector");
return;
};
let Some(logging_pool) = app_state.pg_registry.get_pool(&logging_client_name) else {
warn!(
client = %logging_client_name,
"Logging client pool unavailable; skipping vacuum health collector"
);
return;
};
let registry: Arc<PostgresClientRegistry> = app_state.pg_registry.clone();
let runtime_env: &crate::config_validation::RuntimeEnvSettings = runtime_env_settings();
let host: String = var("HOSTNAME").unwrap_or_else(|_| "unknown".to_string());
let instance_id: Uuid = Uuid::new_v4();
let interval_secs: u64 = runtime_env.vacuum_health_interval_secs;
let retention_days: i64 = runtime_env.vacuum_health_retention_days;
let statement_timeout_ms: u64 = runtime_env.vacuum_health_statement_timeout_ms;
let thresholds: VacuumHealthThresholds = VacuumHealthThresholds {
bloat_dead_pct: runtime_env.vacuum_health_bloat_dead_pct,
xid_risk_pct: runtime_env.vacuum_health_xid_risk_pct,
};
info!(
client = %logging_client_name,
interval_secs,
retention_days,
statement_timeout_ms,
host,
instance_id = %instance_id,
"Starting vacuum health collector"
);
actix_web::rt::spawn(async move {
let start: RtInstant = RtInstant::now();
let mut ticker: Interval = interval_at(start, Duration::from_secs(interval_secs));
loop {
ticker.tick().await;
let clients: Vec<RegisteredClient> = registry.list_registered_clients();
let mut logging_pool_saturated: bool = false;
for reg in clients {
if !reg.pool_connected {
continue;
}
let Some(target_pool) = registry.get_pool(®.client_name) else {
continue;
};
let client_name: &str = reg.client_name.as_str();
match collect_vacuum_catalog_rows(&target_pool, statement_timeout_ms).await {
Ok(rows) => {
let rollup: VacuumRollup = rollup_vacuum_rows(&rows, &thresholds);
if let Err(err) = insert_vacuum_health_snapshot(
&logging_pool,
client_name,
Some(host.as_str()),
Some(instance_id),
&rollup,
&rows,
)
.await
{
warn!(
target_client = %client_name,
logging_client = %logging_client_name,
error = %err,
"Failed to write vacuum_health snapshot to logging database"
);
if is_pool_timeout(&err) {
logging_pool_saturated = true;
break;
}
}
}
Err(err) => {
warn!(
target_client = %client_name,
error = %err,
"Vacuum health catalog query failed"
);
let msg: String = err.to_string();
if let Err(insert_err) = insert_vacuum_health_failure(
&logging_pool,
client_name,
Some(host.as_str()),
Some(instance_id),
&msg,
)
.await
{
warn!(
target_client = %client_name,
logging_client = %logging_client_name,
error = %insert_err,
"Failed to record vacuum_health failure row"
);
if is_pool_timeout(&insert_err) {
logging_pool_saturated = true;
break;
}
}
}
}
}
if logging_pool_saturated {
warn!(
logging_client = %logging_client_name,
"Vacuum health sweep interrupted early due to logging pool saturation"
);
continue;
}
if let Err(err) = prune_vacuum_health_snapshots(&logging_pool, retention_days).await {
debug!(error = %err, "Failed to prune old vacuum_health_snapshots rows");
}
}
});
}