parent_ai_json_engine 0.0.2

Crate provides a JSON engine for collecting, aggregating, and managing models.
Documentation
use std::collections::HashMap;
use std::fs::File;
use std::io::BufRead;
use std::io::BufReader;
use std::path::PathBuf;
use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc, Mutex,
};
use std::time::Duration;

pub struct MetricsAggregator {
    output_dir: PathBuf,
    window_minutes: u64,
    worker_handle: Arc<Mutex<Option<std::thread::JoinHandle<()>>>>,
    shutdown_flag: Arc<AtomicBool>,
}

impl MetricsAggregator {
    pub fn new(output_dir: PathBuf, window_minutes: u64) -> Self {
        Self {
            output_dir,
            window_minutes,
            worker_handle: Arc::new(Mutex::new(None)),
            shutdown_flag: Arc::new(AtomicBool::new(false)),
        }
    }

    pub fn start(self: Arc<Self>) {
        let interval_seconds = self.window_minutes * 60;
        let shutdown = Arc::clone(&self.shutdown_flag);
        let self_for_thread = Arc::clone(&self);
        let handle = crate::io::spawn_blocking(move || loop {
            if shutdown.load(Ordering::SeqCst) {
                break;
            }
            std::thread::sleep(Duration::from_secs(interval_seconds));
            if shutdown.load(Ordering::SeqCst) {
                break;
            }
            if let Err(e) = self_for_thread.aggregate_window() {
                eprintln!("[aggregator] Error: {}", e);
            }
        });
        let mut wh = self.worker_handle.lock().unwrap();
        *wh = Some(handle);
    }

    pub fn stop(&self) {
        // request shutdown and perform a final aggregation
        self.shutdown_flag.store(true, Ordering::SeqCst);
        if let Err(e) = self.aggregate_window() {
            eprintln!("{{\"component\":\"aggregator\",\"op\":\"final_aggregate\",\"error\":\"{}\",\"trace_id\":null}}", e);
        }
        if let Some(h) = self.worker_handle.lock().unwrap().take() {
            if let Err(e) = h.join() {
                let any = &*e;
                if let Some(s) = any.downcast_ref::<&str>() {
                    eprintln!(
                        "{{\"component\":\"aggregator\",\"error\":\"{}\",\"trace_id\":null}}",
                        s
                    );
                } else if let Some(s) = any.downcast_ref::<String>() {
                    eprintln!(
                        "{{\"component\":\"aggregator\",\"error\":\"{}\",\"trace_id\":null}}",
                        s
                    );
                } else {
                    eprintln!("{{\"component\":\"aggregator\",\"error\":\"thread panicked\",\"trace_id\":null}}");
                }
            }
        }
    }

    fn aggregate_window(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        let events_path = self.output_dir.join("events.jsonl");
        if !events_path.exists() {
            return Ok(());
        }

        let file = File::open(&events_path)?;
        let reader = BufReader::new(file);

        let mut engine_stats: HashMap<String, EngineStats> = HashMap::new();

        for line in reader.lines() {
            let line = line?;
            if let Some(event) = parse_event_line(&line) {
                if event.engine_id.is_empty() {
                    continue;
                }
                let entry = engine_stats.entry(event.engine_id).or_default();
                if let Some(lat) = event.latency {
                    entry.latencies.push(lat);
                }
                if let Some(q) = event.quality {
                    entry.quality_scores.push(q);
                }
                if let Some(s) = event.success {
                    entry.total_requests += 1;
                    if s {
                        entry.successful_requests += 1;
                    }
                }
            }
        }

        if !engine_stats.is_empty() {
            self.write_aggregated_stats(&engine_stats)?;
        }

        Ok(())
    }

    fn write_aggregated_stats(
        &self,
        engine_stats: &HashMap<String, EngineStats>,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        let output_path = self.output_dir.join("aggregated_stats.jsonl");

        let ts = chrono_fallback();

        let mut engines_json: Vec<String> = Vec::new();

        for (engine_id, stats) in engine_stats.iter() {
            let mut lat_min = None;
            let mut lat_max = None;
            let mut lat_avg = None;
            let mut p50 = None;
            let mut p95 = None;
            let mut p99 = None;
            if !stats.latencies.is_empty() {
                let mut sorted = stats.latencies.to_vec();
                sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
                lat_min = sorted.first().copied();
                lat_max = sorted.last().copied();
                lat_avg = Some(sorted.iter().sum::<f64>() / sorted.len() as f64);
                p50 = percentile(&sorted, 50);
                p95 = percentile(&sorted, 95);
                p99 = percentile(&sorted, 99);
            }

            let quality_avg = if !stats.quality_scores.is_empty() {
                Some(stats.quality_scores.iter().sum::<f64>() / stats.quality_scores.len() as f64)
            } else {
                None
            };

            let success_rate = if stats.total_requests > 0 {
                stats.successful_requests as f64 / stats.total_requests as f64
            } else {
                0.0
            };

            let engine_json = format!(
                "{{\"engine_id\":\"{}\",\"requests\":{},\"success_rate\":{},\"latency\":{{\"min\":{},\"max\":{},\"avg\":{},\"p50\":{},\"p95\":{},\"p99\":{}}},\"quality\":{{\"avg\":{}}}}}",
                engine_id,
                stats.total_requests,
                success_rate,
                option_to_number(lat_min),
                option_to_number(lat_max),
                option_to_number(lat_avg),
                option_to_number(p50),
                option_to_number(p95),
                option_to_number(p99),
                option_to_number(quality_avg),
            );
            engines_json.push(engine_json);
        }

        let full = format!(
            "{{\"timestamp\":\"{}\",\"engines\":[{}]}}\n",
            ts,
            engines_json.join(",")
        );
        crate::config::atomic_append(&output_path, full.as_bytes())?;
        Ok(())
    }
}

impl Drop for MetricsAggregator {
    fn drop(&mut self) {
        self.shutdown_flag.store(true, Ordering::SeqCst);
        if let Some(h) = self.worker_handle.lock().unwrap().take() {
            if let Err(e) = h.join() {
                let any = &*e;
                if let Some(s) = any.downcast_ref::<&str>() {
                    eprintln!("[aggregator] worker thread panicked: {}", s);
                } else if let Some(s) = any.downcast_ref::<String>() {
                    eprintln!("[aggregator] worker thread panicked: {}", s);
                } else {
                    eprintln!("[aggregator] worker thread panicked with unknown payload");
                }
            }
        }
    }
}

#[derive(Default)]
struct EngineStats {
    latencies: Vec<f64>,
    quality_scores: Vec<f64>,
    total_requests: u64,
    successful_requests: u64,
}

fn percentile(sorted: &[f64], p: u8) -> Option<f64> {
    if sorted.is_empty() {
        return None;
    }
    let idx = ((p as f64 / 100.0) * (sorted.len() as f64 - 1.0)).round() as usize;
    sorted.get(idx).copied()
}

struct EngineEvent {
    engine_id: String,
    latency: Option<f64>,
    success: Option<bool>,
    quality: Option<f64>,
}

fn parse_event_line(s: &str) -> Option<EngineEvent> {
    let engine_id = extract_string_field(s, "engine_id").unwrap_or_default();
    let latency =
        extract_number_field(s, "latency_ms").or_else(|| extract_number_field(s, "latency"));
    let success = extract_bool_field(s, "success");
    let quality = extract_number_field(s, "quality_score");
    Some(EngineEvent {
        engine_id,
        latency,
        success,
        quality,
    })
}

fn extract_string_field(s: &str, key: &str) -> Option<String> {
    if let Some(pos) = s.find(&format!("\"{}\"", key)) {
        if let Some(colon) = s[pos..].find(':') {
            let tail = &s[pos + colon + 1..];
            if let Some(qpos) = tail.find('"') {
                let rest = &tail[qpos + 1..];
                if let Some(endq) = rest.find('"') {
                    return Some(rest[..endq].to_string());
                }
            }
        }
    }
    None
}

fn extract_number_field(s: &str, key: &str) -> Option<f64> {
    if let Some(pos) = s.find(&format!("\"{}\"", key)) {
        if let Some(colon) = s[pos..].find(':') {
            let tail = &s[pos + colon + 1..];
            let mut num = String::new();
            for c in tail.chars() {
                if c.is_ascii_digit() || c == '.' || c == '-' {
                    num.push(c);
                } else if !num.is_empty() {
                    break;
                } else {
                    continue;
                }
            }
            if !num.is_empty() {
                if let Ok(v) = num.parse::<f64>() {
                    return Some(v);
                }
            }
        }
    }
    None
}

fn extract_bool_field(s: &str, key: &str) -> Option<bool> {
    if let Some(pos) = s.find(&format!("\"{}\"", key)) {
        if let Some(colon) = s[pos..].find(':') {
            let tail = &s[pos + colon + 1..];
            if tail.trim_start().starts_with("true") {
                return Some(true);
            }
            if tail.trim_start().starts_with("false") {
                return Some(false);
            }
        }
    }
    None
}

fn option_to_number(v: Option<f64>) -> String {
    match v {
        Some(x) if x.is_finite() => format!("{}", x),
        _ => "null".to_string(),
    }
}

fn chrono_fallback() -> String {
    match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
        Ok(d) => format!("{}", d.as_secs()),
        Err(_) => "0".to_string(),
    }
}