1use ralph_proto::{Event, UxEvent};
8use serde::{Deserialize, Serialize};
9use std::io::{self, Write};
10use std::sync::Mutex;
11use std::time::{Instant, SystemTime, UNIX_EPOCH};
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct Record {
19 pub ts: u64,
21
22 pub event: String,
24
25 pub data: serde_json::Value,
27}
28
29impl Record {
30 pub fn new(event: impl Into<String>, data: impl Serialize) -> Self {
32 let ts = SystemTime::now()
33 .duration_since(UNIX_EPOCH)
34 .unwrap_or_default()
35 .as_millis() as u64;
36
37 Self {
38 ts,
39 event: event.into(),
40 data: serde_json::to_value(data).unwrap_or(serde_json::Value::Null),
41 }
42 }
43
44 pub fn from_bus_event(event: &Event) -> Self {
46 Self::new("bus.publish", event)
47 }
48
49 pub fn from_ux_event(ux_event: &UxEvent) -> Self {
51 let event_type = match ux_event {
53 UxEvent::TerminalWrite(_) => "ux.terminal.write",
54 UxEvent::TerminalResize(_) => "ux.terminal.resize",
55 UxEvent::TerminalColorMode(_) => "ux.terminal.color_mode",
56 UxEvent::TuiFrame(_) => "ux.tui.frame",
57 };
58 Self::new(event_type, ux_event)
59 }
60
61 pub fn meta_loop_start(prompt_file: &str, max_iterations: u32, ux_mode: Option<&str>) -> Self {
63 Self::new(
64 "_meta.loop_start",
65 serde_json::json!({
66 "prompt_file": prompt_file,
67 "max_iterations": max_iterations,
68 "ux_mode": ux_mode.unwrap_or("cli"),
69 }),
70 )
71 }
72
73 pub fn meta_iteration(iteration: u32, elapsed_ms: u64, hat: &str) -> Self {
75 Self::new(
76 "_meta.iteration",
77 serde_json::json!({
78 "n": iteration,
79 "elapsed_ms": elapsed_ms,
80 "hat": hat,
81 }),
82 )
83 }
84
85 pub fn meta_termination(
87 reason: &str,
88 iterations: u32,
89 elapsed_secs: f64,
90 ux_writes: u32,
91 ) -> Self {
92 Self::new(
93 "_meta.termination",
94 serde_json::json!({
95 "reason": reason,
96 "iterations": iterations,
97 "elapsed_secs": elapsed_secs,
98 "ux_writes": ux_writes,
99 }),
100 )
101 }
102}
103
104pub struct SessionRecorder<W> {
128 writer: Mutex<W>,
130
131 start_time: Instant,
133
134 ux_write_count: Mutex<u32>,
136}
137
138impl<W: Write> SessionRecorder<W> {
139 pub fn new(writer: W) -> Self {
141 Self {
142 writer: Mutex::new(writer),
143 start_time: Instant::now(),
144 ux_write_count: Mutex::new(0),
145 }
146 }
147
148 pub fn record_bus_event(&self, event: &Event) {
150 let record = Record::from_bus_event(event);
151 self.write_record(&record);
152 }
153
154 pub fn record_ux_event(&self, ux_event: &UxEvent) {
156 if matches!(ux_event, UxEvent::TerminalWrite(_))
157 && let Ok(mut count) = self.ux_write_count.lock()
158 {
159 *count += 1;
160 }
161 let record = Record::from_ux_event(ux_event);
162 self.write_record(&record);
163 }
164
165 pub fn record_ux_events(&self, events: &[UxEvent]) {
167 for event in events {
168 self.record_ux_event(event);
169 }
170 }
171
172 pub fn record_meta(&self, record: Record) {
174 self.write_record(&record);
175 }
176
177 pub fn ux_write_count(&self) -> u32 {
179 self.ux_write_count.lock().map(|g| *g).unwrap_or(0)
180 }
181
182 pub fn elapsed(&self) -> std::time::Duration {
184 self.start_time.elapsed()
185 }
186
187 fn write_record(&self, record: &Record) {
189 if let Ok(mut writer) = self.writer.lock() {
190 if let Ok(json) = serde_json::to_string(record) {
192 let _ = writeln!(writer, "{}", json);
193 }
194 }
195 }
196
197 pub fn flush(&self) -> io::Result<()> {
199 self.writer
200 .lock()
201 .map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to acquire writer lock"))?
202 .flush()
203 }
204}
205
206impl<W: Write + Send + 'static> SessionRecorder<W> {
207 pub fn make_observer(recorder: std::sync::Arc<Self>) -> impl Fn(&Event) + Send + 'static {
220 move |event| {
221 recorder.record_bus_event(event);
222 }
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229
230 #[test]
231 fn test_record_bus_event() {
232 let mut output = Vec::new();
233 {
234 let recorder = SessionRecorder::new(&mut output);
235 let event = Event::new("task.start", "Begin work");
236 recorder.record_bus_event(&event);
237 }
238
239 let output_str = String::from_utf8_lossy(&output);
240 assert!(output_str.contains("bus.publish"));
241 assert!(output_str.contains("task.start"));
242 assert!(output_str.contains("Begin work"));
243 }
244
245 #[test]
246 fn test_record_ux_event() {
247 use ralph_proto::TerminalWrite;
248
249 let mut output = Vec::new();
250 {
251 let recorder = SessionRecorder::new(&mut output);
252 let ux_event = UxEvent::TerminalWrite(TerminalWrite::new(b"Hello", true, 100));
253 recorder.record_ux_event(&ux_event);
254 }
255
256 let output_str = String::from_utf8_lossy(&output);
257 assert!(output_str.contains("ux.terminal.write"));
258 assert!(output_str.contains("SGVsbG8=")); }
260
261 #[test]
262 fn test_record_metadata() {
263 let mut output = Vec::new();
264 {
265 let recorder = SessionRecorder::new(&mut output);
266 recorder.record_meta(Record::meta_loop_start("PROMPT.md", 100, Some("cli")));
267 recorder.record_meta(Record::meta_iteration(1, 5000, "default"));
268 recorder.record_meta(Record::meta_termination("CompletionPromise", 3, 25.5, 42));
269 }
270
271 let output_str = String::from_utf8_lossy(&output);
272 assert!(output_str.contains("_meta.loop_start"));
273 assert!(output_str.contains("_meta.iteration"));
274 assert!(output_str.contains("_meta.termination"));
275 assert!(output_str.contains("PROMPT.md"));
276 assert!(output_str.contains("CompletionPromise"));
277 }
278
279 #[test]
280 fn test_jsonl_format() {
281 let mut output = Vec::new();
282 {
283 let recorder = SessionRecorder::new(&mut output);
284 recorder.record_bus_event(&Event::new("test.1", "First"));
285 recorder.record_bus_event(&Event::new("test.2", "Second"));
286 }
287
288 let output_str = String::from_utf8_lossy(&output);
289 let lines: Vec<&str> = output_str.lines().collect();
290
291 assert_eq!(lines.len(), 2);
293
294 for line in lines {
296 let parsed: Result<serde_json::Value, _> = serde_json::from_str(line);
297 assert!(parsed.is_ok(), "Line should be valid JSON: {}", line);
298 }
299 }
300
301 #[test]
302 fn test_ux_write_count() {
303 use ralph_proto::{TerminalResize, TerminalWrite};
304
305 let output = Vec::new();
306 let recorder = SessionRecorder::new(output);
307
308 recorder.record_ux_event(&UxEvent::TerminalWrite(TerminalWrite::new(b"a", true, 0)));
310 recorder.record_ux_event(&UxEvent::TerminalResize(TerminalResize::new(80, 24, 10)));
311 recorder.record_ux_event(&UxEvent::TerminalWrite(TerminalWrite::new(b"b", true, 20)));
312
313 assert_eq!(recorder.ux_write_count(), 2);
315 }
316
317 #[test]
318 fn test_record_roundtrip() {
319 let event = Event::new("task.done", "Finished");
320 let record = Record::from_bus_event(&event);
321
322 let json = serde_json::to_string(&record).unwrap();
324 let parsed: Record = serde_json::from_str(&json).unwrap();
325
326 assert_eq!(parsed.event, "bus.publish");
327 assert!(parsed.ts > 0);
328 }
329}