athena_rs 3.26.1

Hyper performant polyglot Database driver
Documentation
//! Cluster-health payload loading for `GET /health/cluster`.
//!
//! This module orchestrates cache lookup, mirror-source selection, mirror
//! probing, and final payload assembly so route handlers remain lightweight.

use crate::AppState;
use crate::api::client_context::logging_pool;
use crate::api::health_capabilities::unavailable_capabilities;
use crate::api::health_cluster_cache::{read_fresh_cluster_payload, write_cluster_payload_cache};
use crate::api::health_cluster_payload::{
    apply_cluster_probe_metrics, build_cluster_health_payload,
};
use crate::api::health_contracts::{ClusterHealthPayload, ClusterMirrorHealth};
use crate::api::health_mirror_probe::probe_cluster_mirrors;
use crate::data::cluster::{
    ClusterMirrorRecord, fallback_cluster_mirrors, list_active_cluster_mirrors,
};

use std::time::{SystemTime, UNIX_EPOCH};

/// Cache key used for cluster-health payload caching.
const CLUSTER_CACHE_KEY: &str = "health:cluster";

/// Loads cluster health payload, serving from short-lived cache when fresh.
pub(super) async fn load_cluster_payload(app_state: &AppState) -> ClusterHealthPayload {
    let now_epoch_seconds = epoch_seconds();
    if let Some(cached_payload) =
        read_fresh_cluster_payload(app_state, CLUSTER_CACHE_KEY, now_epoch_seconds).await
    {
        return cached_payload;
    }

    let mirrors = load_active_cluster_mirrors(app_state).await;
    let probes =
        probe_cluster_mirrors(app_state.client.clone(), mirrors, unavailable_capabilities).await;

    let mirrors: Vec<ClusterMirrorHealth> = apply_cluster_probe_metrics(app_state, probes);
    let payload: ClusterHealthPayload = build_cluster_health_payload(app_state, mirrors);
    write_cluster_payload_cache(app_state, CLUSTER_CACHE_KEY, &payload, now_epoch_seconds).await;
    payload
}

/// Loads active mirror rows from the logging client and falls back when unavailable.
async fn load_active_cluster_mirrors(app_state: &AppState) -> Vec<ClusterMirrorRecord> {
    let active_mirrors = match logging_pool(app_state) {
        Ok(pool) => list_active_cluster_mirrors(&pool).await.ok(),
        Err(_) => None,
    };
    prefer_active_mirrors_or_fallback(active_mirrors)
}

/// Uses active mirror rows when present; otherwise falls back to static defaults.
fn prefer_active_mirrors_or_fallback(
    active_mirrors: Option<Vec<ClusterMirrorRecord>>,
) -> Vec<ClusterMirrorRecord> {
    active_mirrors
        .filter(|rows| !rows.is_empty())
        .unwrap_or_else(fallback_cluster_mirrors)
}

/// Returns current Unix epoch seconds.
fn epoch_seconds() -> i64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_secs() as i64
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Builds a mirror record for fallback-selection tests.
    fn mirror(url: &str) -> ClusterMirrorRecord {
        ClusterMirrorRecord {
            url: url.to_string(),
            is_active: true,
            priority: 0,
        }
    }

    #[test]
    /// Prefers active mirror rows when they exist and are non-empty.
    fn prefers_non_empty_active_mirror_rows() {
        let provided = vec![mirror("https://example-mirror")];
        let result = prefer_active_mirrors_or_fallback(Some(provided.clone()));
        assert_eq!(result.len(), 1);
        assert_eq!(result[0].url, provided[0].url);
    }

    #[test]
    /// Falls back to static mirrors when active rows are missing or empty.
    fn falls_back_when_active_mirror_rows_are_missing_or_empty() {
        let from_none = prefer_active_mirrors_or_fallback(None);
        let from_empty = prefer_active_mirrors_or_fallback(Some(Vec::new()));
        let expected = fallback_cluster_mirrors();
        assert_eq!(from_none.len(), expected.len());
        assert_eq!(from_empty.len(), expected.len());
        assert_eq!(from_none[0].url, expected[0].url);
        assert_eq!(from_empty[0].url, expected[0].url);
    }
}