athena_rs 3.23.0

Hyper performant polyglot Database driver
Documentation
//! Runtime snapshot loaders for `/metrics` endpoint enrichment.
//!
//! This module isolates short-lived cached aggregates and logging-database
//! queue snapshots from the main Prometheus text rendering path.

use serde::{Deserialize, Serialize};
use serde_json::json;
use sqlx::Row;
use std::collections::HashMap;

use crate::AppState;

const LAST_24H_CACHE_KEY: &str = "metrics:last_24h";

/// Cached aggregate totals for last-24-hour traffic surfaces.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub(super) struct Last24hMetrics {
    /// Total request count observed in `gateway_request_log` over the last 24h.
    pub(super) requests_last_24h: i64,
    /// Total management mutation count observed in `gateway_operation_log` over the last 24h.
    pub(super) management_mutations_last_24h: i64,
}

/// Cache payload wrapper with timestamp for short TTL validation.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct Last24hMetricsCacheEntry {
    cached_at_epoch_seconds: i64,
    metrics: Last24hMetrics,
}

/// Deferred queue health/size snapshot used by Prometheus export.
#[derive(Debug, Clone, Default)]
pub(super) struct DeferredQueueMetrics {
    /// Whether the deferred queue table could be queried successfully.
    pub(super) storage_up: bool,
    /// Total number of deferred queue rows across statuses.
    pub(super) total: u64,
    /// Per-status row counts for deferred queue rows.
    pub(super) by_status: HashMap<String, u64>,
    /// Age in seconds of the oldest queued/running deferred row.
    pub(super) oldest_age_seconds: f64,
}

/// Loads cached (or live) last-24-hour request and management mutation totals.
pub(super) async fn last_24h_metrics(app_state: &AppState) -> Last24hMetrics {
    if let Some(cached) = app_state.cache.get(LAST_24H_CACHE_KEY).await
        && let Ok(value) = serde_json::from_value::<Last24hMetricsCacheEntry>(cached)
        && epoch_seconds() - value.cached_at_epoch_seconds <= 30
    {
        return value.metrics;
    }

    let Some(logging_client_name) = app_state.logging_client_name.as_ref() else {
        return Last24hMetrics::default();
    };
    let Some(pool) = app_state.pg_registry.get_pool(logging_client_name) else {
        return Last24hMetrics::default();
    };

    let requests_last_24h: i64 = sqlx::query(
        r#"
        SELECT COUNT(*) AS total
        FROM gateway_request_log
        WHERE to_timestamp(time) >= now() - interval '24 hours'
        "#,
    )
    .fetch_one(&pool)
    .await
    .ok()
    .and_then(|row| row.try_get::<i64, _>("total").ok())
    .unwrap_or_default();

    let management_mutations_last_24h = sqlx::query(
        r#"
        SELECT COUNT(*) AS total
        FROM gateway_operation_log
        WHERE path LIKE '/management/%'
          AND to_timestamp(time) >= now() - interval '24 hours'
        "#,
    )
    .fetch_one(&pool)
    .await
    .ok()
    .and_then(|row| row.try_get::<i64, _>("total").ok())
    .unwrap_or_default();

    let value: Last24hMetrics = Last24hMetrics {
        requests_last_24h,
        management_mutations_last_24h,
    };
    app_state
        .cache
        .insert(
            LAST_24H_CACHE_KEY.to_string(),
            json!(Last24hMetricsCacheEntry {
                cached_at_epoch_seconds: epoch_seconds(),
                metrics: value.clone(),
            }),
        )
        .await;
    value
}

/// Queries deferred queue health and size aggregates for metrics export.
pub(super) async fn deferred_queue_metrics(app_state: &AppState) -> DeferredQueueMetrics {
    let Some(logging_client_name) = app_state.logging_client_name.as_ref() else {
        return DeferredQueueMetrics::default();
    };
    let Some(pool) = app_state.pg_registry.get_pool(logging_client_name) else {
        return DeferredQueueMetrics::default();
    };

    let mut metrics: DeferredQueueMetrics = DeferredQueueMetrics {
        storage_up: true,
        ..DeferredQueueMetrics::default()
    };

    let rows: Result<Vec<sqlx::postgres::PgRow>, sqlx::Error> = sqlx::query(
        r#"
        SELECT status, COUNT(*)::bigint AS total
        FROM public.gateway_deferred_request_queue
        GROUP BY status
        "#,
    )
    .fetch_all(&pool)
    .await;

    let rows: Vec<sqlx::postgres::PgRow> = match rows {
        Ok(rows) => rows,
        Err(_) => {
            metrics.storage_up = false;
            return metrics;
        }
    };

    for row in rows {
        let status: String = row
            .try_get("status")
            .unwrap_or_else(|_| "unknown".to_string());
        let count: i64 = row.try_get("total").unwrap_or_default();
        let count: u64 = count.max(0) as u64;
        metrics.total = metrics.total.saturating_add(count);
        metrics.by_status.insert(status, count);
    }

    metrics.oldest_age_seconds = sqlx::query(
        r#"
        SELECT COALESCE(EXTRACT(EPOCH FROM (now() - MIN(created_at))), 0) AS oldest_age_seconds
        FROM public.gateway_deferred_request_queue
        WHERE status IN ('queued', 'running')
        "#,
    )
    .fetch_one(&pool)
    .await
    .ok()
    .and_then(|row| row.try_get::<f64, _>("oldest_age_seconds").ok())
    .unwrap_or(0.0)
    .max(0.0);

    metrics
}

/// Returns the current UNIX epoch timestamp in seconds.
fn epoch_seconds() -> i64 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_secs() as i64
}