ralph_core/testing/
replay_backend.rs

1//! Replay backend for deterministic testing using recorded JSONL sessions.
2//!
3//! `ReplayBackend` loads JSONL session files recorded by `SessionRecorder` and
4//! replays terminal output as mock CLI responses. This enables deterministic
5//! smoketesting without live API calls.
6//!
7//! # Example
8//!
9//! ```
10//! use ralph_core::testing::ReplayBackend;
11//! use std::io::Cursor;
12//!
13//! // Create a simple JSONL fixture
14//! let jsonl = r#"{"ts":1000,"event":"ux.terminal.write","data":{"bytes":"SGVsbG8=","stdout":true,"offset_ms":0}}"#;
15//! let mut backend = ReplayBackend::from_reader(Cursor::new(jsonl)).unwrap();
16//!
17//! // Get output chunks in order
18//! let chunk = backend.next_output().unwrap();
19//! assert_eq!(chunk, b"Hello");
20//! ```
21
22use crate::session_player::SessionPlayer;
23use ralph_proto::UxEvent;
24use std::io::{self, BufRead, BufReader};
25use std::path::Path;
26use std::time::Duration;
27
28/// Timing mode for replay.
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
30pub enum ReplayTimingMode {
31    /// Serve all output immediately without timing delays.
32    #[default]
33    Instant,
34    /// Honor recorded timing delays between outputs.
35    Realistic,
36}
37
38/// A backend that replays recorded JSONL session output.
39///
40/// Loads session recordings from `SessionRecorder` and serves terminal output
41/// chunks in the order they were recorded. Supports configurable timing modes
42/// for fast tests or realistic replay.
43#[derive(Debug)]
44pub struct ReplayBackend {
45    /// The underlying session player with parsed records.
46    player: SessionPlayer,
47    /// Current position in the terminal writes.
48    position: usize,
49    /// Timing mode for replay.
50    timing_mode: ReplayTimingMode,
51    /// Cached terminal write indices for efficient iteration.
52    terminal_write_indices: Vec<usize>,
53    /// Last offset for timing calculations.
54    last_offset_ms: u64,
55}
56
57impl ReplayBackend {
58    /// Creates a replay backend from a JSONL file.
59    ///
60    /// # Errors
61    ///
62    /// Returns an error if the file cannot be opened or contains invalid JSON.
63    pub fn from_file(path: impl AsRef<Path>) -> io::Result<Self> {
64        let file = std::fs::File::open(path.as_ref())?;
65        let reader = BufReader::new(file);
66        Self::from_reader(reader)
67    }
68
69    /// Creates a replay backend from a JSONL reader.
70    ///
71    /// # Errors
72    ///
73    /// Returns an error if the JSONL data is malformed.
74    pub fn from_reader<R: BufRead>(reader: R) -> io::Result<Self> {
75        let player = SessionPlayer::from_reader(reader)?;
76
77        // Pre-compute indices of terminal write records for efficient iteration
78        let terminal_write_indices: Vec<usize> = player
79            .records()
80            .iter()
81            .enumerate()
82            .filter(|(_, r)| r.record.event == "ux.terminal.write")
83            .map(|(i, _)| i)
84            .collect();
85
86        Ok(Self {
87            player,
88            position: 0,
89            timing_mode: ReplayTimingMode::default(),
90            terminal_write_indices,
91            last_offset_ms: 0,
92        })
93    }
94
95    /// Creates a replay backend from raw JSONL bytes.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if the data is malformed.
100    pub fn from_bytes(bytes: &[u8]) -> io::Result<Self> {
101        Self::from_reader(io::BufReader::new(bytes))
102    }
103
104    /// Sets the timing mode for replay.
105    pub fn with_timing(mut self, mode: ReplayTimingMode) -> Self {
106        self.timing_mode = mode;
107        self
108    }
109
110    /// Returns the next terminal output chunk, or `None` if exhausted.
111    ///
112    /// In `Realistic` timing mode, this will sleep for the recorded delay
113    /// between outputs.
114    pub fn next_output(&mut self) -> Option<Vec<u8>> {
115        if self.position >= self.terminal_write_indices.len() {
116            return None;
117        }
118
119        let record_idx = self.terminal_write_indices[self.position];
120        let record = &self.player.records()[record_idx];
121
122        // Handle timing in realistic mode
123        if self.timing_mode == ReplayTimingMode::Realistic && self.position > 0 {
124            let delay_ms = record.offset_ms.saturating_sub(self.last_offset_ms);
125            if delay_ms > 0 {
126                std::thread::sleep(Duration::from_millis(delay_ms));
127            }
128        }
129        self.last_offset_ms = record.offset_ms;
130
131        // Parse and decode the terminal write
132        let bytes = self.parse_terminal_write(&record.record)?;
133        self.position += 1;
134        Some(bytes)
135    }
136
137    /// Returns `true` if all output has been served.
138    pub fn is_exhausted(&self) -> bool {
139        self.position >= self.terminal_write_indices.len()
140    }
141
142    /// Returns the total number of terminal write events.
143    pub fn output_count(&self) -> usize {
144        self.terminal_write_indices.len()
145    }
146
147    /// Returns the number of outputs already served.
148    pub fn outputs_served(&self) -> usize {
149        self.position
150    }
151
152    /// Resets the replay to the beginning.
153    pub fn reset(&mut self) {
154        self.position = 0;
155        self.last_offset_ms = 0;
156    }
157
158    /// Collects all remaining output into a single byte vector.
159    ///
160    /// This consumes all remaining output chunks. The backend will be
161    /// exhausted after calling this method.
162    pub fn collect_remaining(&mut self) -> Vec<u8> {
163        let mut result = Vec::new();
164        while let Some(chunk) = self.next_output() {
165            result.extend(chunk);
166        }
167        result
168    }
169
170    /// Collects all output (from beginning) into a single byte vector.
171    ///
172    /// Resets position before collecting.
173    pub fn collect_all(&mut self) -> Vec<u8> {
174        self.reset();
175        self.collect_remaining()
176    }
177
178    /// Parses a Record's data field as terminal write bytes.
179    fn parse_terminal_write(&self, record: &crate::session_recorder::Record) -> Option<Vec<u8>> {
180        // Reconstruct the tagged format for UxEvent deserialization
181        let tagged = serde_json::json!({
182            "event": record.event,
183            "data": record.data,
184        });
185
186        let ux_event: UxEvent = serde_json::from_value(tagged).ok()?;
187
188        if let UxEvent::TerminalWrite(write) = ux_event {
189            write.decode_bytes().ok()
190        } else {
191            None
192        }
193    }
194}
195
196/// Iterator implementation for streaming output chunks.
197impl Iterator for ReplayBackend {
198    type Item = Vec<u8>;
199
200    fn next(&mut self) -> Option<Self::Item> {
201        self.next_output()
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use crate::session_recorder::Record;
209    use ralph_proto::TerminalWrite;
210
211    /// Helper to create a JSONL line for a terminal write event.
212    fn make_write_record(bytes: &[u8], stdout: bool, offset_ms: u64, base_ts: u64) -> String {
213        let write = TerminalWrite::new(bytes, stdout, offset_ms);
214        let record = Record {
215            ts: base_ts + offset_ms,
216            event: "ux.terminal.write".to_string(),
217            data: serde_json::to_value(&write).unwrap(),
218        };
219        serde_json::to_string(&record).unwrap()
220    }
221
222    #[test]
223    fn test_from_reader_loads_valid_jsonl() {
224        let line1 = make_write_record(b"Hello", true, 0, 1000);
225        let line2 = make_write_record(b" World", true, 100, 1000);
226        let jsonl = format!("{}\n{}\n", line1, line2);
227
228        let backend = ReplayBackend::from_bytes(jsonl.as_bytes()).unwrap();
229
230        assert_eq!(backend.output_count(), 2);
231        assert!(!backend.is_exhausted());
232    }
233
234    #[test]
235    fn test_from_file_error_on_missing_file() {
236        let result = ReplayBackend::from_file("/nonexistent/path/to/file.jsonl");
237        assert!(result.is_err());
238
239        let err = result.unwrap_err();
240        assert_eq!(err.kind(), io::ErrorKind::NotFound);
241    }
242
243    #[test]
244    fn test_from_reader_empty_input() {
245        let backend = ReplayBackend::from_bytes(b"").unwrap();
246
247        assert_eq!(backend.output_count(), 0);
248        assert!(backend.is_exhausted());
249    }
250
251    #[test]
252    fn test_next_output_returns_bytes_in_order() {
253        let line1 = make_write_record(b"First", true, 0, 1000);
254        let line2 = make_write_record(b"Second", true, 50, 1000);
255        let line3 = make_write_record(b"Third", true, 100, 1000);
256        let jsonl = format!("{}\n{}\n{}\n", line1, line2, line3);
257
258        let mut backend = ReplayBackend::from_bytes(jsonl.as_bytes()).unwrap();
259
260        assert_eq!(backend.next_output().unwrap(), b"First");
261        assert_eq!(backend.next_output().unwrap(), b"Second");
262        assert_eq!(backend.next_output().unwrap(), b"Third");
263        assert!(backend.next_output().is_none());
264    }
265
266    #[test]
267    fn test_is_exhausted_true_after_all_output() {
268        let line = make_write_record(b"Only", true, 0, 1000);
269        let mut backend = ReplayBackend::from_bytes(line.as_bytes()).unwrap();
270
271        assert!(!backend.is_exhausted());
272        assert_eq!(backend.outputs_served(), 0);
273
274        backend.next_output();
275
276        assert!(backend.is_exhausted());
277        assert_eq!(backend.outputs_served(), 1);
278    }
279
280    #[test]
281    fn test_instant_mode_serves_all_immediately() {
282        let line1 = make_write_record(b"A", true, 0, 1000);
283        let line2 = make_write_record(b"B", true, 1000, 1000); // 1 second delay
284        let jsonl = format!("{}\n{}\n", line1, line2);
285
286        let mut backend = ReplayBackend::from_bytes(jsonl.as_bytes())
287            .unwrap()
288            .with_timing(ReplayTimingMode::Instant);
289
290        // Should be instant, not delayed
291        let start = std::time::Instant::now();
292        backend.next_output();
293        backend.next_output();
294        let elapsed = start.elapsed();
295
296        // Should complete in well under 1 second (the recorded delay)
297        assert!(elapsed.as_millis() < 100, "Should be instant, took {:?}", elapsed);
298    }
299
300    #[test]
301    fn test_iterator_yields_all_chunks() {
302        let line1 = make_write_record(b"One", true, 0, 1000);
303        let line2 = make_write_record(b"Two", true, 10, 1000);
304        let jsonl = format!("{}\n{}\n", line1, line2);
305
306        let backend = ReplayBackend::from_bytes(jsonl.as_bytes()).unwrap();
307        let chunks: Vec<Vec<u8>> = backend.collect();
308
309        assert_eq!(chunks.len(), 2);
310        assert_eq!(chunks[0], b"One");
311        assert_eq!(chunks[1], b"Two");
312    }
313
314    #[test]
315    fn test_collect_all_concatenates_output() {
316        let line1 = make_write_record(b"Hello, ", true, 0, 1000);
317        let line2 = make_write_record(b"World!", true, 50, 1000);
318        let jsonl = format!("{}\n{}\n", line1, line2);
319
320        let mut backend = ReplayBackend::from_bytes(jsonl.as_bytes()).unwrap();
321        let all = backend.collect_all();
322
323        assert_eq!(all, b"Hello, World!");
324    }
325
326    #[test]
327    fn test_reset_allows_replay() {
328        let line = make_write_record(b"Replay", true, 0, 1000);
329        let mut backend = ReplayBackend::from_bytes(line.as_bytes()).unwrap();
330
331        // First pass
332        assert_eq!(backend.next_output().unwrap(), b"Replay");
333        assert!(backend.is_exhausted());
334
335        // Reset and replay
336        backend.reset();
337        assert!(!backend.is_exhausted());
338        assert_eq!(backend.next_output().unwrap(), b"Replay");
339    }
340
341    #[test]
342    fn test_filters_non_terminal_write_events() {
343        let write = make_write_record(b"output", true, 0, 1000);
344        let meta = r#"{"ts":1000,"event":"_meta.loop_start","data":{"prompt_file":"PROMPT.md"}}"#;
345        let bus = r#"{"ts":1050,"event":"bus.publish","data":{"topic":"task.start"}}"#;
346
347        let jsonl = format!("{}\n{}\n{}\n", write, meta, bus);
348        let backend = ReplayBackend::from_bytes(jsonl.as_bytes()).unwrap();
349
350        // Should only have the terminal write event
351        assert_eq!(backend.output_count(), 1);
352    }
353
354    #[test]
355    fn test_handles_whitespace_lines() {
356        let line = make_write_record(b"data", true, 0, 1000);
357        let jsonl = format!("\n  \n{}\n\n", line);
358
359        let backend = ReplayBackend::from_bytes(jsonl.as_bytes()).unwrap();
360        assert_eq!(backend.output_count(), 1);
361    }
362
363    #[test]
364    fn test_malformed_json_returns_error() {
365        let result = ReplayBackend::from_bytes(b"not valid json");
366        assert!(result.is_err());
367    }
368}