athena_rs 2.0.2

Database gateway API
Documentation
//! Daemon / background server module.
//! Houses background tasks that run alongside the HTTP server.

use actix_web::rt::time::{Instant as RtInstant, interval_at};
use actix_web::web::Data;
use std::time::Duration;
use tracing::{debug, info, warn};
use uuid::Uuid;

use crate::AppState;
use crate::data::client_connections::{insert_connection_snapshot, prune_connection_snapshots};

const DEFAULT_MONITOR_INTERVAL_SECS: u64 = 30;
const DEFAULT_MONITOR_RETENTION_HOURS: i64 = 24;

/// Spawn a background task that periodically records pool occupancy into the
/// `client_connections` table inside the logging database.
///
/// The monitor is intentionally lightweight: it samples counts from sqlx pool
/// metadata and writes a single row per client per interval. When the logging
/// client is misconfigured or unavailable, the monitor logs a warning and exits
/// quietly without impacting the HTTP server.
pub fn spawn_connection_monitor(app_state: Data<AppState>) {
    let Some(logging_client_name) = app_state.logging_client_name.clone() else {
        debug!("No logging client configured; skipping connection monitor");
        return;
    };

    let Some(logging_pool) = app_state.pg_registry.get_pool(&logging_client_name) else {
        warn!(client = %logging_client_name, "Logging client pool unavailable; skipping connection monitor");
        return;
    };

    let registry = app_state.pg_registry.clone();
    let host: String = std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_string());
    let instance_id: Uuid = Uuid::new_v4();
    let interval_secs: u64 = std::env::var("ATHENA_POOL_MONITOR_INTERVAL_SECS")
        .ok()
        .and_then(|v| v.parse().ok())
        .unwrap_or(DEFAULT_MONITOR_INTERVAL_SECS);
    let retention_hours: i64 = std::env::var("ATHENA_POOL_MONITOR_RETENTION_HOURS")
        .ok()
        .and_then(|v| v.parse().ok())
        .unwrap_or(DEFAULT_MONITOR_RETENTION_HOURS);

    info!(
        client = %logging_client_name,
        interval_secs,
        retention_hours,
        host,
        instance_id = %instance_id,
        "Starting connection pool monitor"
    );

    actix_web::rt::spawn(async move {
        let start = RtInstant::now();
        let mut ticker = interval_at(start, Duration::from_secs(interval_secs));

        loop {
            ticker.tick().await;

            let snapshots = registry.pool_snapshots();
            for snapshot in snapshots {
                if let Err(err) =
                    insert_connection_snapshot(&logging_pool, &snapshot, &host, instance_id).await
                {
                    warn!(
                        client = %snapshot.client_name,
                        error = %err,
                        "Failed to write client_connections snapshot"
                    );
                }
            }

            // Opportunistically prune old snapshots to keep the table bounded.
            if let Err(err) = prune_connection_snapshots(&logging_pool, retention_hours).await {
                debug!(error = %err, "Failed to prune old client_connections rows");
            }
        }
    });
}