sparrow/runtime/
recorder.rs1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3use std::path::PathBuf;
4
5use crate::event::Event;
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct Transcript {
13 pub run_id: String,
14 pub inputs: RunInputs,
15 pub events: Vec<Event>,
16 pub created_at: String,
17}
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct RunInputs {
21 pub task: String,
22 pub config_snapshot: serde_json::Value,
23 pub model_id: String,
24 pub repo_head: Option<String>,
25 pub timestamp: String,
26 pub agent: String,
27}
28
29impl Transcript {
30 pub fn new(run_id: String, inputs: RunInputs) -> Self {
31 Self {
32 run_id,
33 inputs,
34 events: Vec::new(),
35 created_at: chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
36 }
37 }
38
39 pub fn push_event(&mut self, event: &Event) {
40 self.events.push(event.clone());
41 }
42
43 pub fn event_count(&self) -> usize {
44 self.events.len()
45 }
46}
47
48pub trait Recorder: Send + Sync {
51 fn record(&self, event: &Event);
52 fn finalize(&self, run_id: &str) -> anyhow::Result<Transcript>;
53}
54
55pub trait Replayer: Send + Sync {
58 fn load(&self, run_id: &str) -> Option<Transcript>;
59 fn list_transcripts(&self) -> Vec<String>;
60 fn delete(&self, run_id: &str) -> anyhow::Result<()>;
61}
62
63pub struct FsRecorder {
66 transcripts_dir: PathBuf,
67 active: std::sync::Mutex<HashMap<String, Transcript>>,
68}
69
70impl FsRecorder {
71 pub fn new(transcripts_dir: PathBuf) -> Self {
72 std::fs::create_dir_all(&transcripts_dir).ok();
73 Self {
74 transcripts_dir,
75 active: std::sync::Mutex::new(HashMap::new()),
76 }
77 }
78
79 pub fn start_run(&self, run_id: String, inputs: RunInputs) {
80 let transcript = Transcript::new(run_id, inputs);
81 self.active
82 .lock()
83 .unwrap()
84 .insert(transcript.run_id.clone(), transcript);
85 }
86
87 fn run_dir(&self, run_id: &str) -> PathBuf {
88 self.transcripts_dir.join(run_id)
89 }
90}
91
92impl Recorder for FsRecorder {
93 fn record(&self, event: &Event) {
94 let run_id = event_run_id(event);
95 if let Some(transcript) = self.active.lock().unwrap().get_mut(run_id) {
96 transcript.push_event(event);
97 }
98 }
99
100 fn finalize(&self, run_id: &str) -> anyhow::Result<Transcript> {
101 let transcript = self
102 .active
103 .lock()
104 .unwrap()
105 .remove(run_id)
106 .ok_or_else(|| anyhow::anyhow!("No active transcript for {}", run_id))?;
107
108 let run_dir = self.run_dir(run_id);
109 std::fs::create_dir_all(&run_dir)?;
110
111 let inputs_json = serde_json::to_string_pretty(&transcript.inputs)?;
113 std::fs::write(run_dir.join("inputs.json"), inputs_json)?;
114
115 let mut events_jsonl = String::new();
117 for event in &transcript.events {
118 events_jsonl.push_str(&serde_json::to_string(event)?);
119 events_jsonl.push('\n');
120 }
121 std::fs::write(run_dir.join("events.jsonl"), events_jsonl)?;
122
123 let meta = serde_json::json!({
125 "run_id": transcript.run_id,
126 "event_count": transcript.events.len(),
127 "created_at": transcript.created_at,
128 });
129 std::fs::write(
130 run_dir.join("meta.json"),
131 serde_json::to_string_pretty(&meta)?,
132 )?;
133
134 tracing::info!(
135 "Transcript saved: {} ({} events)",
136 run_id,
137 transcript.events.len()
138 );
139
140 Ok(transcript)
141 }
142}
143
144fn event_run_id(event: &Event) -> &str {
145 match event {
146 Event::RunStarted { run, .. }
147 | Event::RouteSelected { run, .. }
148 | Event::ModelSwitched { run, .. }
149 | Event::ThinkingDelta { run, .. }
150 | Event::ReasoningDelta { run, .. }
151 | Event::Message { run, .. }
152 | Event::ToolUseProposed { run, .. }
153 | Event::ApprovalRequested { run, .. }
154 | Event::ApprovalResolved { run, .. }
155 | Event::ToolUseStarted { run, .. }
156 | Event::ToolOutput { run, .. }
157 | Event::DiffProposed { run, .. }
158 | Event::DiffApplied { run, .. }
159 | Event::TestResult { run, .. }
160 | Event::AgentSpawned { run, .. }
161 | Event::AgentStatus { run, .. }
162 | Event::CheckpointCreated { run, .. }
163 | Event::SkillLearned { run, .. }
164 | Event::CostUpdate { run, .. }
165 | Event::TokenUsage { run, .. }
166 | Event::TokenUsageEstimated { run, .. }
167 | Event::AutonomyChanged { run, .. }
168 | Event::RunFinished { run, .. }
169 | Event::Error { run, .. }
170 | Event::Compacted { run, .. } => &run.0,
171 }
172}
173
174impl Replayer for FsRecorder {
175 fn load(&self, run_id: &str) -> Option<Transcript> {
176 let run_dir = self.run_dir(run_id);
177 if !run_dir.exists() {
178 return None;
179 }
180
181 let inputs: RunInputs =
182 serde_json::from_str(&std::fs::read_to_string(run_dir.join("inputs.json")).ok()?)
183 .ok()?;
184
185 let events_text = std::fs::read_to_string(run_dir.join("events.jsonl")).ok()?;
186 let events: Vec<Event> = events_text
187 .lines()
188 .filter(|l| !l.is_empty())
189 .filter_map(|l| serde_json::from_str(l).ok())
190 .collect();
191
192 Some(Transcript {
193 run_id: run_id.to_string(),
194 inputs,
195 events,
196 created_at: String::new(),
197 })
198 }
199
200 fn list_transcripts(&self) -> Vec<String> {
201 let mut ids = Vec::new();
202 if let Ok(entries) = std::fs::read_dir(&self.transcripts_dir) {
203 for entry in entries.flatten() {
204 if entry.path().is_dir() {
205 if let Some(name) = entry.file_name().to_str() {
206 ids.push(name.to_string());
207 }
208 }
209 }
210 }
211 ids.sort();
212 ids
213 }
214
215 fn delete(&self, run_id: &str) -> anyhow::Result<()> {
216 let run_dir = self.run_dir(run_id);
217 if run_dir.exists() {
218 std::fs::remove_dir_all(&run_dir)?;
219 }
220 Ok(())
221 }
222}