use actix_web::rt::time::{Instant as RtInstant, interval_at};
use actix_web::web::Data;
use std::env::var;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Interval;
use tracing::{debug, info, warn};
use uuid::Uuid;
const B: &str = "\x1b[1m"; const Y: &str = "\x1b[33m"; const R: &str = "\x1b[31m"; const _G: &str = "\x1b[32m"; const Z: &str = "\x1b[0m";
use crate::AppState;
use crate::data::client_connections::{insert_connection_snapshot, prune_connection_snapshots};
use crate::data::vacuum_health::{
DEFAULT_STATEMENT_TIMEOUT_MS, VacuumHealthThresholds, collect_vacuum_catalog_rows,
insert_vacuum_health_failure, insert_vacuum_health_snapshot, prune_vacuum_health_snapshots,
rollup_vacuum_rows,
};
use crate::drivers::postgresql::pool_manager::ConnectionPoolSnapshot;
use crate::drivers::postgresql::sqlx_driver::{ClientConnectionTarget, PostgresClientRegistry};
const DEFAULT_MONITOR_INTERVAL_SECS: u64 = 30;
const DEFAULT_MONITOR_RETENTION_HOURS: i64 = 24;
const DEFAULT_VACUUM_HEALTH_INTERVAL_SECS: u64 = 3600;
const DEFAULT_VACUUM_HEALTH_RETENTION_DAYS: i64 = 30;
const DEFAULT_RECONNECT_INTERVAL_SECS: u64 = 20;
pub fn spawn_connection_monitor(app_state: Data<AppState>) {
let Some(logging_client_name) = app_state.logging_client_name.clone() else {
debug!("No logging client configured; skipping connection monitor");
return;
};
let Some(logging_pool) = app_state.pg_registry.get_pool(&logging_client_name) else {
warn!(
client = %logging_client_name,
"Logging client pool unavailable; skipping connection monitor"
);
return;
};
let registry: Arc<PostgresClientRegistry> = app_state.pg_registry.clone();
let host: String = var("HOSTNAME").unwrap_or_else(|_| "unknown".to_string());
let instance_id: Uuid = Uuid::new_v4();
let interval_secs: u64 = var("ATHENA_POOL_MONITOR_INTERVAL_SECS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_MONITOR_INTERVAL_SECS);
let retention_hours: i64 = var("ATHENA_POOL_MONITOR_RETENTION_HOURS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_MONITOR_RETENTION_HOURS);
info!(
client = %logging_client_name,
interval_secs,
retention_hours,
host,
instance_id = %instance_id,
"Starting connection pool monitor"
);
actix_web::rt::spawn(async move {
let start: RtInstant = RtInstant::now();
let mut ticker: Interval = interval_at(start, Duration::from_secs(interval_secs));
loop {
ticker.tick().await;
let snapshots: Vec<ConnectionPoolSnapshot> = registry.pool_snapshots();
let total: usize = snapshots.len();
let mut failed: i32 = 0;
let mut last_err: Option<sqlx::Error> = None;
for snapshot in &snapshots {
if let Err(err) =
insert_connection_snapshot(&logging_pool, snapshot, &host, instance_id).await
{
failed += 1;
if last_err.is_none() {
last_err = Some(err);
}
}
}
if failed > 0 {
warn!(
failed,
total,
error = %last_err.unwrap(),
"{}âš {} {}Failed to write client_connections snapshot(s){} ({}failed{}={}, total={}); {}logging DB may be unreachable{}",
Y, Z, B, Z, B, Z, failed, total, R, Z
);
}
if let Err(err) = prune_connection_snapshots(&logging_pool, retention_hours).await {
debug!(error = %err, "Failed to prune old client_connections rows");
}
}
});
}
pub fn spawn_vacuum_health_collector(app_state: Data<AppState>) {
let Some(logging_client_name) = app_state.logging_client_name.clone() else {
debug!("No logging client configured; skipping vacuum health collector");
return;
};
let Some(logging_pool) = app_state.pg_registry.get_pool(&logging_client_name) else {
warn!(
client = %logging_client_name,
"Logging client pool unavailable; skipping vacuum health collector"
);
return;
};
let registry: Arc<PostgresClientRegistry> = app_state.pg_registry.clone();
let host: String = var("HOSTNAME").unwrap_or_else(|_| "unknown".to_string());
let instance_id: Uuid = Uuid::new_v4();
let interval_secs: u64 = var("ATHENA_VACUUM_HEALTH_INTERVAL_SECS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_VACUUM_HEALTH_INTERVAL_SECS);
let retention_days: i64 = var("ATHENA_VACUUM_HEALTH_RETENTION_DAYS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_VACUUM_HEALTH_RETENTION_DAYS);
let statement_timeout_ms: u64 = var("ATHENA_VACUUM_HEALTH_STATEMENT_TIMEOUT_MS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_STATEMENT_TIMEOUT_MS);
info!(
client = %logging_client_name,
interval_secs,
retention_days,
statement_timeout_ms,
host,
instance_id = %instance_id,
"Starting vacuum health collector"
);
actix_web::rt::spawn(async move {
let start: RtInstant = RtInstant::now();
let mut ticker: Interval = interval_at(start, Duration::from_secs(interval_secs));
loop {
ticker.tick().await;
let thresholds: VacuumHealthThresholds = VacuumHealthThresholds::from_env();
let clients = registry.list_registered_clients();
for reg in clients {
if !reg.pool_connected {
continue;
}
let Some(target_pool) = registry.get_pool(®.client_name) else {
continue;
};
let client_name = reg.client_name.as_str();
match collect_vacuum_catalog_rows(&target_pool, statement_timeout_ms).await {
Ok(rows) => {
let rollup = rollup_vacuum_rows(&rows, &thresholds);
if let Err(err) = insert_vacuum_health_snapshot(
&logging_pool,
client_name,
Some(host.as_str()),
Some(instance_id),
&rollup,
&rows,
)
.await
{
warn!(
client = %client_name,
error = %err,
"Failed to write vacuum_health snapshot to logging database"
);
}
}
Err(err) => {
warn!(
client = %client_name,
error = %err,
"Vacuum health catalog query failed"
);
let msg = err.to_string();
if let Err(insert_err) = insert_vacuum_health_failure(
&logging_pool,
client_name,
Some(host.as_str()),
Some(instance_id),
&msg,
)
.await
{
warn!(
client = %client_name,
error = %insert_err,
"Failed to record vacuum_health failure row"
);
}
}
}
}
if let Err(err) = prune_vacuum_health_snapshots(&logging_pool, retention_days).await {
debug!(error = %err, "Failed to prune old vacuum_health_snapshots rows");
}
}
});
}
pub fn spawn_registry_reconnect_worker(app_state: Data<AppState>) {
let registry: Arc<PostgresClientRegistry> = app_state.pg_registry.clone();
let interval_secs: u64 = var("ATHENA_CLIENT_RECONNECT_INTERVAL_SECS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_RECONNECT_INTERVAL_SECS);
info!(interval_secs, "Starting Postgres client reconnect worker");
actix_web::rt::spawn(async move {
let start: RtInstant = RtInstant::now();
let mut ticker: Interval = interval_at(start, Duration::from_secs(interval_secs));
loop {
ticker.tick().await;
let clients = registry.list_registered_clients();
for client in clients {
if !client.is_active || client.is_frozen || client.pool_connected {
continue;
}
let target = ClientConnectionTarget {
client_name: client.client_name.clone(),
source: client.source.clone(),
description: client.description.clone(),
pg_uri: client.pg_uri.clone(),
pg_uri_env_var: client.pg_uri_env_var.clone(),
config_uri_template: client.config_uri_template.clone(),
is_active: client.is_active,
is_frozen: client.is_frozen,
};
match registry.upsert_client(target).await {
Ok(()) => info!(client = %client.client_name, "Reconnected Postgres client"),
Err(err) => debug!(
client = %client.client_name,
error = %err,
"Reconnect attempt failed"
),
}
}
registry.sync_connection_status();
}
});
}