collet 0.1.1

Relentless agentic coding orchestrator with zero-drop agent loops
Documentation
//! Observer — collects (task, trajectory, feedback) triples into JSONL logs.
//!
//! Each solve batch produces a `batch_NNNN.jsonl` file under
//! `evolution/observations/`.

use std::io::Write;
use std::path::{Path, PathBuf};

use anyhow::{Context, Result};
use serde_json::json;

use super::types::Observation;

/// Collects observations and persists them as JSONL.
pub struct Observer {
    observations_dir: PathBuf,
    batch_id: u32,
}

impl Observer {
    pub fn new(evolution_dir: impl AsRef<Path>) -> Self {
        let observations_dir = evolution_dir.as_ref().join("observations");
        let _ = std::fs::create_dir_all(&observations_dir);
        let batch_id = next_batch_id(&observations_dir);
        Self {
            observations_dir,
            batch_id,
        }
    }

    /// Write a batch of observations to a JSONL file.  Returns the file path.
    pub fn collect(&mut self, observations: &[Observation]) -> Result<PathBuf> {
        let batch_file = self
            .observations_dir
            .join(format!("batch_{:04}.jsonl", self.batch_id));

        let mut file = std::fs::File::create(&batch_file)
            .with_context(|| format!("Failed to create {}", batch_file.display()))?;

        for obs in observations {
            let record = json!({
                // Flat fields (backward compat)
                "task_id": obs.task.id,
                "task_input": obs.task.input,
                "agent_output": obs.trajectory.output,
                "steps": obs.trajectory.steps,
                "conversation": obs.trajectory.conversation,
                "success": obs.feedback.success,
                "score": obs.feedback.score,
                "feedback_detail": obs.feedback.detail,
                "timestamp": chrono::Utc::now().to_rfc3339(),

                // Nested structure for engines
                "task": {
                    "id": obs.task.id,
                    "input": obs.task.input,
                    "metadata": obs.task.metadata,
                },
                "trajectory": {
                    "output": obs.trajectory.output,
                    "steps": obs.trajectory.steps,
                },
                "feedback": {
                    "success": obs.feedback.success,
                    "score": obs.feedback.score,
                    "detail": obs.feedback.detail,
                    "raw": obs.feedback.raw,
                },
            });

            serde_json::to_writer(&mut file, &record)?;
            writeln!(file)?;
        }

        tracing::info!(
            count = observations.len(),
            batch = batch_file.display().to_string(),
            "Wrote observations"
        );

        self.batch_id += 1;
        Ok(batch_file)
    }

    /// Read the most recent N batches of observations.
    pub fn get_recent_logs(&self, n_batches: usize) -> Result<Vec<serde_json::Value>> {
        let mut batch_files: Vec<_> = std::fs::read_dir(&self.observations_dir)?
            .filter_map(|e| e.ok())
            .filter(|e| {
                e.path()
                    .file_name()
                    .and_then(|n| n.to_str())
                    .map(|n| n.starts_with("batch_") && n.ends_with(".jsonl"))
                    .unwrap_or(false)
            })
            .map(|e| e.path())
            .collect();

        batch_files.sort();
        let recent = if batch_files.len() > n_batches {
            &batch_files[batch_files.len() - n_batches..]
        } else {
            &batch_files
        };

        let mut records = Vec::new();
        for path in recent {
            let content = std::fs::read_to_string(path)?;
            for line in content.lines() {
                if !line.trim().is_empty() {
                    records.push(serde_json::from_str(line)?);
                }
            }
        }
        Ok(records)
    }

    /// Compute aggregate stats across all observations.
    pub fn get_summary_stats(&self) -> Result<serde_json::Value> {
        let all = self.get_recent_logs(9999)?;
        if all.is_empty() {
            return Ok(json!({"total": 0, "success_rate": 0.0, "avg_score": 0.0}));
        }
        let successes = all
            .iter()
            .filter(|r| r.get("success").and_then(|v| v.as_bool()).unwrap_or(false))
            .count();
        let scores: Vec<f64> = all
            .iter()
            .filter_map(|r| r.get("score").and_then(|v| v.as_f64()))
            .collect();
        let avg = if scores.is_empty() {
            0.0
        } else {
            scores.iter().sum::<f64>() / scores.len() as f64
        };
        Ok(json!({
            "total": all.len(),
            "success_rate": successes as f64 / all.len() as f64,
            "avg_score": avg,
        }))
    }
}

fn next_batch_id(dir: &Path) -> u32 {
    let mut max_id = 0u32;
    if let Ok(entries) = std::fs::read_dir(dir) {
        for entry in entries.flatten() {
            if let Some(stem) = entry.path().file_stem().and_then(|s| s.to_str())
                && let Some(id_str) = stem.strip_prefix("batch_")
                && let Ok(id) = id_str.parse::<u32>()
            {
                max_id = max_id.max(id);
            }
        }
    }
    max_id + 1
}