Skip to main content

ralph_core/
session_player.rs

1//! Session player for replaying recorded JSONL sessions.
2//!
3//! `SessionPlayer` reads events from JSONL files and replays them with
4//! configurable timing. Supports terminal output replay (with ANSI colors),
5//! plain text mode (ANSI stripped), and step-through debugging.
6
7use ralph_proto::{TerminalWrite, UxEvent};
8use std::io::{self, BufRead, Write};
9use std::time::Duration;
10
11use crate::session_recorder::Record;
12
13/// Replay mode for session playback.
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum ReplayMode {
16    /// Re-render to terminal with timing and colors preserved.
17    Terminal,
18    /// Strip ANSI codes, output plain text.
19    Text,
20}
21
22/// Configuration for session playback.
23#[derive(Debug, Clone)]
24pub struct PlayerConfig {
25    /// Replay speed multiplier (1.0 = original speed, 2.0 = 2x faster).
26    pub speed: f32,
27
28    /// If true, pause after each event and wait for Enter.
29    pub step_mode: bool,
30
31    /// Output mode for UX events.
32    pub replay_mode: ReplayMode,
33
34    /// Filter to specific event types (empty = all events).
35    pub event_filter: Vec<String>,
36}
37
38impl Default for PlayerConfig {
39    fn default() -> Self {
40        Self {
41            speed: 1.0,
42            step_mode: false,
43            replay_mode: ReplayMode::Terminal,
44            event_filter: Vec::new(),
45        }
46    }
47}
48
49impl PlayerConfig {
50    /// Creates a new config with terminal replay mode.
51    pub fn terminal() -> Self {
52        Self::default()
53    }
54
55    /// Creates a new config with text replay mode (ANSI stripped).
56    pub fn text() -> Self {
57        Self {
58            replay_mode: ReplayMode::Text,
59            ..Default::default()
60        }
61    }
62
63    /// Sets the speed multiplier.
64    pub fn with_speed(mut self, speed: f32) -> Self {
65        self.speed = speed.max(0.1); // Minimum 0.1x speed
66        self
67    }
68
69    /// Enables step-through mode.
70    pub fn with_step_mode(mut self) -> Self {
71        self.step_mode = true;
72        self
73    }
74
75    /// Filters to specific event types.
76    pub fn with_filter(mut self, events: Vec<String>) -> Self {
77        self.event_filter = events;
78        self
79    }
80}
81
82/// A parsed record with timing information for replay.
83#[derive(Debug, Clone)]
84pub struct TimestampedRecord {
85    /// The original record.
86    pub record: Record,
87
88    /// Offset from session start in milliseconds.
89    pub offset_ms: u64,
90}
91
92/// Plays back recorded sessions.
93///
94/// `SessionPlayer` reads JSONL records, extracts timing information,
95/// and replays events with configurable speed and output modes.
96///
97/// # Example
98///
99/// ```
100/// use ralph_core::{SessionPlayer, PlayerConfig};
101/// use std::io::Cursor;
102///
103/// let jsonl = r#"{"ts":1000,"event":"ux.terminal.write","data":{"bytes":"SGVsbG8=","stdout":true,"offset_ms":0}}
104/// {"ts":1100,"event":"ux.terminal.write","data":{"bytes":"V29ybGQ=","stdout":true,"offset_ms":100}}"#;
105///
106/// let reader = Cursor::new(jsonl);
107/// let player = SessionPlayer::from_reader(reader).unwrap();
108///
109/// assert_eq!(player.record_count(), 2);
110/// ```
111#[derive(Debug)]
112pub struct SessionPlayer {
113    /// Parsed records with timing.
114    records: Vec<TimestampedRecord>,
115
116    /// Playback configuration.
117    config: PlayerConfig,
118
119    /// Current playback position.
120    position: usize,
121}
122
123impl SessionPlayer {
124    /// Creates a player from a JSONL reader.
125    pub fn from_reader<R: BufRead>(reader: R) -> io::Result<Self> {
126        let mut records = Vec::new();
127        let mut first_ts: Option<u64> = None;
128
129        for line in reader.lines() {
130            let line = line?;
131            if line.trim().is_empty() {
132                continue;
133            }
134
135            let record: Record = serde_json::from_str(&line).map_err(|e| {
136                io::Error::new(
137                    io::ErrorKind::InvalidData,
138                    format!("Invalid JSON record: {}", e),
139                )
140            })?;
141
142            // Calculate offset from session start
143            let ts = record.ts;
144            let base_ts = *first_ts.get_or_insert(ts);
145            let offset_ms = ts.saturating_sub(base_ts);
146
147            records.push(TimestampedRecord { record, offset_ms });
148        }
149
150        Ok(Self {
151            records,
152            config: PlayerConfig::default(),
153            position: 0,
154        })
155    }
156
157    /// Creates a player from raw JSONL bytes.
158    pub fn from_bytes(bytes: &[u8]) -> io::Result<Self> {
159        Self::from_reader(io::BufReader::new(bytes))
160    }
161
162    /// Sets the playback configuration.
163    pub fn with_config(mut self, config: PlayerConfig) -> Self {
164        self.config = config;
165        self
166    }
167
168    /// Returns the number of records.
169    pub fn record_count(&self) -> usize {
170        self.records.len()
171    }
172
173    /// Returns all records.
174    pub fn records(&self) -> &[TimestampedRecord] {
175        &self.records
176    }
177
178    /// Returns records filtered by event type.
179    pub fn filter_by_event(&self, event_prefix: &str) -> Vec<&TimestampedRecord> {
180        self.records
181            .iter()
182            .filter(|r| r.record.event.starts_with(event_prefix))
183            .collect()
184    }
185
186    /// Returns only UX terminal write events.
187    pub fn terminal_writes(&self) -> Vec<&TimestampedRecord> {
188        self.filter_by_event("ux.terminal.write")
189    }
190
191    /// Returns only metadata events.
192    pub fn metadata_events(&self) -> Vec<&TimestampedRecord> {
193        self.filter_by_event("_meta.")
194    }
195
196    /// Returns only bus events.
197    pub fn bus_events(&self) -> Vec<&TimestampedRecord> {
198        self.filter_by_event("bus.")
199    }
200
201    /// Resets playback to the beginning.
202    pub fn reset(&mut self) {
203        self.position = 0;
204    }
205
206    /// Replays all UX terminal events to the given writer.
207    ///
208    /// This is a synchronous replay that respects timing delays adjusted
209    /// by the speed multiplier. In step mode, it waits for Enter after
210    /// each event.
211    pub fn replay_terminal<W: Write>(&mut self, writer: &mut W) -> io::Result<()> {
212        self.reset();
213        let mut last_offset_ms: u64 = 0;
214
215        let terminal_writes = self.terminal_writes();
216        for record in terminal_writes {
217            // Calculate delay from previous event
218            let delay_ms = record.offset_ms.saturating_sub(last_offset_ms);
219            last_offset_ms = record.offset_ms;
220
221            // Apply speed multiplier
222            if !self.config.step_mode && delay_ms > 0 && self.config.speed > 0.0 {
223                let adjusted_delay = (delay_ms as f32 / self.config.speed) as u64;
224                if adjusted_delay > 0 {
225                    std::thread::sleep(Duration::from_millis(adjusted_delay));
226                }
227            }
228
229            // Parse and output the terminal write
230            if let Ok(UxEvent::TerminalWrite(write)) = Self::parse_ux_event(&record.record) {
231                self.output_terminal_write(writer, &write)?;
232            }
233
234            // Step mode: wait for Enter
235            if self.config.step_mode {
236                writer.flush()?;
237                let mut input = String::new();
238                io::stdin().read_line(&mut input)?;
239            }
240        }
241
242        writer.flush()
243    }
244
245    /// Outputs a terminal write event based on replay mode.
246    fn output_terminal_write<W: Write>(
247        &self,
248        writer: &mut W,
249        write: &TerminalWrite,
250    ) -> io::Result<()> {
251        let bytes = write.decode_bytes().map_err(|e| {
252            io::Error::new(
253                io::ErrorKind::InvalidData,
254                format!("Failed to decode base64: {}", e),
255            )
256        })?;
257
258        match self.config.replay_mode {
259            ReplayMode::Terminal => {
260                // Output raw bytes (preserves ANSI sequences)
261                writer.write_all(&bytes)?;
262            }
263            ReplayMode::Text => {
264                // Strip ANSI sequences
265                let stripped = strip_ansi(&bytes);
266                writer.write_all(&stripped)?;
267            }
268        }
269
270        Ok(())
271    }
272
273    /// Parses a Record's data field as a UxEvent.
274    fn parse_ux_event(record: &Record) -> Result<UxEvent, serde_json::Error> {
275        // The record stores data without the event tag, so we need to reconstruct
276        // the tagged format for UxEvent deserialization
277        let tagged = serde_json::json!({
278            "event": record.event,
279            "data": record.data,
280        });
281        serde_json::from_value(tagged)
282    }
283
284    /// Collects all terminal output as a single string (for snapshot testing).
285    pub fn collect_terminal_output(&self) -> io::Result<String> {
286        let mut output = Vec::new();
287
288        for record in self.terminal_writes() {
289            if let Ok(UxEvent::TerminalWrite(write)) = Self::parse_ux_event(&record.record) {
290                let bytes = write.decode_bytes().map_err(|e| {
291                    io::Error::new(
292                        io::ErrorKind::InvalidData,
293                        format!("Failed to decode base64: {}", e),
294                    )
295                })?;
296                output.extend_from_slice(&bytes);
297            }
298        }
299
300        String::from_utf8(output).map_err(|e| {
301            io::Error::new(
302                io::ErrorKind::InvalidData,
303                format!("Invalid UTF-8 in terminal output: {}", e),
304            )
305        })
306    }
307
308    /// Collects terminal output with ANSI codes stripped (for text snapshot testing).
309    pub fn collect_text_output(&self) -> io::Result<String> {
310        let raw = self.collect_terminal_output()?;
311        Ok(String::from_utf8_lossy(&strip_ansi(raw.as_bytes())).into_owned())
312    }
313
314    /// Collects terminal output with ANSI codes escaped (for ANSI snapshot testing).
315    pub fn collect_ansi_escaped(&self) -> io::Result<String> {
316        let raw = self.collect_terminal_output()?;
317        Ok(escape_ansi(&raw))
318    }
319}
320
321/// Strips ANSI escape sequences from bytes.
322///
323/// Handles CSI sequences (\x1b[...m), OSC sequences (\x1b]...\x07),
324/// and simple escape sequences (\x1b followed by a single char).
325fn strip_ansi(bytes: &[u8]) -> Vec<u8> {
326    let mut result = Vec::with_capacity(bytes.len());
327    let mut i = 0;
328
329    while i < bytes.len() {
330        if bytes[i] == 0x1b {
331            // ESC character - start of escape sequence
332            i += 1;
333            if i >= bytes.len() {
334                break;
335            }
336
337            match bytes[i] {
338                b'[' => {
339                    // CSI sequence: ESC [ ... (final byte in 0x40-0x7E range)
340                    i += 1;
341                    while i < bytes.len() && !(0x40..=0x7E).contains(&bytes[i]) {
342                        i += 1;
343                    }
344                    if i < bytes.len() {
345                        i += 1; // Skip final byte
346                    }
347                }
348                b']' => {
349                    // OSC sequence: ESC ] ... (terminated by BEL or ST)
350                    i += 1;
351                    while i < bytes.len() {
352                        if bytes[i] == 0x07 {
353                            i += 1;
354                            break;
355                        }
356                        if bytes[i] == 0x1b && i + 1 < bytes.len() && bytes[i + 1] == b'\\' {
357                            i += 2;
358                            break;
359                        }
360                        i += 1;
361                    }
362                }
363                _ => {
364                    // Simple escape sequence: ESC + single char
365                    i += 1;
366                }
367            }
368        } else {
369            result.push(bytes[i]);
370            i += 1;
371        }
372    }
373
374    result
375}
376
377/// Escapes ANSI sequences for visibility in snapshots.
378///
379/// Converts \x1b to `\x1b` literal string for readable diff comparisons.
380fn escape_ansi(s: &str) -> String {
381    s.replace('\x1b', "\\x1b")
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387
388    fn make_write_record(bytes: &[u8], stdout: bool, offset_ms: u64, base_ts: u64) -> String {
389        let write = TerminalWrite::new(bytes, stdout, offset_ms);
390        let record = Record {
391            ts: base_ts + offset_ms,
392            event: "ux.terminal.write".to_string(),
393            data: serde_json::to_value(&write).unwrap(),
394        };
395        serde_json::to_string(&record).unwrap()
396    }
397
398    #[test]
399    fn test_player_from_reader() {
400        let line1 = make_write_record(b"Hello", true, 0, 1000);
401        let line2 = make_write_record(b"World", true, 100, 1000);
402        let jsonl = format!("{}\n{}\n", line1, line2);
403
404        let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
405
406        assert_eq!(player.record_count(), 2);
407        assert_eq!(player.records[0].offset_ms, 0);
408        assert_eq!(player.records[1].offset_ms, 100);
409    }
410
411    #[test]
412    fn test_filter_by_event() {
413        let write = make_write_record(b"test", true, 0, 1000);
414        let meta = r#"{"ts":1000,"event":"_meta.loop_start","data":{"prompt_file":"PROMPT.md"}}"#;
415        let bus = r#"{"ts":1050,"event":"bus.publish","data":{"topic":"task.start"}}"#;
416
417        let jsonl = format!("{}\n{}\n{}\n", write, meta, bus);
418        let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
419
420        assert_eq!(player.terminal_writes().len(), 1);
421        assert_eq!(player.metadata_events().len(), 1);
422        assert_eq!(player.bus_events().len(), 1);
423    }
424
425    #[test]
426    fn test_collect_terminal_output() {
427        let line1 = make_write_record(b"Hello, ", true, 0, 1000);
428        let line2 = make_write_record(b"World!", true, 50, 1000);
429        let jsonl = format!("{}\n{}\n", line1, line2);
430
431        let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
432        let output = player.collect_terminal_output().unwrap();
433
434        assert_eq!(output, "Hello, World!");
435    }
436
437    #[test]
438    fn test_strip_ansi() {
439        let input = b"Hello, \x1b[32mWorld\x1b[0m!";
440        let stripped = strip_ansi(input);
441        assert_eq!(stripped, b"Hello, World!");
442    }
443
444    #[test]
445    fn test_strip_ansi_complex() {
446        // Multiple CSI sequences
447        let input = b"\x1b[1m\x1b[32mBold Green\x1b[0m Normal";
448        let stripped = strip_ansi(input);
449        assert_eq!(stripped, b"Bold Green Normal");
450    }
451
452    #[test]
453    fn test_escape_ansi() {
454        let input = "Hello \x1b[32mWorld\x1b[0m";
455        let escaped = escape_ansi(input);
456        assert_eq!(escaped, "Hello \\x1b[32mWorld\\x1b[0m");
457    }
458
459    #[test]
460    fn test_collect_text_output() {
461        let line = make_write_record(b"Hello \x1b[32mWorld\x1b[0m", true, 0, 1000);
462        let player = SessionPlayer::from_bytes(line.as_bytes()).unwrap();
463
464        let text = player.collect_text_output().unwrap();
465        assert_eq!(text, "Hello World");
466    }
467
468    #[test]
469    fn test_collect_ansi_escaped() {
470        let line = make_write_record(b"Hello \x1b[32mWorld\x1b[0m", true, 0, 1000);
471        let player = SessionPlayer::from_bytes(line.as_bytes()).unwrap();
472
473        let escaped = player.collect_ansi_escaped().unwrap();
474        assert_eq!(escaped, "Hello \\x1b[32mWorld\\x1b[0m");
475    }
476
477    #[test]
478    fn test_replay_terminal() {
479        let line1 = make_write_record(b"Hello", true, 0, 1000);
480        let line2 = make_write_record(b" World", true, 10, 1000);
481        let jsonl = format!("{}\n{}\n", line1, line2);
482
483        let mut player = SessionPlayer::from_bytes(jsonl.as_bytes())
484            .unwrap()
485            .with_config(PlayerConfig::terminal().with_speed(100.0)); // Fast replay
486
487        let mut output = Vec::new();
488        player.replay_terminal(&mut output).unwrap();
489
490        assert_eq!(String::from_utf8(output).unwrap(), "Hello World");
491    }
492
493    #[test]
494    fn test_replay_text_mode() {
495        let line = make_write_record(b"\x1b[32mGreen\x1b[0m", true, 0, 1000);
496        let mut player = SessionPlayer::from_bytes(line.as_bytes())
497            .unwrap()
498            .with_config(PlayerConfig::text());
499
500        let mut output = Vec::new();
501        player.replay_terminal(&mut output).unwrap();
502
503        assert_eq!(String::from_utf8(output).unwrap(), "Green");
504    }
505
506    #[test]
507    fn test_player_config_builder() {
508        let config = PlayerConfig::terminal()
509            .with_speed(2.0)
510            .with_step_mode()
511            .with_filter(vec!["ux.".to_string()]);
512
513        assert!((config.speed - 2.0).abs() < f32::EPSILON);
514        assert!(config.step_mode);
515        assert_eq!(config.event_filter, vec!["ux."]);
516    }
517
518    #[test]
519    fn test_empty_input() {
520        let player = SessionPlayer::from_bytes(b"").unwrap();
521        assert_eq!(player.record_count(), 0);
522    }
523
524    #[test]
525    fn test_whitespace_lines_skipped() {
526        let line = make_write_record(b"test", true, 0, 1000);
527        let jsonl = format!("\n  \n{}\n\n", line);
528
529        let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
530        assert_eq!(player.record_count(), 1);
531    }
532}