ralph_core/
session_recorder.rs

1//! Session recorder for writing events to JSONL files.
2//!
3//! `SessionRecorder` captures events from both the EventBus (routing events)
4//! and UX captures (terminal output) into a unified JSONL format for replay
5//! and analysis.
6
7use ralph_proto::{Event, UxEvent};
8use serde::{Deserialize, Serialize};
9use std::io::{self, Write};
10use std::sync::Mutex;
11use std::time::{Instant, SystemTime, UNIX_EPOCH};
12
13/// A timestamped record in the JSONL session file.
14///
15/// Records use internal tagging to distinguish event types while maintaining
16/// a flat structure for easy parsing.
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct Record {
19    /// Unix timestamp in milliseconds when the event was recorded.
20    pub ts: u64,
21
22    /// The event type discriminator (e.g., "bus.publish", "ux.terminal.write").
23    pub event: String,
24
25    /// The event data, serialized based on event type.
26    pub data: serde_json::Value,
27}
28
29impl Record {
30    /// Creates a new record with the current timestamp.
31    pub fn new(event: impl Into<String>, data: impl Serialize) -> Self {
32        let ts = SystemTime::now()
33            .duration_since(UNIX_EPOCH)
34            .unwrap_or_default()
35            .as_millis() as u64;
36
37        Self {
38            ts,
39            event: event.into(),
40            data: serde_json::to_value(data).unwrap_or(serde_json::Value::Null),
41        }
42    }
43
44    /// Creates a record for an EventBus event.
45    pub fn from_bus_event(event: &Event) -> Self {
46        Self::new("bus.publish", event)
47    }
48
49    /// Creates a record for a UX event.
50    pub fn from_ux_event(ux_event: &UxEvent) -> Self {
51        // Extract the event type from the UxEvent's serde tag
52        let event_type = match ux_event {
53            UxEvent::TerminalWrite(_) => "ux.terminal.write",
54            UxEvent::TerminalResize(_) => "ux.terminal.resize",
55            UxEvent::TerminalColorMode(_) => "ux.terminal.color_mode",
56            UxEvent::TuiFrame(_) => "ux.tui.frame",
57        };
58        Self::new(event_type, ux_event)
59    }
60
61    /// Creates a metadata record for loop start.
62    pub fn meta_loop_start(prompt_file: &str, max_iterations: u32, ux_mode: Option<&str>) -> Self {
63        Self::new(
64            "_meta.loop_start",
65            serde_json::json!({
66                "prompt_file": prompt_file,
67                "max_iterations": max_iterations,
68                "ux_mode": ux_mode.unwrap_or("cli"),
69            }),
70        )
71    }
72
73    /// Creates a metadata record for an iteration.
74    pub fn meta_iteration(iteration: u32, elapsed_ms: u64, hat: &str) -> Self {
75        Self::new(
76            "_meta.iteration",
77            serde_json::json!({
78                "n": iteration,
79                "elapsed_ms": elapsed_ms,
80                "hat": hat,
81            }),
82        )
83    }
84
85    /// Creates a metadata record for termination.
86    pub fn meta_termination(
87        reason: &str,
88        iterations: u32,
89        elapsed_secs: f64,
90        ux_writes: u32,
91    ) -> Self {
92        Self::new(
93            "_meta.termination",
94            serde_json::json!({
95                "reason": reason,
96                "iterations": iterations,
97                "elapsed_secs": elapsed_secs,
98                "ux_writes": ux_writes,
99            }),
100        )
101    }
102}
103
104/// Records session events to a JSONL output.
105///
106/// The recorder is thread-safe and can be used as an EventBus observer.
107/// It writes each event as a JSON line immediately for crash resilience.
108///
109/// # Example
110///
111/// ```
112/// use ralph_core::SessionRecorder;
113/// use ralph_proto::Event;
114///
115/// let mut output = Vec::new();
116/// let recorder = SessionRecorder::new(&mut output);
117///
118/// // Record a bus event
119/// let event = Event::new("task.start", "Begin implementation");
120/// recorder.record_bus_event(&event);
121///
122/// // Flush and check output
123/// drop(recorder);
124/// let output_str = String::from_utf8_lossy(&output);
125/// assert!(output_str.contains("bus.publish"));
126/// ```
127pub struct SessionRecorder<W> {
128    /// The output writer, wrapped in a mutex for thread-safe access.
129    writer: Mutex<W>,
130
131    /// Start time for calculating session-relative offsets.
132    start_time: Instant,
133
134    /// Counter for UX write events recorded.
135    ux_write_count: Mutex<u32>,
136}
137
138impl<W: Write> SessionRecorder<W> {
139    /// Creates a new session recorder writing to the given output.
140    pub fn new(writer: W) -> Self {
141        Self {
142            writer: Mutex::new(writer),
143            start_time: Instant::now(),
144            ux_write_count: Mutex::new(0),
145        }
146    }
147
148    /// Records an EventBus event.
149    pub fn record_bus_event(&self, event: &Event) {
150        let record = Record::from_bus_event(event);
151        self.write_record(&record);
152    }
153
154    /// Records a UX event.
155    pub fn record_ux_event(&self, ux_event: &UxEvent) {
156        if matches!(ux_event, UxEvent::TerminalWrite(_))
157            && let Ok(mut count) = self.ux_write_count.lock()
158        {
159            *count += 1;
160        }
161        let record = Record::from_ux_event(ux_event);
162        self.write_record(&record);
163    }
164
165    /// Records multiple UX events.
166    pub fn record_ux_events(&self, events: &[UxEvent]) {
167        for event in events {
168            self.record_ux_event(event);
169        }
170    }
171
172    /// Records a metadata event.
173    pub fn record_meta(&self, record: Record) {
174        self.write_record(&record);
175    }
176
177    /// Returns the number of UX write events recorded.
178    pub fn ux_write_count(&self) -> u32 {
179        self.ux_write_count.lock().map(|g| *g).unwrap_or(0)
180    }
181
182    /// Returns the elapsed time since recording started.
183    pub fn elapsed(&self) -> std::time::Duration {
184        self.start_time.elapsed()
185    }
186
187    /// Writes a record to the output.
188    fn write_record(&self, record: &Record) {
189        if let Ok(mut writer) = self.writer.lock() {
190            // Ignore write errors - recording should not interrupt execution
191            if let Ok(json) = serde_json::to_string(record) {
192                let _ = writeln!(writer, "{}", json);
193            }
194        }
195    }
196
197    /// Flushes the underlying writer.
198    pub fn flush(&self) -> io::Result<()> {
199        self.writer
200            .lock()
201            .map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to acquire writer lock"))?
202            .flush()
203    }
204}
205
206impl<W: Write + Send + 'static> SessionRecorder<W> {
207    /// Creates an observer closure suitable for EventBus::set_observer.
208    ///
209    /// The returned closure holds a reference to this recorder and calls
210    /// `record_bus_event` for each event received.
211    ///
212    /// # Example
213    ///
214    /// ```ignore
215    /// let recorder = Arc::new(SessionRecorder::new(file));
216    /// let observer = SessionRecorder::make_observer(Arc::clone(&recorder));
217    /// event_bus.set_observer(observer);
218    /// ```
219    pub fn make_observer(recorder: std::sync::Arc<Self>) -> impl Fn(&Event) + Send + 'static {
220        move |event| {
221            recorder.record_bus_event(event);
222        }
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229
230    #[test]
231    fn test_record_bus_event() {
232        let mut output = Vec::new();
233        {
234            let recorder = SessionRecorder::new(&mut output);
235            let event = Event::new("task.start", "Begin work");
236            recorder.record_bus_event(&event);
237        }
238
239        let output_str = String::from_utf8_lossy(&output);
240        assert!(output_str.contains("bus.publish"));
241        assert!(output_str.contains("task.start"));
242        assert!(output_str.contains("Begin work"));
243    }
244
245    #[test]
246    fn test_record_ux_event() {
247        use ralph_proto::TerminalWrite;
248
249        let mut output = Vec::new();
250        {
251            let recorder = SessionRecorder::new(&mut output);
252            let ux_event = UxEvent::TerminalWrite(TerminalWrite::new(b"Hello", true, 100));
253            recorder.record_ux_event(&ux_event);
254        }
255
256        let output_str = String::from_utf8_lossy(&output);
257        assert!(output_str.contains("ux.terminal.write"));
258        assert!(output_str.contains("SGVsbG8=")); // "Hello" in base64
259    }
260
261    #[test]
262    fn test_record_metadata() {
263        let mut output = Vec::new();
264        {
265            let recorder = SessionRecorder::new(&mut output);
266            recorder.record_meta(Record::meta_loop_start("PROMPT.md", 100, Some("cli")));
267            recorder.record_meta(Record::meta_iteration(1, 5000, "default"));
268            recorder.record_meta(Record::meta_termination("CompletionPromise", 3, 25.5, 42));
269        }
270
271        let output_str = String::from_utf8_lossy(&output);
272        assert!(output_str.contains("_meta.loop_start"));
273        assert!(output_str.contains("_meta.iteration"));
274        assert!(output_str.contains("_meta.termination"));
275        assert!(output_str.contains("PROMPT.md"));
276        assert!(output_str.contains("CompletionPromise"));
277    }
278
279    #[test]
280    fn test_jsonl_format() {
281        let mut output = Vec::new();
282        {
283            let recorder = SessionRecorder::new(&mut output);
284            recorder.record_bus_event(&Event::new("test.1", "First"));
285            recorder.record_bus_event(&Event::new("test.2", "Second"));
286        }
287
288        let output_str = String::from_utf8_lossy(&output);
289        let lines: Vec<&str> = output_str.lines().collect();
290
291        // Should have exactly 2 lines (JSONL format)
292        assert_eq!(lines.len(), 2);
293
294        // Each line should be valid JSON
295        for line in lines {
296            let parsed: Result<serde_json::Value, _> = serde_json::from_str(line);
297            assert!(parsed.is_ok(), "Line should be valid JSON: {}", line);
298        }
299    }
300
301    #[test]
302    fn test_ux_write_count() {
303        use ralph_proto::{TerminalResize, TerminalWrite};
304
305        let output = Vec::new();
306        let recorder = SessionRecorder::new(output);
307
308        // Record some UX events
309        recorder.record_ux_event(&UxEvent::TerminalWrite(TerminalWrite::new(b"a", true, 0)));
310        recorder.record_ux_event(&UxEvent::TerminalResize(TerminalResize::new(80, 24, 10)));
311        recorder.record_ux_event(&UxEvent::TerminalWrite(TerminalWrite::new(b"b", true, 20)));
312
313        // Only TerminalWrite events should be counted
314        assert_eq!(recorder.ux_write_count(), 2);
315    }
316
317    #[test]
318    fn test_record_roundtrip() {
319        let event = Event::new("task.done", "Finished");
320        let record = Record::from_bus_event(&event);
321
322        // Serialize and deserialize
323        let json = serde_json::to_string(&record).unwrap();
324        let parsed: Record = serde_json::from_str(&json).unwrap();
325
326        assert_eq!(parsed.event, "bus.publish");
327        assert!(parsed.ts > 0);
328    }
329}