Skip to main content

rlx_runtime/
record_replay.rs

1// RLX — versatile ML compiler + runtime.
2// Copyright (C) 2026 Eugene Hauptmann, Nataliya Kosmyna.
3//
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, version 3.
7//
8// This program is distributed in the hope that it will be useful,
9// but WITHOUT ANY WARRANTY; without even the implied warranty of
10// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11// GNU General Public License for more details.
12//
13// You should have received a copy of the GNU General Public License
14// along with this program. If not, see <https://www.gnu.org/licenses/>.
15
16//! Record/replay middleware (plan #63).
17//!
18//! Borrowed from MAX's `serve/recordreplay/{middleware, replay,
19//! jsonl, schema}.py`. Capture (request, response) pairs to a
20//! JSONL file as production traffic flows through; replay them
21//! against a new build to detect regressions.
22//!
23//! Pure data layer:
24//!   - [`RecordingWriter`] appends [`RecordedExchange`] rows to a
25//!     file (one JSON object per line).
26//!   - [`ReplayReader`] reads them back in declaration order.
27//!   - The actual middleware that wraps a request handler is left
28//!     to the future serving crate — we ship the storage format +
29//!     reader/writer; the wiring is two function calls.
30
31use serde::{Deserialize, Serialize};
32use std::fs::{File, OpenOptions};
33use std::io::{BufRead, BufReader, BufWriter, Write};
34use std::path::Path;
35
36/// One captured request/response pair plus metadata.
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct RecordedExchange {
39    /// Wall-clock nanoseconds since UNIX epoch when the request
40    /// was received. Replay tools use this to compute relative
41    /// timing if they want to reproduce arrival rates.
42    pub recv_ts_ns: u128,
43    /// Latency of the response in nanoseconds (from request
44    /// receive to response send). `None` if the request didn't
45    /// complete (timeout / cancel / crash).
46    pub latency_ns: Option<u64>,
47    /// Free-form protocol tag (e.g. `"openai.chat"`,
48    /// `"kserve.embedding"`). Replay readers can filter on this.
49    pub protocol: String,
50    /// Wire-format request as JSON.
51    pub request: serde_json::Value,
52    /// Wire-format response as JSON, or `None` for errored / open
53    /// exchanges captured mid-flight.
54    pub response: Option<serde_json::Value>,
55}
56
57impl RecordedExchange {
58    /// Convenience constructor with the current wall clock.
59    pub fn now(
60        protocol: impl Into<String>,
61        request: serde_json::Value,
62        response: Option<serde_json::Value>,
63        latency_ns: Option<u64>,
64    ) -> Self {
65        let recv_ts_ns = std::time::SystemTime::now()
66            .duration_since(std::time::UNIX_EPOCH)
67            .map(|d| d.as_nanos())
68            .unwrap_or(0);
69        Self {
70            recv_ts_ns,
71            latency_ns,
72            protocol: protocol.into(),
73            request,
74            response,
75        }
76    }
77}
78
79/// Append-only JSONL writer. One file = one stream of exchanges.
80/// Cheap drop semantics: flush is best-effort on drop, explicit
81/// `flush` available for callers who need durability.
82pub struct RecordingWriter {
83    out: BufWriter<File>,
84    written: usize,
85}
86
87impl RecordingWriter {
88    /// Open `path`, creating it (or appending if it exists).
89    pub fn open(path: impl AsRef<Path>) -> std::io::Result<Self> {
90        let f = OpenOptions::new().create(true).append(true).open(path)?;
91        Ok(Self {
92            out: BufWriter::new(f),
93            written: 0,
94        })
95    }
96
97    /// Append one exchange. Errors propagate so callers can
98    /// downgrade to "log on failure, don't crash the request".
99    pub fn write(&mut self, ex: &RecordedExchange) -> std::io::Result<()> {
100        let line = serde_json::to_string(ex)
101            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
102        self.out.write_all(line.as_bytes())?;
103        self.out.write_all(b"\n")?;
104        self.written += 1;
105        Ok(())
106    }
107
108    pub fn flush(&mut self) -> std::io::Result<()> {
109        self.out.flush()
110    }
111
112    /// Number of exchanges written this session.
113    pub fn count(&self) -> usize {
114        self.written
115    }
116}
117
118impl Drop for RecordingWriter {
119    fn drop(&mut self) {
120        let _ = self.out.flush();
121    }
122}
123
124/// Iterator-style replay reader. Yields exchanges in file order.
125pub struct ReplayReader {
126    inner: BufReader<File>,
127}
128
129impl ReplayReader {
130    pub fn open(path: impl AsRef<Path>) -> std::io::Result<Self> {
131        let f = File::open(path)?;
132        Ok(Self {
133            inner: BufReader::new(f),
134        })
135    }
136}
137
138impl Iterator for ReplayReader {
139    type Item = std::io::Result<RecordedExchange>;
140    fn next(&mut self) -> Option<Self::Item> {
141        let mut line = String::new();
142        match self.inner.read_line(&mut line) {
143            Ok(0) => None,
144            Ok(_) => {
145                if line.trim().is_empty() {
146                    return self.next();
147                }
148                Some(
149                    serde_json::from_str(line.trim())
150                        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
151                )
152            }
153            Err(e) => Some(Err(e)),
154        }
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use serde_json::json;
162
163    fn temp_path(label: &str) -> std::path::PathBuf {
164        let dir = std::env::temp_dir();
165        dir.join(format!(
166            "rlx-rr-{label}-{}.jsonl",
167            std::time::SystemTime::now()
168                .duration_since(std::time::UNIX_EPOCH)
169                .unwrap()
170                .as_nanos()
171        ))
172    }
173
174    #[test]
175    fn round_trip_single_exchange() {
176        let path = temp_path("single");
177        {
178            let mut w = RecordingWriter::open(&path).unwrap();
179            let ex = RecordedExchange::now(
180                "openai.chat",
181                json!({ "model": "gpt-4o-mini", "messages": [] }),
182                Some(json!({ "id": "abc", "choices": [] })),
183                Some(123_456),
184            );
185            w.write(&ex).unwrap();
186            assert_eq!(w.count(), 1);
187            w.flush().unwrap();
188        }
189
190        let mut iter = ReplayReader::open(&path).unwrap();
191        let got = iter.next().unwrap().unwrap();
192        assert_eq!(got.protocol, "openai.chat");
193        assert_eq!(got.latency_ns, Some(123_456));
194        assert_eq!(got.request["model"], "gpt-4o-mini");
195        assert!(got.response.is_some());
196        assert!(iter.next().is_none());
197        let _ = std::fs::remove_file(&path);
198    }
199
200    #[test]
201    fn writes_and_reads_in_order() {
202        let path = temp_path("order");
203        {
204            let mut w = RecordingWriter::open(&path).unwrap();
205            for i in 0..5 {
206                w.write(&RecordedExchange::now(
207                    "test",
208                    json!({ "i": i }),
209                    Some(json!({ "i": i })),
210                    Some(i as u64 * 1000),
211                ))
212                .unwrap();
213            }
214        }
215
216        let exchanges: Vec<RecordedExchange> = ReplayReader::open(&path)
217            .unwrap()
218            .collect::<std::io::Result<Vec<_>>>()
219            .unwrap();
220        assert_eq!(exchanges.len(), 5);
221        for (i, ex) in exchanges.iter().enumerate() {
222            assert_eq!(ex.request["i"], i);
223        }
224        let _ = std::fs::remove_file(&path);
225    }
226
227    #[test]
228    fn append_mode_preserves_existing() {
229        let path = temp_path("append");
230        {
231            let mut w = RecordingWriter::open(&path).unwrap();
232            w.write(&RecordedExchange::now(
233                "first",
234                json!({"a": 1}),
235                Some(json!({})),
236                Some(1),
237            ))
238            .unwrap();
239        }
240        {
241            // Re-open: must append, not truncate.
242            let mut w = RecordingWriter::open(&path).unwrap();
243            w.write(&RecordedExchange::now(
244                "second",
245                json!({"a": 2}),
246                Some(json!({})),
247                Some(2),
248            ))
249            .unwrap();
250        }
251
252        let exchanges: Vec<_> = ReplayReader::open(&path)
253            .unwrap()
254            .collect::<std::io::Result<Vec<_>>>()
255            .unwrap();
256        assert_eq!(exchanges.len(), 2);
257        assert_eq!(exchanges[0].protocol, "first");
258        assert_eq!(exchanges[1].protocol, "second");
259        let _ = std::fs::remove_file(&path);
260    }
261
262    #[test]
263    fn missing_response_round_trips() {
264        let path = temp_path("missing-resp");
265        {
266            let mut w = RecordingWriter::open(&path).unwrap();
267            w.write(&RecordedExchange::now(
268                "abandoned",
269                json!({"q": "hello"}),
270                None,
271                None,
272            ))
273            .unwrap();
274        }
275        let ex = ReplayReader::open(&path).unwrap().next().unwrap().unwrap();
276        assert!(ex.response.is_none());
277        assert!(ex.latency_ns.is_none());
278        let _ = std::fs::remove_file(&path);
279    }
280}