1use ralph_proto::{TerminalWrite, UxEvent};
8use std::io::{self, BufRead, Write};
9use std::time::Duration;
10
11use crate::session_recorder::Record;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum ReplayMode {
16 Terminal,
18 Text,
20}
21
22#[derive(Debug, Clone)]
24pub struct PlayerConfig {
25 pub speed: f32,
27
28 pub step_mode: bool,
30
31 pub replay_mode: ReplayMode,
33
34 pub event_filter: Vec<String>,
36}
37
38impl Default for PlayerConfig {
39 fn default() -> Self {
40 Self {
41 speed: 1.0,
42 step_mode: false,
43 replay_mode: ReplayMode::Terminal,
44 event_filter: Vec::new(),
45 }
46 }
47}
48
49impl PlayerConfig {
50 pub fn terminal() -> Self {
52 Self::default()
53 }
54
55 pub fn text() -> Self {
57 Self {
58 replay_mode: ReplayMode::Text,
59 ..Default::default()
60 }
61 }
62
63 pub fn with_speed(mut self, speed: f32) -> Self {
65 self.speed = speed.max(0.1); self
67 }
68
69 pub fn with_step_mode(mut self) -> Self {
71 self.step_mode = true;
72 self
73 }
74
75 pub fn with_filter(mut self, events: Vec<String>) -> Self {
77 self.event_filter = events;
78 self
79 }
80}
81
82#[derive(Debug, Clone)]
84pub struct TimestampedRecord {
85 pub record: Record,
87
88 pub offset_ms: u64,
90}
91
92#[derive(Debug)]
112pub struct SessionPlayer {
113 records: Vec<TimestampedRecord>,
115
116 config: PlayerConfig,
118
119 position: usize,
121}
122
123impl SessionPlayer {
124 pub fn from_reader<R: BufRead>(reader: R) -> io::Result<Self> {
126 let mut records = Vec::new();
127 let mut first_ts: Option<u64> = None;
128
129 for line in reader.lines() {
130 let line = line?;
131 if line.trim().is_empty() {
132 continue;
133 }
134
135 let record: Record = serde_json::from_str(&line).map_err(|e| {
136 io::Error::new(
137 io::ErrorKind::InvalidData,
138 format!("Invalid JSON record: {}", e),
139 )
140 })?;
141
142 let ts = record.ts;
144 let base_ts = *first_ts.get_or_insert(ts);
145 let offset_ms = ts.saturating_sub(base_ts);
146
147 records.push(TimestampedRecord { record, offset_ms });
148 }
149
150 Ok(Self {
151 records,
152 config: PlayerConfig::default(),
153 position: 0,
154 })
155 }
156
157 pub fn from_bytes(bytes: &[u8]) -> io::Result<Self> {
159 Self::from_reader(io::BufReader::new(bytes))
160 }
161
162 pub fn with_config(mut self, config: PlayerConfig) -> Self {
164 self.config = config;
165 self
166 }
167
168 pub fn record_count(&self) -> usize {
170 self.records.len()
171 }
172
173 pub fn records(&self) -> &[TimestampedRecord] {
175 &self.records
176 }
177
178 pub fn filter_by_event(&self, event_prefix: &str) -> Vec<&TimestampedRecord> {
180 self.records
181 .iter()
182 .filter(|r| r.record.event.starts_with(event_prefix))
183 .collect()
184 }
185
186 pub fn terminal_writes(&self) -> Vec<&TimestampedRecord> {
188 self.filter_by_event("ux.terminal.write")
189 }
190
191 pub fn metadata_events(&self) -> Vec<&TimestampedRecord> {
193 self.filter_by_event("_meta.")
194 }
195
196 pub fn bus_events(&self) -> Vec<&TimestampedRecord> {
198 self.filter_by_event("bus.")
199 }
200
201 pub fn reset(&mut self) {
203 self.position = 0;
204 }
205
206 pub fn replay_terminal<W: Write>(&mut self, writer: &mut W) -> io::Result<()> {
212 self.reset();
213 let mut last_offset_ms: u64 = 0;
214
215 let terminal_writes = self.terminal_writes();
216 for record in terminal_writes {
217 let delay_ms = record.offset_ms.saturating_sub(last_offset_ms);
219 last_offset_ms = record.offset_ms;
220
221 if !self.config.step_mode && delay_ms > 0 && self.config.speed > 0.0 {
223 let adjusted_delay = (delay_ms as f32 / self.config.speed) as u64;
224 if adjusted_delay > 0 {
225 std::thread::sleep(Duration::from_millis(adjusted_delay));
226 }
227 }
228
229 if let Ok(UxEvent::TerminalWrite(write)) = Self::parse_ux_event(&record.record) {
231 self.output_terminal_write(writer, &write)?;
232 }
233
234 if self.config.step_mode {
236 writer.flush()?;
237 let mut input = String::new();
238 io::stdin().read_line(&mut input)?;
239 }
240 }
241
242 writer.flush()
243 }
244
245 fn output_terminal_write<W: Write>(
247 &self,
248 writer: &mut W,
249 write: &TerminalWrite,
250 ) -> io::Result<()> {
251 let bytes = write.decode_bytes().map_err(|e| {
252 io::Error::new(
253 io::ErrorKind::InvalidData,
254 format!("Failed to decode base64: {}", e),
255 )
256 })?;
257
258 match self.config.replay_mode {
259 ReplayMode::Terminal => {
260 writer.write_all(&bytes)?;
262 }
263 ReplayMode::Text => {
264 let stripped = strip_ansi(&bytes);
266 writer.write_all(&stripped)?;
267 }
268 }
269
270 Ok(())
271 }
272
273 fn parse_ux_event(record: &Record) -> Result<UxEvent, serde_json::Error> {
275 let tagged = serde_json::json!({
278 "event": record.event,
279 "data": record.data,
280 });
281 serde_json::from_value(tagged)
282 }
283
284 pub fn collect_terminal_output(&self) -> io::Result<String> {
286 let mut output = Vec::new();
287
288 for record in self.terminal_writes() {
289 if let Ok(UxEvent::TerminalWrite(write)) = Self::parse_ux_event(&record.record) {
290 let bytes = write.decode_bytes().map_err(|e| {
291 io::Error::new(
292 io::ErrorKind::InvalidData,
293 format!("Failed to decode base64: {}", e),
294 )
295 })?;
296 output.extend_from_slice(&bytes);
297 }
298 }
299
300 String::from_utf8(output).map_err(|e| {
301 io::Error::new(
302 io::ErrorKind::InvalidData,
303 format!("Invalid UTF-8 in terminal output: {}", e),
304 )
305 })
306 }
307
308 pub fn collect_text_output(&self) -> io::Result<String> {
310 let raw = self.collect_terminal_output()?;
311 Ok(String::from_utf8_lossy(&strip_ansi(raw.as_bytes())).into_owned())
312 }
313
314 pub fn collect_ansi_escaped(&self) -> io::Result<String> {
316 let raw = self.collect_terminal_output()?;
317 Ok(escape_ansi(&raw))
318 }
319}
320
321fn strip_ansi(bytes: &[u8]) -> Vec<u8> {
326 let mut result = Vec::with_capacity(bytes.len());
327 let mut i = 0;
328
329 while i < bytes.len() {
330 if bytes[i] == 0x1b {
331 i += 1;
333 if i >= bytes.len() {
334 break;
335 }
336
337 match bytes[i] {
338 b'[' => {
339 i += 1;
341 while i < bytes.len() && !(0x40..=0x7E).contains(&bytes[i]) {
342 i += 1;
343 }
344 if i < bytes.len() {
345 i += 1; }
347 }
348 b']' => {
349 i += 1;
351 while i < bytes.len() {
352 if bytes[i] == 0x07 {
353 i += 1;
354 break;
355 }
356 if bytes[i] == 0x1b && i + 1 < bytes.len() && bytes[i + 1] == b'\\' {
357 i += 2;
358 break;
359 }
360 i += 1;
361 }
362 }
363 _ => {
364 i += 1;
366 }
367 }
368 } else {
369 result.push(bytes[i]);
370 i += 1;
371 }
372 }
373
374 result
375}
376
377fn escape_ansi(s: &str) -> String {
381 s.replace('\x1b', "\\x1b")
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387
388 fn make_write_record(bytes: &[u8], stdout: bool, offset_ms: u64, base_ts: u64) -> String {
389 let write = TerminalWrite::new(bytes, stdout, offset_ms);
390 let record = Record {
391 ts: base_ts + offset_ms,
392 event: "ux.terminal.write".to_string(),
393 data: serde_json::to_value(&write).unwrap(),
394 };
395 serde_json::to_string(&record).unwrap()
396 }
397
398 #[test]
399 fn test_player_from_reader() {
400 let line1 = make_write_record(b"Hello", true, 0, 1000);
401 let line2 = make_write_record(b"World", true, 100, 1000);
402 let jsonl = format!("{}\n{}\n", line1, line2);
403
404 let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
405
406 assert_eq!(player.record_count(), 2);
407 assert_eq!(player.records[0].offset_ms, 0);
408 assert_eq!(player.records[1].offset_ms, 100);
409 }
410
411 #[test]
412 fn test_filter_by_event() {
413 let write = make_write_record(b"test", true, 0, 1000);
414 let meta = r#"{"ts":1000,"event":"_meta.loop_start","data":{"prompt_file":"PROMPT.md"}}"#;
415 let bus = r#"{"ts":1050,"event":"bus.publish","data":{"topic":"task.start"}}"#;
416
417 let jsonl = format!("{}\n{}\n{}\n", write, meta, bus);
418 let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
419
420 assert_eq!(player.terminal_writes().len(), 1);
421 assert_eq!(player.metadata_events().len(), 1);
422 assert_eq!(player.bus_events().len(), 1);
423 }
424
425 #[test]
426 fn test_collect_terminal_output() {
427 let line1 = make_write_record(b"Hello, ", true, 0, 1000);
428 let line2 = make_write_record(b"World!", true, 50, 1000);
429 let jsonl = format!("{}\n{}\n", line1, line2);
430
431 let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
432 let output = player.collect_terminal_output().unwrap();
433
434 assert_eq!(output, "Hello, World!");
435 }
436
437 #[test]
438 fn test_strip_ansi() {
439 let input = b"Hello, \x1b[32mWorld\x1b[0m!";
440 let stripped = strip_ansi(input);
441 assert_eq!(stripped, b"Hello, World!");
442 }
443
444 #[test]
445 fn test_strip_ansi_complex() {
446 let input = b"\x1b[1m\x1b[32mBold Green\x1b[0m Normal";
448 let stripped = strip_ansi(input);
449 assert_eq!(stripped, b"Bold Green Normal");
450 }
451
452 #[test]
453 fn test_escape_ansi() {
454 let input = "Hello \x1b[32mWorld\x1b[0m";
455 let escaped = escape_ansi(input);
456 assert_eq!(escaped, "Hello \\x1b[32mWorld\\x1b[0m");
457 }
458
459 #[test]
460 fn test_collect_text_output() {
461 let line = make_write_record(b"Hello \x1b[32mWorld\x1b[0m", true, 0, 1000);
462 let player = SessionPlayer::from_bytes(line.as_bytes()).unwrap();
463
464 let text = player.collect_text_output().unwrap();
465 assert_eq!(text, "Hello World");
466 }
467
468 #[test]
469 fn test_collect_ansi_escaped() {
470 let line = make_write_record(b"Hello \x1b[32mWorld\x1b[0m", true, 0, 1000);
471 let player = SessionPlayer::from_bytes(line.as_bytes()).unwrap();
472
473 let escaped = player.collect_ansi_escaped().unwrap();
474 assert_eq!(escaped, "Hello \\x1b[32mWorld\\x1b[0m");
475 }
476
477 #[test]
478 fn test_replay_terminal() {
479 let line1 = make_write_record(b"Hello", true, 0, 1000);
480 let line2 = make_write_record(b" World", true, 10, 1000);
481 let jsonl = format!("{}\n{}\n", line1, line2);
482
483 let mut player = SessionPlayer::from_bytes(jsonl.as_bytes())
484 .unwrap()
485 .with_config(PlayerConfig::terminal().with_speed(100.0)); let mut output = Vec::new();
488 player.replay_terminal(&mut output).unwrap();
489
490 assert_eq!(String::from_utf8(output).unwrap(), "Hello World");
491 }
492
493 #[test]
494 fn test_replay_text_mode() {
495 let line = make_write_record(b"\x1b[32mGreen\x1b[0m", true, 0, 1000);
496 let mut player = SessionPlayer::from_bytes(line.as_bytes())
497 .unwrap()
498 .with_config(PlayerConfig::text());
499
500 let mut output = Vec::new();
501 player.replay_terminal(&mut output).unwrap();
502
503 assert_eq!(String::from_utf8(output).unwrap(), "Green");
504 }
505
506 #[test]
507 fn test_player_config_builder() {
508 let config = PlayerConfig::terminal()
509 .with_speed(2.0)
510 .with_step_mode()
511 .with_filter(vec!["ux.".to_string()]);
512
513 assert!((config.speed - 2.0).abs() < f32::EPSILON);
514 assert!(config.step_mode);
515 assert_eq!(config.event_filter, vec!["ux."]);
516 }
517
518 #[test]
519 fn test_empty_input() {
520 let player = SessionPlayer::from_bytes(b"").unwrap();
521 assert_eq!(player.record_count(), 0);
522 }
523
524 #[test]
525 fn test_whitespace_lines_skipped() {
526 let line = make_write_record(b"test", true, 0, 1000);
527 let jsonl = format!("\n \n{}\n\n", line);
528
529 let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
530 assert_eq!(player.record_count(), 1);
531 }
532}