Skip to main content

ralph_workflow/json_parser/
incremental_parser.rs

1//! Incremental NDJSON parser for real-time streaming.
2//!
3//! This module provides a parser that can process NDJSON (newline-delimited JSON)
4//! incrementally, yielding complete JSON objects as soon as they're received,
5//! without waiting for newlines.
6//!
7//! # Why Incremental Parsing?
8//!
9//! The standard approach of using `reader.lines()` blocks until a newline is received.
10//! For AI agents that buffer their output (like Codex), this causes all output to appear
11//! at once instead of streaming character-by-character.
12//!
13//! This parser detects complete JSON objects by tracking brace nesting depth,
14//! allowing true real-time streaming like `ChatGPT`.
15
16/// Incremental NDJSON parser that yields complete JSON objects as they arrive.
17///
18/// # How It Works
19///
20/// The parser maintains a buffer of received bytes and tracks brace nesting depth.
21/// When the depth returns to zero after seeing a closing brace, we have a complete
22/// JSON object that can be parsed.
23///
24/// # Depth Limit
25///
26/// The parser enforces a maximum nesting depth to prevent integer overflow
27/// from malicious input with extremely deep nesting. If the depth exceeds
28/// this limit, parsing fails with an error.
29///
30/// # Example
31///
32/// ```ignore
33/// let mut parser = IncrementalNdjsonParser::new();
34///
35/// // Feed first half of JSON
36/// let (parser, events1) = parser.feed_and_get_events(b"{\"type\": \"de");
37/// assert_eq!(events1.len(), 0);  // Not complete yet
38///
39/// // Feed second half
40/// let (_, events2) = parser.feed_and_get_events(b"lta\"}\n");
41/// assert_eq!(events2.len(), 1);  // Complete!
42/// assert_eq!(events2[0], "{\"type\": \"delta\"}");
43/// ```
44pub struct IncrementalNdjsonParser {
45    /// Buffer of received bytes that haven't been parsed yet
46    buffer: Vec<u8>,
47    /// Current brace nesting depth (0 means top-level)
48    depth: usize,
49    /// Whether we're inside a string literal
50    in_string: bool,
51    /// Whether the next character is escaped
52    escape_next: bool,
53    /// Whether we've seen at least one opening brace (started parsing JSON)
54    started: bool,
55    /// Complete JSON objects extracted so far
56    results: Vec<String>,
57}
58
59/// Maximum allowed nesting depth for JSON objects.
60/// This prevents integer overflow from malicious input with extremely deep nesting.
61/// Most well-formed JSON has nesting depth < 20, so 1000 is a conservative limit.
62const MAX_JSON_DEPTH: usize = 1000;
63
64impl IncrementalNdjsonParser {
65    /// Create a new incremental NDJSON parser.
66    pub const fn new() -> Self {
67        Self {
68            buffer: Vec::new(),
69            depth: 0,
70            in_string: false,
71            escape_next: false,
72            started: false,
73            results: Vec::new(),
74        }
75    }
76
77    /// Feed bytes into the parser, returning any complete JSON objects found.
78    ///
79    /// This method processes the input bytes and extracts complete JSON objects.
80    /// Multiple JSON objects may be returned from a single call if they're all complete.
81    ///
82    /// # Arguments
83    ///
84    /// * `data` - Bytes to feed into the parser
85    ///
86    /// # Returns
87    ///
88    /// A tuple of (updated parser, vector of complete JSON strings), in the order they were completed.
89    pub fn feed(self, byte: u8) -> Self {
90        self.process_byte(byte)
91    }
92
93    pub fn feed_and_get_events(self, data: &[u8]) -> (Self, Vec<String>) {
94        let parser = data.iter().fold(self, |acc, &byte| acc.process_byte(byte));
95        let (results, empty_parser) = extract_results(parser);
96        (empty_parser, results)
97    }
98
99    pub fn drain_results(&mut self) -> Vec<String> {
100        std::mem::take(&mut self.results)
101    }
102
103    /// Get any complete JSON objects extracted so far.
104    #[must_use]
105    pub fn get_results(&self) -> Vec<String> {
106        self.results.clone()
107    }
108
109    /// Clear the internal buffer.
110    ///
111    /// This can be useful for error recovery when invalid data is encountered.
112    #[cfg(test)]
113    pub fn clear(&mut self) {
114        self.buffer = Vec::new();
115        self.depth = 0;
116        self.in_string = false;
117        self.escape_next = false;
118        self.started = false;
119    }
120
121    /// Check if the parser is currently inside a JSON object.
122    #[must_use]
123    pub const fn is_parsing(&self) -> bool {
124        self.started
125    }
126
127    /// Finalize parsing and return any remaining buffered data.
128    ///
129    /// This method should be called when the input stream ends to retrieve
130    /// any incomplete JSON that was buffered. This is important for handling
131    /// cases where the last line of a file doesn't have a trailing newline
132    /// or where a complete JSON object was received but not yet extracted.
133    ///
134    /// # Returns
135    ///
136    /// Any remaining buffered data as a string if non-empty, or None if buffer is empty.
137    ///
138    /// # Example
139    ///
140    /// ```ignore
141    /// let mut parser = IncrementalNdjsonParser::new();
142    /// let (_, _) = parser.feed_and_get_events(b"{\"type\": \"delta\"}\n{\"type\": \"incomplete\"");
143    /// // When stream ends, get any remaining buffered data
144    /// if let Some(remaining) = parser.finish() {
145    ///     println!("Remaining: {}", remaining);
146    /// }
147    /// ```
148    #[must_use]
149    pub fn finish(self) -> Option<String> {
150        String::from_utf8(self.buffer)
151            .ok()
152            .map(|s| s.trim().to_string())
153            .filter(|s| !s.is_empty())
154    }
155}
156
157// Include boundary module for mutable byte processing.
158include!("incremental_parser/io.rs");
159
160fn extract_results(parser: IncrementalNdjsonParser) -> (Vec<String>, IncrementalNdjsonParser) {
161    let IncrementalNdjsonParser {
162        buffer,
163        depth,
164        in_string,
165        escape_next,
166        started,
167        results,
168    } = parser;
169    let empty = IncrementalNdjsonParser {
170        buffer,
171        depth,
172        in_string,
173        escape_next,
174        started,
175        results: Vec::new(),
176    };
177    (results, empty)
178}
179
180impl Default for IncrementalNdjsonParser {
181    fn default() -> Self {
182        Self::new()
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    #[test]
191    fn test_incremental_parser_single_json() {
192        let parser = IncrementalNdjsonParser::new();
193        let (_, events) = parser.feed_and_get_events(b"{\"type\": \"delta\"}\n");
194        assert_eq!(events.len(), 1);
195        assert_eq!(events[0], "{\"type\": \"delta\"}");
196    }
197
198    #[test]
199    fn test_incremental_parser_split_json() {
200        let parser = IncrementalNdjsonParser::new();
201
202        let (parser, events1) = parser.feed_and_get_events(b"{\"type\": \"de");
203        assert_eq!(events1.len(), 0);
204
205        let (_, events2) = parser.feed_and_get_events(b"lta\"}\n");
206        assert_eq!(events2.len(), 1);
207        assert_eq!(events2[0], "{\"type\": \"delta\"}");
208    }
209
210    #[test]
211    fn test_incremental_parser_multiple_jsons() {
212        let parser = IncrementalNdjsonParser::new();
213        let input = b"{\"type\": \"delta\"}\n{\"type\": \"done\"}\n";
214        let (_, events) = parser.feed_and_get_events(input);
215        assert_eq!(events.len(), 2);
216        assert_eq!(events[0], "{\"type\": \"delta\"}");
217        assert_eq!(events[1], "{\"type\": \"done\"}");
218    }
219
220    #[test]
221    fn test_incremental_parser_nested_json() {
222        let parser = IncrementalNdjsonParser::new();
223        let input = b"{\"type\": \"delta\", \"data\": {\"nested\": true}}\n";
224        let (_, events) = parser.feed_and_get_events(input);
225        assert_eq!(events.len(), 1);
226        assert!(events[0].contains("\"nested\": true"));
227    }
228
229    #[test]
230    fn test_incremental_parser_json_with_strings_containing_braces() {
231        let parser = IncrementalNdjsonParser::new();
232        let input = b"{\"text\": \"hello {world}\"}\n";
233        let (_, events) = parser.feed_and_get_events(input);
234        assert_eq!(events.len(), 1);
235        assert_eq!(events[0], "{\"text\": \"hello {world}\"}");
236    }
237
238    #[test]
239    fn test_incremental_parser_json_with_escaped_quotes() {
240        let parser = IncrementalNdjsonParser::new();
241        let input = b"{\"text\": \"hello \\\"world\\\"\"}\n";
242        let (_, events) = parser.feed_and_get_events(input);
243        assert_eq!(events.len(), 1);
244        assert!(events[0].contains("\\\""));
245    }
246
247    #[test]
248    fn test_incremental_parser_empty_input() {
249        let parser = IncrementalNdjsonParser::new();
250        let (_, events) = parser.feed_and_get_events(b"");
251        assert_eq!(events.len(), 0);
252    }
253
254    #[test]
255    fn test_incremental_parser_whitespace_only() {
256        let parser = IncrementalNdjsonParser::new();
257        let (_, events) = parser.feed_and_get_events(b"   \n  \n");
258        assert_eq!(events.len(), 0);
259    }
260
261    #[test]
262    fn test_incremental_parser_ignores_preamble_before_json() {
263        let parser = IncrementalNdjsonParser::new();
264        let input = b"[i] Joined existing CLIProxy\n{\"type\":\"delta\"}\n";
265        let (_, events) = parser.feed_and_get_events(input);
266        assert_eq!(events, vec!["{\"type\":\"delta\"}".to_string()]);
267    }
268
269    #[test]
270    fn test_incremental_parser_clear() {
271        let parser = IncrementalNdjsonParser::new();
272
273        let (mut parser, _) = parser.feed_and_get_events(b"{\"type\":");
274        assert!(parser.is_parsing());
275
276        parser.clear();
277        assert!(!parser.is_parsing());
278
279        let (_, events) = parser.feed_and_get_events(b"{\"type\": \"delta\"}\n");
280        assert_eq!(events.len(), 1);
281    }
282
283    #[test]
284    fn test_incremental_parser_byte_by_byte() {
285        let input = b"{\"type\": \"delta\"}\n";
286        let mut parser = IncrementalNdjsonParser::new();
287        let mut all_events = Vec::new();
288
289        for &byte in input {
290            parser = parser.feed(byte);
291            all_events.extend(parser.drain_results());
292        }
293
294        assert_eq!(all_events.len(), 1);
295        assert_eq!(all_events[0], "{\"type\": \"delta\"}");
296    }
297
298    #[test]
299    fn test_incremental_parser_multiline_json() {
300        let parser = IncrementalNdjsonParser::new();
301        let input = b"{\n  \"type\": \"delta\",\n  \"value\": 123\n}\n";
302        let (_, events) = parser.feed_and_get_events(input);
303        assert_eq!(events.len(), 1);
304        assert!(events[0].contains("\"type\": \"delta\""));
305        assert!(events[0].contains("\"value\": 123"));
306    }
307
308    #[test]
309    fn test_incremental_parser_depth_limit() {
310        let input = "{".repeat(MAX_JSON_DEPTH + 1);
311        let (parser, events) = IncrementalNdjsonParser::new().feed_and_get_events(input.as_bytes());
312        assert_eq!(events.len(), 0);
313        assert!(!parser.is_parsing());
314    }
315
316    #[test]
317    fn test_incremental_parser_finish_returns_buffered_data() {
318        let parser = IncrementalNdjsonParser::new();
319        let (parser, events) = parser.feed_and_get_events(b"{\"type\": \"incomplete\"");
320        assert_eq!(events, vec![] as Vec<String>);
321
322        let remaining = parser.finish();
323        assert_eq!(remaining, Some("{\"type\": \"incomplete\"".to_string()));
324    }
325
326    #[test]
327    fn test_incremental_parser_finish_returns_none_for_empty_buffer() {
328        let parser = IncrementalNdjsonParser::new();
329        assert_eq!(parser.finish(), None);
330    }
331
332    #[test]
333    fn test_incremental_parser_finish_returns_none_for_complete_json() {
334        let parser = IncrementalNdjsonParser::new();
335        let (parser, events) = parser.feed_and_get_events(b"{\"type\": \"delta\"}\n");
336        assert_eq!(events.len(), 1);
337
338        assert_eq!(parser.finish(), None);
339    }
340
341    #[test]
342    fn test_incremental_parser_finish_with_complete_json_no_newline() {
343        let parser = IncrementalNdjsonParser::new();
344        let (parser, events) = parser.feed_and_get_events(b"{\"type\": \"delta\"}");
345        assert_eq!(events.len(), 1);
346        assert_eq!(events[0], "{\"type\": \"delta\"}");
347
348        assert_eq!(parser.finish(), None);
349    }
350
351    #[test]
352    fn test_incremental_parser_finish_with_incomplete_json_missing_brace() {
353        let parser = IncrementalNdjsonParser::new();
354        let (parser, events) = parser.feed_and_get_events(b"{\"type\": \"delta\"");
355        assert_eq!(events.len(), 0);
356
357        let remaining = parser.finish();
358        assert_eq!(remaining, Some("{\"type\": \"delta\"".to_string()));
359    }
360}