use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use crate::config::atomic;
use crate::config::json::Value as JsonValue;
pub struct MetricsAggregator {
output_dir: PathBuf,
window_minutes: u64,
stop_flag: Arc<AtomicBool>,
}
impl MetricsAggregator {
pub fn new(output_dir: PathBuf, window_minutes: u64) -> Self {
Self {
output_dir,
window_minutes,
stop_flag: Arc::new(AtomicBool::new(false)),
}
}
pub fn start(self: Arc<Self>) -> thread::JoinHandle<()> {
thread::spawn(move || {
let interval_seconds = self.window_minutes * 60;
loop {
if self.stop_flag.load(Ordering::SeqCst) {
break;
}
thread::sleep(Duration::from_secs(interval_seconds));
if self.stop_flag.load(Ordering::SeqCst) {
break;
}
if let Err(e) = self.aggregate_window() {
eprintln!("[aggregator] aggregate error={}", e);
}
}
if let Err(e) = self.aggregate_window() {
eprintln!("[aggregator] final aggregate error={}", e);
}
})
}
pub fn request_stop(&self) {
self.stop_flag.store(true, Ordering::SeqCst);
}
pub fn shutdown_and_join(self: Arc<Self>, handle: std::thread::JoinHandle<()>) {
self.request_stop();
match handle.join() {
Ok(_) => {}
Err(e) => eprintln!("[aggregator] thread join failed: {:?}", e),
}
}
fn aggregate_window(&self) -> std::io::Result<()> {
let events_path = self.output_dir.join("events.jsonl");
if !events_path.exists() {
return Ok(());
}
let file = File::open(&events_path)?;
let reader = BufReader::new(file);
let mut engine_stats: HashMap<String, EngineStats> = HashMap::new();
for line in reader.lines() {
let line = line?;
if let Ok(event) = line.parse::<JsonValue>() {
if let Some(engine_id) = get_str(&event, "engine_id") {
self.process_event(&event, engine_id, &mut engine_stats);
}
}
}
if !engine_stats.is_empty() {
self.write_aggregated_stats(&engine_stats)?;
}
Ok(())
}
fn process_event(
&self,
event: &JsonValue,
engine_id: &str,
stats: &mut HashMap<String, EngineStats>,
) {
let entry = stats.entry(engine_id.to_string()).or_default();
if let Some(latency) = get_f64_nested(event, &["metrics", "latency_ms"]) {
entry.latencies.push(latency);
}
if let Some(success) = get_bool_nested(event, &["metrics", "success"]) {
entry.total_requests += 1;
if success {
entry.successful_requests += 1;
}
}
if let Some(quality) = get_f64(event, "quality_score") {
entry.quality_scores.push(quality);
}
}
fn write_aggregated_stats(&self, stats: &HashMap<String, EngineStats>) -> std::io::Result<()> {
let output_path = self.output_dir.join("aggregated_stats.jsonl");
let timestamp = atomic::current_time_rfc3339();
let mut out_bytes = Vec::new();
for (engine_id, engine_stats) in stats {
let mut latencies = engine_stats.latencies.clone();
latencies.sort_by(|a, b| a.partial_cmp(b).unwrap());
let mut metrics = std::collections::BTreeMap::new();
metrics.insert(
"total_requests".to_string(),
JsonValue::Number(engine_stats.total_requests as f64),
);
metrics.insert(
"successful_requests".to_string(),
JsonValue::Number(engine_stats.successful_requests as f64),
);
let success_rate = if engine_stats.total_requests > 0 {
engine_stats.successful_requests as f64 / engine_stats.total_requests as f64
} else {
0.0
};
metrics.insert("success_rate".to_string(), JsonValue::Number(success_rate));
metrics.insert(
"latency_p50".to_string(),
percentile(&latencies, 50)
.map(JsonValue::Number)
.unwrap_or(JsonValue::Null),
);
metrics.insert(
"latency_p95".to_string(),
percentile(&latencies, 95)
.map(JsonValue::Number)
.unwrap_or(JsonValue::Null),
);
metrics.insert(
"latency_p99".to_string(),
percentile(&latencies, 99)
.map(JsonValue::Number)
.unwrap_or(JsonValue::Null),
);
metrics.insert(
"latency_min".to_string(),
latencies
.first()
.copied()
.map(JsonValue::Number)
.unwrap_or(JsonValue::Null),
);
metrics.insert(
"latency_max".to_string(),
latencies
.last()
.copied()
.map(JsonValue::Number)
.unwrap_or(JsonValue::Null),
);
let avg_quality = if !engine_stats.quality_scores.is_empty() {
engine_stats.quality_scores.iter().sum::<f64>()
/ engine_stats.quality_scores.len() as f64
} else {
0.0
};
metrics.insert(
"avg_quality_score".to_string(),
JsonValue::Number(avg_quality),
);
let mut out = std::collections::BTreeMap::new();
out.insert(
"timestamp".to_string(),
JsonValue::String(timestamp.clone()),
);
out.insert(
"engine_id".to_string(),
JsonValue::String(engine_id.clone()),
);
out.insert(
"window_minutes".to_string(),
JsonValue::Number(self.window_minutes as f64),
);
out.insert("metrics".to_string(), JsonValue::Object(metrics));
let aggregated = JsonValue::Object(out);
let s = format!("{}", aggregated);
out_bytes.extend_from_slice(s.as_bytes());
out_bytes.push(b'\n');
}
crate::config::atomic::atomic_append(&output_path, &out_bytes)?;
crate::metrics::inc_writes_flushed();
eprintln!(
"[aggregator] wrote aggregated entries={} path={}",
stats.len(),
output_path.display()
);
Ok(())
}
}
#[derive(Default)]
struct EngineStats {
latencies: Vec<f64>,
quality_scores: Vec<f64>,
total_requests: u64,
successful_requests: u64,
}
fn percentile(sorted: &[f64], p: u8) -> Option<f64> {
if sorted.is_empty() {
return None;
}
let index = (p as f64 / 100.0 * (sorted.len() - 1) as f64).round() as usize;
Some(sorted[index])
}
fn get_str<'a>(v: &'a JsonValue, key: &str) -> Option<&'a str> {
if let JsonValue::Object(m) = v {
m.get(key).and_then(|x| {
if let JsonValue::String(s) = x {
Some(s.as_str())
} else {
None
}
})
} else {
None
}
}
fn get_f64(v: &JsonValue, key: &str) -> Option<f64> {
if let JsonValue::Object(m) = v {
m.get(key).and_then(|x| {
if let JsonValue::Number(n) = x {
Some(*n)
} else {
None
}
})
} else {
None
}
}
fn get_f64_nested(v: &JsonValue, path: &[&str]) -> Option<f64> {
let mut cur = v;
for p in path.iter().take(path.len() - 1) {
if let JsonValue::Object(m) = cur {
cur = m.get(*p)?;
} else {
return None;
}
}
get_f64(cur, path[path.len() - 1])
}
fn get_bool_nested(v: &JsonValue, path: &[&str]) -> Option<bool> {
let mut cur = v;
for p in path.iter().take(path.len() - 1) {
if let JsonValue::Object(m) = cur {
cur = m.get(*p)?;
} else {
return None;
}
}
if let JsonValue::Object(m) = cur {
m.get(path[path.len() - 1]).and_then(|x| {
if let JsonValue::Bool(b) = x {
Some(*b)
} else {
None
}
})
} else {
None
}
}