Skip to main content

ralph_core/
session_recorder.rs

1//! Session recorder for writing events to JSONL files.
2//!
3//! `SessionRecorder` captures events from both the EventBus (routing events)
4//! and UX captures (terminal output) into a unified JSONL format for replay
5//! and analysis.
6
7use ralph_proto::{Event, UxEvent};
8use serde::{Deserialize, Serialize};
9use std::io::{self, Write};
10use std::sync::Mutex;
11use std::time::{Instant, SystemTime, UNIX_EPOCH};
12
13/// A timestamped record in the JSONL session file.
14///
15/// Records use internal tagging to distinguish event types while maintaining
16/// a flat structure for easy parsing.
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct Record {
19    /// Unix timestamp in milliseconds when the event was recorded.
20    pub ts: u64,
21
22    /// The event type discriminator (e.g., "bus.publish", "ux.terminal.write").
23    pub event: String,
24
25    /// The event data, serialized based on event type.
26    pub data: serde_json::Value,
27}
28
29impl Record {
30    /// Creates a new record with the current timestamp.
31    pub fn new(event: impl Into<String>, data: impl Serialize) -> Self {
32        let ts = SystemTime::now()
33            .duration_since(UNIX_EPOCH)
34            .unwrap_or_default()
35            .as_millis() as u64;
36
37        Self {
38            ts,
39            event: event.into(),
40            data: serde_json::to_value(data).unwrap_or(serde_json::Value::Null),
41        }
42    }
43
44    /// Creates a record for an EventBus event.
45    pub fn from_bus_event(event: &Event) -> Self {
46        Self::new("bus.publish", event)
47    }
48
49    /// Creates a record for a UX event.
50    pub fn from_ux_event(ux_event: &UxEvent) -> Self {
51        // Extract the event type from the UxEvent's serde tag
52        let event_type = match ux_event {
53            UxEvent::TerminalWrite(_) => "ux.terminal.write",
54            UxEvent::TerminalResize(_) => "ux.terminal.resize",
55            UxEvent::TerminalColorMode(_) => "ux.terminal.color_mode",
56            UxEvent::TuiFrame(_) => "ux.tui.frame",
57        };
58        Self::new(event_type, ux_event)
59    }
60
61    /// Creates a metadata record for loop start.
62    pub fn meta_loop_start(prompt_file: &str, max_iterations: u32, ux_mode: Option<&str>) -> Self {
63        Self::new(
64            "_meta.loop_start",
65            serde_json::json!({
66                "prompt_file": prompt_file,
67                "max_iterations": max_iterations,
68                "ux_mode": ux_mode.unwrap_or("cli"),
69            }),
70        )
71    }
72
73    /// Creates a metadata record for an iteration.
74    pub fn meta_iteration(iteration: u32, elapsed_ms: u64, hat: &str) -> Self {
75        Self::new(
76            "_meta.iteration",
77            serde_json::json!({
78                "n": iteration,
79                "elapsed_ms": elapsed_ms,
80                "hat": hat,
81            }),
82        )
83    }
84
85    /// Creates a metadata record for termination.
86    pub fn meta_termination(
87        reason: &str,
88        iterations: u32,
89        elapsed_secs: f64,
90        ux_writes: u32,
91    ) -> Self {
92        Self::new(
93            "_meta.termination",
94            serde_json::json!({
95                "reason": reason,
96                "iterations": iterations,
97                "elapsed_secs": elapsed_secs,
98                "ux_writes": ux_writes,
99            }),
100        )
101    }
102}
103
104/// Records session events to a JSONL output.
105///
106/// The recorder is thread-safe and can be used as an EventBus observer.
107/// It writes each event as a JSON line immediately for crash resilience.
108///
109/// # Example
110///
111/// ```
112/// use ralph_core::SessionRecorder;
113/// use ralph_proto::Event;
114///
115/// let mut output = Vec::new();
116/// let recorder = SessionRecorder::new(&mut output);
117///
118/// // Record a bus event
119/// let event = Event::new("task.start", "Begin implementation");
120/// recorder.record_bus_event(&event);
121///
122/// // Flush and check output
123/// drop(recorder);
124/// let output_str = String::from_utf8_lossy(&output);
125/// assert!(output_str.contains("bus.publish"));
126/// ```
127pub struct SessionRecorder<W> {
128    /// The output writer, wrapped in a mutex for thread-safe access.
129    writer: Mutex<W>,
130
131    /// Start time for calculating session-relative offsets.
132    start_time: Instant,
133
134    /// Counter for UX write events recorded.
135    ux_write_count: Mutex<u32>,
136}
137
138impl<W: Write> SessionRecorder<W> {
139    /// Creates a new session recorder writing to the given output.
140    pub fn new(writer: W) -> Self {
141        Self {
142            writer: Mutex::new(writer),
143            start_time: Instant::now(),
144            ux_write_count: Mutex::new(0),
145        }
146    }
147
148    /// Records an EventBus event.
149    pub fn record_bus_event(&self, event: &Event) {
150        let record = Record::from_bus_event(event);
151        self.write_record(&record);
152    }
153
154    /// Records a UX event.
155    pub fn record_ux_event(&self, ux_event: &UxEvent) {
156        if matches!(ux_event, UxEvent::TerminalWrite(_)) {
157            if let Ok(mut count) = self.ux_write_count.lock() {
158                *count += 1;
159            }
160        }
161        let record = Record::from_ux_event(ux_event);
162        self.write_record(&record);
163    }
164
165    /// Records multiple UX events.
166    pub fn record_ux_events(&self, events: &[UxEvent]) {
167        for event in events {
168            self.record_ux_event(event);
169        }
170    }
171
172    /// Records a metadata event.
173    pub fn record_meta(&self, record: Record) {
174        self.write_record(&record);
175    }
176
177    /// Returns the number of UX write events recorded.
178    pub fn ux_write_count(&self) -> u32 {
179        self.ux_write_count.lock().map(|g| *g).unwrap_or(0)
180    }
181
182    /// Returns the elapsed time since recording started.
183    pub fn elapsed(&self) -> std::time::Duration {
184        self.start_time.elapsed()
185    }
186
187    /// Writes a record to the output.
188    fn write_record(&self, record: &Record) {
189        if let Ok(mut writer) = self.writer.lock() {
190            // Ignore write errors - recording should not interrupt execution
191            if let Ok(json) = serde_json::to_string(record) {
192                let _ = writeln!(writer, "{}", json);
193            }
194        }
195    }
196
197    /// Flushes the underlying writer.
198    pub fn flush(&self) -> io::Result<()> {
199        self.writer.lock().map_err(|_| {
200            io::Error::new(io::ErrorKind::Other, "Failed to acquire writer lock")
201        })?.flush()
202    }
203}
204
205impl<W: Write + Send + 'static> SessionRecorder<W> {
206    /// Creates an observer closure suitable for EventBus::set_observer.
207    ///
208    /// The returned closure holds a reference to this recorder and calls
209    /// `record_bus_event` for each event received.
210    ///
211    /// # Example
212    ///
213    /// ```ignore
214    /// let recorder = Arc::new(SessionRecorder::new(file));
215    /// let observer = SessionRecorder::make_observer(Arc::clone(&recorder));
216    /// event_bus.set_observer(observer);
217    /// ```
218    pub fn make_observer(
219        recorder: std::sync::Arc<Self>,
220    ) -> impl Fn(&Event) + Send + 'static {
221        move |event| {
222            recorder.record_bus_event(event);
223        }
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230
231    #[test]
232    fn test_record_bus_event() {
233        let mut output = Vec::new();
234        {
235            let recorder = SessionRecorder::new(&mut output);
236            let event = Event::new("task.start", "Begin work");
237            recorder.record_bus_event(&event);
238        }
239
240        let output_str = String::from_utf8_lossy(&output);
241        assert!(output_str.contains("bus.publish"));
242        assert!(output_str.contains("task.start"));
243        assert!(output_str.contains("Begin work"));
244    }
245
246    #[test]
247    fn test_record_ux_event() {
248        use ralph_proto::TerminalWrite;
249
250        let mut output = Vec::new();
251        {
252            let recorder = SessionRecorder::new(&mut output);
253            let ux_event = UxEvent::TerminalWrite(TerminalWrite::new(b"Hello", true, 100));
254            recorder.record_ux_event(&ux_event);
255        }
256
257        let output_str = String::from_utf8_lossy(&output);
258        assert!(output_str.contains("ux.terminal.write"));
259        assert!(output_str.contains("SGVsbG8=")); // "Hello" in base64
260    }
261
262    #[test]
263    fn test_record_metadata() {
264        let mut output = Vec::new();
265        {
266            let recorder = SessionRecorder::new(&mut output);
267            recorder.record_meta(Record::meta_loop_start("PROMPT.md", 100, Some("cli")));
268            recorder.record_meta(Record::meta_iteration(1, 5000, "default"));
269            recorder.record_meta(Record::meta_termination("CompletionPromise", 3, 25.5, 42));
270        }
271
272        let output_str = String::from_utf8_lossy(&output);
273        assert!(output_str.contains("_meta.loop_start"));
274        assert!(output_str.contains("_meta.iteration"));
275        assert!(output_str.contains("_meta.termination"));
276        assert!(output_str.contains("PROMPT.md"));
277        assert!(output_str.contains("CompletionPromise"));
278    }
279
280    #[test]
281    fn test_jsonl_format() {
282        let mut output = Vec::new();
283        {
284            let recorder = SessionRecorder::new(&mut output);
285            recorder.record_bus_event(&Event::new("test.1", "First"));
286            recorder.record_bus_event(&Event::new("test.2", "Second"));
287        }
288
289        let output_str = String::from_utf8_lossy(&output);
290        let lines: Vec<&str> = output_str.lines().collect();
291
292        // Should have exactly 2 lines (JSONL format)
293        assert_eq!(lines.len(), 2);
294
295        // Each line should be valid JSON
296        for line in lines {
297            let parsed: Result<serde_json::Value, _> = serde_json::from_str(line);
298            assert!(parsed.is_ok(), "Line should be valid JSON: {}", line);
299        }
300    }
301
302    #[test]
303    fn test_ux_write_count() {
304        use ralph_proto::{TerminalResize, TerminalWrite};
305
306        let output = Vec::new();
307        let recorder = SessionRecorder::new(output);
308
309        // Record some UX events
310        recorder.record_ux_event(&UxEvent::TerminalWrite(TerminalWrite::new(b"a", true, 0)));
311        recorder.record_ux_event(&UxEvent::TerminalResize(TerminalResize::new(80, 24, 10)));
312        recorder.record_ux_event(&UxEvent::TerminalWrite(TerminalWrite::new(b"b", true, 20)));
313
314        // Only TerminalWrite events should be counted
315        assert_eq!(recorder.ux_write_count(), 2);
316    }
317
318    #[test]
319    fn test_record_roundtrip() {
320        let event = Event::new("task.done", "Finished");
321        let record = Record::from_bus_event(&event);
322
323        // Serialize and deserialize
324        let json = serde_json::to_string(&record).unwrap();
325        let parsed: Record = serde_json::from_str(&json).unwrap();
326
327        assert_eq!(parsed.event, "bus.publish");
328        assert!(parsed.ts > 0);
329    }
330}