rlx_runtime/
record_replay.rs1use serde::{Deserialize, Serialize};
32use std::fs::{File, OpenOptions};
33use std::io::{BufRead, BufReader, BufWriter, Write};
34use std::path::Path;
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct RecordedExchange {
39 pub recv_ts_ns: u128,
43 pub latency_ns: Option<u64>,
47 pub protocol: String,
50 pub request: serde_json::Value,
52 pub response: Option<serde_json::Value>,
55}
56
57impl RecordedExchange {
58 pub fn now(
60 protocol: impl Into<String>,
61 request: serde_json::Value,
62 response: Option<serde_json::Value>,
63 latency_ns: Option<u64>,
64 ) -> Self {
65 let recv_ts_ns = std::time::SystemTime::now()
66 .duration_since(std::time::UNIX_EPOCH)
67 .map(|d| d.as_nanos())
68 .unwrap_or(0);
69 Self {
70 recv_ts_ns,
71 latency_ns,
72 protocol: protocol.into(),
73 request,
74 response,
75 }
76 }
77}
78
79pub struct RecordingWriter {
83 out: BufWriter<File>,
84 written: usize,
85}
86
87impl RecordingWriter {
88 pub fn open(path: impl AsRef<Path>) -> std::io::Result<Self> {
90 let f = OpenOptions::new().create(true).append(true).open(path)?;
91 Ok(Self {
92 out: BufWriter::new(f),
93 written: 0,
94 })
95 }
96
97 pub fn write(&mut self, ex: &RecordedExchange) -> std::io::Result<()> {
100 let line = serde_json::to_string(ex)
101 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
102 self.out.write_all(line.as_bytes())?;
103 self.out.write_all(b"\n")?;
104 self.written += 1;
105 Ok(())
106 }
107
108 pub fn flush(&mut self) -> std::io::Result<()> {
109 self.out.flush()
110 }
111
112 pub fn count(&self) -> usize {
114 self.written
115 }
116}
117
118impl Drop for RecordingWriter {
119 fn drop(&mut self) {
120 let _ = self.out.flush();
121 }
122}
123
124pub struct ReplayReader {
126 inner: BufReader<File>,
127}
128
129impl ReplayReader {
130 pub fn open(path: impl AsRef<Path>) -> std::io::Result<Self> {
131 let f = File::open(path)?;
132 Ok(Self {
133 inner: BufReader::new(f),
134 })
135 }
136}
137
138impl Iterator for ReplayReader {
139 type Item = std::io::Result<RecordedExchange>;
140 fn next(&mut self) -> Option<Self::Item> {
141 let mut line = String::new();
142 match self.inner.read_line(&mut line) {
143 Ok(0) => None,
144 Ok(_) => {
145 if line.trim().is_empty() {
146 return self.next();
147 }
148 Some(
149 serde_json::from_str(line.trim())
150 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
151 )
152 }
153 Err(e) => Some(Err(e)),
154 }
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161 use serde_json::json;
162
163 fn temp_path(label: &str) -> std::path::PathBuf {
164 let dir = std::env::temp_dir();
165 dir.join(format!(
166 "rlx-rr-{label}-{}.jsonl",
167 std::time::SystemTime::now()
168 .duration_since(std::time::UNIX_EPOCH)
169 .unwrap()
170 .as_nanos()
171 ))
172 }
173
174 #[test]
175 fn round_trip_single_exchange() {
176 let path = temp_path("single");
177 {
178 let mut w = RecordingWriter::open(&path).unwrap();
179 let ex = RecordedExchange::now(
180 "openai.chat",
181 json!({ "model": "gpt-4o-mini", "messages": [] }),
182 Some(json!({ "id": "abc", "choices": [] })),
183 Some(123_456),
184 );
185 w.write(&ex).unwrap();
186 assert_eq!(w.count(), 1);
187 w.flush().unwrap();
188 }
189
190 let mut iter = ReplayReader::open(&path).unwrap();
191 let got = iter.next().unwrap().unwrap();
192 assert_eq!(got.protocol, "openai.chat");
193 assert_eq!(got.latency_ns, Some(123_456));
194 assert_eq!(got.request["model"], "gpt-4o-mini");
195 assert!(got.response.is_some());
196 assert!(iter.next().is_none());
197 let _ = std::fs::remove_file(&path);
198 }
199
200 #[test]
201 fn writes_and_reads_in_order() {
202 let path = temp_path("order");
203 {
204 let mut w = RecordingWriter::open(&path).unwrap();
205 for i in 0..5 {
206 w.write(&RecordedExchange::now(
207 "test",
208 json!({ "i": i }),
209 Some(json!({ "i": i })),
210 Some(i as u64 * 1000),
211 ))
212 .unwrap();
213 }
214 }
215
216 let exchanges: Vec<RecordedExchange> = ReplayReader::open(&path)
217 .unwrap()
218 .collect::<std::io::Result<Vec<_>>>()
219 .unwrap();
220 assert_eq!(exchanges.len(), 5);
221 for (i, ex) in exchanges.iter().enumerate() {
222 assert_eq!(ex.request["i"], i);
223 }
224 let _ = std::fs::remove_file(&path);
225 }
226
227 #[test]
228 fn append_mode_preserves_existing() {
229 let path = temp_path("append");
230 {
231 let mut w = RecordingWriter::open(&path).unwrap();
232 w.write(&RecordedExchange::now(
233 "first",
234 json!({"a": 1}),
235 Some(json!({})),
236 Some(1),
237 ))
238 .unwrap();
239 }
240 {
241 let mut w = RecordingWriter::open(&path).unwrap();
243 w.write(&RecordedExchange::now(
244 "second",
245 json!({"a": 2}),
246 Some(json!({})),
247 Some(2),
248 ))
249 .unwrap();
250 }
251
252 let exchanges: Vec<_> = ReplayReader::open(&path)
253 .unwrap()
254 .collect::<std::io::Result<Vec<_>>>()
255 .unwrap();
256 assert_eq!(exchanges.len(), 2);
257 assert_eq!(exchanges[0].protocol, "first");
258 assert_eq!(exchanges[1].protocol, "second");
259 let _ = std::fs::remove_file(&path);
260 }
261
262 #[test]
263 fn missing_response_round_trips() {
264 let path = temp_path("missing-resp");
265 {
266 let mut w = RecordingWriter::open(&path).unwrap();
267 w.write(&RecordedExchange::now(
268 "abandoned",
269 json!({"q": "hello"}),
270 None,
271 None,
272 ))
273 .unwrap();
274 }
275 let ex = ReplayReader::open(&path).unwrap().next().unwrap().unwrap();
276 assert!(ex.response.is_none());
277 assert!(ex.latency_ns.is_none());
278 let _ = std::fs::remove_file(&path);
279 }
280}