athena_rs 3.3.0

Database gateway API
Documentation
//! Daemon / background server module.
//! Houses background tasks that run alongside the HTTP server.

use actix_web::rt::time::{Instant as RtInstant, interval_at};
use actix_web::web::Data;
use std::env::var;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Interval;
use tracing::{debug, info, warn};
use uuid::Uuid;

/// ANSI codes for terminal styling (only when stdout is a TTY; otherwise no-op if stripped).
const B: &str = "\x1b[1m"; // bold
const Y: &str = "\x1b[33m"; // yellow
const R: &str = "\x1b[31m"; // red
const _G: &str = "\x1b[32m"; // green
const Z: &str = "\x1b[0m"; // reset

use crate::AppState;
use crate::data::client_connections::{insert_connection_snapshot, prune_connection_snapshots};
use crate::data::vacuum_health::{
    DEFAULT_STATEMENT_TIMEOUT_MS, VacuumHealthThresholds, collect_vacuum_catalog_rows,
    insert_vacuum_health_failure, insert_vacuum_health_snapshot, prune_vacuum_health_snapshots,
    rollup_vacuum_rows,
};
use crate::drivers::postgresql::pool_manager::ConnectionPoolSnapshot;
use crate::drivers::postgresql::sqlx_driver::{ClientConnectionTarget, PostgresClientRegistry};

const DEFAULT_MONITOR_INTERVAL_SECS: u64 = 30;
const DEFAULT_MONITOR_RETENTION_HOURS: i64 = 24;

const DEFAULT_VACUUM_HEALTH_INTERVAL_SECS: u64 = 3600;
const DEFAULT_VACUUM_HEALTH_RETENTION_DAYS: i64 = 30;
const DEFAULT_RECONNECT_INTERVAL_SECS: u64 = 20;

/// Spawn a background task that periodically records pool occupancy into the
/// `client_connections` table inside the logging database.
///
/// The monitor is intentionally lightweight: it samples counts from sqlx pool
/// metadata and writes a single row per client per interval. When the logging
/// client is misconfigured or unavailable, the monitor logs a warning and exits
/// quietly without impacting the HTTP server.
///
/// **Important:** The spawned task uses `PgPool` (sqlx); it must run on the
/// Actix/Tokio runtime. Do not run pool or sqlx operations from `spawn_blocking`
/// or other threads that lack a Tokio context, or you may see "this functionality
/// requires a Tokio context" panics.
pub fn spawn_connection_monitor(app_state: Data<AppState>) {
    let Some(logging_client_name) = app_state.logging_client_name.clone() else {
        debug!("No logging client configured; skipping connection monitor");
        return;
    };

    let Some(logging_pool) = app_state.pg_registry.get_pool(&logging_client_name) else {
        warn!(
            client = %logging_client_name,
            "Logging client pool unavailable; skipping connection monitor"
        );
        return;
    };

    let registry: Arc<PostgresClientRegistry> = app_state.pg_registry.clone();
    let host: String = var("HOSTNAME").unwrap_or_else(|_| "unknown".to_string());
    let instance_id: Uuid = Uuid::new_v4();
    let interval_secs: u64 = var("ATHENA_POOL_MONITOR_INTERVAL_SECS")
        .ok()
        .and_then(|v| v.parse().ok())
        .unwrap_or(DEFAULT_MONITOR_INTERVAL_SECS);
    let retention_hours: i64 = var("ATHENA_POOL_MONITOR_RETENTION_HOURS")
        .ok()
        .and_then(|v| v.parse().ok())
        .unwrap_or(DEFAULT_MONITOR_RETENTION_HOURS);

    info!(
        client = %logging_client_name,
        interval_secs,
        retention_hours,
        host,
        instance_id = %instance_id,
        "Starting connection pool monitor"
    );

    actix_web::rt::spawn(async move {
        let start: RtInstant = RtInstant::now();
        let mut ticker: Interval = interval_at(start, Duration::from_secs(interval_secs));

        loop {
            ticker.tick().await;

            let snapshots: Vec<ConnectionPoolSnapshot> = registry.pool_snapshots();
            let total: usize = snapshots.len();
            let mut failed: i32 = 0;
            let mut last_err: Option<sqlx::Error> = None;
            for snapshot in &snapshots {
                if let Err(err) =
                    insert_connection_snapshot(&logging_pool, snapshot, &host, instance_id).await
                {
                    failed += 1;
                    if last_err.is_none() {
                        last_err = Some(err);
                    }
                }
            }
            if failed > 0 {
                warn!(
                    failed,
                    total,
                    error = %last_err.unwrap(),
                    "{}âš {} {}Failed to write client_connections snapshot(s){} ({}failed{}={}, total={}); {}logging DB may be unreachable{}",
                    Y, Z, B, Z, B, Z, failed, total, R, Z
                );
            }

            // Opportunistically prune old snapshots to keep the table bounded.
            if let Err(err) = prune_connection_snapshots(&logging_pool, retention_hours).await {
                debug!(error = %err, "Failed to prune old client_connections rows");
            }
        }
    });
}

/// Spawn a background task that samples `pg_stat_user_tables` / XID age on each
/// registered Postgres client and stores rows in `vacuum_health_*` tables on the
/// logging database.
pub fn spawn_vacuum_health_collector(app_state: Data<AppState>) {
    let Some(logging_client_name) = app_state.logging_client_name.clone() else {
        debug!("No logging client configured; skipping vacuum health collector");
        return;
    };

    let Some(logging_pool) = app_state.pg_registry.get_pool(&logging_client_name) else {
        warn!(
            client = %logging_client_name,
            "Logging client pool unavailable; skipping vacuum health collector"
        );
        return;
    };

    let registry: Arc<PostgresClientRegistry> = app_state.pg_registry.clone();
    let host: String = var("HOSTNAME").unwrap_or_else(|_| "unknown".to_string());
    let instance_id: Uuid = Uuid::new_v4();
    let interval_secs: u64 = var("ATHENA_VACUUM_HEALTH_INTERVAL_SECS")
        .ok()
        .and_then(|v| v.parse().ok())
        .unwrap_or(DEFAULT_VACUUM_HEALTH_INTERVAL_SECS);
    let retention_days: i64 = var("ATHENA_VACUUM_HEALTH_RETENTION_DAYS")
        .ok()
        .and_then(|v| v.parse().ok())
        .unwrap_or(DEFAULT_VACUUM_HEALTH_RETENTION_DAYS);
    let statement_timeout_ms: u64 = var("ATHENA_VACUUM_HEALTH_STATEMENT_TIMEOUT_MS")
        .ok()
        .and_then(|v| v.parse().ok())
        .unwrap_or(DEFAULT_STATEMENT_TIMEOUT_MS);

    info!(
        client = %logging_client_name,
        interval_secs,
        retention_days,
        statement_timeout_ms,
        host,
        instance_id = %instance_id,
        "Starting vacuum health collector"
    );

    actix_web::rt::spawn(async move {
        let start: RtInstant = RtInstant::now();
        let mut ticker: Interval = interval_at(start, Duration::from_secs(interval_secs));

        loop {
            ticker.tick().await;

            let thresholds: VacuumHealthThresholds = VacuumHealthThresholds::from_env();
            let clients = registry.list_registered_clients();

            for reg in clients {
                if !reg.pool_connected {
                    continue;
                }
                let Some(target_pool) = registry.get_pool(&reg.client_name) else {
                    continue;
                };

                let client_name = reg.client_name.as_str();
                match collect_vacuum_catalog_rows(&target_pool, statement_timeout_ms).await {
                    Ok(rows) => {
                        let rollup = rollup_vacuum_rows(&rows, &thresholds);
                        if let Err(err) = insert_vacuum_health_snapshot(
                            &logging_pool,
                            client_name,
                            Some(host.as_str()),
                            Some(instance_id),
                            &rollup,
                            &rows,
                        )
                        .await
                        {
                            warn!(
                                client = %client_name,
                                error = %err,
                                "Failed to write vacuum_health snapshot to logging database"
                            );
                        }
                    }
                    Err(err) => {
                        warn!(
                            client = %client_name,
                            error = %err,
                            "Vacuum health catalog query failed"
                        );
                        let msg = err.to_string();
                        if let Err(insert_err) = insert_vacuum_health_failure(
                            &logging_pool,
                            client_name,
                            Some(host.as_str()),
                            Some(instance_id),
                            &msg,
                        )
                        .await
                        {
                            warn!(
                                client = %client_name,
                                error = %insert_err,
                                "Failed to record vacuum_health failure row"
                            );
                        }
                    }
                }
            }

            if let Err(err) = prune_vacuum_health_snapshots(&logging_pool, retention_days).await {
                debug!(error = %err, "Failed to prune old vacuum_health_snapshots rows");
            }
        }
    });
}

/// Spawn a background worker that retries connecting unavailable active clients.
pub fn spawn_registry_reconnect_worker(app_state: Data<AppState>) {
    let registry: Arc<PostgresClientRegistry> = app_state.pg_registry.clone();
    let interval_secs: u64 = var("ATHENA_CLIENT_RECONNECT_INTERVAL_SECS")
        .ok()
        .and_then(|v| v.parse().ok())
        .unwrap_or(DEFAULT_RECONNECT_INTERVAL_SECS);

    info!(interval_secs, "Starting Postgres client reconnect worker");
    actix_web::rt::spawn(async move {
        let start: RtInstant = RtInstant::now();
        let mut ticker: Interval = interval_at(start, Duration::from_secs(interval_secs));
        loop {
            ticker.tick().await;
            let clients = registry.list_registered_clients();
            for client in clients {
                if !client.is_active || client.is_frozen || client.pool_connected {
                    continue;
                }
                let target = ClientConnectionTarget {
                    client_name: client.client_name.clone(),
                    source: client.source.clone(),
                    description: client.description.clone(),
                    pg_uri: client.pg_uri.clone(),
                    pg_uri_env_var: client.pg_uri_env_var.clone(),
                    config_uri_template: client.config_uri_template.clone(),
                    is_active: client.is_active,
                    is_frozen: client.is_frozen,
                };
                match registry.upsert_client(target).await {
                    Ok(()) => info!(client = %client.client_name, "Reconnected Postgres client"),
                    Err(err) => debug!(
                        client = %client.client_name,
                        error = %err,
                        "Reconnect attempt failed"
                    ),
                }
            }
            registry.sync_connection_status();
        }
    });
}