1use std::io::{BufRead, Read};
12
13use thiserror::Error;
14
15use crate::agentlog::Record;
16
17pub const DEFAULT_MAX_LINE_BYTES: usize = 16 * 1024 * 1024;
26
27pub const DEFAULT_MAX_TOTAL_BYTES: usize = 1024 * 1024 * 1024;
32
33#[derive(Debug, Error)]
35pub enum ParseError {
36 #[error("io error on line {line}: {source}\nhint: check that the file is readable and not truncated mid-line")]
38 Io {
39 line: usize,
41 #[source]
43 source: std::io::Error,
44 },
45
46 #[error("parse error on line {line}: {source}\nhint: verify the record matches the envelope schema (SPEC §3)")]
48 Json {
49 line: usize,
51 #[source]
53 source: serde_json::Error,
54 },
55
56 #[error("line {line} exceeds byte limit ({bytes} > {limit})\nhint: raise via Parser::with_max_line_bytes or check for corrupt input")]
58 LineTooLarge {
59 line: usize,
61 bytes: usize,
63 limit: usize,
65 },
66
67 #[error("trace exceeds total byte limit ({bytes} > {limit})\nhint: raise via Parser::with_max_total_bytes or split the trace")]
69 TraceTooLarge {
70 bytes: usize,
72 limit: usize,
74 },
75}
76
77pub struct Parser<R> {
91 reader: R,
92 line: usize,
93 buffer: String,
94 done: bool,
95 max_line_bytes: usize,
96 max_total_bytes: usize,
97 total_bytes: usize,
98}
99
100impl<R: BufRead> Parser<R> {
101 pub fn new(reader: R) -> Self {
103 Self {
104 reader,
105 line: 0,
106 buffer: String::new(),
107 done: false,
108 max_line_bytes: DEFAULT_MAX_LINE_BYTES,
109 max_total_bytes: DEFAULT_MAX_TOTAL_BYTES,
110 total_bytes: 0,
111 }
112 }
113
114 pub fn with_max_line_bytes(mut self, n: usize) -> Self {
116 self.max_line_bytes = n;
117 self
118 }
119
120 pub fn with_max_total_bytes(mut self, n: usize) -> Self {
122 self.max_total_bytes = n;
123 self
124 }
125}
126
127impl<R: BufRead> Iterator for Parser<R> {
128 type Item = Result<Record, ParseError>;
129
130 fn next(&mut self) -> Option<Self::Item> {
131 if self.done {
132 return None;
133 }
134 loop {
135 self.buffer.clear();
136 self.line += 1;
137 let reader_ref: &mut R = &mut self.reader;
146 let mut bounded: std::io::Take<&mut R> =
147 Read::take(reader_ref, self.max_line_bytes as u64 + 1);
148 match bounded.read_line(&mut self.buffer) {
149 Ok(0) => {
150 self.done = true;
151 return None;
152 }
153 Ok(bytes) => {
154 self.total_bytes = self.total_bytes.saturating_add(bytes);
155 if bytes > self.max_line_bytes {
156 self.done = true;
157 return Some(Err(ParseError::LineTooLarge {
158 line: self.line,
159 bytes,
160 limit: self.max_line_bytes,
161 }));
162 }
163 if self.total_bytes > self.max_total_bytes {
164 self.done = true;
165 return Some(Err(ParseError::TraceTooLarge {
166 bytes: self.total_bytes,
167 limit: self.max_total_bytes,
168 }));
169 }
170 let trimmed = self.buffer.trim_end_matches(['\r', '\n']);
174 if trimmed.is_empty() {
175 continue;
176 }
177 let parsed = serde_json::from_str::<Record>(trimmed);
178 return Some(parsed.map_err(|e| ParseError::Json {
179 line: self.line,
180 source: e,
181 }));
182 }
183 Err(e) => {
184 self.done = true;
185 return Some(Err(ParseError::Io {
186 line: self.line,
187 source: e,
188 }));
189 }
190 }
191 }
192 }
193}
194
195pub fn parse_all<R: BufRead>(reader: R) -> Result<Vec<Record>, ParseError> {
199 Parser::new(reader).collect()
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::agentlog::{Kind, Record};
206 use serde_json::json;
207 use std::io::Cursor;
208
209 fn make_record(kind: Kind, payload: serde_json::Value) -> Record {
210 Record::new(kind, payload, "2026-04-21T10:00:00Z", None)
211 }
212
213 fn to_jsonl(records: &[Record]) -> String {
214 let mut out = String::new();
215 for r in records {
216 out.push_str(&serde_json::to_string(r).unwrap());
217 out.push('\n');
218 }
219 out
220 }
221
222 #[test]
223 fn parses_a_single_record() {
224 let r = make_record(Kind::ChatRequest, json!({"model": "a"}));
225 let wire = to_jsonl(std::slice::from_ref(&r));
226 let parsed: Vec<Record> = parse_all(Cursor::new(wire)).unwrap();
227 assert_eq!(parsed.len(), 1);
228 assert_eq!(parsed[0].id, r.id);
229 }
230
231 #[test]
232 fn parses_multiple_records_in_order() {
233 let records = vec![
234 make_record(Kind::Metadata, json!({"sdk": {"name": "shadow"}})),
235 make_record(Kind::ChatRequest, json!({"model": "a"})),
236 make_record(
237 Kind::ChatResponse,
238 json!({"model": "a", "stop_reason": "end_turn"}),
239 ),
240 ];
241 let wire = to_jsonl(&records);
242 let parsed = parse_all(Cursor::new(wire)).unwrap();
243 assert_eq!(parsed.len(), 3);
244 for (a, b) in records.iter().zip(parsed.iter()) {
245 assert_eq!(a.id, b.id);
246 assert_eq!(a.kind, b.kind);
247 }
248 }
249
250 #[test]
251 fn blank_lines_are_skipped() {
252 let r = make_record(Kind::ChatRequest, json!({"model": "a"}));
253 let wire = format!("\n{}\n\n", serde_json::to_string(&r).unwrap());
254 let parsed = parse_all(Cursor::new(wire)).unwrap();
255 assert_eq!(parsed.len(), 1);
256 }
257
258 #[test]
259 fn rejects_a_line_longer_than_the_configured_limit() {
260 let big = "x".repeat(2048);
262 let r = make_record(Kind::ChatRequest, json!({"model": "a", "pad": big}));
263 let wire = to_jsonl(std::slice::from_ref(&r));
264
265 let mut it = Parser::new(Cursor::new(wire)).with_max_line_bytes(1024);
268 match it.next().unwrap() {
269 Err(ParseError::LineTooLarge { limit, bytes, .. }) => {
270 assert_eq!(limit, 1024);
271 assert!(bytes > 1024);
272 }
273 other => panic!("expected LineTooLarge, got {:?}", other),
274 }
275 assert!(it.next().is_none());
277 }
278
279 #[test]
280 fn rejects_total_trace_exceeding_byte_cap() {
281 let pad = "y".repeat(700);
284 let r = make_record(Kind::ChatRequest, json!({"model": "a", "pad": pad}));
285 let wire = to_jsonl(&[r.clone(), r.clone(), r.clone()]);
286
287 let mut it = Parser::new(Cursor::new(wire)).with_max_total_bytes(1500);
288 assert!(it.next().unwrap().is_ok());
290 match it.next().unwrap() {
292 Err(ParseError::TraceTooLarge { limit, bytes }) => {
293 assert_eq!(limit, 1500);
294 assert!(bytes > 1500);
295 }
296 other => panic!("expected TraceTooLarge, got {:?}", other),
297 }
298 assert!(it.next().is_none());
299 }
300
301 #[test]
302 fn reports_line_number_on_malformed_line() {
303 let r = make_record(Kind::ChatRequest, json!({"model": "a"}));
305 let wire = format!(
306 "{}\nnot-valid-json\n{}\n",
307 serde_json::to_string(&r).unwrap(),
308 serde_json::to_string(&r).unwrap()
309 );
310 let mut it = Parser::new(Cursor::new(wire));
311 assert!(it.next().unwrap().is_ok()); match it.next().unwrap() {
313 Err(ParseError::Json { line, .. }) => assert_eq!(line, 2),
314 other => panic!("expected Json error on line 2, got {other:?}"),
315 }
316 }
317
318 #[test]
319 fn reports_line_number_on_schema_mismatch() {
320 let wire = r#"{"not_a_record": true}"#.to_string() + "\n";
322 let mut it = Parser::new(Cursor::new(wire));
323 match it.next().unwrap() {
324 Err(ParseError::Json { line, .. }) => assert_eq!(line, 1),
325 other => panic!("expected Json error on line 1, got {other:?}"),
326 }
327 }
328
329 #[test]
330 fn empty_input_yields_empty_vec() {
331 let parsed = parse_all(Cursor::new(String::new())).unwrap();
332 assert_eq!(parsed.len(), 0);
333 }
334
335 #[test]
336 fn handles_trailing_newline() {
337 let r = make_record(Kind::ChatRequest, json!({"model": "a"}));
338 let wire = serde_json::to_string(&r).unwrap() + "\n";
339 let parsed = parse_all(Cursor::new(wire)).unwrap();
340 assert_eq!(parsed.len(), 1);
341 }
342
343 #[test]
344 fn handles_crlf_line_endings() {
345 let r = make_record(Kind::ChatRequest, json!({"model": "a"}));
346 let wire = format!("{}\r\n", serde_json::to_string(&r).unwrap());
347 let parsed = parse_all(Cursor::new(wire)).unwrap();
348 assert_eq!(parsed.len(), 1);
349 }
350}