child_ai_json_engine 0.0.2

Crate provides a JSON engine for collecting, aggregating, and managing models.
Documentation
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

// blocking I/O - call from a dedicated thread or use `spawn_blocking` when running inside an async runtime
use crate::config::atomic;
use crate::config::json::Value as JsonValue;

pub struct MetricsAggregator {
    output_dir: PathBuf,
    window_minutes: u64,
    stop_flag: Arc<AtomicBool>,
}

impl MetricsAggregator {
    pub fn new(output_dir: PathBuf, window_minutes: u64) -> Self {
        Self {
            output_dir,
            window_minutes,
            stop_flag: Arc::new(AtomicBool::new(false)),
        }
    }

    pub fn start(self: Arc<Self>) -> thread::JoinHandle<()> {
        thread::spawn(move || {
            let interval_seconds = self.window_minutes * 60;
            loop {
                if self.stop_flag.load(Ordering::SeqCst) {
                    break;
                }
                thread::sleep(Duration::from_secs(interval_seconds));
                if self.stop_flag.load(Ordering::SeqCst) {
                    break;
                }
                if let Err(e) = self.aggregate_window() {
                    eprintln!("[aggregator] aggregate error={}", e);
                }
            }

            // final aggregation before exit
            if let Err(e) = self.aggregate_window() {
                eprintln!("[aggregator] final aggregate error={}", e);
            }
        })
    }

    pub fn request_stop(&self) {
        self.stop_flag.store(true, Ordering::SeqCst);
    }

    pub fn shutdown_and_join(self: Arc<Self>, handle: std::thread::JoinHandle<()>) {
        self.request_stop();
        match handle.join() {
            Ok(_) => {}
            Err(e) => eprintln!("[aggregator] thread join failed: {:?}", e),
        }
    }

    fn aggregate_window(&self) -> std::io::Result<()> {
        let events_path = self.output_dir.join("events.jsonl");
        if !events_path.exists() {
            return Ok(());
        }

        let file = File::open(&events_path)?;
        let reader = BufReader::new(file);

        let mut engine_stats: HashMap<String, EngineStats> = HashMap::new();

        for line in reader.lines() {
            let line = line?;
            if let Ok(event) = line.parse::<JsonValue>() {
                if let Some(engine_id) = get_str(&event, "engine_id") {
                    self.process_event(&event, engine_id, &mut engine_stats);
                }
            }
        }

        if !engine_stats.is_empty() {
            self.write_aggregated_stats(&engine_stats)?;
        }

        Ok(())
    }

    fn process_event(
        &self,
        event: &JsonValue,
        engine_id: &str,
        stats: &mut HashMap<String, EngineStats>,
    ) {
        let entry = stats.entry(engine_id.to_string()).or_default();

        if let Some(latency) = get_f64_nested(event, &["metrics", "latency_ms"]) {
            entry.latencies.push(latency);
        }

        if let Some(success) = get_bool_nested(event, &["metrics", "success"]) {
            entry.total_requests += 1;
            if success {
                entry.successful_requests += 1;
            }
        }

        if let Some(quality) = get_f64(event, "quality_score") {
            entry.quality_scores.push(quality);
        }
    }

    fn write_aggregated_stats(&self, stats: &HashMap<String, EngineStats>) -> std::io::Result<()> {
        let output_path = self.output_dir.join("aggregated_stats.jsonl");
        let timestamp = atomic::current_time_rfc3339();
        let mut out_bytes = Vec::new();
        for (engine_id, engine_stats) in stats {
            let mut latencies = engine_stats.latencies.clone();
            latencies.sort_by(|a, b| a.partial_cmp(b).unwrap());

            let mut metrics = std::collections::BTreeMap::new();
            metrics.insert(
                "total_requests".to_string(),
                JsonValue::Number(engine_stats.total_requests as f64),
            );
            metrics.insert(
                "successful_requests".to_string(),
                JsonValue::Number(engine_stats.successful_requests as f64),
            );
            let success_rate = if engine_stats.total_requests > 0 {
                engine_stats.successful_requests as f64 / engine_stats.total_requests as f64
            } else {
                0.0
            };
            metrics.insert("success_rate".to_string(), JsonValue::Number(success_rate));
            metrics.insert(
                "latency_p50".to_string(),
                percentile(&latencies, 50)
                    .map(JsonValue::Number)
                    .unwrap_or(JsonValue::Null),
            );
            metrics.insert(
                "latency_p95".to_string(),
                percentile(&latencies, 95)
                    .map(JsonValue::Number)
                    .unwrap_or(JsonValue::Null),
            );
            metrics.insert(
                "latency_p99".to_string(),
                percentile(&latencies, 99)
                    .map(JsonValue::Number)
                    .unwrap_or(JsonValue::Null),
            );
            metrics.insert(
                "latency_min".to_string(),
                latencies
                    .first()
                    .copied()
                    .map(JsonValue::Number)
                    .unwrap_or(JsonValue::Null),
            );
            metrics.insert(
                "latency_max".to_string(),
                latencies
                    .last()
                    .copied()
                    .map(JsonValue::Number)
                    .unwrap_or(JsonValue::Null),
            );
            let avg_quality = if !engine_stats.quality_scores.is_empty() {
                engine_stats.quality_scores.iter().sum::<f64>()
                    / engine_stats.quality_scores.len() as f64
            } else {
                0.0
            };
            metrics.insert(
                "avg_quality_score".to_string(),
                JsonValue::Number(avg_quality),
            );

            let mut out = std::collections::BTreeMap::new();
            out.insert(
                "timestamp".to_string(),
                JsonValue::String(timestamp.clone()),
            );
            out.insert(
                "engine_id".to_string(),
                JsonValue::String(engine_id.clone()),
            );
            out.insert(
                "window_minutes".to_string(),
                JsonValue::Number(self.window_minutes as f64),
            );
            out.insert("metrics".to_string(), JsonValue::Object(metrics));

            let aggregated = JsonValue::Object(out);
            let s = format!("{}", aggregated);
            out_bytes.extend_from_slice(s.as_bytes());
            out_bytes.push(b'\n');
        }

        crate::config::atomic::atomic_append(&output_path, &out_bytes)?;
        crate::metrics::inc_writes_flushed();
        eprintln!(
            "[aggregator] wrote aggregated entries={} path={}",
            stats.len(),
            output_path.display()
        );
        Ok(())
    }
}

#[derive(Default)]
struct EngineStats {
    latencies: Vec<f64>,
    quality_scores: Vec<f64>,
    total_requests: u64,
    successful_requests: u64,
}

fn percentile(sorted: &[f64], p: u8) -> Option<f64> {
    if sorted.is_empty() {
        return None;
    }
    let index = (p as f64 / 100.0 * (sorted.len() - 1) as f64).round() as usize;
    Some(sorted[index])
}

fn get_str<'a>(v: &'a JsonValue, key: &str) -> Option<&'a str> {
    if let JsonValue::Object(m) = v {
        m.get(key).and_then(|x| {
            if let JsonValue::String(s) = x {
                Some(s.as_str())
            } else {
                None
            }
        })
    } else {
        None
    }
}

fn get_f64(v: &JsonValue, key: &str) -> Option<f64> {
    if let JsonValue::Object(m) = v {
        m.get(key).and_then(|x| {
            if let JsonValue::Number(n) = x {
                Some(*n)
            } else {
                None
            }
        })
    } else {
        None
    }
}

fn get_f64_nested(v: &JsonValue, path: &[&str]) -> Option<f64> {
    let mut cur = v;
    for p in path.iter().take(path.len() - 1) {
        if let JsonValue::Object(m) = cur {
            cur = m.get(*p)?;
        } else {
            return None;
        }
    }
    get_f64(cur, path[path.len() - 1])
}

fn get_bool_nested(v: &JsonValue, path: &[&str]) -> Option<bool> {
    let mut cur = v;
    for p in path.iter().take(path.len() - 1) {
        if let JsonValue::Object(m) = cur {
            cur = m.get(*p)?;
        } else {
            return None;
        }
    }
    if let JsonValue::Object(m) = cur {
        m.get(path[path.len() - 1]).and_then(|x| {
            if let JsonValue::Bool(b) = x {
                Some(*b)
            } else {
                None
            }
        })
    } else {
        None
    }
}