use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Mutex, OnceLock};
use crate::core::topology::CoreId;
use super::{
CoreMetrics, CoreMetricsAccumulator, CoreMetricsAggregate, HealthReport, HealthStatus, Metrics,
};
static QUERY_TOTAL: AtomicU64 = AtomicU64::new(0);
static QUERY_ERRORS: AtomicU64 = AtomicU64::new(0);
static QUERY_LATENCY_TOTAL_MICROS: AtomicU64 = AtomicU64::new(0);
static INGEST_ACCEPTED: AtomicU64 = AtomicU64::new(0);
static INGEST_REJECTED: AtomicU64 = AtomicU64::new(0);
const LATENCY_SAMPLE_WINDOW: usize = 4096;
fn core_metrics_cell() -> &'static Mutex<HashMap<CoreId, CoreMetricsAccumulator>> {
static CELL: OnceLock<Mutex<HashMap<CoreId, CoreMetricsAccumulator>>> = OnceLock::new();
CELL.get_or_init(|| Mutex::new(HashMap::new()))
}
fn latency_samples() -> &'static Mutex<VecDeque<u64>> {
static SAMPLES: OnceLock<Mutex<VecDeque<u64>>> = OnceLock::new();
SAMPLES.get_or_init(|| Mutex::new(VecDeque::with_capacity(LATENCY_SAMPLE_WINDOW)))
}
pub fn record_query_success(latency_micros: u128) {
QUERY_TOTAL.fetch_add(1, Ordering::Relaxed);
let latency = latency_micros.min(u64::MAX as u128) as u64;
QUERY_LATENCY_TOTAL_MICROS.fetch_add(latency, Ordering::Relaxed);
if let Ok(mut samples) = latency_samples().lock() {
if samples.len() >= LATENCY_SAMPLE_WINDOW {
samples.pop_front();
}
samples.push_back(latency);
}
}
pub fn record_query_success_for_core(core_id: CoreId, latency_micros: u128) {
let latency = latency_micros.min(u64::MAX as u128) as u64;
if let Ok(mut map) = core_metrics_cell().lock() {
let entry = map.entry(core_id).or_default();
entry.query_total = entry.query_total.saturating_add(1);
entry.query_latency_total_micros = entry.query_latency_total_micros.saturating_add(latency);
}
}
pub fn record_query_error() {
QUERY_TOTAL.fetch_add(1, Ordering::Relaxed);
QUERY_ERRORS.fetch_add(1, Ordering::Relaxed);
}
pub fn record_query_error_for_core(core_id: CoreId) {
if let Ok(mut map) = core_metrics_cell().lock() {
let entry = map.entry(core_id).or_default();
entry.query_total = entry.query_total.saturating_add(1);
entry.query_errors = entry.query_errors.saturating_add(1);
}
}
pub fn record_ingest(accepted: u64, rejected: u64) {
INGEST_ACCEPTED.fetch_add(accepted, Ordering::Relaxed);
INGEST_REJECTED.fetch_add(rejected, Ordering::Relaxed);
}
pub fn record_ingest_for_core(core_id: CoreId, accepted: u64, rejected: u64) {
if let Ok(mut map) = core_metrics_cell().lock() {
let entry = map.entry(core_id).or_default();
entry.ingest_accepted = entry.ingest_accepted.saturating_add(accepted);
entry.ingest_rejected = entry.ingest_rejected.saturating_add(rejected);
}
}
pub fn metrics_snapshot() -> Metrics {
let query_total = QUERY_TOTAL.load(Ordering::Relaxed);
let query_errors = QUERY_ERRORS.load(Ordering::Relaxed);
let query_latency_total = QUERY_LATENCY_TOTAL_MICROS.load(Ordering::Relaxed);
let query_avg_latency_micros = if query_total > 0 {
query_latency_total / query_total
} else {
0
};
let (query_p95_latency_micros, query_p99_latency_micros) = latency_percentiles();
Metrics {
query_total,
query_errors,
query_avg_latency_micros,
query_p95_latency_micros,
query_p99_latency_micros,
ingest_accepted: INGEST_ACCEPTED.load(Ordering::Relaxed),
ingest_rejected: INGEST_REJECTED.load(Ordering::Relaxed),
}
}
pub fn health() -> HealthReport {
let metrics = metrics_snapshot();
let error_rate = if metrics.query_total > 0 {
metrics.query_errors as f64 / metrics.query_total as f64
} else {
0.0
};
let (status, reason) = if error_rate > 0.2 {
(
HealthStatus::Unhealthy,
"query error rate above 20%".to_string(),
)
} else if error_rate > 0.05 {
(
HealthStatus::Degraded,
"query error rate above 5%".to_string(),
)
} else {
(HealthStatus::Healthy, "within error budget".to_string())
};
HealthReport {
status,
reason,
error_rate,
}
}
pub fn metrics_prometheus_text() -> String {
let m = metrics_snapshot();
let mut out = String::new();
out.push_str("# TYPE iridium_query_total counter\n");
out.push_str(&format!("iridium_query_total {}\n", m.query_total));
out.push_str("# TYPE iridium_query_errors counter\n");
out.push_str(&format!("iridium_query_errors {}\n", m.query_errors));
out.push_str("# TYPE iridium_query_avg_latency_micros gauge\n");
out.push_str(&format!(
"iridium_query_avg_latency_micros {}\n",
m.query_avg_latency_micros
));
out.push_str("# TYPE iridium_query_p95_latency_micros gauge\n");
out.push_str(&format!(
"iridium_query_p95_latency_micros {}\n",
m.query_p95_latency_micros
));
out.push_str("# TYPE iridium_query_p99_latency_micros gauge\n");
out.push_str(&format!(
"iridium_query_p99_latency_micros {}\n",
m.query_p99_latency_micros
));
out.push_str("# TYPE iridium_ingest_accepted counter\n");
out.push_str(&format!("iridium_ingest_accepted {}\n", m.ingest_accepted));
out.push_str("# TYPE iridium_ingest_rejected counter\n");
out.push_str(&format!("iridium_ingest_rejected {}\n", m.ingest_rejected));
out
}
pub fn reset_metrics() {
QUERY_TOTAL.store(0, Ordering::Relaxed);
QUERY_ERRORS.store(0, Ordering::Relaxed);
QUERY_LATENCY_TOTAL_MICROS.store(0, Ordering::Relaxed);
INGEST_ACCEPTED.store(0, Ordering::Relaxed);
INGEST_REJECTED.store(0, Ordering::Relaxed);
if let Ok(mut samples) = latency_samples().lock() {
samples.clear();
}
if let Ok(mut map) = core_metrics_cell().lock() {
map.clear();
}
}
pub fn core_metrics_snapshot() -> Vec<CoreMetrics> {
let Ok(map) = core_metrics_cell().lock() else {
return Vec::new();
};
let mut out = map
.iter()
.map(|(core_id, acc)| CoreMetrics {
core_id: *core_id,
query_total: acc.query_total,
query_errors: acc.query_errors,
query_avg_latency_micros: if acc.query_total > 0 {
acc.query_latency_total_micros / acc.query_total
} else {
0
},
ingest_accepted: acc.ingest_accepted,
ingest_rejected: acc.ingest_rejected,
})
.collect::<Vec<CoreMetrics>>();
out.sort_by_key(|m| m.core_id);
out
}
pub fn core_metrics_aggregate_snapshot() -> CoreMetricsAggregate {
let snapshot = core_metrics_snapshot();
let mut out = CoreMetricsAggregate::default();
for metrics in snapshot {
out.query_total = out.query_total.saturating_add(metrics.query_total);
out.query_errors = out.query_errors.saturating_add(metrics.query_errors);
out.ingest_accepted = out.ingest_accepted.saturating_add(metrics.ingest_accepted);
out.ingest_rejected = out.ingest_rejected.saturating_add(metrics.ingest_rejected);
}
let Ok(map) = core_metrics_cell().lock() else {
return out;
};
let latency_total: u64 = map
.values()
.map(|acc| acc.query_latency_total_micros)
.fold(0_u64, |acc, value| acc.saturating_add(value));
if out.query_total > 0 {
out.query_avg_latency_micros = latency_total / out.query_total;
}
out
}
fn latency_percentiles() -> (u64, u64) {
let Ok(samples) = latency_samples().lock() else {
return (0, 0);
};
if samples.is_empty() {
return (0, 0);
}
let mut sorted = samples.iter().copied().collect::<Vec<u64>>();
sorted.sort_unstable();
let p95 = percentile_from_sorted(&sorted, 0.95);
let p99 = percentile_from_sorted(&sorted, 0.99);
(p95, p99)
}
fn percentile_from_sorted(sorted: &[u64], percentile: f64) -> u64 {
if sorted.is_empty() {
return 0;
}
let rank = ((sorted.len() - 1) as f64 * percentile).ceil() as usize;
sorted[rank.min(sorted.len() - 1)]
}