atomr_agents_parser/
streaming.rs1use atomr_agents_core::{Result, Value};
8
9#[derive(Default)]
10pub struct StreamingPartialJsonParser {
11 buffer: String,
12}
13
14impl StreamingPartialJsonParser {
15 pub fn new() -> Self {
16 Self::default()
17 }
18
19 pub fn feed(&mut self, chunk: &str) -> Result<Option<Value>> {
23 self.buffer.push_str(chunk);
24 Ok(try_parse_partial(&self.buffer))
25 }
26
27 pub fn finish(self) -> Result<Value> {
28 match try_parse_partial(&self.buffer) {
29 Some(v) => Ok(v),
30 None => Err(atomr_agents_core::AgentError::Tool(
31 "streaming json: no parseable content".into(),
32 )),
33 }
34 }
35}
36
37fn try_parse_partial(buf: &str) -> Option<Value> {
40 let trimmed = buf.trim();
41 if trimmed.is_empty() {
42 return None;
43 }
44 if let Ok(v) = serde_json::from_str(trimmed) {
46 return Some(v);
47 }
48 let mut depth_obj = 0i32;
50 let mut depth_arr = 0i32;
51 let mut in_string = false;
52 let mut last_close = None;
53 let bytes = trimmed.as_bytes();
54 let mut prev = 0u8;
55 for (i, c) in bytes.iter().enumerate() {
56 if in_string {
57 if *c == b'"' && prev != b'\\' {
58 in_string = false;
59 }
60 prev = *c;
61 continue;
62 }
63 match *c {
64 b'"' => in_string = true,
65 b'{' => depth_obj += 1,
66 b'}' => {
67 depth_obj -= 1;
68 if depth_obj == 0 && depth_arr == 0 {
69 last_close = Some(i + 1);
70 }
71 }
72 b'[' => depth_arr += 1,
73 b']' => {
74 depth_arr -= 1;
75 if depth_obj == 0 && depth_arr == 0 {
76 last_close = Some(i + 1);
77 }
78 }
79 _ => {}
80 }
81 prev = *c;
82 }
83 if let Some(end) = last_close {
84 if let Ok(v) = serde_json::from_str(&trimmed[..end]) {
85 return Some(v);
86 }
87 }
88 let mut repaired = trimmed.to_string();
90 while depth_obj > 0 {
91 repaired.push('}');
92 depth_obj -= 1;
93 }
94 while depth_arr > 0 {
95 repaired.push(']');
96 depth_arr -= 1;
97 }
98 serde_json::from_str(&repaired).ok()
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104
105 #[test]
106 fn emits_partial_object_after_first_field() {
107 let mut p = StreamingPartialJsonParser::new();
108 let v = p.feed(r#"{"name": "Alice""#).unwrap().unwrap();
109 assert_eq!(v["name"], "Alice");
110 }
111
112 #[test]
113 fn refines_value_as_more_arrives() {
114 let mut p = StreamingPartialJsonParser::new();
115 let _ = p.feed(r#"{"items": [1, 2"#).unwrap();
116 let v = p.feed(r#", 3]}"#).unwrap().unwrap();
117 assert_eq!(v["items"].as_array().unwrap().len(), 3);
118 }
119
120 #[test]
121 fn finish_returns_final_value() {
122 let mut p = StreamingPartialJsonParser::new();
123 let _ = p.feed(r#"{"k":"v"}"#).unwrap();
124 assert_eq!(p.finish().unwrap()["k"], "v");
125 }
126}