use std::path::PathBuf;
use std::time::Duration;
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::sync::mpsc;
use crate::daemon::client::DaemonEvent;
pub async fn run(path: PathBuf, speed: f64, tx: mpsc::UnboundedSender<DaemonEvent>) -> Result<()> {
let file = File::open(&path)
.await
.with_context(|| format!("opening replay file {}", path.display()))?;
let mut lines = BufReader::new(file).lines();
let mut anchor: Option<(DateTime<Utc>, std::time::Instant)> = None;
let mut count = 0usize;
while let Some(line) = lines.next_line().await.context("reading replay line")? {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let event: DaemonEvent = match serde_json::from_str(trimmed) {
Ok(e) => e,
Err(e) => {
tracing::warn!(error = %e, "replay: skipping malformed line");
continue;
}
};
let ts = event
.data
.get("timestamp")
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<DateTime<Utc>>().ok());
if let Some(ts) = ts {
match anchor {
None => {
anchor = Some((ts, std::time::Instant::now()));
}
Some((first_ts, started)) => {
let event_offset = (ts - first_ts).to_std().unwrap_or(Duration::ZERO);
let scaled = event_offset.div_f64(speed.max(0.01));
let target = started + scaled;
let now = std::time::Instant::now();
if target > now {
tokio::time::sleep(target - now).await;
}
}
}
}
if tx.send(event).is_err() {
tracing::debug!("replay: TUI disconnected, stopping");
return Ok(());
}
count += 1;
}
tracing::info!(count, "replay complete");
let final_event = DaemonEvent {
event: "run_event".into(),
data: serde_json::json!({
"type": "log",
"run_id": "replay",
"level": "info",
"message": format!("replay complete · {count} events replayed"),
"timestamp": Utc::now(),
}),
};
let _ = tx.send(final_event);
Ok(())
}