1use ralph_proto::{TerminalWrite, UxEvent};
8use std::io::{self, BufRead, Write};
9use std::time::Duration;
10
11use crate::session_recorder::Record;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum ReplayMode {
16 Terminal,
18 Text,
20}
21
22#[derive(Debug, Clone)]
24pub struct PlayerConfig {
25 pub speed: f32,
27
28 pub step_mode: bool,
30
31 pub replay_mode: ReplayMode,
33
34 pub event_filter: Vec<String>,
36}
37
38impl Default for PlayerConfig {
39 fn default() -> Self {
40 Self {
41 speed: 1.0,
42 step_mode: false,
43 replay_mode: ReplayMode::Terminal,
44 event_filter: Vec::new(),
45 }
46 }
47}
48
49impl PlayerConfig {
50 pub fn terminal() -> Self {
52 Self::default()
53 }
54
55 pub fn text() -> Self {
57 Self {
58 replay_mode: ReplayMode::Text,
59 ..Default::default()
60 }
61 }
62
63 pub fn with_speed(mut self, speed: f32) -> Self {
65 self.speed = speed.max(0.1); self
67 }
68
69 pub fn with_step_mode(mut self) -> Self {
71 self.step_mode = true;
72 self
73 }
74
75 pub fn with_filter(mut self, events: Vec<String>) -> Self {
77 self.event_filter = events;
78 self
79 }
80}
81
82#[derive(Debug, Clone)]
84pub struct TimestampedRecord {
85 pub record: Record,
87
88 pub offset_ms: u64,
90}
91
92#[derive(Debug)]
112pub struct SessionPlayer {
113 records: Vec<TimestampedRecord>,
115
116 config: PlayerConfig,
118
119 position: usize,
121}
122
123impl SessionPlayer {
124 pub fn from_reader<R: BufRead>(reader: R) -> io::Result<Self> {
126 let mut records = Vec::new();
127 let mut first_ts: Option<u64> = None;
128
129 for line in reader.lines() {
130 let line = line?;
131 if line.trim().is_empty() {
132 continue;
133 }
134
135 let record: Record = serde_json::from_str(&line).map_err(|e| {
136 io::Error::new(
137 io::ErrorKind::InvalidData,
138 format!("Invalid JSON record: {}", e),
139 )
140 })?;
141
142 let ts = record.ts;
144 let base_ts = *first_ts.get_or_insert(ts);
145 let offset_ms = ts.saturating_sub(base_ts);
146
147 records.push(TimestampedRecord { record, offset_ms });
148 }
149
150 Ok(Self {
151 records,
152 config: PlayerConfig::default(),
153 position: 0,
154 })
155 }
156
157 pub fn from_bytes(bytes: &[u8]) -> io::Result<Self> {
159 Self::from_reader(io::BufReader::new(bytes))
160 }
161
162 pub fn with_config(mut self, config: PlayerConfig) -> Self {
164 self.config = config;
165 self
166 }
167
168 pub fn record_count(&self) -> usize {
170 self.records.len()
171 }
172
173 pub fn records(&self) -> &[TimestampedRecord] {
175 &self.records
176 }
177
178 pub fn filter_by_event(&self, event_prefix: &str) -> Vec<&TimestampedRecord> {
180 self.records
181 .iter()
182 .filter(|r| r.record.event.starts_with(event_prefix))
183 .collect()
184 }
185
186 pub fn terminal_writes(&self) -> Vec<&TimestampedRecord> {
188 self.filter_by_event("ux.terminal.write")
189 }
190
191 pub fn metadata_events(&self) -> Vec<&TimestampedRecord> {
193 self.filter_by_event("_meta.")
194 }
195
196 pub fn bus_events(&self) -> Vec<&TimestampedRecord> {
198 self.filter_by_event("bus.")
199 }
200
201 pub fn reset(&mut self) {
203 self.position = 0;
204 }
205
206 pub fn replay_terminal<W: Write>(&mut self, writer: &mut W) -> io::Result<()> {
212 self.reset();
213 let mut last_offset_ms: u64 = 0;
214
215 let terminal_writes = self.terminal_writes();
216 for record in terminal_writes {
217 let delay_ms = record.offset_ms.saturating_sub(last_offset_ms);
219 last_offset_ms = record.offset_ms;
220
221 if !self.config.step_mode && delay_ms > 0 && self.config.speed > 0.0 {
223 let adjusted_delay = (delay_ms as f32 / self.config.speed) as u64;
224 if adjusted_delay > 0 {
225 std::thread::sleep(Duration::from_millis(adjusted_delay));
226 }
227 }
228
229 if let Ok(ux_event) = Self::parse_ux_event(&record.record) {
231 if let UxEvent::TerminalWrite(write) = ux_event {
232 self.output_terminal_write(writer, &write)?;
233 }
234 }
235
236 if self.config.step_mode {
238 writer.flush()?;
239 let mut input = String::new();
240 io::stdin().read_line(&mut input)?;
241 }
242 }
243
244 writer.flush()
245 }
246
247 fn output_terminal_write<W: Write>(
249 &self,
250 writer: &mut W,
251 write: &TerminalWrite,
252 ) -> io::Result<()> {
253 let bytes = write.decode_bytes().map_err(|e| {
254 io::Error::new(
255 io::ErrorKind::InvalidData,
256 format!("Failed to decode base64: {}", e),
257 )
258 })?;
259
260 match self.config.replay_mode {
261 ReplayMode::Terminal => {
262 writer.write_all(&bytes)?;
264 }
265 ReplayMode::Text => {
266 let stripped = strip_ansi(&bytes);
268 writer.write_all(&stripped)?;
269 }
270 }
271
272 Ok(())
273 }
274
275 fn parse_ux_event(record: &Record) -> Result<UxEvent, serde_json::Error> {
277 let tagged = serde_json::json!({
280 "event": record.event,
281 "data": record.data,
282 });
283 serde_json::from_value(tagged)
284 }
285
286 pub fn collect_terminal_output(&self) -> io::Result<String> {
288 let mut output = Vec::new();
289
290 for record in self.terminal_writes() {
291 if let Ok(ux_event) = Self::parse_ux_event(&record.record) {
292 if let UxEvent::TerminalWrite(write) = ux_event {
293 let bytes = write.decode_bytes().map_err(|e| {
294 io::Error::new(
295 io::ErrorKind::InvalidData,
296 format!("Failed to decode base64: {}", e),
297 )
298 })?;
299 output.extend_from_slice(&bytes);
300 }
301 }
302 }
303
304 String::from_utf8(output).map_err(|e| {
305 io::Error::new(
306 io::ErrorKind::InvalidData,
307 format!("Invalid UTF-8 in terminal output: {}", e),
308 )
309 })
310 }
311
312 pub fn collect_text_output(&self) -> io::Result<String> {
314 let raw = self.collect_terminal_output()?;
315 Ok(String::from_utf8_lossy(&strip_ansi(raw.as_bytes())).into_owned())
316 }
317
318 pub fn collect_ansi_escaped(&self) -> io::Result<String> {
320 let raw = self.collect_terminal_output()?;
321 Ok(escape_ansi(&raw))
322 }
323}
324
325fn strip_ansi(bytes: &[u8]) -> Vec<u8> {
330 let mut result = Vec::with_capacity(bytes.len());
331 let mut i = 0;
332
333 while i < bytes.len() {
334 if bytes[i] == 0x1b {
335 i += 1;
337 if i >= bytes.len() {
338 break;
339 }
340
341 match bytes[i] {
342 b'[' => {
343 i += 1;
345 while i < bytes.len() && !(0x40..=0x7E).contains(&bytes[i]) {
346 i += 1;
347 }
348 if i < bytes.len() {
349 i += 1; }
351 }
352 b']' => {
353 i += 1;
355 while i < bytes.len() {
356 if bytes[i] == 0x07 {
357 i += 1;
358 break;
359 }
360 if bytes[i] == 0x1b && i + 1 < bytes.len() && bytes[i + 1] == b'\\' {
361 i += 2;
362 break;
363 }
364 i += 1;
365 }
366 }
367 _ => {
368 i += 1;
370 }
371 }
372 } else {
373 result.push(bytes[i]);
374 i += 1;
375 }
376 }
377
378 result
379}
380
381fn escape_ansi(s: &str) -> String {
385 s.replace('\x1b', "\\x1b")
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391
392 fn make_write_record(bytes: &[u8], stdout: bool, offset_ms: u64, base_ts: u64) -> String {
393 let write = TerminalWrite::new(bytes, stdout, offset_ms);
394 let record = Record {
395 ts: base_ts + offset_ms,
396 event: "ux.terminal.write".to_string(),
397 data: serde_json::to_value(&write).unwrap(),
398 };
399 serde_json::to_string(&record).unwrap()
400 }
401
402 #[test]
403 fn test_player_from_reader() {
404 let line1 = make_write_record(b"Hello", true, 0, 1000);
405 let line2 = make_write_record(b"World", true, 100, 1000);
406 let jsonl = format!("{}\n{}\n", line1, line2);
407
408 let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
409
410 assert_eq!(player.record_count(), 2);
411 assert_eq!(player.records[0].offset_ms, 0);
412 assert_eq!(player.records[1].offset_ms, 100);
413 }
414
415 #[test]
416 fn test_filter_by_event() {
417 let write = make_write_record(b"test", true, 0, 1000);
418 let meta = r#"{"ts":1000,"event":"_meta.loop_start","data":{"prompt_file":"PROMPT.md"}}"#;
419 let bus = r#"{"ts":1050,"event":"bus.publish","data":{"topic":"task.start"}}"#;
420
421 let jsonl = format!("{}\n{}\n{}\n", write, meta, bus);
422 let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
423
424 assert_eq!(player.terminal_writes().len(), 1);
425 assert_eq!(player.metadata_events().len(), 1);
426 assert_eq!(player.bus_events().len(), 1);
427 }
428
429 #[test]
430 fn test_collect_terminal_output() {
431 let line1 = make_write_record(b"Hello, ", true, 0, 1000);
432 let line2 = make_write_record(b"World!", true, 50, 1000);
433 let jsonl = format!("{}\n{}\n", line1, line2);
434
435 let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
436 let output = player.collect_terminal_output().unwrap();
437
438 assert_eq!(output, "Hello, World!");
439 }
440
441 #[test]
442 fn test_strip_ansi() {
443 let input = b"Hello, \x1b[32mWorld\x1b[0m!";
444 let stripped = strip_ansi(input);
445 assert_eq!(stripped, b"Hello, World!");
446 }
447
448 #[test]
449 fn test_strip_ansi_complex() {
450 let input = b"\x1b[1m\x1b[32mBold Green\x1b[0m Normal";
452 let stripped = strip_ansi(input);
453 assert_eq!(stripped, b"Bold Green Normal");
454 }
455
456 #[test]
457 fn test_escape_ansi() {
458 let input = "Hello \x1b[32mWorld\x1b[0m";
459 let escaped = escape_ansi(input);
460 assert_eq!(escaped, "Hello \\x1b[32mWorld\\x1b[0m");
461 }
462
463 #[test]
464 fn test_collect_text_output() {
465 let line = make_write_record(b"Hello \x1b[32mWorld\x1b[0m", true, 0, 1000);
466 let player = SessionPlayer::from_bytes(line.as_bytes()).unwrap();
467
468 let text = player.collect_text_output().unwrap();
469 assert_eq!(text, "Hello World");
470 }
471
472 #[test]
473 fn test_collect_ansi_escaped() {
474 let line = make_write_record(b"Hello \x1b[32mWorld\x1b[0m", true, 0, 1000);
475 let player = SessionPlayer::from_bytes(line.as_bytes()).unwrap();
476
477 let escaped = player.collect_ansi_escaped().unwrap();
478 assert_eq!(escaped, "Hello \\x1b[32mWorld\\x1b[0m");
479 }
480
481 #[test]
482 fn test_replay_terminal() {
483 let line1 = make_write_record(b"Hello", true, 0, 1000);
484 let line2 = make_write_record(b" World", true, 10, 1000);
485 let jsonl = format!("{}\n{}\n", line1, line2);
486
487 let mut player = SessionPlayer::from_bytes(jsonl.as_bytes())
488 .unwrap()
489 .with_config(PlayerConfig::terminal().with_speed(100.0)); let mut output = Vec::new();
492 player.replay_terminal(&mut output).unwrap();
493
494 assert_eq!(String::from_utf8(output).unwrap(), "Hello World");
495 }
496
497 #[test]
498 fn test_replay_text_mode() {
499 let line = make_write_record(b"\x1b[32mGreen\x1b[0m", true, 0, 1000);
500 let mut player = SessionPlayer::from_bytes(line.as_bytes())
501 .unwrap()
502 .with_config(PlayerConfig::text());
503
504 let mut output = Vec::new();
505 player.replay_terminal(&mut output).unwrap();
506
507 assert_eq!(String::from_utf8(output).unwrap(), "Green");
508 }
509
510 #[test]
511 fn test_player_config_builder() {
512 let config = PlayerConfig::terminal()
513 .with_speed(2.0)
514 .with_step_mode()
515 .with_filter(vec!["ux.".to_string()]);
516
517 assert!((config.speed - 2.0).abs() < f32::EPSILON);
518 assert!(config.step_mode);
519 assert_eq!(config.event_filter, vec!["ux."]);
520 }
521
522 #[test]
523 fn test_empty_input() {
524 let player = SessionPlayer::from_bytes(b"").unwrap();
525 assert_eq!(player.record_count(), 0);
526 }
527
528 #[test]
529 fn test_whitespace_lines_skipped() {
530 let line = make_write_record(b"test", true, 0, 1000);
531 let jsonl = format!("\n \n{}\n\n", line);
532
533 let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
534 assert_eq!(player.record_count(), 1);
535 }
536}