use ralph_proto::{TerminalWrite, UxEvent};
use std::io::{self, BufRead, Write};
use std::time::Duration;
use crate::session_recorder::Record;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReplayMode {
Terminal,
Text,
}
#[derive(Debug, Clone)]
pub struct PlayerConfig {
pub speed: f32,
pub step_mode: bool,
pub replay_mode: ReplayMode,
pub event_filter: Vec<String>,
}
impl Default for PlayerConfig {
fn default() -> Self {
Self {
speed: 1.0,
step_mode: false,
replay_mode: ReplayMode::Terminal,
event_filter: Vec::new(),
}
}
}
impl PlayerConfig {
pub fn terminal() -> Self {
Self::default()
}
pub fn text() -> Self {
Self {
replay_mode: ReplayMode::Text,
..Default::default()
}
}
pub fn with_speed(mut self, speed: f32) -> Self {
self.speed = speed.max(0.1); self
}
pub fn with_step_mode(mut self) -> Self {
self.step_mode = true;
self
}
pub fn with_filter(mut self, events: Vec<String>) -> Self {
self.event_filter = events;
self
}
}
#[derive(Debug, Clone)]
pub struct TimestampedRecord {
pub record: Record,
pub offset_ms: u64,
}
#[derive(Debug)]
pub struct SessionPlayer {
records: Vec<TimestampedRecord>,
config: PlayerConfig,
position: usize,
}
impl SessionPlayer {
pub fn from_reader<R: BufRead>(reader: R) -> io::Result<Self> {
let mut records = Vec::new();
let mut first_ts: Option<u64> = None;
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let record: Record = serde_json::from_str(&line).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Invalid JSON record: {}", e),
)
})?;
let ts = record.ts;
let base_ts = *first_ts.get_or_insert(ts);
let offset_ms = ts.saturating_sub(base_ts);
records.push(TimestampedRecord { record, offset_ms });
}
Ok(Self {
records,
config: PlayerConfig::default(),
position: 0,
})
}
pub fn from_bytes(bytes: &[u8]) -> io::Result<Self> {
Self::from_reader(io::BufReader::new(bytes))
}
pub fn with_config(mut self, config: PlayerConfig) -> Self {
self.config = config;
self
}
pub fn record_count(&self) -> usize {
self.records.len()
}
pub fn records(&self) -> &[TimestampedRecord] {
&self.records
}
pub fn filter_by_event(&self, event_prefix: &str) -> Vec<&TimestampedRecord> {
self.records
.iter()
.filter(|r| r.record.event.starts_with(event_prefix))
.collect()
}
pub fn terminal_writes(&self) -> Vec<&TimestampedRecord> {
self.filter_by_event("ux.terminal.write")
}
pub fn metadata_events(&self) -> Vec<&TimestampedRecord> {
self.filter_by_event("_meta.")
}
pub fn bus_events(&self) -> Vec<&TimestampedRecord> {
self.filter_by_event("bus.")
}
pub fn reset(&mut self) {
self.position = 0;
}
pub fn replay_terminal<W: Write>(&mut self, writer: &mut W) -> io::Result<()> {
self.reset();
let mut last_offset_ms: u64 = 0;
let terminal_writes = self.terminal_writes();
for record in terminal_writes {
let delay_ms = record.offset_ms.saturating_sub(last_offset_ms);
last_offset_ms = record.offset_ms;
if !self.config.step_mode && delay_ms > 0 && self.config.speed > 0.0 {
let adjusted_delay = (delay_ms as f32 / self.config.speed) as u64;
if adjusted_delay > 0 {
std::thread::sleep(Duration::from_millis(adjusted_delay));
}
}
if let Ok(UxEvent::TerminalWrite(write)) = Self::parse_ux_event(&record.record) {
self.output_terminal_write(writer, &write)?;
}
if self.config.step_mode {
writer.flush()?;
let mut input = String::new();
io::stdin().read_line(&mut input)?;
}
}
writer.flush()
}
fn output_terminal_write<W: Write>(
&self,
writer: &mut W,
write: &TerminalWrite,
) -> io::Result<()> {
let bytes = write.decode_bytes().map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Failed to decode base64: {}", e),
)
})?;
match self.config.replay_mode {
ReplayMode::Terminal => {
writer.write_all(&bytes)?;
}
ReplayMode::Text => {
let stripped = strip_ansi(&bytes);
writer.write_all(&stripped)?;
}
}
Ok(())
}
fn parse_ux_event(record: &Record) -> Result<UxEvent, serde_json::Error> {
let tagged = serde_json::json!({
"event": record.event,
"data": record.data,
});
serde_json::from_value(tagged)
}
pub fn collect_terminal_output(&self) -> io::Result<String> {
let mut output = Vec::new();
for record in self.terminal_writes() {
if let Ok(UxEvent::TerminalWrite(write)) = Self::parse_ux_event(&record.record) {
let bytes = write.decode_bytes().map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Failed to decode base64: {}", e),
)
})?;
output.extend_from_slice(&bytes);
}
}
String::from_utf8(output).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Invalid UTF-8 in terminal output: {}", e),
)
})
}
pub fn collect_text_output(&self) -> io::Result<String> {
let raw = self.collect_terminal_output()?;
Ok(String::from_utf8_lossy(&strip_ansi(raw.as_bytes())).into_owned())
}
pub fn collect_ansi_escaped(&self) -> io::Result<String> {
let raw = self.collect_terminal_output()?;
Ok(escape_ansi(&raw))
}
}
fn strip_ansi(bytes: &[u8]) -> Vec<u8> {
let mut result = Vec::with_capacity(bytes.len());
let mut i = 0;
while i < bytes.len() {
if bytes[i] == 0x1b {
i += 1;
if i >= bytes.len() {
break;
}
match bytes[i] {
b'[' => {
i += 1;
while i < bytes.len() && !(0x40..=0x7E).contains(&bytes[i]) {
i += 1;
}
if i < bytes.len() {
i += 1; }
}
b']' => {
i += 1;
while i < bytes.len() {
if bytes[i] == 0x07 {
i += 1;
break;
}
if bytes[i] == 0x1b && i + 1 < bytes.len() && bytes[i + 1] == b'\\' {
i += 2;
break;
}
i += 1;
}
}
_ => {
i += 1;
}
}
} else {
result.push(bytes[i]);
i += 1;
}
}
result
}
fn escape_ansi(s: &str) -> String {
s.replace('\x1b', "\\x1b")
}
#[cfg(test)]
mod tests {
use super::*;
fn make_write_record(bytes: &[u8], stdout: bool, offset_ms: u64, base_ts: u64) -> String {
let write = TerminalWrite::new(bytes, stdout, offset_ms);
let record = Record {
ts: base_ts + offset_ms,
event: "ux.terminal.write".to_string(),
data: serde_json::to_value(&write).unwrap(),
};
serde_json::to_string(&record).unwrap()
}
#[test]
fn test_player_from_reader() {
let line1 = make_write_record(b"Hello", true, 0, 1000);
let line2 = make_write_record(b"World", true, 100, 1000);
let jsonl = format!("{}\n{}\n", line1, line2);
let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
assert_eq!(player.record_count(), 2);
assert_eq!(player.records[0].offset_ms, 0);
assert_eq!(player.records[1].offset_ms, 100);
}
#[test]
fn test_filter_by_event() {
let write = make_write_record(b"test", true, 0, 1000);
let meta = r#"{"ts":1000,"event":"_meta.loop_start","data":{"prompt_file":"PROMPT.md"}}"#;
let bus = r#"{"ts":1050,"event":"bus.publish","data":{"topic":"task.start"}}"#;
let jsonl = format!("{}\n{}\n{}\n", write, meta, bus);
let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
assert_eq!(player.terminal_writes().len(), 1);
assert_eq!(player.metadata_events().len(), 1);
assert_eq!(player.bus_events().len(), 1);
}
#[test]
fn test_collect_terminal_output() {
let line1 = make_write_record(b"Hello, ", true, 0, 1000);
let line2 = make_write_record(b"World!", true, 50, 1000);
let jsonl = format!("{}\n{}\n", line1, line2);
let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
let output = player.collect_terminal_output().unwrap();
assert_eq!(output, "Hello, World!");
}
#[test]
fn test_strip_ansi() {
let input = b"Hello, \x1b[32mWorld\x1b[0m!";
let stripped = strip_ansi(input);
assert_eq!(stripped, b"Hello, World!");
}
#[test]
fn test_strip_ansi_complex() {
let input = b"\x1b[1m\x1b[32mBold Green\x1b[0m Normal";
let stripped = strip_ansi(input);
assert_eq!(stripped, b"Bold Green Normal");
}
#[test]
fn test_escape_ansi() {
let input = "Hello \x1b[32mWorld\x1b[0m";
let escaped = escape_ansi(input);
assert_eq!(escaped, "Hello \\x1b[32mWorld\\x1b[0m");
}
#[test]
fn test_collect_text_output() {
let line = make_write_record(b"Hello \x1b[32mWorld\x1b[0m", true, 0, 1000);
let player = SessionPlayer::from_bytes(line.as_bytes()).unwrap();
let text = player.collect_text_output().unwrap();
assert_eq!(text, "Hello World");
}
#[test]
fn test_collect_ansi_escaped() {
let line = make_write_record(b"Hello \x1b[32mWorld\x1b[0m", true, 0, 1000);
let player = SessionPlayer::from_bytes(line.as_bytes()).unwrap();
let escaped = player.collect_ansi_escaped().unwrap();
assert_eq!(escaped, "Hello \\x1b[32mWorld\\x1b[0m");
}
#[test]
fn test_replay_terminal() {
let line1 = make_write_record(b"Hello", true, 0, 1000);
let line2 = make_write_record(b" World", true, 10, 1000);
let jsonl = format!("{}\n{}\n", line1, line2);
let mut player = SessionPlayer::from_bytes(jsonl.as_bytes())
.unwrap()
.with_config(PlayerConfig::terminal().with_speed(100.0));
let mut output = Vec::new();
player.replay_terminal(&mut output).unwrap();
assert_eq!(String::from_utf8(output).unwrap(), "Hello World");
}
#[test]
fn test_replay_text_mode() {
let line = make_write_record(b"\x1b[32mGreen\x1b[0m", true, 0, 1000);
let mut player = SessionPlayer::from_bytes(line.as_bytes())
.unwrap()
.with_config(PlayerConfig::text());
let mut output = Vec::new();
player.replay_terminal(&mut output).unwrap();
assert_eq!(String::from_utf8(output).unwrap(), "Green");
}
#[test]
fn test_player_config_builder() {
let config = PlayerConfig::terminal()
.with_speed(2.0)
.with_step_mode()
.with_filter(vec!["ux.".to_string()]);
assert!((config.speed - 2.0).abs() < f32::EPSILON);
assert!(config.step_mode);
assert_eq!(config.event_filter, vec!["ux."]);
}
#[test]
fn test_empty_input() {
let player = SessionPlayer::from_bytes(b"").unwrap();
assert_eq!(player.record_count(), 0);
}
#[test]
fn test_whitespace_lines_skipped() {
let line = make_write_record(b"test", true, 0, 1000);
let jsonl = format!("\n \n{}\n\n", line);
let player = SessionPlayer::from_bytes(jsonl.as_bytes()).unwrap();
assert_eq!(player.record_count(), 1);
}
}