1use ralph_proto::{Event, UxEvent};
8use serde::{Deserialize, Serialize};
9use std::io::{self, Write};
10use std::sync::Mutex;
11use std::time::{Instant, SystemTime, UNIX_EPOCH};
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct Record {
19 pub ts: u64,
21
22 pub event: String,
24
25 pub data: serde_json::Value,
27}
28
29impl Record {
30 pub fn new(event: impl Into<String>, data: impl Serialize) -> Self {
32 let ts = SystemTime::now()
33 .duration_since(UNIX_EPOCH)
34 .unwrap_or_default()
35 .as_millis() as u64;
36
37 Self {
38 ts,
39 event: event.into(),
40 data: serde_json::to_value(data).unwrap_or(serde_json::Value::Null),
41 }
42 }
43
44 pub fn from_bus_event(event: &Event) -> Self {
46 Self::new("bus.publish", event)
47 }
48
49 pub fn from_ux_event(ux_event: &UxEvent) -> Self {
51 let event_type = match ux_event {
53 UxEvent::TerminalWrite(_) => "ux.terminal.write",
54 UxEvent::TerminalResize(_) => "ux.terminal.resize",
55 UxEvent::TerminalColorMode(_) => "ux.terminal.color_mode",
56 UxEvent::TuiFrame(_) => "ux.tui.frame",
57 };
58 Self::new(event_type, ux_event)
59 }
60
61 pub fn meta_loop_start(prompt_file: &str, max_iterations: u32, ux_mode: Option<&str>) -> Self {
63 Self::new(
64 "_meta.loop_start",
65 serde_json::json!({
66 "prompt_file": prompt_file,
67 "max_iterations": max_iterations,
68 "ux_mode": ux_mode.unwrap_or("cli"),
69 }),
70 )
71 }
72
73 pub fn meta_iteration(iteration: u32, elapsed_ms: u64, hat: &str) -> Self {
75 Self::new(
76 "_meta.iteration",
77 serde_json::json!({
78 "n": iteration,
79 "elapsed_ms": elapsed_ms,
80 "hat": hat,
81 }),
82 )
83 }
84
85 pub fn meta_termination(
87 reason: &str,
88 iterations: u32,
89 elapsed_secs: f64,
90 ux_writes: u32,
91 ) -> Self {
92 Self::new(
93 "_meta.termination",
94 serde_json::json!({
95 "reason": reason,
96 "iterations": iterations,
97 "elapsed_secs": elapsed_secs,
98 "ux_writes": ux_writes,
99 }),
100 )
101 }
102}
103
104pub struct SessionRecorder<W> {
128 writer: Mutex<W>,
130
131 start_time: Instant,
133
134 ux_write_count: Mutex<u32>,
136}
137
138impl<W: Write> SessionRecorder<W> {
139 pub fn new(writer: W) -> Self {
141 Self {
142 writer: Mutex::new(writer),
143 start_time: Instant::now(),
144 ux_write_count: Mutex::new(0),
145 }
146 }
147
148 pub fn record_bus_event(&self, event: &Event) {
150 let record = Record::from_bus_event(event);
151 self.write_record(&record);
152 }
153
154 pub fn record_ux_event(&self, ux_event: &UxEvent) {
156 if matches!(ux_event, UxEvent::TerminalWrite(_)) {
157 if let Ok(mut count) = self.ux_write_count.lock() {
158 *count += 1;
159 }
160 }
161 let record = Record::from_ux_event(ux_event);
162 self.write_record(&record);
163 }
164
165 pub fn record_ux_events(&self, events: &[UxEvent]) {
167 for event in events {
168 self.record_ux_event(event);
169 }
170 }
171
172 pub fn record_meta(&self, record: Record) {
174 self.write_record(&record);
175 }
176
177 pub fn ux_write_count(&self) -> u32 {
179 self.ux_write_count.lock().map(|g| *g).unwrap_or(0)
180 }
181
182 pub fn elapsed(&self) -> std::time::Duration {
184 self.start_time.elapsed()
185 }
186
187 fn write_record(&self, record: &Record) {
189 if let Ok(mut writer) = self.writer.lock() {
190 if let Ok(json) = serde_json::to_string(record) {
192 let _ = writeln!(writer, "{}", json);
193 }
194 }
195 }
196
197 pub fn flush(&self) -> io::Result<()> {
199 self.writer.lock().map_err(|_| {
200 io::Error::new(io::ErrorKind::Other, "Failed to acquire writer lock")
201 })?.flush()
202 }
203}
204
205impl<W: Write + Send + 'static> SessionRecorder<W> {
206 pub fn make_observer(
219 recorder: std::sync::Arc<Self>,
220 ) -> impl Fn(&Event) + Send + 'static {
221 move |event| {
222 recorder.record_bus_event(event);
223 }
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230
231 #[test]
232 fn test_record_bus_event() {
233 let mut output = Vec::new();
234 {
235 let recorder = SessionRecorder::new(&mut output);
236 let event = Event::new("task.start", "Begin work");
237 recorder.record_bus_event(&event);
238 }
239
240 let output_str = String::from_utf8_lossy(&output);
241 assert!(output_str.contains("bus.publish"));
242 assert!(output_str.contains("task.start"));
243 assert!(output_str.contains("Begin work"));
244 }
245
246 #[test]
247 fn test_record_ux_event() {
248 use ralph_proto::TerminalWrite;
249
250 let mut output = Vec::new();
251 {
252 let recorder = SessionRecorder::new(&mut output);
253 let ux_event = UxEvent::TerminalWrite(TerminalWrite::new(b"Hello", true, 100));
254 recorder.record_ux_event(&ux_event);
255 }
256
257 let output_str = String::from_utf8_lossy(&output);
258 assert!(output_str.contains("ux.terminal.write"));
259 assert!(output_str.contains("SGVsbG8=")); }
261
262 #[test]
263 fn test_record_metadata() {
264 let mut output = Vec::new();
265 {
266 let recorder = SessionRecorder::new(&mut output);
267 recorder.record_meta(Record::meta_loop_start("PROMPT.md", 100, Some("cli")));
268 recorder.record_meta(Record::meta_iteration(1, 5000, "default"));
269 recorder.record_meta(Record::meta_termination("CompletionPromise", 3, 25.5, 42));
270 }
271
272 let output_str = String::from_utf8_lossy(&output);
273 assert!(output_str.contains("_meta.loop_start"));
274 assert!(output_str.contains("_meta.iteration"));
275 assert!(output_str.contains("_meta.termination"));
276 assert!(output_str.contains("PROMPT.md"));
277 assert!(output_str.contains("CompletionPromise"));
278 }
279
280 #[test]
281 fn test_jsonl_format() {
282 let mut output = Vec::new();
283 {
284 let recorder = SessionRecorder::new(&mut output);
285 recorder.record_bus_event(&Event::new("test.1", "First"));
286 recorder.record_bus_event(&Event::new("test.2", "Second"));
287 }
288
289 let output_str = String::from_utf8_lossy(&output);
290 let lines: Vec<&str> = output_str.lines().collect();
291
292 assert_eq!(lines.len(), 2);
294
295 for line in lines {
297 let parsed: Result<serde_json::Value, _> = serde_json::from_str(line);
298 assert!(parsed.is_ok(), "Line should be valid JSON: {}", line);
299 }
300 }
301
302 #[test]
303 fn test_ux_write_count() {
304 use ralph_proto::{TerminalResize, TerminalWrite};
305
306 let output = Vec::new();
307 let recorder = SessionRecorder::new(output);
308
309 recorder.record_ux_event(&UxEvent::TerminalWrite(TerminalWrite::new(b"a", true, 0)));
311 recorder.record_ux_event(&UxEvent::TerminalResize(TerminalResize::new(80, 24, 10)));
312 recorder.record_ux_event(&UxEvent::TerminalWrite(TerminalWrite::new(b"b", true, 20)));
313
314 assert_eq!(recorder.ux_write_count(), 2);
316 }
317
318 #[test]
319 fn test_record_roundtrip() {
320 let event = Event::new("task.done", "Finished");
321 let record = Record::from_bus_event(&event);
322
323 let json = serde_json::to_string(&record).unwrap();
325 let parsed: Record = serde_json::from_str(&json).unwrap();
326
327 assert_eq!(parsed.event, "bus.publish");
328 assert!(parsed.ts > 0);
329 }
330}