use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::Mutex;
#[derive(Debug, Clone, PartialEq)]
pub enum RecordedEvent {
BeaconRecv {
ts: f64,
server: SocketAddr,
seq: u32,
version: u16,
},
ClientConnect { ts: f64, peer: SocketAddr },
ClientDisconnect { ts: f64, peer: SocketAddr },
}
impl RecordedEvent {
fn now_ts() -> f64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs_f64())
.unwrap_or(0.0)
}
pub fn beacon(server: SocketAddr, seq: u32, version: u16) -> Self {
Self::BeaconRecv {
ts: Self::now_ts(),
server,
seq,
version,
}
}
pub fn connect(peer: SocketAddr) -> Self {
Self::ClientConnect {
ts: Self::now_ts(),
peer,
}
}
pub fn disconnect(peer: SocketAddr) -> Self {
Self::ClientDisconnect {
ts: Self::now_ts(),
peer,
}
}
pub fn ts(&self) -> f64 {
match self {
Self::BeaconRecv { ts, .. }
| Self::ClientConnect { ts, .. }
| Self::ClientDisconnect { ts, .. } => *ts,
}
}
pub fn to_json(&self) -> String {
match self {
Self::BeaconRecv {
ts,
server,
seq,
version,
} => format!(
"{{\"ts\":{:.3},\"ev\":\"beacon_recv\",\"server\":\"{server}\",\"seq\":{seq},\"version\":{version}}}",
ts
),
Self::ClientConnect { ts, peer } => format!(
"{{\"ts\":{:.3},\"ev\":\"client_connect\",\"peer\":\"{peer}\"}}",
ts
),
Self::ClientDisconnect { ts, peer } => format!(
"{{\"ts\":{:.3},\"ev\":\"client_disconnect\",\"peer\":\"{peer}\"}}",
ts
),
}
}
pub fn from_json(line: &str) -> Option<Self> {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
return None;
}
let ts = json_field_f64(line, "ts")?;
let ev = json_field_str(line, "ev")?;
match ev.as_str() {
"beacon_recv" => Some(Self::BeaconRecv {
ts,
server: json_field_str(line, "server")?.parse().ok()?,
seq: json_field_u64(line, "seq")? as u32,
version: json_field_u64(line, "version")? as u16,
}),
"client_connect" => Some(Self::ClientConnect {
ts,
peer: json_field_str(line, "peer")?.parse().ok()?,
}),
"client_disconnect" => Some(Self::ClientDisconnect {
ts,
peer: json_field_str(line, "peer")?.parse().ok()?,
}),
_ => None,
}
}
}
#[derive(Clone)]
pub struct EventRecorder {
file: Arc<Mutex<tokio::fs::File>>,
}
impl EventRecorder {
pub async fn create(path: impl AsRef<Path>) -> std::io::Result<Self> {
let f = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.await?;
Ok(Self {
file: Arc::new(Mutex::new(f)),
})
}
pub async fn record(&self, ev: &RecordedEvent) {
let line = ev.to_json();
let mut f = self.file.lock().await;
let _ = f.write_all(line.as_bytes()).await;
let _ = f.write_all(b"\n").await;
}
pub async fn flush(&self) {
let mut f = self.file.lock().await;
let _ = f.flush().await;
}
}
pub async fn replay(
path: impl AsRef<Path>,
paced: bool,
mut sink: impl FnMut(&RecordedEvent),
) -> std::io::Result<usize> {
let f = tokio::fs::File::open(path).await?;
let mut reader = BufReader::new(f);
let mut line = String::new();
let mut count = 0usize;
let mut prior_ts: Option<f64> = None;
let start = std::time::Instant::now();
let start_ts: Option<f64> = None;
let mut start_ts = start_ts;
loop {
line.clear();
let n = reader.read_line(&mut line).await?;
if n == 0 {
break;
}
let Some(ev) = RecordedEvent::from_json(&line) else {
continue;
};
if paced {
let st = *start_ts.get_or_insert(ev.ts());
let target = std::time::Duration::from_secs_f64((ev.ts() - st).max(0.0));
let elapsed = start.elapsed();
if target > elapsed {
tokio::time::sleep(target - elapsed).await;
}
prior_ts = Some(ev.ts());
} else {
let _ = prior_ts;
}
sink(&ev);
count += 1;
}
Ok(count)
}
fn json_field_str(line: &str, key: &str) -> Option<String> {
let needle = format!("\"{key}\":\"");
let start = line.find(&needle)? + needle.len();
let rest = &line[start..];
let end = rest.find('"')?;
Some(rest[..end].to_string())
}
fn json_field_f64(line: &str, key: &str) -> Option<f64> {
let needle = format!("\"{key}\":");
let start = line.find(&needle)? + needle.len();
let rest = &line[start..];
let end = rest
.find(|c: char| !c.is_ascii_digit() && c != '.' && c != '-' && c != '+')
.unwrap_or(rest.len());
rest[..end].parse().ok()
}
fn json_field_u64(line: &str, key: &str) -> Option<u64> {
let f = json_field_f64(line, key)?;
Some(f as u64)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn round_trip_beacon() {
let ev = RecordedEvent::BeaconRecv {
ts: 1234.567,
server: "10.0.0.5:5064".parse().unwrap(),
seq: 42,
version: 13,
};
let s = ev.to_json();
let back = RecordedEvent::from_json(&s).unwrap();
assert_eq!(ev, back);
}
#[test]
fn round_trip_connect() {
let ev = RecordedEvent::ClientConnect {
ts: 99.0,
peer: "10.0.0.6:54311".parse().unwrap(),
};
let back = RecordedEvent::from_json(&ev.to_json()).unwrap();
assert_eq!(ev, back);
}
#[test]
fn unknown_event_returns_none() {
let line = r#"{"ts":1.0,"ev":"unknown"}"#;
assert!(RecordedEvent::from_json(line).is_none());
}
#[tokio::test]
async fn record_then_replay_round_trip() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("rec.jsonl");
let rec = EventRecorder::create(&path).await.unwrap();
rec.record(&RecordedEvent::beacon(
"10.0.0.5:5064".parse().unwrap(),
1,
13,
))
.await;
rec.record(&RecordedEvent::connect("10.0.0.6:54311".parse().unwrap()))
.await;
rec.flush().await;
drop(rec);
let mut seen: Vec<RecordedEvent> = Vec::new();
let n = replay(&path, false, |ev| seen.push(ev.clone()))
.await
.unwrap();
assert_eq!(n, 2);
assert!(matches!(seen[0], RecordedEvent::BeaconRecv { .. }));
assert!(matches!(seen[1], RecordedEvent::ClientConnect { .. }));
}
}