ralph_workflow/json_parser/
incremental_parser.rs1pub struct IncrementalNdjsonParser {
45 buffer: Vec<u8>,
47 depth: usize,
49 in_string: bool,
51 escape_next: bool,
53 started: bool,
55 results: Vec<String>,
57}
58
59const MAX_JSON_DEPTH: usize = 1000;
63
64impl IncrementalNdjsonParser {
65 pub const fn new() -> Self {
67 Self {
68 buffer: Vec::new(),
69 depth: 0,
70 in_string: false,
71 escape_next: false,
72 started: false,
73 results: Vec::new(),
74 }
75 }
76
77 pub fn feed(self, byte: u8) -> Self {
90 self.process_byte(byte)
91 }
92
93 pub fn feed_and_get_events(self, data: &[u8]) -> (Self, Vec<String>) {
94 let parser = data.iter().fold(self, |acc, &byte| acc.process_byte(byte));
95 let (results, empty_parser) = extract_results(parser);
96 (empty_parser, results)
97 }
98
99 pub fn drain_results(&mut self) -> Vec<String> {
100 std::mem::take(&mut self.results)
101 }
102
103 #[must_use]
105 pub fn get_results(&self) -> Vec<String> {
106 self.results.clone()
107 }
108
109 #[cfg(test)]
113 pub fn clear(&mut self) {
114 self.buffer = Vec::new();
115 self.depth = 0;
116 self.in_string = false;
117 self.escape_next = false;
118 self.started = false;
119 }
120
121 #[must_use]
123 pub const fn is_parsing(&self) -> bool {
124 self.started
125 }
126
127 #[must_use]
149 pub fn finish(self) -> Option<String> {
150 String::from_utf8(self.buffer)
151 .ok()
152 .map(|s| s.trim().to_string())
153 .filter(|s| !s.is_empty())
154 }
155}
156
157include!("incremental_parser/io.rs");
159
160fn extract_results(parser: IncrementalNdjsonParser) -> (Vec<String>, IncrementalNdjsonParser) {
161 let IncrementalNdjsonParser {
162 buffer,
163 depth,
164 in_string,
165 escape_next,
166 started,
167 results,
168 } = parser;
169 let empty = IncrementalNdjsonParser {
170 buffer,
171 depth,
172 in_string,
173 escape_next,
174 started,
175 results: Vec::new(),
176 };
177 (results, empty)
178}
179
180impl Default for IncrementalNdjsonParser {
181 fn default() -> Self {
182 Self::new()
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 #[test]
191 fn test_incremental_parser_single_json() {
192 let parser = IncrementalNdjsonParser::new();
193 let (_, events) = parser.feed_and_get_events(b"{\"type\": \"delta\"}\n");
194 assert_eq!(events.len(), 1);
195 assert_eq!(events[0], "{\"type\": \"delta\"}");
196 }
197
198 #[test]
199 fn test_incremental_parser_split_json() {
200 let parser = IncrementalNdjsonParser::new();
201
202 let (parser, events1) = parser.feed_and_get_events(b"{\"type\": \"de");
203 assert_eq!(events1.len(), 0);
204
205 let (_, events2) = parser.feed_and_get_events(b"lta\"}\n");
206 assert_eq!(events2.len(), 1);
207 assert_eq!(events2[0], "{\"type\": \"delta\"}");
208 }
209
210 #[test]
211 fn test_incremental_parser_multiple_jsons() {
212 let parser = IncrementalNdjsonParser::new();
213 let input = b"{\"type\": \"delta\"}\n{\"type\": \"done\"}\n";
214 let (_, events) = parser.feed_and_get_events(input);
215 assert_eq!(events.len(), 2);
216 assert_eq!(events[0], "{\"type\": \"delta\"}");
217 assert_eq!(events[1], "{\"type\": \"done\"}");
218 }
219
220 #[test]
221 fn test_incremental_parser_nested_json() {
222 let parser = IncrementalNdjsonParser::new();
223 let input = b"{\"type\": \"delta\", \"data\": {\"nested\": true}}\n";
224 let (_, events) = parser.feed_and_get_events(input);
225 assert_eq!(events.len(), 1);
226 assert!(events[0].contains("\"nested\": true"));
227 }
228
229 #[test]
230 fn test_incremental_parser_json_with_strings_containing_braces() {
231 let parser = IncrementalNdjsonParser::new();
232 let input = b"{\"text\": \"hello {world}\"}\n";
233 let (_, events) = parser.feed_and_get_events(input);
234 assert_eq!(events.len(), 1);
235 assert_eq!(events[0], "{\"text\": \"hello {world}\"}");
236 }
237
238 #[test]
239 fn test_incremental_parser_json_with_escaped_quotes() {
240 let parser = IncrementalNdjsonParser::new();
241 let input = b"{\"text\": \"hello \\\"world\\\"\"}\n";
242 let (_, events) = parser.feed_and_get_events(input);
243 assert_eq!(events.len(), 1);
244 assert!(events[0].contains("\\\""));
245 }
246
247 #[test]
248 fn test_incremental_parser_empty_input() {
249 let parser = IncrementalNdjsonParser::new();
250 let (_, events) = parser.feed_and_get_events(b"");
251 assert_eq!(events.len(), 0);
252 }
253
254 #[test]
255 fn test_incremental_parser_whitespace_only() {
256 let parser = IncrementalNdjsonParser::new();
257 let (_, events) = parser.feed_and_get_events(b" \n \n");
258 assert_eq!(events.len(), 0);
259 }
260
261 #[test]
262 fn test_incremental_parser_ignores_preamble_before_json() {
263 let parser = IncrementalNdjsonParser::new();
264 let input = b"[i] Joined existing CLIProxy\n{\"type\":\"delta\"}\n";
265 let (_, events) = parser.feed_and_get_events(input);
266 assert_eq!(events, vec!["{\"type\":\"delta\"}".to_string()]);
267 }
268
269 #[test]
270 fn test_incremental_parser_clear() {
271 let parser = IncrementalNdjsonParser::new();
272
273 let (mut parser, _) = parser.feed_and_get_events(b"{\"type\":");
274 assert!(parser.is_parsing());
275
276 parser.clear();
277 assert!(!parser.is_parsing());
278
279 let (_, events) = parser.feed_and_get_events(b"{\"type\": \"delta\"}\n");
280 assert_eq!(events.len(), 1);
281 }
282
283 #[test]
284 fn test_incremental_parser_byte_by_byte() {
285 let input = b"{\"type\": \"delta\"}\n";
286 let mut parser = IncrementalNdjsonParser::new();
287 let mut all_events = Vec::new();
288
289 for &byte in input {
290 parser = parser.feed(byte);
291 all_events.extend(parser.drain_results());
292 }
293
294 assert_eq!(all_events.len(), 1);
295 assert_eq!(all_events[0], "{\"type\": \"delta\"}");
296 }
297
298 #[test]
299 fn test_incremental_parser_multiline_json() {
300 let parser = IncrementalNdjsonParser::new();
301 let input = b"{\n \"type\": \"delta\",\n \"value\": 123\n}\n";
302 let (_, events) = parser.feed_and_get_events(input);
303 assert_eq!(events.len(), 1);
304 assert!(events[0].contains("\"type\": \"delta\""));
305 assert!(events[0].contains("\"value\": 123"));
306 }
307
308 #[test]
309 fn test_incremental_parser_depth_limit() {
310 let input = "{".repeat(MAX_JSON_DEPTH + 1);
311 let (parser, events) = IncrementalNdjsonParser::new().feed_and_get_events(input.as_bytes());
312 assert_eq!(events.len(), 0);
313 assert!(!parser.is_parsing());
314 }
315
316 #[test]
317 fn test_incremental_parser_finish_returns_buffered_data() {
318 let parser = IncrementalNdjsonParser::new();
319 let (parser, events) = parser.feed_and_get_events(b"{\"type\": \"incomplete\"");
320 assert_eq!(events, vec![] as Vec<String>);
321
322 let remaining = parser.finish();
323 assert_eq!(remaining, Some("{\"type\": \"incomplete\"".to_string()));
324 }
325
326 #[test]
327 fn test_incremental_parser_finish_returns_none_for_empty_buffer() {
328 let parser = IncrementalNdjsonParser::new();
329 assert_eq!(parser.finish(), None);
330 }
331
332 #[test]
333 fn test_incremental_parser_finish_returns_none_for_complete_json() {
334 let parser = IncrementalNdjsonParser::new();
335 let (parser, events) = parser.feed_and_get_events(b"{\"type\": \"delta\"}\n");
336 assert_eq!(events.len(), 1);
337
338 assert_eq!(parser.finish(), None);
339 }
340
341 #[test]
342 fn test_incremental_parser_finish_with_complete_json_no_newline() {
343 let parser = IncrementalNdjsonParser::new();
344 let (parser, events) = parser.feed_and_get_events(b"{\"type\": \"delta\"}");
345 assert_eq!(events.len(), 1);
346 assert_eq!(events[0], "{\"type\": \"delta\"}");
347
348 assert_eq!(parser.finish(), None);
349 }
350
351 #[test]
352 fn test_incremental_parser_finish_with_incomplete_json_missing_brace() {
353 let parser = IncrementalNdjsonParser::new();
354 let (parser, events) = parser.feed_and_get_events(b"{\"type\": \"delta\"");
355 assert_eq!(events.len(), 0);
356
357 let remaining = parser.finish();
358 assert_eq!(remaining, Some("{\"type\": \"delta\"".to_string()));
359 }
360}