use serde::{Deserialize, Serialize};
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::path::Path;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RecordedExchange {
pub recv_ts_ns: u128,
pub latency_ns: Option<u64>,
pub protocol: String,
pub request: serde_json::Value,
pub response: Option<serde_json::Value>,
}
impl RecordedExchange {
pub fn now(
protocol: impl Into<String>,
request: serde_json::Value,
response: Option<serde_json::Value>,
latency_ns: Option<u64>,
) -> Self {
let recv_ts_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
Self {
recv_ts_ns,
latency_ns,
protocol: protocol.into(),
request,
response,
}
}
}
pub struct RecordingWriter {
out: BufWriter<File>,
written: usize,
}
impl RecordingWriter {
pub fn open(path: impl AsRef<Path>) -> std::io::Result<Self> {
let f = OpenOptions::new().create(true).append(true).open(path)?;
Ok(Self {
out: BufWriter::new(f),
written: 0,
})
}
pub fn write(&mut self, ex: &RecordedExchange) -> std::io::Result<()> {
let line = serde_json::to_string(ex)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
self.out.write_all(line.as_bytes())?;
self.out.write_all(b"\n")?;
self.written += 1;
Ok(())
}
pub fn flush(&mut self) -> std::io::Result<()> {
self.out.flush()
}
pub fn count(&self) -> usize {
self.written
}
}
impl Drop for RecordingWriter {
fn drop(&mut self) {
let _ = self.out.flush();
}
}
pub struct ReplayReader {
inner: BufReader<File>,
}
impl ReplayReader {
pub fn open(path: impl AsRef<Path>) -> std::io::Result<Self> {
let f = File::open(path)?;
Ok(Self {
inner: BufReader::new(f),
})
}
}
impl Iterator for ReplayReader {
type Item = std::io::Result<RecordedExchange>;
fn next(&mut self) -> Option<Self::Item> {
let mut line = String::new();
match self.inner.read_line(&mut line) {
Ok(0) => None,
Ok(_) => {
if line.trim().is_empty() {
return self.next();
}
Some(
serde_json::from_str(line.trim())
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
)
}
Err(e) => Some(Err(e)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn temp_path(label: &str) -> std::path::PathBuf {
let dir = std::env::temp_dir();
dir.join(format!(
"rlx-rr-{label}-{}.jsonl",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
))
}
#[test]
fn round_trip_single_exchange() {
let path = temp_path("single");
{
let mut w = RecordingWriter::open(&path).unwrap();
let ex = RecordedExchange::now(
"openai.chat",
json!({ "model": "gpt-4o-mini", "messages": [] }),
Some(json!({ "id": "abc", "choices": [] })),
Some(123_456),
);
w.write(&ex).unwrap();
assert_eq!(w.count(), 1);
w.flush().unwrap();
}
let mut iter = ReplayReader::open(&path).unwrap();
let got = iter.next().unwrap().unwrap();
assert_eq!(got.protocol, "openai.chat");
assert_eq!(got.latency_ns, Some(123_456));
assert_eq!(got.request["model"], "gpt-4o-mini");
assert!(got.response.is_some());
assert!(iter.next().is_none());
let _ = std::fs::remove_file(&path);
}
#[test]
fn writes_and_reads_in_order() {
let path = temp_path("order");
{
let mut w = RecordingWriter::open(&path).unwrap();
for i in 0..5 {
w.write(&RecordedExchange::now(
"test",
json!({ "i": i }),
Some(json!({ "i": i })),
Some(i as u64 * 1000),
))
.unwrap();
}
}
let exchanges: Vec<RecordedExchange> = ReplayReader::open(&path)
.unwrap()
.collect::<std::io::Result<Vec<_>>>()
.unwrap();
assert_eq!(exchanges.len(), 5);
for (i, ex) in exchanges.iter().enumerate() {
assert_eq!(ex.request["i"], i);
}
let _ = std::fs::remove_file(&path);
}
#[test]
fn append_mode_preserves_existing() {
let path = temp_path("append");
{
let mut w = RecordingWriter::open(&path).unwrap();
w.write(&RecordedExchange::now(
"first",
json!({"a": 1}),
Some(json!({})),
Some(1),
))
.unwrap();
}
{
let mut w = RecordingWriter::open(&path).unwrap();
w.write(&RecordedExchange::now(
"second",
json!({"a": 2}),
Some(json!({})),
Some(2),
))
.unwrap();
}
let exchanges: Vec<_> = ReplayReader::open(&path)
.unwrap()
.collect::<std::io::Result<Vec<_>>>()
.unwrap();
assert_eq!(exchanges.len(), 2);
assert_eq!(exchanges[0].protocol, "first");
assert_eq!(exchanges[1].protocol, "second");
let _ = std::fs::remove_file(&path);
}
#[test]
fn missing_response_round_trips() {
let path = temp_path("missing-resp");
{
let mut w = RecordingWriter::open(&path).unwrap();
w.write(&RecordedExchange::now(
"abandoned",
json!({"q": "hello"}),
None,
None,
))
.unwrap();
}
let ex = ReplayReader::open(&path).unwrap().next().unwrap().unwrap();
assert!(ex.response.is_none());
assert!(ex.latency_ns.is_none());
let _ = std::fs::remove_file(&path);
}
}