athena_rs 3.26.3

Hyper performant polyglot Database driver
//! Cluster mirror probing helpers for health endpoints.
//!
//! This module encapsulates remote mirror probing and capability mapping so
//! `health.rs` can focus on cache and payload orchestration.

use futures::future::join_all;
use serde_json::Value;
use std::time::{Duration, Instant};

use crate::api::health_capabilities::{ClusterCapabilities, ClusterCapabilityStatus};
use crate::api::health_contracts::ClusterMirrorHealth;
use crate::data::cluster::ClusterMirrorRecord;

/// Probes configured mirrors and returns `(mirror_url, probe)` tuples.
pub(super) async fn probe_cluster_mirrors(
    client: reqwest::Client,
    mirrors: Vec<ClusterMirrorRecord>,
    unavailable_capabilities: fn(&str) -> ClusterCapabilities,
) -> Vec<(String, ClusterMirrorHealth)> {
    join_all(mirrors.into_iter().map(|mirror| {
        let client = client.clone();
        async move {
            let root_url: String = mirror.url.clone();
            let ping_url: String = format!("{}/ping", mirror.url.trim_end_matches('/'));
            let openapi_url: String = format!("{}/openapi.yaml", mirror.url.trim_end_matches('/'));

            let ping_started: Instant = Instant::now();
            let ping_response: Result<reqwest::Response, reqwest::Error> = client
                .get(&ping_url)
                .timeout(Duration::from_secs(3))
                .send()
                .await;
            let latency_ms = ping_response
                .as_ref()
                .ok()
                .map(|_| ping_started.elapsed().as_secs_f64() * 1000.0);

            let root_response = client
                .get(&root_url)
                .timeout(Duration::from_secs(4))
                .send()
                .await;
            let root_json: Option<Value> = match root_response {
                Ok(response) => response.json::<Value>().await.ok(),
                Err(_) => None,
            };

            let openapi_started: Instant = Instant::now();
            let download_response: Result<reqwest::Response, reqwest::Error> = client
                .get(&openapi_url)
                .timeout(Duration::from_secs(5))
                .send()
                .await;
            let download_bytes_per_sec: Option<f64> = match download_response {
                Ok(response) => match response.bytes().await {
                    Ok(bytes) => {
                        let elapsed = openapi_started.elapsed().as_secs_f64().max(0.001);
                        Some(bytes.len() as f64 / elapsed)
                    }
                    Err(_) => None,
                },
                Err(_) => None,
            };

            let openapi_ok: bool = download_bytes_per_sec.is_some();
            let status: &str = if root_json.is_some() && latency_ms.is_some() {
                "online"
            } else {
                "offline"
            };

            let cargo_toml_version: Option<String> = root_json
                .as_ref()
                .and_then(|json| {
                    json.get("cargo_toml_version")
                        .or_else(|| json.get("version"))
                })
                .and_then(Value::as_str)
                .map(str::to_string);
            let message: Option<String> = root_json
                .as_ref()
                .and_then(|json| json.get("message"))
                .and_then(Value::as_str)
                .map(str::to_string);

            let mirror_capabilities = build_mirror_capabilities(
                root_json.as_ref(),
                openapi_ok,
                status,
                unavailable_capabilities,
            );

            (
                mirror.url,
                ClusterMirrorHealth {
                    url: root_url,
                    status: status.to_string(),
                    latency_ms,
                    download_bytes_per_sec,
                    cargo_toml_version,
                    message,
                    capabilities: mirror_capabilities,
                },
            )
        }
    }))
    .await
}

/// Builds mirror capability payload from mirror root metadata and probe outcomes.
fn build_mirror_capabilities(
    root_json: Option<&Value>,
    openapi_ok: bool,
    status: &str,
    unavailable_capabilities: fn(&str) -> ClusterCapabilities,
) -> ClusterCapabilities {
    root_json
        .and_then(|json| json.get("capabilities"))
        .and_then(|value| serde_json::from_value::<ClusterCapabilities>(value.clone()).ok())
        .map(|caps| ClusterCapabilities {
            openapi_download: openapi_download_status(openapi_ok),
            management_api: route_advertisement_status(
                root_json,
                "/management/capabilities",
                "GET",
            ),
            gateway_fetch: route_advertisement_status(root_json, "/gateway/fetch", "POST"),
            ..caps
        })
        .unwrap_or_else(|| {
            let reason = if status == "offline" {
                "mirror offline"
            } else {
                "capabilities not reported by mirror"
            };
            let mut caps = unavailable_capabilities(reason);
            caps.openapi_download = openapi_download_status(openapi_ok);
            caps.management_api =
                route_advertisement_status(root_json, "/management/capabilities", "GET");
            caps.gateway_fetch = route_advertisement_status(root_json, "/gateway/fetch", "POST");
            caps
        })
}

/// Builds `openapi_download` capability status from probe outcome.
fn openapi_download_status(openapi_ok: bool) -> ClusterCapabilityStatus {
    ClusterCapabilityStatus {
        available: openapi_ok,
        detail: if openapi_ok {
            "Downloaded /openapi.yaml successfully".to_string()
        } else {
            "Failed to download /openapi.yaml".to_string()
        },
    }
}

/// Builds route capability status from route-advertisement metadata.
fn route_advertisement_status(
    root_json: Option<&Value>,
    path: &str,
    method: &str,
) -> ClusterCapabilityStatus {
    let availability = root_json
        .and_then(|json| route_supports(json, path, method))
        .unwrap_or(false);

    let detail = root_json
        .and_then(|json| route_supports(json, path, method))
        .map(|ok| {
            if ok {
                format!("Route {path} ({method}) is advertised")
            } else {
                format!("Route {path} ({method}) not advertised")
            }
        })
        .unwrap_or_else(|| "Mirror did not include routes metadata".to_string());

    ClusterCapabilityStatus {
        available: availability,
        detail,
    }
}

/// Returns whether a mirror root payload advertises a route+method pair.
fn route_supports(root_json: &Value, path: &str, method: &str) -> Option<bool> {
    let routes: &Vec<Value> = root_json.get("routes")?.as_array()?;
    let method_upper: String = method.to_uppercase();
    let found: bool = routes.iter().any(|route| {
        let route_path: Option<&str> = route.get("path").and_then(Value::as_str);
        let methods: Option<&Vec<Value>> = route.get("methods").and_then(Value::as_array);
        route_path == Some(path)
            && methods
                .map(|items| {
                    items.iter().any(|m| {
                        m.as_str()
                            .map(|s| s.eq_ignore_ascii_case(&method_upper))
                            .unwrap_or(false)
                    })
                })
                .unwrap_or(false)
    });
    Some(found)
}