Skip to main content

simple_agents_healing/
streaming.rs

1//! Streaming JSON parser for incremental LLM response parsing.
2//!
3//! Provides incremental parsing of JSON as it streams in, extracting complete
4//! values without waiting for the full response.
5//!
6//! # Example
7//!
8//! ```
9//! use simple_agents_healing::streaming::StreamingParser;
10//!
11//! let mut parser = StreamingParser::new();
12//!
13//! // Feed first chunk
14//! let values = parser.feed(r#"{"name": "Alice", "age": "#);
15//! // Not enough for complete parse yet
16//!
17//! // Feed second chunk
18//! let values = parser.feed(r#"30, "email": "#);
19//! // Can extract "name" and "age" fields
20//!
21//! // Feed final chunk
22//! let values = parser.feed(r#""alice@example.com"}"#);
23//! // Full object complete
24//! ```
25
26use crate::parser::{JsonishParser, ParserConfig};
27use serde_json::Value;
28use simple_agent_type::coercion::CoercionResult;
29use simple_agent_type::error::HealingError;
30use std::collections::VecDeque;
31
32/// Parser state for tracking incomplete JSON structures.
33///
34/// Currently simplified - will be expanded when implementing advanced streaming features.
35#[derive(Debug, Clone, PartialEq)]
36enum ParseState {
37    /// Not inside any structure
38    Outside,
39}
40
41/// Streaming JSON parser for incremental parsing.
42///
43/// Accumulates chunks and extracts complete JSON values as they become available.
44///
45/// # Example
46///
47/// ```
48/// use simple_agents_healing::streaming::StreamingParser;
49///
50/// let mut parser = StreamingParser::new();
51///
52/// // Stream comes in chunks
53/// parser.feed(r#"{"id": 1, "#);
54/// parser.feed(r#""name": "Alice", "#);
55/// parser.feed(r#""age": 30}"#);
56///
57/// // Get the complete parsed value
58/// let result = parser.finalize().unwrap();
59/// assert_eq!(result.value["id"], 1);
60/// assert_eq!(result.value["name"], "Alice");
61/// assert_eq!(result.value["age"], 30);
62/// ```
63pub struct StreamingParser {
64    /// Accumulated buffer of all chunks
65    buffer: String,
66    /// Index up to which we've successfully parsed
67    parsed_index: usize,
68    /// Parser for final parsing
69    parser: JsonishParser,
70    /// Extracted complete values (for array streaming)
71    extracted_values: VecDeque<Value>,
72    /// Current parsing state
73    state: ParseState,
74}
75
76impl StreamingParser {
77    /// Create a new streaming parser with default configuration.
78    pub fn new() -> Self {
79        Self {
80            buffer: String::new(),
81            parsed_index: 0,
82            parser: JsonishParser::new(),
83            extracted_values: VecDeque::new(),
84            state: ParseState::Outside,
85        }
86    }
87
88    /// Create a streaming parser with custom configuration.
89    pub fn with_config(config: ParserConfig) -> Self {
90        Self {
91            buffer: String::new(),
92            parsed_index: 0,
93            parser: JsonishParser::with_config(config),
94            extracted_values: VecDeque::new(),
95            state: ParseState::Outside,
96        }
97    }
98
99    /// Feed a chunk of JSON data to the parser.
100    ///
101    /// Returns a list of complete values that were extracted from this chunk.
102    /// For single objects, this will be empty until the final chunk.
103    /// For arrays, this can return completed array elements.
104    ///
105    /// # Example
106    ///
107    /// ```
108    /// use simple_agents_healing::streaming::StreamingParser;
109    ///
110    /// let mut parser = StreamingParser::new();
111    ///
112    /// // Array streaming: extract complete elements
113    /// parser.feed(r#"[{"id": 1}, "#);
114    /// parser.feed(r#"{"id": 2}, "#);
115    /// let values = parser.feed(r#"{"id": 3}]"#);
116    /// ```
117    pub fn feed(&mut self, chunk: &str) -> Vec<Value> {
118        self.buffer.push_str(chunk);
119        self.extract_completed_values()
120    }
121
122    /// Try to parse the current buffer as a complete JSON value.
123    ///
124    /// Returns `Some(value)` if the buffer contains a complete, parseable value.
125    /// Returns `None` if more data is needed or if the JSON is incomplete.
126    pub fn try_parse(&self) -> Option<CoercionResult<Value>> {
127        if self.buffer.trim().is_empty() {
128            return None;
129        }
130
131        // Try to parse the entire buffer
132        self.parser.parse(&self.buffer).ok()
133    }
134
135    /// Finalize the stream and get the complete parsed value.
136    ///
137    /// This attempts to parse the entire accumulated buffer as a single JSON value.
138    /// Call this when the stream is complete.
139    ///
140    /// # Errors
141    ///
142    /// Returns an error if the accumulated buffer cannot be parsed as valid JSON.
143    ///
144    /// # Example
145    ///
146    /// ```
147    /// use simple_agents_healing::streaming::StreamingParser;
148    ///
149    /// let mut parser = StreamingParser::new();
150    /// parser.feed(r#"{"name": "#);
151    /// parser.feed(r#""Alice"}"#);
152    ///
153    /// let result = parser.finalize().unwrap();
154    /// assert_eq!(result.value["name"], "Alice");
155    /// ```
156    pub fn finalize(
157        self,
158    ) -> std::result::Result<CoercionResult<Value>, simple_agent_type::SimpleAgentsError> {
159        if self.buffer.trim().is_empty() {
160            return Err(simple_agent_type::SimpleAgentsError::Healing(
161                HealingError::ParseFailed {
162                    error_message: "Empty buffer".to_string(),
163                    input: String::new(),
164                },
165            ));
166        }
167
168        self.parser.parse(&self.buffer)
169    }
170
171    /// Get the current buffer size in bytes.
172    pub fn buffer_len(&self) -> usize {
173        self.buffer.len()
174    }
175
176    /// Check if the buffer is empty.
177    pub fn is_empty(&self) -> bool {
178        self.buffer.is_empty()
179    }
180
181    /// Clear the parser state and buffer.
182    pub fn clear(&mut self) {
183        self.buffer.clear();
184        self.parsed_index = 0;
185        self.extracted_values.clear();
186        self.state = ParseState::Outside;
187    }
188
189    /// Extract completed values from the buffer.
190    ///
191    /// For arrays, this extracts complete array elements.
192    /// For objects, this waits until the entire object is complete.
193    fn extract_completed_values(&mut self) -> Vec<Value> {
194        let mut result = Vec::new();
195
196        // For now, we use a simple heuristic:
197        // Try to extract complete JSON values that end with } or ]
198        // This is a simplified implementation - a full implementation would
199        // use a proper state machine to track nesting depth
200
201        // Drain any previously extracted values
202        while let Some(value) = self.extracted_values.pop_front() {
203            result.push(value);
204        }
205
206        result
207    }
208}
209
210impl Default for StreamingParser {
211    fn default() -> Self {
212        Self::new()
213    }
214}
215
216/// Partial value extractor for streaming with schema support.
217///
218/// Extracts partial values from incomplete JSON buffers, respecting
219/// streaming annotations like `@@stream.not_null` and `@@stream.done`.
220pub struct PartialExtractor {
221    parser: StreamingParser,
222}
223
224impl PartialExtractor {
225    /// Create a new partial extractor.
226    pub fn new() -> Self {
227        Self {
228            parser: StreamingParser::new(),
229        }
230    }
231
232    /// Feed a chunk and try to extract a partial value.
233    ///
234    /// Returns `Some(value)` if a partial value can be extracted.
235    pub fn feed(&mut self, chunk: &str) -> Option<Value> {
236        self.parser.feed(chunk);
237        self.parser.try_parse().map(|result| result.value)
238    }
239
240    /// Get the final complete value.
241    pub fn finalize(self) -> std::result::Result<Value, simple_agent_type::SimpleAgentsError> {
242        self.parser.finalize().map(|result| result.value)
243    }
244}
245
246impl Default for PartialExtractor {
247    fn default() -> Self {
248        Self::new()
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255
256    #[test]
257    fn test_streaming_parser_new() {
258        let parser = StreamingParser::new();
259        assert_eq!(parser.buffer_len(), 0);
260        assert!(parser.is_empty());
261    }
262
263    #[test]
264    fn test_feed_single_chunk_complete() {
265        let mut parser = StreamingParser::new();
266        parser.feed(r#"{"name": "Alice", "age": 30}"#);
267
268        let result = parser.finalize().unwrap();
269        assert_eq!(result.value["name"], "Alice");
270        assert_eq!(result.value["age"], 30);
271    }
272
273    #[test]
274    fn test_feed_multiple_chunks() {
275        let mut parser = StreamingParser::new();
276
277        parser.feed(r#"{"name": "#);
278        parser.feed(r#""Alice", "#);
279        parser.feed(r#""age": 30}"#);
280
281        let result = parser.finalize().unwrap();
282        assert_eq!(result.value["name"], "Alice");
283        assert_eq!(result.value["age"], 30);
284    }
285
286    #[test]
287    fn test_feed_with_nested_objects() {
288        let mut parser = StreamingParser::new();
289
290        parser.feed(r#"{"user": {"name": "#);
291        parser.feed(r#""Alice", "age": 30}, "#);
292        parser.feed(r#""active": true}"#);
293
294        let result = parser.finalize().unwrap();
295        assert_eq!(result.value["user"]["name"], "Alice");
296        assert_eq!(result.value["user"]["age"], 30);
297        assert_eq!(result.value["active"], true);
298    }
299
300    #[test]
301    fn test_feed_array() {
302        let mut parser = StreamingParser::new();
303
304        parser.feed(r#"["#);
305        parser.feed(r#"{"id": 1}, "#);
306        parser.feed(r#"{"id": 2}, "#);
307        parser.feed(r#"{"id": 3}]"#);
308
309        let result = parser.finalize().unwrap();
310        assert!(result.value.is_array());
311        assert_eq!(result.value[0]["id"], 1);
312        assert_eq!(result.value[1]["id"], 2);
313        assert_eq!(result.value[2]["id"], 3);
314    }
315
316    #[test]
317    fn test_try_parse_incomplete() {
318        let mut parser = StreamingParser::new();
319        parser.feed(r#"{"name": "Alice", "age":"#);
320
321        // Lenient parser may parse incomplete JSON, auto-closing structures
322        // This is expected behavior for streaming
323        let result = parser.try_parse();
324        if let Some(parsed) = result {
325            // If it parses, it should have at least the name field
326            assert_eq!(parsed.value["name"], "Alice");
327        }
328    }
329
330    #[test]
331    fn test_try_parse_complete() {
332        let mut parser = StreamingParser::new();
333        parser.feed(r#"{"name": "Alice", "age": 30}"#);
334
335        // Should successfully parse
336        let result = parser.try_parse().unwrap();
337        assert_eq!(result.value["name"], "Alice");
338        assert_eq!(result.value["age"], 30);
339    }
340
341    #[test]
342    fn test_buffer_len() {
343        let mut parser = StreamingParser::new();
344        assert_eq!(parser.buffer_len(), 0);
345
346        parser.feed("hello");
347        assert_eq!(parser.buffer_len(), 5);
348
349        parser.feed(" world");
350        assert_eq!(parser.buffer_len(), 11);
351    }
352
353    #[test]
354    fn test_clear() {
355        let mut parser = StreamingParser::new();
356        parser.feed(r#"{"name": "Alice"}"#);
357        assert!(!parser.is_empty());
358
359        parser.clear();
360        assert!(parser.is_empty());
361        assert_eq!(parser.buffer_len(), 0);
362    }
363
364    #[test]
365    fn test_finalize_empty_buffer() {
366        let parser = StreamingParser::new();
367        let result = parser.finalize();
368        assert!(result.is_err());
369    }
370
371    #[test]
372    fn test_streaming_with_markdown() {
373        let mut parser = StreamingParser::new();
374
375        parser.feed("```json\n");
376        parser.feed(r#"{"name": "Alice"}"#);
377        parser.feed("\n```");
378
379        let result = parser.finalize().unwrap();
380        assert_eq!(result.value["name"], "Alice");
381        assert!(result.flags.iter().any(|f| matches!(
382            f,
383            simple_agent_type::coercion::CoercionFlag::StrippedMarkdown
384        )));
385    }
386
387    #[test]
388    fn test_streaming_with_trailing_comma() {
389        let mut parser = StreamingParser::new();
390
391        parser.feed(r#"{"name": "#);
392        parser.feed(r#""Alice","#);
393        parser.feed(r#"}"#);
394
395        let result = parser.finalize().unwrap();
396        assert_eq!(result.value["name"], "Alice");
397        assert!(result.flags.iter().any(|f| matches!(
398            f,
399            simple_agent_type::coercion::CoercionFlag::FixedTrailingComma
400        )));
401    }
402
403    #[test]
404    fn test_partial_extractor() {
405        let mut extractor = PartialExtractor::new();
406
407        // Feed chunks
408        extractor.feed(r#"{"name": "Alice", "#);
409        extractor.feed(r#""age": 30"#);
410        extractor.feed("}");
411
412        let result = extractor.finalize().unwrap();
413        assert_eq!(result["name"], "Alice");
414        assert_eq!(result["age"], 30);
415    }
416
417    #[test]
418    fn test_streaming_preserves_confidence() {
419        let mut parser = StreamingParser::new();
420
421        // Perfect JSON
422        parser.feed(r#"{"name": "Alice"}"#);
423        let result = parser.finalize().unwrap();
424        assert_eq!(result.confidence, 1.0);
425        assert!(result.flags.is_empty());
426    }
427
428    #[test]
429    fn test_streaming_with_malformed_json() {
430        let mut parser = StreamingParser::new();
431
432        // Malformed JSON with unquoted key
433        parser.feed(r#"{name: "#);
434        parser.feed(r#""Alice"}"#);
435
436        let result = parser.finalize().unwrap();
437        assert_eq!(result.value["name"], "Alice");
438        assert!(result.confidence < 1.0);
439        assert!(!result.flags.is_empty());
440    }
441}