Skip to main content

shadow_core/agentlog/
parser.rs

1//! Streaming JSONL parser for `.agentlog` files.
2//!
3//! Each line of an `.agentlog` file is one JSON object conforming to the
4//! envelope schema from SPEC §3. This module provides a zero-copy-ish
5//! iterator that yields one [`Record`] per line (or a typed error with the
6//! 1-based line number where the failure occurred). It does NOT enforce
7//! trace-level invariants like "first record is metadata" or "parents
8//! point backward" — those are enforced by callers that actually need
9//! them (the replay engine, the differ) rather than at parse time.
10
11use std::io::{BufRead, Read};
12
13use thiserror::Error;
14
15use crate::agentlog::Record;
16
17/// Maximum bytes per JSONL line. Default covers real agent traces
18/// with long tool outputs + conversational context (observed p99 is
19/// ~50 KB per record); the ceiling catches runaway inputs early
20/// rather than OOMing deep inside `read_line`.
21///
22/// Tunable via [`Parser::with_max_line_bytes`] — callers that ingest
23/// legitimately bigger records (multimodal payloads, massive tool
24/// results) can raise it explicitly.
25pub const DEFAULT_MAX_LINE_BYTES: usize = 16 * 1024 * 1024;
26
27/// Maximum total bytes per trace. Hard cap to prevent a malicious
28/// or truncated file from exhausting memory during a `parse_all`
29/// collect. At 1 GB, covers months of production traffic for most
30/// agents while still refusing obvious denial-of-service payloads.
31pub const DEFAULT_MAX_TOTAL_BYTES: usize = 1024 * 1024 * 1024;
32
33/// Errors produced by the streaming parser.
34#[derive(Debug, Error)]
35pub enum ParseError {
36    /// The underlying reader failed.
37    #[error("io error on line {line}: {source}\nhint: check that the file is readable and not truncated mid-line")]
38    Io {
39        /// 1-based line number where the error surfaced.
40        line: usize,
41        /// The underlying I/O error.
42        #[source]
43        source: std::io::Error,
44    },
45
46    /// A line was not valid JSON or did not match the [`Record`] schema.
47    #[error("parse error on line {line}: {source}\nhint: verify the record matches the envelope schema (SPEC §3)")]
48    Json {
49        /// 1-based line number where the error surfaced.
50        line: usize,
51        /// The underlying serde_json error.
52        #[source]
53        source: serde_json::Error,
54    },
55
56    /// A single JSONL line exceeded [`Parser`]'s per-line byte budget.
57    #[error("line {line} exceeds byte limit ({bytes} > {limit})\nhint: raise via Parser::with_max_line_bytes or check for corrupt input")]
58    LineTooLarge {
59        /// 1-based line number where the limit was hit.
60        line: usize,
61        /// Actual byte count observed.
62        bytes: usize,
63        /// Configured limit.
64        limit: usize,
65    },
66
67    /// The trace's total byte count exceeded the configured ceiling.
68    #[error("trace exceeds total byte limit ({bytes} > {limit})\nhint: raise via Parser::with_max_total_bytes or split the trace")]
69    TraceTooLarge {
70        /// Accumulated byte count at the point of failure.
71        bytes: usize,
72        /// Configured limit.
73        limit: usize,
74    },
75}
76
77/// Streaming parser. One instance processes one `.agentlog` stream.
78///
79/// Usage:
80/// ```no_run
81/// # use std::io::BufReader;
82/// # use std::fs::File;
83/// # use shadow_core::agentlog::parser::Parser;
84/// let file = File::open("trace.agentlog").unwrap();
85/// for result in Parser::new(BufReader::new(file)) {
86///     let record = result.unwrap();
87///     println!("{}", record.id);
88/// }
89/// ```
90pub struct Parser<R> {
91    reader: R,
92    line: usize,
93    buffer: String,
94    done: bool,
95    max_line_bytes: usize,
96    max_total_bytes: usize,
97    total_bytes: usize,
98}
99
100impl<R: BufRead> Parser<R> {
101    /// Wrap a buffered reader with default resource bounds.
102    pub fn new(reader: R) -> Self {
103        Self {
104            reader,
105            line: 0,
106            buffer: String::new(),
107            done: false,
108            max_line_bytes: DEFAULT_MAX_LINE_BYTES,
109            max_total_bytes: DEFAULT_MAX_TOTAL_BYTES,
110            total_bytes: 0,
111        }
112    }
113
114    /// Override the per-line byte cap. Default: [`DEFAULT_MAX_LINE_BYTES`].
115    pub fn with_max_line_bytes(mut self, n: usize) -> Self {
116        self.max_line_bytes = n;
117        self
118    }
119
120    /// Override the whole-trace byte cap. Default: [`DEFAULT_MAX_TOTAL_BYTES`].
121    pub fn with_max_total_bytes(mut self, n: usize) -> Self {
122        self.max_total_bytes = n;
123        self
124    }
125}
126
127impl<R: BufRead> Iterator for Parser<R> {
128    type Item = Result<Record, ParseError>;
129
130    fn next(&mut self) -> Option<Self::Item> {
131        if self.done {
132            return None;
133        }
134        loop {
135            self.buffer.clear();
136            self.line += 1;
137            // Bound the bytes read per line to protect against a
138            // malformed stream with no newline that would otherwise
139            // grow `buffer` unbounded. We pass a `&mut R` explicitly
140            // to `Read::take` (rather than using the inherent
141            // auto-deref method) so the reborrow — not the underlying
142            // reader — is what moves into the Take adapter. The
143            // resulting `Take<&mut R>` implements BufRead through the
144            // blanket impl, so `read_line` still works.
145            let reader_ref: &mut R = &mut self.reader;
146            let mut bounded: std::io::Take<&mut R> =
147                Read::take(reader_ref, self.max_line_bytes as u64 + 1);
148            match bounded.read_line(&mut self.buffer) {
149                Ok(0) => {
150                    self.done = true;
151                    return None;
152                }
153                Ok(bytes) => {
154                    self.total_bytes = self.total_bytes.saturating_add(bytes);
155                    if bytes > self.max_line_bytes {
156                        self.done = true;
157                        return Some(Err(ParseError::LineTooLarge {
158                            line: self.line,
159                            bytes,
160                            limit: self.max_line_bytes,
161                        }));
162                    }
163                    if self.total_bytes > self.max_total_bytes {
164                        self.done = true;
165                        return Some(Err(ParseError::TraceTooLarge {
166                            bytes: self.total_bytes,
167                            limit: self.max_total_bytes,
168                        }));
169                    }
170                    // Skip blank lines (defensive — valid `.agentlog` files
171                    // don't have them, but we'd rather tolerate a stray
172                    // trailing newline than error-spam on it).
173                    let trimmed = self.buffer.trim_end_matches(['\r', '\n']);
174                    if trimmed.is_empty() {
175                        continue;
176                    }
177                    let parsed = serde_json::from_str::<Record>(trimmed);
178                    return Some(parsed.map_err(|e| ParseError::Json {
179                        line: self.line,
180                        source: e,
181                    }));
182                }
183                Err(e) => {
184                    self.done = true;
185                    return Some(Err(ParseError::Io {
186                        line: self.line,
187                        source: e,
188                    }));
189                }
190            }
191        }
192    }
193}
194
195/// Parse an entire `.agentlog` stream into a `Vec<Record>`.
196///
197/// Collects into memory — use [`Parser`] directly for large files.
198pub fn parse_all<R: BufRead>(reader: R) -> Result<Vec<Record>, ParseError> {
199    Parser::new(reader).collect()
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use crate::agentlog::{Kind, Record};
206    use serde_json::json;
207    use std::io::Cursor;
208
209    fn make_record(kind: Kind, payload: serde_json::Value) -> Record {
210        Record::new(kind, payload, "2026-04-21T10:00:00Z", None)
211    }
212
213    fn to_jsonl(records: &[Record]) -> String {
214        let mut out = String::new();
215        for r in records {
216            out.push_str(&serde_json::to_string(r).unwrap());
217            out.push('\n');
218        }
219        out
220    }
221
222    #[test]
223    fn parses_a_single_record() {
224        let r = make_record(Kind::ChatRequest, json!({"model": "a"}));
225        let wire = to_jsonl(std::slice::from_ref(&r));
226        let parsed: Vec<Record> = parse_all(Cursor::new(wire)).unwrap();
227        assert_eq!(parsed.len(), 1);
228        assert_eq!(parsed[0].id, r.id);
229    }
230
231    #[test]
232    fn parses_multiple_records_in_order() {
233        let records = vec![
234            make_record(Kind::Metadata, json!({"sdk": {"name": "shadow"}})),
235            make_record(Kind::ChatRequest, json!({"model": "a"})),
236            make_record(
237                Kind::ChatResponse,
238                json!({"model": "a", "stop_reason": "end_turn"}),
239            ),
240        ];
241        let wire = to_jsonl(&records);
242        let parsed = parse_all(Cursor::new(wire)).unwrap();
243        assert_eq!(parsed.len(), 3);
244        for (a, b) in records.iter().zip(parsed.iter()) {
245            assert_eq!(a.id, b.id);
246            assert_eq!(a.kind, b.kind);
247        }
248    }
249
250    #[test]
251    fn blank_lines_are_skipped() {
252        let r = make_record(Kind::ChatRequest, json!({"model": "a"}));
253        let wire = format!("\n{}\n\n", serde_json::to_string(&r).unwrap());
254        let parsed = parse_all(Cursor::new(wire)).unwrap();
255        assert_eq!(parsed.len(), 1);
256    }
257
258    #[test]
259    fn rejects_a_line_longer_than_the_configured_limit() {
260        // Build a valid record with one field of ~2kB.
261        let big = "x".repeat(2048);
262        let r = make_record(Kind::ChatRequest, json!({"model": "a", "pad": big}));
263        let wire = to_jsonl(std::slice::from_ref(&r));
264
265        // With a 1024-byte limit, the parser should error out with
266        // `LineTooLarge` rather than silently pass through.
267        let mut it = Parser::new(Cursor::new(wire)).with_max_line_bytes(1024);
268        match it.next().unwrap() {
269            Err(ParseError::LineTooLarge { limit, bytes, .. }) => {
270                assert_eq!(limit, 1024);
271                assert!(bytes > 1024);
272            }
273            other => panic!("expected LineTooLarge, got {:?}", other),
274        }
275        // Parser is `done` after the error — no further records.
276        assert!(it.next().is_none());
277    }
278
279    #[test]
280    fn rejects_total_trace_exceeding_byte_cap() {
281        // Build three 800-byte records. With a 1500-byte total cap,
282        // the second one should trip TraceTooLarge.
283        let pad = "y".repeat(700);
284        let r = make_record(Kind::ChatRequest, json!({"model": "a", "pad": pad}));
285        let wire = to_jsonl(&[r.clone(), r.clone(), r.clone()]);
286
287        let mut it = Parser::new(Cursor::new(wire)).with_max_total_bytes(1500);
288        // First record: fits under 1500 cumulative.
289        assert!(it.next().unwrap().is_ok());
290        // Second record: pushes total over 1500 → error.
291        match it.next().unwrap() {
292            Err(ParseError::TraceTooLarge { limit, bytes }) => {
293                assert_eq!(limit, 1500);
294                assert!(bytes > 1500);
295            }
296            other => panic!("expected TraceTooLarge, got {:?}", other),
297        }
298        assert!(it.next().is_none());
299    }
300
301    #[test]
302    fn reports_line_number_on_malformed_line() {
303        // Two records, with a malformed middle line.
304        let r = make_record(Kind::ChatRequest, json!({"model": "a"}));
305        let wire = format!(
306            "{}\nnot-valid-json\n{}\n",
307            serde_json::to_string(&r).unwrap(),
308            serde_json::to_string(&r).unwrap()
309        );
310        let mut it = Parser::new(Cursor::new(wire));
311        assert!(it.next().unwrap().is_ok()); // line 1 ok
312        match it.next().unwrap() {
313            Err(ParseError::Json { line, .. }) => assert_eq!(line, 2),
314            other => panic!("expected Json error on line 2, got {other:?}"),
315        }
316    }
317
318    #[test]
319    fn reports_line_number_on_schema_mismatch() {
320        // Valid JSON but missing required envelope fields.
321        let wire = r#"{"not_a_record": true}"#.to_string() + "\n";
322        let mut it = Parser::new(Cursor::new(wire));
323        match it.next().unwrap() {
324            Err(ParseError::Json { line, .. }) => assert_eq!(line, 1),
325            other => panic!("expected Json error on line 1, got {other:?}"),
326        }
327    }
328
329    #[test]
330    fn empty_input_yields_empty_vec() {
331        let parsed = parse_all(Cursor::new(String::new())).unwrap();
332        assert_eq!(parsed.len(), 0);
333    }
334
335    #[test]
336    fn handles_trailing_newline() {
337        let r = make_record(Kind::ChatRequest, json!({"model": "a"}));
338        let wire = serde_json::to_string(&r).unwrap() + "\n";
339        let parsed = parse_all(Cursor::new(wire)).unwrap();
340        assert_eq!(parsed.len(), 1);
341    }
342
343    #[test]
344    fn handles_crlf_line_endings() {
345        let r = make_record(Kind::ChatRequest, json!({"model": "a"}));
346        let wire = format!("{}\r\n", serde_json::to_string(&r).unwrap());
347        let parsed = parse_all(Cursor::new(wire)).unwrap();
348        assert_eq!(parsed.len(), 1);
349    }
350}