#![allow(dead_code)]
use prometheus::{
register_counter_vec, register_histogram, CounterVec, Encoder, Histogram, TextEncoder,
};
use std::sync::OnceLock;
pub struct AppMetrics {
pub ingested_observations_total: CounterVec,
pub alerts_total: CounterVec,
pub ingestion_latency_ms: Histogram,
pub detection_latency_ms: Histogram,
pub db_operation_latency_ms: CounterVec,
}
static METRICS: OnceLock<AppMetrics> = OnceLock::new();
impl AppMetrics {
fn new() -> prometheus::Result<Self> {
Ok(Self {
ingested_observations_total: register_counter_vec!(
"ingested_observations_total",
"Total number of aircraft observations ingested",
&["status"]
)?,
alerts_total: register_counter_vec!(
"alerts_total",
"Total number of anomaly alerts generated",
&["anomaly_type"]
)?,
ingestion_latency_ms: register_histogram!(
"ingestion_latency_ms",
"Time taken to process ingestion batches in milliseconds",
vec![1.0, 5.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0]
)?,
detection_latency_ms: register_histogram!(
"detection_latency_ms",
"Time taken to run anomaly detection analysis in milliseconds",
vec![10.0, 50.0, 100.0, 500.0, 1000.0, 5000.0, 10000.0]
)?,
db_operation_latency_ms: register_counter_vec!(
"db_operation_latency_ms_total",
"Total time spent on database operations",
&["operation"]
)?,
})
}
pub fn init() -> prometheus::Result<()> {
let metrics = Self::new()?;
METRICS
.set(metrics)
.map_err(|_| prometheus::Error::Msg("Metrics already initialized".to_string()))?;
Ok(())
}
pub fn global() -> &'static AppMetrics {
METRICS.get().expect("Metrics not initialized")
}
pub fn record_ingestion_success(&self, count: usize) {
self.ingested_observations_total
.with_label_values(&["success"])
.inc_by(count as f64);
}
pub fn record_ingestion_error(&self) {
self.ingested_observations_total
.with_label_values(&["error"])
.inc();
}
pub fn record_alert(&self, anomaly_type: &str) {
self.alerts_total.with_label_values(&[anomaly_type]).inc();
}
pub fn time_ingestion<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
let timer = self.ingestion_latency_ms.start_timer();
let result = f();
timer.observe_duration();
result
}
pub fn time_detection<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
let timer = self.detection_latency_ms.start_timer();
let result = f();
timer.observe_duration();
result
}
pub fn record_db_operation(&self, operation: &str, duration_ms: f64) {
self.db_operation_latency_ms
.with_label_values(&[operation])
.inc_by(duration_ms);
}
pub fn export(&self) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer)?;
Ok(String::from_utf8(buffer)?)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_metrics_initialization() {
let result = AppMetrics::init();
assert!(result.is_ok() || result.is_err());
if result.is_ok() {
let metrics = AppMetrics::global();
metrics.record_ingestion_success(5);
metrics.record_ingestion_error();
metrics.record_alert("temporal");
let _timed_result = metrics.time_ingestion(|| "test".to_string());
let _timed_result2 = metrics.time_detection(|| 42);
metrics.record_db_operation("insert", 125.5);
let export_result = metrics.export();
assert!(export_result.is_ok(), "Metrics export should succeed");
let metrics_text = export_result.unwrap();
assert!(metrics_text.contains("ingested_observations_total"));
assert!(metrics_text.contains("alerts_total"));
}
}
#[test]
fn test_timing_operations() {
let _ = AppMetrics::init();
let metrics = AppMetrics::global();
let result = metrics.time_ingestion(|| {
std::thread::sleep(std::time::Duration::from_millis(1));
"completed"
});
assert_eq!(result, "completed");
let export = metrics.export().expect("Should export");
assert!(export.contains("ingestion_latency_ms"));
}
}