Skip to main content

sparrow/runtime/
recorder.rs

1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3use std::path::PathBuf;
4
5use crate::event::Event;
6
7// ─── Transcript ─────────────────────────────────────────────────────────────────
8
9/// A run transcript: inputs.json + events.jsonl + checkpoint refs.
10/// Makes runs replayable, diffable, shareable, auditable.
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct Transcript {
13    pub run_id: String,
14    pub inputs: RunInputs,
15    pub events: Vec<Event>,
16    pub created_at: String,
17}
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct RunInputs {
21    pub task: String,
22    pub config_snapshot: serde_json::Value,
23    pub model_id: String,
24    pub repo_head: Option<String>,
25    pub timestamp: String,
26    pub agent: String,
27}
28
29impl Transcript {
30    pub fn new(run_id: String, inputs: RunInputs) -> Self {
31        Self {
32            run_id,
33            inputs,
34            events: Vec::new(),
35            created_at: chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
36        }
37    }
38
39    pub fn push_event(&mut self, event: &Event) {
40        self.events.push(event.clone());
41    }
42
43    pub fn event_count(&self) -> usize {
44        self.events.len()
45    }
46}
47
48// ─── THE RECORDER TRAIT ─────────────────────────────────────────────────────────
49
50pub trait Recorder: Send + Sync {
51    fn record(&self, event: &Event);
52    fn finalize(&self, run_id: &str) -> anyhow::Result<Transcript>;
53}
54
55// ─── THE REPLAYER TRAIT ─────────────────────────────────────────────────────────
56
57pub trait Replayer: Send + Sync {
58    fn load(&self, run_id: &str) -> Option<Transcript>;
59    fn list_transcripts(&self) -> Vec<String>;
60    fn delete(&self, run_id: &str) -> anyhow::Result<()>;
61}
62
63// ─── Filesystem-backed recorder/replayer ────────────────────────────────────────
64
65pub struct FsRecorder {
66    transcripts_dir: PathBuf,
67    active: std::sync::Mutex<HashMap<String, Transcript>>,
68}
69
70impl FsRecorder {
71    pub fn new(transcripts_dir: PathBuf) -> Self {
72        std::fs::create_dir_all(&transcripts_dir).ok();
73        Self {
74            transcripts_dir,
75            active: std::sync::Mutex::new(HashMap::new()),
76        }
77    }
78
79    pub fn start_run(&self, run_id: String, inputs: RunInputs) {
80        let transcript = Transcript::new(run_id, inputs);
81        self.active
82            .lock()
83            .unwrap()
84            .insert(transcript.run_id.clone(), transcript);
85    }
86
87    fn run_dir(&self, run_id: &str) -> PathBuf {
88        self.transcripts_dir.join(run_id)
89    }
90}
91
92impl Recorder for FsRecorder {
93    fn record(&self, event: &Event) {
94        let run_id = event_run_id(event);
95        if let Some(transcript) = self.active.lock().unwrap().get_mut(run_id) {
96            transcript.push_event(event);
97        }
98    }
99
100    fn finalize(&self, run_id: &str) -> anyhow::Result<Transcript> {
101        let transcript = self
102            .active
103            .lock()
104            .unwrap()
105            .remove(run_id)
106            .ok_or_else(|| anyhow::anyhow!("No active transcript for {}", run_id))?;
107
108        let run_dir = self.run_dir(run_id);
109        std::fs::create_dir_all(&run_dir)?;
110
111        // Write inputs.json
112        let inputs_json = serde_json::to_string_pretty(&transcript.inputs)?;
113        std::fs::write(run_dir.join("inputs.json"), inputs_json)?;
114
115        // Write events.jsonl
116        let mut events_jsonl = String::new();
117        for event in &transcript.events {
118            events_jsonl.push_str(&serde_json::to_string(event)?);
119            events_jsonl.push('\n');
120        }
121        std::fs::write(run_dir.join("events.jsonl"), events_jsonl)?;
122
123        // Write metadata
124        let meta = serde_json::json!({
125            "run_id": transcript.run_id,
126            "event_count": transcript.events.len(),
127            "created_at": transcript.created_at,
128        });
129        std::fs::write(
130            run_dir.join("meta.json"),
131            serde_json::to_string_pretty(&meta)?,
132        )?;
133
134        tracing::info!(
135            "Transcript saved: {} ({} events)",
136            run_id,
137            transcript.events.len()
138        );
139
140        Ok(transcript)
141    }
142}
143
144fn event_run_id(event: &Event) -> &str {
145    match event {
146        Event::RunStarted { run, .. }
147        | Event::RouteSelected { run, .. }
148        | Event::ModelSwitched { run, .. }
149        | Event::ThinkingDelta { run, .. }
150        | Event::ReasoningDelta { run, .. }
151        | Event::Message { run, .. }
152        | Event::ToolUseProposed { run, .. }
153        | Event::ApprovalRequested { run, .. }
154        | Event::ApprovalResolved { run, .. }
155        | Event::ToolUseStarted { run, .. }
156        | Event::ToolOutput { run, .. }
157        | Event::DiffProposed { run, .. }
158        | Event::DiffApplied { run, .. }
159        | Event::TestResult { run, .. }
160        | Event::AgentSpawned { run, .. }
161        | Event::AgentStatus { run, .. }
162        | Event::CheckpointCreated { run, .. }
163        | Event::SkillLearned { run, .. }
164        | Event::CostUpdate { run, .. }
165        | Event::TokenUsage { run, .. }
166        | Event::TokenUsageEstimated { run, .. }
167        | Event::AutonomyChanged { run, .. }
168        | Event::RunFinished { run, .. }
169        | Event::Error { run, .. }
170        | Event::Compacted { run, .. } => &run.0,
171    }
172}
173
174impl Replayer for FsRecorder {
175    fn load(&self, run_id: &str) -> Option<Transcript> {
176        let run_dir = self.run_dir(run_id);
177        if !run_dir.exists() {
178            return None;
179        }
180
181        let inputs: RunInputs =
182            serde_json::from_str(&std::fs::read_to_string(run_dir.join("inputs.json")).ok()?)
183                .ok()?;
184
185        let events_text = std::fs::read_to_string(run_dir.join("events.jsonl")).ok()?;
186        let events: Vec<Event> = events_text
187            .lines()
188            .filter(|l| !l.is_empty())
189            .filter_map(|l| serde_json::from_str(l).ok())
190            .collect();
191
192        Some(Transcript {
193            run_id: run_id.to_string(),
194            inputs,
195            events,
196            created_at: String::new(),
197        })
198    }
199
200    fn list_transcripts(&self) -> Vec<String> {
201        let mut ids = Vec::new();
202        if let Ok(entries) = std::fs::read_dir(&self.transcripts_dir) {
203            for entry in entries.flatten() {
204                if entry.path().is_dir() {
205                    if let Some(name) = entry.file_name().to_str() {
206                        ids.push(name.to_string());
207                    }
208                }
209            }
210        }
211        ids.sort();
212        ids
213    }
214
215    fn delete(&self, run_id: &str) -> anyhow::Result<()> {
216        let run_dir = self.run_dir(run_id);
217        if run_dir.exists() {
218            std::fs::remove_dir_all(&run_dir)?;
219        }
220        Ok(())
221    }
222}