1use crate::session_player::SessionPlayer;
23use ralph_proto::UxEvent;
24use std::io::{self, BufRead, BufReader};
25use std::path::Path;
26use std::time::Duration;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
30pub enum ReplayTimingMode {
31 #[default]
33 Instant,
34 Realistic,
36}
37
38#[derive(Debug)]
44pub struct ReplayBackend {
45 player: SessionPlayer,
47 position: usize,
49 timing_mode: ReplayTimingMode,
51 terminal_write_indices: Vec<usize>,
53 last_offset_ms: u64,
55}
56
57impl ReplayBackend {
58 pub fn from_file(path: impl AsRef<Path>) -> io::Result<Self> {
64 let file = std::fs::File::open(path.as_ref())?;
65 let reader = BufReader::new(file);
66 Self::from_reader(reader)
67 }
68
69 pub fn from_reader<R: BufRead>(reader: R) -> io::Result<Self> {
75 let player = SessionPlayer::from_reader(reader)?;
76
77 let terminal_write_indices: Vec<usize> = player
79 .records()
80 .iter()
81 .enumerate()
82 .filter(|(_, r)| r.record.event == "ux.terminal.write")
83 .map(|(i, _)| i)
84 .collect();
85
86 Ok(Self {
87 player,
88 position: 0,
89 timing_mode: ReplayTimingMode::default(),
90 terminal_write_indices,
91 last_offset_ms: 0,
92 })
93 }
94
95 pub fn from_bytes(bytes: &[u8]) -> io::Result<Self> {
101 Self::from_reader(io::BufReader::new(bytes))
102 }
103
104 pub fn with_timing(mut self, mode: ReplayTimingMode) -> Self {
106 self.timing_mode = mode;
107 self
108 }
109
110 pub fn next_output(&mut self) -> Option<Vec<u8>> {
115 if self.position >= self.terminal_write_indices.len() {
116 return None;
117 }
118
119 let record_idx = self.terminal_write_indices[self.position];
120 let record = &self.player.records()[record_idx];
121
122 if self.timing_mode == ReplayTimingMode::Realistic && self.position > 0 {
124 let delay_ms = record.offset_ms.saturating_sub(self.last_offset_ms);
125 if delay_ms > 0 {
126 std::thread::sleep(Duration::from_millis(delay_ms));
127 }
128 }
129 self.last_offset_ms = record.offset_ms;
130
131 let bytes = self.parse_terminal_write(&record.record)?;
133 self.position += 1;
134 Some(bytes)
135 }
136
137 pub fn is_exhausted(&self) -> bool {
139 self.position >= self.terminal_write_indices.len()
140 }
141
142 pub fn output_count(&self) -> usize {
144 self.terminal_write_indices.len()
145 }
146
147 pub fn outputs_served(&self) -> usize {
149 self.position
150 }
151
152 pub fn reset(&mut self) {
154 self.position = 0;
155 self.last_offset_ms = 0;
156 }
157
158 pub fn collect_remaining(&mut self) -> Vec<u8> {
163 let mut result = Vec::new();
164 while let Some(chunk) = self.next_output() {
165 result.extend(chunk);
166 }
167 result
168 }
169
170 pub fn collect_all(&mut self) -> Vec<u8> {
174 self.reset();
175 self.collect_remaining()
176 }
177
178 fn parse_terminal_write(&self, record: &crate::session_recorder::Record) -> Option<Vec<u8>> {
180 let tagged = serde_json::json!({
182 "event": record.event,
183 "data": record.data,
184 });
185
186 let ux_event: UxEvent = serde_json::from_value(tagged).ok()?;
187
188 if let UxEvent::TerminalWrite(write) = ux_event {
189 write.decode_bytes().ok()
190 } else {
191 None
192 }
193 }
194}
195
196impl Iterator for ReplayBackend {
198 type Item = Vec<u8>;
199
200 fn next(&mut self) -> Option<Self::Item> {
201 self.next_output()
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use crate::session_recorder::Record;
209 use ralph_proto::TerminalWrite;
210
211 fn make_write_record(bytes: &[u8], stdout: bool, offset_ms: u64, base_ts: u64) -> String {
213 let write = TerminalWrite::new(bytes, stdout, offset_ms);
214 let record = Record {
215 ts: base_ts + offset_ms,
216 event: "ux.terminal.write".to_string(),
217 data: serde_json::to_value(&write).unwrap(),
218 };
219 serde_json::to_string(&record).unwrap()
220 }
221
222 #[test]
223 fn test_from_reader_loads_valid_jsonl() {
224 let line1 = make_write_record(b"Hello", true, 0, 1000);
225 let line2 = make_write_record(b" World", true, 100, 1000);
226 let jsonl = format!("{}\n{}\n", line1, line2);
227
228 let backend = ReplayBackend::from_bytes(jsonl.as_bytes()).unwrap();
229
230 assert_eq!(backend.output_count(), 2);
231 assert!(!backend.is_exhausted());
232 }
233
234 #[test]
235 fn test_from_file_error_on_missing_file() {
236 let result = ReplayBackend::from_file("/nonexistent/path/to/file.jsonl");
237 assert!(result.is_err());
238
239 let err = result.unwrap_err();
240 assert_eq!(err.kind(), io::ErrorKind::NotFound);
241 }
242
243 #[test]
244 fn test_from_reader_empty_input() {
245 let backend = ReplayBackend::from_bytes(b"").unwrap();
246
247 assert_eq!(backend.output_count(), 0);
248 assert!(backend.is_exhausted());
249 }
250
251 #[test]
252 fn test_next_output_returns_bytes_in_order() {
253 let line1 = make_write_record(b"First", true, 0, 1000);
254 let line2 = make_write_record(b"Second", true, 50, 1000);
255 let line3 = make_write_record(b"Third", true, 100, 1000);
256 let jsonl = format!("{}\n{}\n{}\n", line1, line2, line3);
257
258 let mut backend = ReplayBackend::from_bytes(jsonl.as_bytes()).unwrap();
259
260 assert_eq!(backend.next_output().unwrap(), b"First");
261 assert_eq!(backend.next_output().unwrap(), b"Second");
262 assert_eq!(backend.next_output().unwrap(), b"Third");
263 assert!(backend.next_output().is_none());
264 }
265
266 #[test]
267 fn test_is_exhausted_true_after_all_output() {
268 let line = make_write_record(b"Only", true, 0, 1000);
269 let mut backend = ReplayBackend::from_bytes(line.as_bytes()).unwrap();
270
271 assert!(!backend.is_exhausted());
272 assert_eq!(backend.outputs_served(), 0);
273
274 backend.next_output();
275
276 assert!(backend.is_exhausted());
277 assert_eq!(backend.outputs_served(), 1);
278 }
279
280 #[test]
281 fn test_instant_mode_serves_all_immediately() {
282 let line1 = make_write_record(b"A", true, 0, 1000);
283 let line2 = make_write_record(b"B", true, 1000, 1000); let jsonl = format!("{}\n{}\n", line1, line2);
285
286 let mut backend = ReplayBackend::from_bytes(jsonl.as_bytes())
287 .unwrap()
288 .with_timing(ReplayTimingMode::Instant);
289
290 let start = std::time::Instant::now();
292 backend.next_output();
293 backend.next_output();
294 let elapsed = start.elapsed();
295
296 assert!(elapsed.as_millis() < 100, "Should be instant, took {:?}", elapsed);
298 }
299
300 #[test]
301 fn test_iterator_yields_all_chunks() {
302 let line1 = make_write_record(b"One", true, 0, 1000);
303 let line2 = make_write_record(b"Two", true, 10, 1000);
304 let jsonl = format!("{}\n{}\n", line1, line2);
305
306 let backend = ReplayBackend::from_bytes(jsonl.as_bytes()).unwrap();
307 let chunks: Vec<Vec<u8>> = backend.collect();
308
309 assert_eq!(chunks.len(), 2);
310 assert_eq!(chunks[0], b"One");
311 assert_eq!(chunks[1], b"Two");
312 }
313
314 #[test]
315 fn test_collect_all_concatenates_output() {
316 let line1 = make_write_record(b"Hello, ", true, 0, 1000);
317 let line2 = make_write_record(b"World!", true, 50, 1000);
318 let jsonl = format!("{}\n{}\n", line1, line2);
319
320 let mut backend = ReplayBackend::from_bytes(jsonl.as_bytes()).unwrap();
321 let all = backend.collect_all();
322
323 assert_eq!(all, b"Hello, World!");
324 }
325
326 #[test]
327 fn test_reset_allows_replay() {
328 let line = make_write_record(b"Replay", true, 0, 1000);
329 let mut backend = ReplayBackend::from_bytes(line.as_bytes()).unwrap();
330
331 assert_eq!(backend.next_output().unwrap(), b"Replay");
333 assert!(backend.is_exhausted());
334
335 backend.reset();
337 assert!(!backend.is_exhausted());
338 assert_eq!(backend.next_output().unwrap(), b"Replay");
339 }
340
341 #[test]
342 fn test_filters_non_terminal_write_events() {
343 let write = make_write_record(b"output", true, 0, 1000);
344 let meta = r#"{"ts":1000,"event":"_meta.loop_start","data":{"prompt_file":"PROMPT.md"}}"#;
345 let bus = r#"{"ts":1050,"event":"bus.publish","data":{"topic":"task.start"}}"#;
346
347 let jsonl = format!("{}\n{}\n{}\n", write, meta, bus);
348 let backend = ReplayBackend::from_bytes(jsonl.as_bytes()).unwrap();
349
350 assert_eq!(backend.output_count(), 1);
352 }
353
354 #[test]
355 fn test_handles_whitespace_lines() {
356 let line = make_write_record(b"data", true, 0, 1000);
357 let jsonl = format!("\n \n{}\n\n", line);
358
359 let backend = ReplayBackend::from_bytes(jsonl.as_bytes()).unwrap();
360 assert_eq!(backend.output_count(), 1);
361 }
362
363 #[test]
364 fn test_malformed_json_returns_error() {
365 let result = ReplayBackend::from_bytes(b"not valid json");
366 assert!(result.is_err());
367 }
368}