use std::collections::HashMap;
use std::fs::File;
use std::io::BufRead;
use std::io::BufReader;
use std::path::PathBuf;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
use std::time::Duration;
pub struct MetricsAggregator {
output_dir: PathBuf,
window_minutes: u64,
worker_handle: Arc<Mutex<Option<std::thread::JoinHandle<()>>>>,
shutdown_flag: Arc<AtomicBool>,
}
impl MetricsAggregator {
pub fn new(output_dir: PathBuf, window_minutes: u64) -> Self {
Self {
output_dir,
window_minutes,
worker_handle: Arc::new(Mutex::new(None)),
shutdown_flag: Arc::new(AtomicBool::new(false)),
}
}
pub fn start(self: Arc<Self>) {
let interval_seconds = self.window_minutes * 60;
let shutdown = Arc::clone(&self.shutdown_flag);
let self_for_thread = Arc::clone(&self);
let handle = crate::io::spawn_blocking(move || loop {
if shutdown.load(Ordering::SeqCst) {
break;
}
std::thread::sleep(Duration::from_secs(interval_seconds));
if shutdown.load(Ordering::SeqCst) {
break;
}
if let Err(e) = self_for_thread.aggregate_window() {
eprintln!("[aggregator] Error: {}", e);
}
});
let mut wh = self.worker_handle.lock().unwrap();
*wh = Some(handle);
}
pub fn stop(&self) {
self.shutdown_flag.store(true, Ordering::SeqCst);
if let Err(e) = self.aggregate_window() {
eprintln!("{{\"component\":\"aggregator\",\"op\":\"final_aggregate\",\"error\":\"{}\",\"trace_id\":null}}", e);
}
if let Some(h) = self.worker_handle.lock().unwrap().take() {
if let Err(e) = h.join() {
let any = &*e;
if let Some(s) = any.downcast_ref::<&str>() {
eprintln!(
"{{\"component\":\"aggregator\",\"error\":\"{}\",\"trace_id\":null}}",
s
);
} else if let Some(s) = any.downcast_ref::<String>() {
eprintln!(
"{{\"component\":\"aggregator\",\"error\":\"{}\",\"trace_id\":null}}",
s
);
} else {
eprintln!("{{\"component\":\"aggregator\",\"error\":\"thread panicked\",\"trace_id\":null}}");
}
}
}
}
fn aggregate_window(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let events_path = self.output_dir.join("events.jsonl");
if !events_path.exists() {
return Ok(());
}
let file = File::open(&events_path)?;
let reader = BufReader::new(file);
let mut engine_stats: HashMap<String, EngineStats> = HashMap::new();
for line in reader.lines() {
let line = line?;
if let Some(event) = parse_event_line(&line) {
if event.engine_id.is_empty() {
continue;
}
let entry = engine_stats.entry(event.engine_id).or_default();
if let Some(lat) = event.latency {
entry.latencies.push(lat);
}
if let Some(q) = event.quality {
entry.quality_scores.push(q);
}
if let Some(s) = event.success {
entry.total_requests += 1;
if s {
entry.successful_requests += 1;
}
}
}
}
if !engine_stats.is_empty() {
self.write_aggregated_stats(&engine_stats)?;
}
Ok(())
}
fn write_aggregated_stats(
&self,
engine_stats: &HashMap<String, EngineStats>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let output_path = self.output_dir.join("aggregated_stats.jsonl");
let ts = chrono_fallback();
let mut engines_json: Vec<String> = Vec::new();
for (engine_id, stats) in engine_stats.iter() {
let mut lat_min = None;
let mut lat_max = None;
let mut lat_avg = None;
let mut p50 = None;
let mut p95 = None;
let mut p99 = None;
if !stats.latencies.is_empty() {
let mut sorted = stats.latencies.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
lat_min = sorted.first().copied();
lat_max = sorted.last().copied();
lat_avg = Some(sorted.iter().sum::<f64>() / sorted.len() as f64);
p50 = percentile(&sorted, 50);
p95 = percentile(&sorted, 95);
p99 = percentile(&sorted, 99);
}
let quality_avg = if !stats.quality_scores.is_empty() {
Some(stats.quality_scores.iter().sum::<f64>() / stats.quality_scores.len() as f64)
} else {
None
};
let success_rate = if stats.total_requests > 0 {
stats.successful_requests as f64 / stats.total_requests as f64
} else {
0.0
};
let engine_json = format!(
"{{\"engine_id\":\"{}\",\"requests\":{},\"success_rate\":{},\"latency\":{{\"min\":{},\"max\":{},\"avg\":{},\"p50\":{},\"p95\":{},\"p99\":{}}},\"quality\":{{\"avg\":{}}}}}",
engine_id,
stats.total_requests,
success_rate,
option_to_number(lat_min),
option_to_number(lat_max),
option_to_number(lat_avg),
option_to_number(p50),
option_to_number(p95),
option_to_number(p99),
option_to_number(quality_avg),
);
engines_json.push(engine_json);
}
let full = format!(
"{{\"timestamp\":\"{}\",\"engines\":[{}]}}\n",
ts,
engines_json.join(",")
);
crate::config::atomic_append(&output_path, full.as_bytes())?;
Ok(())
}
}
impl Drop for MetricsAggregator {
fn drop(&mut self) {
self.shutdown_flag.store(true, Ordering::SeqCst);
if let Some(h) = self.worker_handle.lock().unwrap().take() {
if let Err(e) = h.join() {
let any = &*e;
if let Some(s) = any.downcast_ref::<&str>() {
eprintln!("[aggregator] worker thread panicked: {}", s);
} else if let Some(s) = any.downcast_ref::<String>() {
eprintln!("[aggregator] worker thread panicked: {}", s);
} else {
eprintln!("[aggregator] worker thread panicked with unknown payload");
}
}
}
}
}
#[derive(Default)]
struct EngineStats {
latencies: Vec<f64>,
quality_scores: Vec<f64>,
total_requests: u64,
successful_requests: u64,
}
fn percentile(sorted: &[f64], p: u8) -> Option<f64> {
if sorted.is_empty() {
return None;
}
let idx = ((p as f64 / 100.0) * (sorted.len() as f64 - 1.0)).round() as usize;
sorted.get(idx).copied()
}
struct EngineEvent {
engine_id: String,
latency: Option<f64>,
success: Option<bool>,
quality: Option<f64>,
}
fn parse_event_line(s: &str) -> Option<EngineEvent> {
let engine_id = extract_string_field(s, "engine_id").unwrap_or_default();
let latency =
extract_number_field(s, "latency_ms").or_else(|| extract_number_field(s, "latency"));
let success = extract_bool_field(s, "success");
let quality = extract_number_field(s, "quality_score");
Some(EngineEvent {
engine_id,
latency,
success,
quality,
})
}
fn extract_string_field(s: &str, key: &str) -> Option<String> {
if let Some(pos) = s.find(&format!("\"{}\"", key)) {
if let Some(colon) = s[pos..].find(':') {
let tail = &s[pos + colon + 1..];
if let Some(qpos) = tail.find('"') {
let rest = &tail[qpos + 1..];
if let Some(endq) = rest.find('"') {
return Some(rest[..endq].to_string());
}
}
}
}
None
}
fn extract_number_field(s: &str, key: &str) -> Option<f64> {
if let Some(pos) = s.find(&format!("\"{}\"", key)) {
if let Some(colon) = s[pos..].find(':') {
let tail = &s[pos + colon + 1..];
let mut num = String::new();
for c in tail.chars() {
if c.is_ascii_digit() || c == '.' || c == '-' {
num.push(c);
} else if !num.is_empty() {
break;
} else {
continue;
}
}
if !num.is_empty() {
if let Ok(v) = num.parse::<f64>() {
return Some(v);
}
}
}
}
None
}
fn extract_bool_field(s: &str, key: &str) -> Option<bool> {
if let Some(pos) = s.find(&format!("\"{}\"", key)) {
if let Some(colon) = s[pos..].find(':') {
let tail = &s[pos + colon + 1..];
if tail.trim_start().starts_with("true") {
return Some(true);
}
if tail.trim_start().starts_with("false") {
return Some(false);
}
}
}
None
}
fn option_to_number(v: Option<f64>) -> String {
match v {
Some(x) if x.is_finite() => format!("{}", x),
_ => "null".to_string(),
}
}
fn chrono_fallback() -> String {
match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
Ok(d) => format!("{}", d.as_secs()),
Err(_) => "0".to_string(),
}
}