iridium-db 0.4.0

A high-performance vector-graph hybrid storage and indexing engine
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Mutex, OnceLock};

use crate::core::topology::CoreId;

use super::{
    CoreMetrics, CoreMetricsAccumulator, CoreMetricsAggregate, HealthReport, HealthStatus, Metrics,
};

static QUERY_TOTAL: AtomicU64 = AtomicU64::new(0);
static QUERY_ERRORS: AtomicU64 = AtomicU64::new(0);
static QUERY_LATENCY_TOTAL_MICROS: AtomicU64 = AtomicU64::new(0);
static INGEST_ACCEPTED: AtomicU64 = AtomicU64::new(0);
static INGEST_REJECTED: AtomicU64 = AtomicU64::new(0);
const LATENCY_SAMPLE_WINDOW: usize = 4096;

fn core_metrics_cell() -> &'static Mutex<HashMap<CoreId, CoreMetricsAccumulator>> {
    static CELL: OnceLock<Mutex<HashMap<CoreId, CoreMetricsAccumulator>>> = OnceLock::new();
    CELL.get_or_init(|| Mutex::new(HashMap::new()))
}

fn latency_samples() -> &'static Mutex<VecDeque<u64>> {
    static SAMPLES: OnceLock<Mutex<VecDeque<u64>>> = OnceLock::new();
    SAMPLES.get_or_init(|| Mutex::new(VecDeque::with_capacity(LATENCY_SAMPLE_WINDOW)))
}

pub fn record_query_success(latency_micros: u128) {
    QUERY_TOTAL.fetch_add(1, Ordering::Relaxed);
    let latency = latency_micros.min(u64::MAX as u128) as u64;
    QUERY_LATENCY_TOTAL_MICROS.fetch_add(latency, Ordering::Relaxed);
    if let Ok(mut samples) = latency_samples().lock() {
        if samples.len() >= LATENCY_SAMPLE_WINDOW {
            samples.pop_front();
        }
        samples.push_back(latency);
    }
}

pub fn record_query_success_for_core(core_id: CoreId, latency_micros: u128) {
    let latency = latency_micros.min(u64::MAX as u128) as u64;
    if let Ok(mut map) = core_metrics_cell().lock() {
        let entry = map.entry(core_id).or_default();
        entry.query_total = entry.query_total.saturating_add(1);
        entry.query_latency_total_micros = entry.query_latency_total_micros.saturating_add(latency);
    }
}

pub fn record_query_error() {
    QUERY_TOTAL.fetch_add(1, Ordering::Relaxed);
    QUERY_ERRORS.fetch_add(1, Ordering::Relaxed);
}

pub fn record_query_error_for_core(core_id: CoreId) {
    if let Ok(mut map) = core_metrics_cell().lock() {
        let entry = map.entry(core_id).or_default();
        entry.query_total = entry.query_total.saturating_add(1);
        entry.query_errors = entry.query_errors.saturating_add(1);
    }
}

pub fn record_ingest(accepted: u64, rejected: u64) {
    INGEST_ACCEPTED.fetch_add(accepted, Ordering::Relaxed);
    INGEST_REJECTED.fetch_add(rejected, Ordering::Relaxed);
}

pub fn record_ingest_for_core(core_id: CoreId, accepted: u64, rejected: u64) {
    if let Ok(mut map) = core_metrics_cell().lock() {
        let entry = map.entry(core_id).or_default();
        entry.ingest_accepted = entry.ingest_accepted.saturating_add(accepted);
        entry.ingest_rejected = entry.ingest_rejected.saturating_add(rejected);
    }
}

pub fn metrics_snapshot() -> Metrics {
    let query_total = QUERY_TOTAL.load(Ordering::Relaxed);
    let query_errors = QUERY_ERRORS.load(Ordering::Relaxed);
    let query_latency_total = QUERY_LATENCY_TOTAL_MICROS.load(Ordering::Relaxed);
    let query_avg_latency_micros = if query_total > 0 {
        query_latency_total / query_total
    } else {
        0
    };

    let (query_p95_latency_micros, query_p99_latency_micros) = latency_percentiles();

    Metrics {
        query_total,
        query_errors,
        query_avg_latency_micros,
        query_p95_latency_micros,
        query_p99_latency_micros,
        ingest_accepted: INGEST_ACCEPTED.load(Ordering::Relaxed),
        ingest_rejected: INGEST_REJECTED.load(Ordering::Relaxed),
    }
}

pub fn health() -> HealthReport {
    let metrics = metrics_snapshot();
    let error_rate = if metrics.query_total > 0 {
        metrics.query_errors as f64 / metrics.query_total as f64
    } else {
        0.0
    };

    let (status, reason) = if error_rate > 0.2 {
        (
            HealthStatus::Unhealthy,
            "query error rate above 20%".to_string(),
        )
    } else if error_rate > 0.05 {
        (
            HealthStatus::Degraded,
            "query error rate above 5%".to_string(),
        )
    } else {
        (HealthStatus::Healthy, "within error budget".to_string())
    };

    HealthReport {
        status,
        reason,
        error_rate,
    }
}

pub fn metrics_prometheus_text() -> String {
    let m = metrics_snapshot();
    let mut out = String::new();
    out.push_str("# TYPE iridium_query_total counter\n");
    out.push_str(&format!("iridium_query_total {}\n", m.query_total));
    out.push_str("# TYPE iridium_query_errors counter\n");
    out.push_str(&format!("iridium_query_errors {}\n", m.query_errors));
    out.push_str("# TYPE iridium_query_avg_latency_micros gauge\n");
    out.push_str(&format!(
        "iridium_query_avg_latency_micros {}\n",
        m.query_avg_latency_micros
    ));
    out.push_str("# TYPE iridium_query_p95_latency_micros gauge\n");
    out.push_str(&format!(
        "iridium_query_p95_latency_micros {}\n",
        m.query_p95_latency_micros
    ));
    out.push_str("# TYPE iridium_query_p99_latency_micros gauge\n");
    out.push_str(&format!(
        "iridium_query_p99_latency_micros {}\n",
        m.query_p99_latency_micros
    ));
    out.push_str("# TYPE iridium_ingest_accepted counter\n");
    out.push_str(&format!("iridium_ingest_accepted {}\n", m.ingest_accepted));
    out.push_str("# TYPE iridium_ingest_rejected counter\n");
    out.push_str(&format!("iridium_ingest_rejected {}\n", m.ingest_rejected));
    out
}

pub fn reset_metrics() {
    QUERY_TOTAL.store(0, Ordering::Relaxed);
    QUERY_ERRORS.store(0, Ordering::Relaxed);
    QUERY_LATENCY_TOTAL_MICROS.store(0, Ordering::Relaxed);
    INGEST_ACCEPTED.store(0, Ordering::Relaxed);
    INGEST_REJECTED.store(0, Ordering::Relaxed);
    if let Ok(mut samples) = latency_samples().lock() {
        samples.clear();
    }
    if let Ok(mut map) = core_metrics_cell().lock() {
        map.clear();
    }
}

pub fn core_metrics_snapshot() -> Vec<CoreMetrics> {
    let Ok(map) = core_metrics_cell().lock() else {
        return Vec::new();
    };
    let mut out = map
        .iter()
        .map(|(core_id, acc)| CoreMetrics {
            core_id: *core_id,
            query_total: acc.query_total,
            query_errors: acc.query_errors,
            query_avg_latency_micros: if acc.query_total > 0 {
                acc.query_latency_total_micros / acc.query_total
            } else {
                0
            },
            ingest_accepted: acc.ingest_accepted,
            ingest_rejected: acc.ingest_rejected,
        })
        .collect::<Vec<CoreMetrics>>();
    out.sort_by_key(|m| m.core_id);
    out
}

pub fn core_metrics_aggregate_snapshot() -> CoreMetricsAggregate {
    let snapshot = core_metrics_snapshot();
    let mut out = CoreMetricsAggregate::default();
    for metrics in snapshot {
        out.query_total = out.query_total.saturating_add(metrics.query_total);
        out.query_errors = out.query_errors.saturating_add(metrics.query_errors);
        out.ingest_accepted = out.ingest_accepted.saturating_add(metrics.ingest_accepted);
        out.ingest_rejected = out.ingest_rejected.saturating_add(metrics.ingest_rejected);
    }
    let Ok(map) = core_metrics_cell().lock() else {
        return out;
    };
    let latency_total: u64 = map
        .values()
        .map(|acc| acc.query_latency_total_micros)
        .fold(0_u64, |acc, value| acc.saturating_add(value));
    if out.query_total > 0 {
        out.query_avg_latency_micros = latency_total / out.query_total;
    }
    out
}

fn latency_percentiles() -> (u64, u64) {
    let Ok(samples) = latency_samples().lock() else {
        return (0, 0);
    };
    if samples.is_empty() {
        return (0, 0);
    }
    let mut sorted = samples.iter().copied().collect::<Vec<u64>>();
    sorted.sort_unstable();
    let p95 = percentile_from_sorted(&sorted, 0.95);
    let p99 = percentile_from_sorted(&sorted, 0.99);
    (p95, p99)
}

fn percentile_from_sorted(sorted: &[u64], percentile: f64) -> u64 {
    if sorted.is_empty() {
        return 0;
    }
    let rank = ((sorted.len() - 1) as f64 * percentile).ceil() as usize;
    sorted[rank.min(sorted.len() - 1)]
}