use std::io::Write;
use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
use serde_json::json;
use super::types::Observation;
pub struct Observer {
observations_dir: PathBuf,
batch_id: u32,
}
impl Observer {
pub fn new(evolution_dir: impl AsRef<Path>) -> Self {
let observations_dir = evolution_dir.as_ref().join("observations");
let _ = std::fs::create_dir_all(&observations_dir);
let batch_id = next_batch_id(&observations_dir);
Self {
observations_dir,
batch_id,
}
}
pub fn collect(&mut self, observations: &[Observation]) -> Result<PathBuf> {
let batch_file = self
.observations_dir
.join(format!("batch_{:04}.jsonl", self.batch_id));
let mut file = std::fs::File::create(&batch_file)
.with_context(|| format!("Failed to create {}", batch_file.display()))?;
for obs in observations {
let record = json!({
"task_id": obs.task.id,
"task_input": obs.task.input,
"agent_output": obs.trajectory.output,
"steps": obs.trajectory.steps,
"conversation": obs.trajectory.conversation,
"success": obs.feedback.success,
"score": obs.feedback.score,
"feedback_detail": obs.feedback.detail,
"timestamp": chrono::Utc::now().to_rfc3339(),
"task": {
"id": obs.task.id,
"input": obs.task.input,
"metadata": obs.task.metadata,
},
"trajectory": {
"output": obs.trajectory.output,
"steps": obs.trajectory.steps,
},
"feedback": {
"success": obs.feedback.success,
"score": obs.feedback.score,
"detail": obs.feedback.detail,
"raw": obs.feedback.raw,
},
});
serde_json::to_writer(&mut file, &record)?;
writeln!(file)?;
}
tracing::info!(
count = observations.len(),
batch = batch_file.display().to_string(),
"Wrote observations"
);
self.batch_id += 1;
Ok(batch_file)
}
pub fn get_recent_logs(&self, n_batches: usize) -> Result<Vec<serde_json::Value>> {
let mut batch_files: Vec<_> = std::fs::read_dir(&self.observations_dir)?
.filter_map(|e| e.ok())
.filter(|e| {
e.path()
.file_name()
.and_then(|n| n.to_str())
.map(|n| n.starts_with("batch_") && n.ends_with(".jsonl"))
.unwrap_or(false)
})
.map(|e| e.path())
.collect();
batch_files.sort();
let recent = if batch_files.len() > n_batches {
&batch_files[batch_files.len() - n_batches..]
} else {
&batch_files
};
let mut records = Vec::new();
for path in recent {
let content = std::fs::read_to_string(path)?;
for line in content.lines() {
if !line.trim().is_empty() {
records.push(serde_json::from_str(line)?);
}
}
}
Ok(records)
}
pub fn get_summary_stats(&self) -> Result<serde_json::Value> {
let all = self.get_recent_logs(9999)?;
if all.is_empty() {
return Ok(json!({"total": 0, "success_rate": 0.0, "avg_score": 0.0}));
}
let successes = all
.iter()
.filter(|r| r.get("success").and_then(|v| v.as_bool()).unwrap_or(false))
.count();
let scores: Vec<f64> = all
.iter()
.filter_map(|r| r.get("score").and_then(|v| v.as_f64()))
.collect();
let avg = if scores.is_empty() {
0.0
} else {
scores.iter().sum::<f64>() / scores.len() as f64
};
Ok(json!({
"total": all.len(),
"success_rate": successes as f64 / all.len() as f64,
"avg_score": avg,
}))
}
}
fn next_batch_id(dir: &Path) -> u32 {
let mut max_id = 0u32;
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
if let Some(stem) = entry.path().file_stem().and_then(|s| s.to_str())
&& let Some(id_str) = stem.strip_prefix("batch_")
&& let Ok(id) = id_str.parse::<u32>()
{
max_id = max_id.max(id);
}
}
}
max_id + 1
}