simple_agents_healing/
streaming.rs1use crate::parser::{JsonishParser, ParserConfig};
27use serde_json::Value;
28use simple_agent_type::coercion::CoercionResult;
29use simple_agent_type::error::HealingError;
30use std::collections::VecDeque;
31
32#[derive(Debug, Clone, PartialEq)]
36enum ParseState {
37 Outside,
39}
40
41pub struct StreamingParser {
64 buffer: String,
66 parsed_index: usize,
68 parser: JsonishParser,
70 extracted_values: VecDeque<Value>,
72 state: ParseState,
74}
75
76impl StreamingParser {
77 pub fn new() -> Self {
79 Self {
80 buffer: String::new(),
81 parsed_index: 0,
82 parser: JsonishParser::new(),
83 extracted_values: VecDeque::new(),
84 state: ParseState::Outside,
85 }
86 }
87
88 pub fn with_config(config: ParserConfig) -> Self {
90 Self {
91 buffer: String::new(),
92 parsed_index: 0,
93 parser: JsonishParser::with_config(config),
94 extracted_values: VecDeque::new(),
95 state: ParseState::Outside,
96 }
97 }
98
99 pub fn feed(&mut self, chunk: &str) -> Vec<Value> {
118 self.buffer.push_str(chunk);
119 self.extract_completed_values()
120 }
121
122 pub fn try_parse(&self) -> Option<CoercionResult<Value>> {
127 if self.buffer.trim().is_empty() {
128 return None;
129 }
130
131 self.parser.parse(&self.buffer).ok()
133 }
134
135 pub fn finalize(
157 self,
158 ) -> std::result::Result<CoercionResult<Value>, simple_agent_type::SimpleAgentsError> {
159 if self.buffer.trim().is_empty() {
160 return Err(simple_agent_type::SimpleAgentsError::Healing(
161 HealingError::ParseFailed {
162 error_message: "Empty buffer".to_string(),
163 input: String::new(),
164 },
165 ));
166 }
167
168 self.parser.parse(&self.buffer)
169 }
170
171 pub fn buffer_len(&self) -> usize {
173 self.buffer.len()
174 }
175
176 pub fn is_empty(&self) -> bool {
178 self.buffer.is_empty()
179 }
180
181 pub fn clear(&mut self) {
183 self.buffer.clear();
184 self.parsed_index = 0;
185 self.extracted_values.clear();
186 self.state = ParseState::Outside;
187 }
188
189 fn extract_completed_values(&mut self) -> Vec<Value> {
194 let mut result = Vec::new();
195
196 while let Some(value) = self.extracted_values.pop_front() {
203 result.push(value);
204 }
205
206 result
207 }
208}
209
210impl Default for StreamingParser {
211 fn default() -> Self {
212 Self::new()
213 }
214}
215
216pub struct PartialExtractor {
221 parser: StreamingParser,
222}
223
224impl PartialExtractor {
225 pub fn new() -> Self {
227 Self {
228 parser: StreamingParser::new(),
229 }
230 }
231
232 pub fn feed(&mut self, chunk: &str) -> Option<Value> {
236 self.parser.feed(chunk);
237 self.parser.try_parse().map(|result| result.value)
238 }
239
240 pub fn finalize(self) -> std::result::Result<Value, simple_agent_type::SimpleAgentsError> {
242 self.parser.finalize().map(|result| result.value)
243 }
244}
245
246impl Default for PartialExtractor {
247 fn default() -> Self {
248 Self::new()
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255
256 #[test]
257 fn test_streaming_parser_new() {
258 let parser = StreamingParser::new();
259 assert_eq!(parser.buffer_len(), 0);
260 assert!(parser.is_empty());
261 }
262
263 #[test]
264 fn test_feed_single_chunk_complete() {
265 let mut parser = StreamingParser::new();
266 parser.feed(r#"{"name": "Alice", "age": 30}"#);
267
268 let result = parser.finalize().unwrap();
269 assert_eq!(result.value["name"], "Alice");
270 assert_eq!(result.value["age"], 30);
271 }
272
273 #[test]
274 fn test_feed_multiple_chunks() {
275 let mut parser = StreamingParser::new();
276
277 parser.feed(r#"{"name": "#);
278 parser.feed(r#""Alice", "#);
279 parser.feed(r#""age": 30}"#);
280
281 let result = parser.finalize().unwrap();
282 assert_eq!(result.value["name"], "Alice");
283 assert_eq!(result.value["age"], 30);
284 }
285
286 #[test]
287 fn test_feed_with_nested_objects() {
288 let mut parser = StreamingParser::new();
289
290 parser.feed(r#"{"user": {"name": "#);
291 parser.feed(r#""Alice", "age": 30}, "#);
292 parser.feed(r#""active": true}"#);
293
294 let result = parser.finalize().unwrap();
295 assert_eq!(result.value["user"]["name"], "Alice");
296 assert_eq!(result.value["user"]["age"], 30);
297 assert_eq!(result.value["active"], true);
298 }
299
300 #[test]
301 fn test_feed_array() {
302 let mut parser = StreamingParser::new();
303
304 parser.feed(r#"["#);
305 parser.feed(r#"{"id": 1}, "#);
306 parser.feed(r#"{"id": 2}, "#);
307 parser.feed(r#"{"id": 3}]"#);
308
309 let result = parser.finalize().unwrap();
310 assert!(result.value.is_array());
311 assert_eq!(result.value[0]["id"], 1);
312 assert_eq!(result.value[1]["id"], 2);
313 assert_eq!(result.value[2]["id"], 3);
314 }
315
316 #[test]
317 fn test_try_parse_incomplete() {
318 let mut parser = StreamingParser::new();
319 parser.feed(r#"{"name": "Alice", "age":"#);
320
321 let result = parser.try_parse();
324 if let Some(parsed) = result {
325 assert_eq!(parsed.value["name"], "Alice");
327 }
328 }
329
330 #[test]
331 fn test_try_parse_complete() {
332 let mut parser = StreamingParser::new();
333 parser.feed(r#"{"name": "Alice", "age": 30}"#);
334
335 let result = parser.try_parse().unwrap();
337 assert_eq!(result.value["name"], "Alice");
338 assert_eq!(result.value["age"], 30);
339 }
340
341 #[test]
342 fn test_buffer_len() {
343 let mut parser = StreamingParser::new();
344 assert_eq!(parser.buffer_len(), 0);
345
346 parser.feed("hello");
347 assert_eq!(parser.buffer_len(), 5);
348
349 parser.feed(" world");
350 assert_eq!(parser.buffer_len(), 11);
351 }
352
353 #[test]
354 fn test_clear() {
355 let mut parser = StreamingParser::new();
356 parser.feed(r#"{"name": "Alice"}"#);
357 assert!(!parser.is_empty());
358
359 parser.clear();
360 assert!(parser.is_empty());
361 assert_eq!(parser.buffer_len(), 0);
362 }
363
364 #[test]
365 fn test_finalize_empty_buffer() {
366 let parser = StreamingParser::new();
367 let result = parser.finalize();
368 assert!(result.is_err());
369 }
370
371 #[test]
372 fn test_streaming_with_markdown() {
373 let mut parser = StreamingParser::new();
374
375 parser.feed("```json\n");
376 parser.feed(r#"{"name": "Alice"}"#);
377 parser.feed("\n```");
378
379 let result = parser.finalize().unwrap();
380 assert_eq!(result.value["name"], "Alice");
381 assert!(result.flags.iter().any(|f| matches!(
382 f,
383 simple_agent_type::coercion::CoercionFlag::StrippedMarkdown
384 )));
385 }
386
387 #[test]
388 fn test_streaming_with_trailing_comma() {
389 let mut parser = StreamingParser::new();
390
391 parser.feed(r#"{"name": "#);
392 parser.feed(r#""Alice","#);
393 parser.feed(r#"}"#);
394
395 let result = parser.finalize().unwrap();
396 assert_eq!(result.value["name"], "Alice");
397 assert!(result.flags.iter().any(|f| matches!(
398 f,
399 simple_agent_type::coercion::CoercionFlag::FixedTrailingComma
400 )));
401 }
402
403 #[test]
404 fn test_partial_extractor() {
405 let mut extractor = PartialExtractor::new();
406
407 extractor.feed(r#"{"name": "Alice", "#);
409 extractor.feed(r#""age": 30"#);
410 extractor.feed("}");
411
412 let result = extractor.finalize().unwrap();
413 assert_eq!(result["name"], "Alice");
414 assert_eq!(result["age"], 30);
415 }
416
417 #[test]
418 fn test_streaming_preserves_confidence() {
419 let mut parser = StreamingParser::new();
420
421 parser.feed(r#"{"name": "Alice"}"#);
423 let result = parser.finalize().unwrap();
424 assert_eq!(result.confidence, 1.0);
425 assert!(result.flags.is_empty());
426 }
427
428 #[test]
429 fn test_streaming_with_malformed_json() {
430 let mut parser = StreamingParser::new();
431
432 parser.feed(r#"{name: "#);
434 parser.feed(r#""Alice"}"#);
435
436 let result = parser.finalize().unwrap();
437 assert_eq!(result.value["name"], "Alice");
438 assert!(result.confidence < 1.0);
439 assert!(!result.flags.is_empty());
440 }
441}