Skip to main content

ralph_core/
session_player.rs

1//! Session player for replaying recorded JSONL sessions.
2//!
3//! `SessionPlayer` reads events from JSONL files and replays them with
4//! configurable timing. Supports terminal output replay (with ANSI colors),
5//! plain text mode (ANSI stripped), and step-through debugging.
6
7use ralph_proto::{TerminalWrite, UxEvent};
8use std::io::{self, BufRead, Write};
9use std::time::Duration;
10
11use crate::session_recorder::Record;
12
13/// Replay mode for session playback.
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum ReplayMode {
16    /// Re-render to terminal with timing and colors preserved.
17    Terminal,
18    /// Strip ANSI codes, output plain text.
19    Text,
20}
21
22/// Configuration for session playback.
23#[derive(Debug, Clone)]
24pub struct PlayerConfig {
25    /// Replay speed multiplier (1.0 = original speed, 2.0 = 2x faster).
26    pub speed: f32,
27
28    /// If true, pause after each event and wait for Enter.
29    pub step_mode: bool,
30
31    /// Output mode for UX events.
32    pub replay_mode: ReplayMode,
33
34    /// Filter to specific event types (empty = all events).
35    pub event_filter: Vec<String>,
36}
37
38impl Default for PlayerConfig {
39    fn default() -> Self {
40        Self {
41            speed: 1.0,
42            step_mode: false,
43            replay_mode: ReplayMode::Terminal,
44            event_filter: Vec::new(),
45        }
46    }
47}
48
49impl PlayerConfig {
50    /// Creates a new config with terminal replay mode.
51    pub fn terminal() -> Self {
52        Self::default()
53    }
54
55    /// Creates a new config with text replay mode (ANSI stripped).
56    pub fn text() -> Self {
57        Self {
58            replay_mode: ReplayMode::Text,
59            ..Default::default()
60        }
61    }
62
63    /// Sets the speed multiplier.
64    pub fn with_speed(mut self, speed: f32) -> Self {
65        self.speed = speed.max(0.1); // Minimum 0.1x speed
66        self
67    }
68
69    /// Enables step-through mode.
70    pub fn with_step_mode(mut self) -> Self {
71        self.step_mode = true;
72        self
73    }
74
75    /// Filters to specific event types.
76    pub fn with_filter(mut self, events: Vec<String>) -> Self {
77        self.event_filter = events;
78        self
79    }
80}
81
82/// A parsed record with timing information for replay.
83#[derive(Debug, Clone)]
84pub struct TimestampedRecord {
85    /// The original record.
86    pub record: Record,
87
88    /// Offset from session start in milliseconds.
89    pub offset_ms: u64,
90}
91
92/// Plays back recorded sessions.
93///
94/// `SessionPlayer` reads JSONL records, extracts timing information,
95/// and replays events with configurable speed and output modes.
96///
97/// # Example
98///
99/// ```
100/// use ralph_core::{SessionPlayer, PlayerConfig};
101/// use std::io::Cursor;
102///
103/// let jsonl = r#"{"ts":1000,"event":"ux.terminal.write","data":{"bytes":"SGVsbG8=","stdout":true,"offset_ms":0}}
104/// {"ts":1100,"event":"ux.terminal.write","data":{"bytes":"V29ybGQ=","stdout":true,"offset_ms":100}}"#;
105///
106/// let reader = Cursor::new(jsonl);
107/// let player = SessionPlayer::from_reader(reader).unwrap();
108///
109/// assert_eq!(player.record_count(), 2);
110/// ```
111#[derive(Debug)]
112pub struct SessionPlayer {
113    /// Parsed records with timing.
114    records: Vec<TimestampedRecord>,
115
116    /// Playback configuration.
117    config: PlayerConfig,
118
119    /// Current playback position.
120    position: usize,
121}
122
123impl SessionPlayer {
124    /// Creates a player from a JSONL reader.
125    pub fn from_reader<R: BufRead>(reader: R) -> io::Result<Self> {
126        let mut records = Vec::new();
127        let mut first_ts: Option<u64> = None;
128
129        for line in reader.lines() {
130            let line = line?;
131            if line.trim().is_empty() {
132                continue;
133            }
134
135            let record: Record = serde_json::from_str(&line).map_err(|e| {
136                io::Error::new(
137                    io::ErrorKind::InvalidData,
138                    format!("Invalid JSON record: {}", e),
139                )
140            })?;
141
142            // Calculate offset from session start
143            let ts = record.ts;
144            let base_ts = *first_ts.get_or_insert(ts);
145            let offset_ms = ts.saturating_sub(base_ts);
146
147            records.push(TimestampedRecord { record, offset_ms });
148        }
149
150        Ok(Self {
151            records,
152            config: PlayerConfig::default(),
153            position: 0,
154        })
155    }
156
157    /// Creates a player from raw JSONL bytes.
158    pub fn from_bytes(bytes: &[u8]) -> io::Result<Self> {
159        Self::from_reader(io::BufReader::new(bytes))
160    }
161
162    /// Sets the playback configuration.
163    pub fn with_config(mut self, config: PlayerConfig) -> Self {
164        self.config = config;
165        self
166    }
167
168    /// Returns the number of records.
169    pub fn record_count(&self) -> usize {
170        self.records.len()
171    }
172
173    /// Returns all records.
174    pub fn records(&self) -> &[TimestampedRecord] {
175        &self.records
176    }
177
178    /// Returns records filtered by event type.
179    pub fn filter_by_event(&self, event_prefix: &str) -> Vec<&TimestampedRecord> {
180        self.records
181            .iter()
182            .filter(|r| r.record.event.starts_with(event_prefix))
183            .collect()
184    }
185
186    /// Returns only UX terminal write events.
187    pub fn terminal_writes(&self) -> Vec<&TimestampedRecord> {
188        self.filter_by_event("ux.terminal.write")
189    }
190
191    /// Returns only metadata events.
192    pub fn metadata_events(&self) -> Vec<&TimestampedRecord> {
193        self.filter_by_event("_meta.")
194    }
195
196    /// Returns only bus events.
197    pub fn bus_events(&self) -> Vec<&TimestampedRecord> {
198        self.filter_by_event("bus.")
199    }
200
201    /// Resets playback to the beginning.
202    pub fn reset(&mut self) {
203        self.position = 0;
204    }
205
206    /// Replays all UX terminal events to the given writer.
207    ///
208    /// This is a synchronous replay that respects timing delays adjusted
209    /// by the speed multiplier. In step mode, it waits for Enter after
210    /// each event.
211    pub fn replay_terminal<W: Write>(&mut self, writer: &mut W) -> io::Result<()> {
212        self.reset();
213        let mut last_offset_ms: u64 = 0;
214
215        let terminal_writes = self.terminal_writes();
216        for record in terminal_writes {
217            // Calculate delay from previous event
218            let delay_ms = record.offset_ms.saturating_sub(last_offset_ms);
219            last_offset_ms = record.offset_ms;
220
221            // Apply speed multiplier
222            if !self.config.step_mode && delay_ms > 0 && self.config.speed > 0.0 {
223                let adjusted_delay = (delay_ms as f32 / self.config.speed) as u64;
224                if adjusted_delay > 0 {
225                    std::thread::sleep(Duration::from_millis(adjusted_delay));
226                }
227            }
228
229            // Parse and output the terminal write
230            if let Ok(ux_event) = Self::parse_ux_event(&record.record) {
231                if let UxEvent::TerminalWrite(write) = ux_event {
232                    self.output_terminal_write(writer, &write)?;
233                }
234            }
235
236            // Step mode: wait for Enter
237            if self.config.step_mode {
238                writer.flush()?;
239                let mut input = String::new();
240                io::stdin().read_line(&mut input)?;
241            }
242        }
243
244        writer.flush()
245    }
246
247    /// Outputs a terminal write event based on replay mode.
248    fn output_terminal_write<W: Write>(
249        &self,
250        writer: &mut W,
251        write: &TerminalWrite,
252    ) -> io::Result<()> {
253        let bytes = write.decode_bytes().map_err(|e| {
254            io::Error::new(
255                io::ErrorKind::InvalidData,
256                format!("Failed to decode base64: {}", e),
257            )
258        })?;
259
260        match self.config.replay_mode {
261            ReplayMode::Terminal => {
262                // Output raw bytes (preserves ANSI sequences)
263                writer.write_all(&bytes)?;
264            }
265            ReplayMode::Text => {
266                // Strip ANSI sequences
267                let stripped = strip_ansi(&bytes);
268                writer.write_all(&stripped)?;
269            }
270        }
271
272        Ok(())
273    }
274
275    /// Parses a Record's data field as a UxEvent.
276    fn parse_ux_event(record: &Record) -> Result<UxEvent, serde_json::Error> {
277        // The record stores data without the event tag, so we need to reconstruct
278        // the tagged format for UxEvent deserialization
279        let tagged = serde_json::json!({
280            "event": record.event,
281            "data": record.data,
282        });
283        serde_json::from_value(tagged)
284    }
285
286    /// Collects all terminal output as a single string (for snapshot testing).
287    pub fn collect_terminal_output(&self) -> io::Result<String> {
288        let mut output = Vec::new();
289
290        for record in self.terminal_writes() {
291            if let Ok(ux_event) = Self::parse_ux_event(&record.record) {
292                if let UxEvent::TerminalWrite(write) = ux_event {
293                    let bytes = write.decode_bytes().map_err(|e| {
294                        io::Error::new(
295                            io::ErrorKind::InvalidData,
296                            format!("Failed to decode base64: {}", e),
297                        )
298                    })?;
299                    output.extend_from_slice(&bytes);
300                }
301            }
302        }
303
304        String::from_utf8(output).map_err(|e| {
305            io::Error::new(
306                io::ErrorKind::InvalidData,
307                format!("Invalid UTF-8 in terminal output: {}", e),
308            )
309        })
310    }
311
312    /// Collects terminal output with ANSI codes stripped (for text snapshot testing).
313    pub fn collect_text_output(&self) -> io::Result<String> {
314        let raw = self.collect_terminal_output()?;
315        Ok(String::from_utf8_lossy(&strip_ansi(raw.as_bytes())).into_owned())
316    }
317
318    /// Collects terminal output with ANSI codes escaped (for ANSI snapshot testing).
319    pub fn collect_ansi_escaped(&self) -> io::Result<String> {
320        let raw = self.collect_terminal_output()?;
321        Ok(escape_ansi(&raw))
322    }
323}
324
325/// Strips ANSI escape sequences from bytes.
326///
327/// Handles CSI sequences (\x1b[...m), OSC sequences (\x1b]...\x07),
328/// and simple escape sequences (\x1b followed by a single char).
329fn strip_ansi(bytes: &[u8]) -> Vec<u8> {
330    let mut result = Vec::with_capacity(bytes.len());
331    let mut i = 0;
332
333    while i < bytes.len() {
334        if bytes[i] == 0x1b {
335            // ESC character - start of escape sequence
336            i += 1;
337            if i >= bytes.len() {
338                break;
339            }
340
341            match bytes[i] {
342                b'[' => {
343                    // CSI sequence: ESC [ ... (final byte in 0x40-0x7E range)
344                    i += 1;
345                    while i < bytes.len() && !(0x40..=0x7E).contains(&bytes[i]) {
346                        i += 1;
347                    }
348                    if i < bytes.len() {
349                        i += 1; // Skip final byte
350                    }
351                }
352                b']' => {
353                    // OSC sequence: ESC ] ... (terminated by BEL or ST)
354                    i += 1;
355                    while i < bytes.len() {
356                        if bytes[i] == 0x07 {
357                            i += 1;
358                            break;
359                        }
360                        if bytes[i] == 0x1b && i + 1 < bytes.len() && bytes[i + 1] == b'\\' {
361                            i += 2;
362                            break;
363                        }
364                        i += 1;
365                    }
366                }
367                _ => {
368                    // Simple escape sequence: ESC + single char
369                    i += 1;
370                }
371            }
372        } else {
373            result.push(bytes[i]);
374            i += 1;
375        }
376    }
377
378    result
379}
380
381/// Escapes ANSI sequences for visibility in snapshots.
382///
383/// Converts \x1b to `\x1b` literal string for readable diff comparisons.
384fn escape_ansi(s: &str) -> String {
385    s.replace('\x1b', "\\x1b")
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391
392    fn make_write_record(bytes: &[u8], stdout: bool, offset_ms: u64, base_ts: u64) -> String {
393        let write = TerminalWrite::new(bytes, stdout, offset_ms);
394        let record = Record {
395            ts: base_ts + offset_ms,
396            event: "ux.terminal.write".to_string(),
397            data: serde_json::to_value(&write).unwrap(),
398        };
399        serde_json::to_string(&record).unwrap()
400    }
401
402    #[test]
403    fn test_player_from_reader() {
404        let line1 = make_write_record(b"Hello", true, 0, 1000);
405        let line2 = make_write_record(b"World", true, 100, 1000);
406        let jsonl = format!("{}\n{}\n", line1, line2);
407
408        let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
409
410        assert_eq!(player.record_count(), 2);
411        assert_eq!(player.records[0].offset_ms, 0);
412        assert_eq!(player.records[1].offset_ms, 100);
413    }
414
415    #[test]
416    fn test_filter_by_event() {
417        let write = make_write_record(b"test", true, 0, 1000);
418        let meta = r#"{"ts":1000,"event":"_meta.loop_start","data":{"prompt_file":"PROMPT.md"}}"#;
419        let bus = r#"{"ts":1050,"event":"bus.publish","data":{"topic":"task.start"}}"#;
420
421        let jsonl = format!("{}\n{}\n{}\n", write, meta, bus);
422        let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
423
424        assert_eq!(player.terminal_writes().len(), 1);
425        assert_eq!(player.metadata_events().len(), 1);
426        assert_eq!(player.bus_events().len(), 1);
427    }
428
429    #[test]
430    fn test_collect_terminal_output() {
431        let line1 = make_write_record(b"Hello, ", true, 0, 1000);
432        let line2 = make_write_record(b"World!", true, 50, 1000);
433        let jsonl = format!("{}\n{}\n", line1, line2);
434
435        let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
436        let output = player.collect_terminal_output().unwrap();
437
438        assert_eq!(output, "Hello, World!");
439    }
440
441    #[test]
442    fn test_strip_ansi() {
443        let input = b"Hello, \x1b[32mWorld\x1b[0m!";
444        let stripped = strip_ansi(input);
445        assert_eq!(stripped, b"Hello, World!");
446    }
447
448    #[test]
449    fn test_strip_ansi_complex() {
450        // Multiple CSI sequences
451        let input = b"\x1b[1m\x1b[32mBold Green\x1b[0m Normal";
452        let stripped = strip_ansi(input);
453        assert_eq!(stripped, b"Bold Green Normal");
454    }
455
456    #[test]
457    fn test_escape_ansi() {
458        let input = "Hello \x1b[32mWorld\x1b[0m";
459        let escaped = escape_ansi(input);
460        assert_eq!(escaped, "Hello \\x1b[32mWorld\\x1b[0m");
461    }
462
463    #[test]
464    fn test_collect_text_output() {
465        let line = make_write_record(b"Hello \x1b[32mWorld\x1b[0m", true, 0, 1000);
466        let player = SessionPlayer::from_bytes(line.as_bytes()).unwrap();
467
468        let text = player.collect_text_output().unwrap();
469        assert_eq!(text, "Hello World");
470    }
471
472    #[test]
473    fn test_collect_ansi_escaped() {
474        let line = make_write_record(b"Hello \x1b[32mWorld\x1b[0m", true, 0, 1000);
475        let player = SessionPlayer::from_bytes(line.as_bytes()).unwrap();
476
477        let escaped = player.collect_ansi_escaped().unwrap();
478        assert_eq!(escaped, "Hello \\x1b[32mWorld\\x1b[0m");
479    }
480
481    #[test]
482    fn test_replay_terminal() {
483        let line1 = make_write_record(b"Hello", true, 0, 1000);
484        let line2 = make_write_record(b" World", true, 10, 1000);
485        let jsonl = format!("{}\n{}\n", line1, line2);
486
487        let mut player = SessionPlayer::from_bytes(jsonl.as_bytes())
488            .unwrap()
489            .with_config(PlayerConfig::terminal().with_speed(100.0)); // Fast replay
490
491        let mut output = Vec::new();
492        player.replay_terminal(&mut output).unwrap();
493
494        assert_eq!(String::from_utf8(output).unwrap(), "Hello World");
495    }
496
497    #[test]
498    fn test_replay_text_mode() {
499        let line = make_write_record(b"\x1b[32mGreen\x1b[0m", true, 0, 1000);
500        let mut player = SessionPlayer::from_bytes(line.as_bytes())
501            .unwrap()
502            .with_config(PlayerConfig::text());
503
504        let mut output = Vec::new();
505        player.replay_terminal(&mut output).unwrap();
506
507        assert_eq!(String::from_utf8(output).unwrap(), "Green");
508    }
509
510    #[test]
511    fn test_player_config_builder() {
512        let config = PlayerConfig::terminal()
513            .with_speed(2.0)
514            .with_step_mode()
515            .with_filter(vec!["ux.".to_string()]);
516
517        assert!((config.speed - 2.0).abs() < f32::EPSILON);
518        assert!(config.step_mode);
519        assert_eq!(config.event_filter, vec!["ux."]);
520    }
521
522    #[test]
523    fn test_empty_input() {
524        let player = SessionPlayer::from_bytes(b"").unwrap();
525        assert_eq!(player.record_count(), 0);
526    }
527
528    #[test]
529    fn test_whitespace_lines_skipped() {
530        let line = make_write_record(b"test", true, 0, 1000);
531        let jsonl = format!("\n  \n{}\n\n", line);
532
533        let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
534        assert_eq!(player.record_count(), 1);
535    }
536}