Skip to main content

atomr_agents_parser/
streaming.rs

1//! Streaming partial-JSON parser.
2//!
3//! Feed token deltas; the parser emits the most-recent best-effort
4//! `Value` parse after each chunk. Useful when a model is mid-stream
5//! and the caller wants to render fields as they finalize.
6
7use atomr_agents_core::{Result, Value};
8
9#[derive(Default)]
10pub struct StreamingPartialJsonParser {
11    buffer: String,
12}
13
14impl StreamingPartialJsonParser {
15    pub fn new() -> Self {
16        Self::default()
17    }
18
19    /// Feed a chunk of raw output. Returns the most-recent partial
20    /// parse if one can be produced; `Ok(None)` when not enough has
21    /// arrived yet.
22    pub fn feed(&mut self, chunk: &str) -> Result<Option<Value>> {
23        self.buffer.push_str(chunk);
24        Ok(try_parse_partial(&self.buffer))
25    }
26
27    pub fn finish(self) -> Result<Value> {
28        match try_parse_partial(&self.buffer) {
29            Some(v) => Ok(v),
30            None => Err(atomr_agents_core::AgentError::Tool(
31                "streaming json: no parseable content".into(),
32            )),
33        }
34    }
35}
36
37/// Best-effort partial parser. Walks the buffer balancing braces
38/// and brackets; returns the longest prefix that parses cleanly.
39fn try_parse_partial(buf: &str) -> Option<Value> {
40    let trimmed = buf.trim();
41    if trimmed.is_empty() {
42        return None;
43    }
44    // Try the full buffer first.
45    if let Ok(v) = serde_json::from_str(trimmed) {
46        return Some(v);
47    }
48    // Walk back, attempting to close at the last balanced point.
49    let mut depth_obj = 0i32;
50    let mut depth_arr = 0i32;
51    let mut in_string = false;
52    let mut last_close = None;
53    let bytes = trimmed.as_bytes();
54    let mut prev = 0u8;
55    for (i, c) in bytes.iter().enumerate() {
56        if in_string {
57            if *c == b'"' && prev != b'\\' {
58                in_string = false;
59            }
60            prev = *c;
61            continue;
62        }
63        match *c {
64            b'"' => in_string = true,
65            b'{' => depth_obj += 1,
66            b'}' => {
67                depth_obj -= 1;
68                if depth_obj == 0 && depth_arr == 0 {
69                    last_close = Some(i + 1);
70                }
71            }
72            b'[' => depth_arr += 1,
73            b']' => {
74                depth_arr -= 1;
75                if depth_obj == 0 && depth_arr == 0 {
76                    last_close = Some(i + 1);
77                }
78            }
79            _ => {}
80        }
81        prev = *c;
82    }
83    if let Some(end) = last_close {
84        if let Ok(v) = serde_json::from_str(&trimmed[..end]) {
85            return Some(v);
86        }
87    }
88    // Try repairing by closing open structures at the current point.
89    let mut repaired = trimmed.to_string();
90    while depth_obj > 0 {
91        repaired.push('}');
92        depth_obj -= 1;
93    }
94    while depth_arr > 0 {
95        repaired.push(']');
96        depth_arr -= 1;
97    }
98    serde_json::from_str(&repaired).ok()
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104
105    #[test]
106    fn emits_partial_object_after_first_field() {
107        let mut p = StreamingPartialJsonParser::new();
108        let v = p.feed(r#"{"name": "Alice""#).unwrap().unwrap();
109        assert_eq!(v["name"], "Alice");
110    }
111
112    #[test]
113    fn refines_value_as_more_arrives() {
114        let mut p = StreamingPartialJsonParser::new();
115        let _ = p.feed(r#"{"items": [1, 2"#).unwrap();
116        let v = p.feed(r#", 3]}"#).unwrap().unwrap();
117        assert_eq!(v["items"].as_array().unwrap().len(), 3);
118    }
119
120    #[test]
121    fn finish_returns_final_value() {
122        let mut p = StreamingPartialJsonParser::new();
123        let _ = p.feed(r#"{"k":"v"}"#).unwrap();
124        assert_eq!(p.finish().unwrap()["k"], "v");
125    }
126}