Skip to main content

zeph_core/agent/speculative/
partial_json.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Streaming partial-JSON parser for speculative tool-call dispatch.
5//!
6//! The Anthropic SSE tool-use stream emits a single JSON object across many
7//! `input_json_delta` events. Standard JSON parsers (including
8//! `serde_json::StreamDeserializer`) wait for the closing brace before yielding
9//! any value — providing no benefit for speculative dispatch.
10//!
11//! `PartialJsonParser` is a ~120-line brace/string/escape state machine that
12//! accumulates delta strings and extracts top-level leaf keys whose values have
13//! been **fully closed** (primitives, fully closed nested objects/arrays).
14//! When all required fields of a tool's `input_schema` are present, the engine
15//! can speculatively dispatch the tool call without waiting for `ToolUseStop`.
16//!
17//! ## Invariants
18//!
19//! - **Escape state**: the escape flag is set after a literal `\` inside a string
20//!   and cleared after the following character, regardless of what that character is.
21//!   Multi-byte escape sequences (e.g. `\uXXXX`) are not individually validated;
22//!   the parser only tracks structural JSON tokens.
23//! - **Mid-array edge case**: array values at depth > 1 are treated as opaque; their
24//!   contents are never surfaced as `known_leaves`. Only top-level keys (depth == 1)
25//!   whose values close cleanly are included.
26//! - **No allocation on malformed input**: `Malformed` is returned immediately when
27//!   a structural invariant is violated; the buffer is not reallocated.
28
29#![allow(dead_code)]
30
31use serde_json::Map;
32
33/// Result of feeding accumulated JSON delta bytes to [`PartialJsonParser::push`].
34#[derive(Debug, Clone, PartialEq)]
35pub enum PrefixState {
36    /// Input is still inside an unterminated string, imbalanced braces, or ends mid-escape.
37    Incomplete,
38    /// A valid prefix has been parsed: the `known_leaves` map contains fully closed top-level
39    /// key-value pairs. `missing_required` lists required schema keys not yet seen.
40    ValidPrefix {
41        /// Top-level keys whose values are fully closed.
42        known_leaves: Map<String, serde_json::Value>,
43        /// Required tool input schema keys not yet present in the buffer.
44        missing_required: Vec<String>,
45    },
46    /// The buffer contains a sequence that cannot be a valid JSON object prefix.
47    Malformed,
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51enum Ctx {
52    TopObject,
53    InKey,
54    AfterKey,
55    InValue,
56    InStringValue,
57    InNestedValue { depth: u32 },
58}
59
60/// Streaming structural parser for partial Anthropic SSE tool-input JSON.
61///
62/// Feed each `InputJsonDelta` string via [`push`](Self::push). When the result is
63/// [`PrefixState::ValidPrefix`] with an empty `missing_required`, the engine may
64/// synthesize a `ToolCall` and speculatively dispatch it.
65///
66/// # Examples
67///
68/// ```rust
69/// use zeph_core::agent::speculative::partial_json::{PartialJsonParser, PrefixState};
70///
71/// let mut p = PartialJsonParser::new();
72/// // Simulate incremental SSE deltas
73/// p.push(r#"{"command": "ls "#);
74/// let state = p.push(r#"-la"}"#);
75/// assert!(matches!(state, PrefixState::ValidPrefix { .. }));
76/// ```
77pub struct PartialJsonParser {
78    buf: String,
79    required: Vec<String>,
80    /// Cached known leaves from the last successful scan.
81    known_cache: Map<String, serde_json::Value>,
82    /// Byte offset in `buf` up to which the cache is valid.
83    /// When `buf.len() > scan_watermark` we re-scan only the tail, rebuilding from the
84    /// cached state (H2: avoids O(N²) full-buffer rescan on every push).
85    scan_watermark: usize,
86}
87
88impl PartialJsonParser {
89    /// Create a new parser. Call [`set_required`](Self::set_required) to configure
90    /// the list of required schema keys before the first [`push`](Self::push).
91    #[must_use]
92    pub fn new() -> Self {
93        Self {
94            buf: String::new(),
95            required: Vec::new(),
96            known_cache: Map::new(),
97            scan_watermark: 0,
98        }
99    }
100
101    /// Set the list of keys required by the tool's `input_schema`.
102    ///
103    /// Keys in this list that are absent from the accumulated buffer are reported as
104    /// `missing_required` in [`PrefixState::ValidPrefix`].
105    pub fn set_required(&mut self, required: Vec<String>) {
106        self.required = required;
107    }
108
109    /// Append `delta` and re-scan the buffer.
110    ///
111    /// Returns the current [`PrefixState`]. May be called repeatedly; each call
112    /// replaces the previous result.
113    pub fn push(&mut self, delta: &str) -> PrefixState {
114        self.buf.push_str(delta);
115        self.scan()
116    }
117
118    /// Reset the parser for reuse after a commit or cancel.
119    pub fn reset(&mut self) {
120        self.buf.clear();
121        self.known_cache.clear();
122        self.scan_watermark = 0;
123    }
124
125    /// Scan `self.buf` and extract fully closed top-level key-value pairs.
126    ///
127    /// Uses a watermark cursor to avoid replaying already-scanned bytes (H2).
128    /// Previously confirmed key-value pairs are carried in `known_cache`; only the
129    /// bytes after `scan_watermark` are newly examined.
130    fn scan(&mut self) -> PrefixState {
131        let bytes = self.buf.as_bytes();
132        let len = bytes.len();
133
134        // On the first call, consume the opening '{'.
135        let mut i = if self.scan_watermark == 0 {
136            let start = skip_ws(bytes, 0);
137            if start >= len || bytes[start] != b'{' {
138                return if self.buf.trim().is_empty() {
139                    PrefixState::Incomplete
140                } else {
141                    PrefixState::Malformed
142                };
143            }
144            start + 1 // consume '{'
145        } else {
146            self.scan_watermark
147        };
148
149        // Start with previously confirmed pairs; we may add more this call.
150        let mut known = self.known_cache.clone();
151
152        loop {
153            i = skip_ws(bytes, i);
154            if i >= len {
155                break; // still incomplete
156            }
157
158            // End of object
159            if bytes[i] == b'}' {
160                self.scan_watermark = i + 1;
161                let missing = self.missing(&known);
162                self.known_cache.clone_from(&known);
163                return PrefixState::ValidPrefix {
164                    known_leaves: known,
165                    missing_required: missing,
166                };
167            }
168
169            // Comma between pairs
170            if bytes[i] == b',' {
171                i += 1;
172                i = skip_ws(bytes, i);
173                if i >= len {
174                    break;
175                }
176            }
177
178            // Key string
179            if bytes[i] != b'"' {
180                return PrefixState::Malformed;
181            }
182            let Some((key, after_key)) = read_string(bytes, i) else {
183                break; // incomplete string
184            };
185            i = after_key;
186            i = skip_ws(bytes, i);
187
188            if i >= len {
189                break;
190            }
191            if bytes[i] != b':' {
192                return PrefixState::Malformed;
193            }
194            i += 1; // consume ':'
195            i = skip_ws(bytes, i);
196
197            if i >= len {
198                break; // value not yet arrived
199            }
200
201            // Try to read a fully closed value
202            match read_value(bytes, i) {
203                ReadValue::Complete(value, end) => {
204                    known.insert(key, value);
205                    // Advance watermark: this pair is confirmed; next call starts here.
206                    self.scan_watermark = end;
207                    self.known_cache.clone_from(&known);
208                    i = end;
209                }
210                ReadValue::Incomplete => break,
211                ReadValue::Malformed => return PrefixState::Malformed,
212            }
213        }
214
215        // Buffer ended mid-object — partial but valid so far
216        let missing = self.missing(&known);
217        PrefixState::ValidPrefix {
218            known_leaves: known,
219            missing_required: missing,
220        }
221    }
222
223    fn missing(&self, known: &Map<String, serde_json::Value>) -> Vec<String> {
224        self.required
225            .iter()
226            .filter(|k| !known.contains_key(k.as_str()))
227            .cloned()
228            .collect()
229    }
230}
231
232impl Default for PartialJsonParser {
233    fn default() -> Self {
234        Self::new()
235    }
236}
237
238// --- Internal helpers -------------------------------------------------------
239
240fn skip_ws(bytes: &[u8], mut i: usize) -> usize {
241    while i < bytes.len() && matches!(bytes[i], b' ' | b'\t' | b'\r' | b'\n') {
242        i += 1;
243    }
244    i
245}
246
247/// Read a JSON string starting at `i` (which must point at `"`).
248/// Returns `(string_content, index_after_closing_quote)` or `None` if incomplete.
249///
250/// Operates on raw bytes to avoid the Latin-1 `b as char` cast that corrupts non-ASCII.
251/// The closed string slice is validated as UTF-8 and decoded at object boundary.
252fn read_string(bytes: &[u8], start: usize) -> Option<(String, usize)> {
253    debug_assert_eq!(bytes[start], b'"');
254    let mut i = start + 1;
255    let mut escape = false;
256    while i < bytes.len() {
257        let b = bytes[i];
258        if escape {
259            escape = false;
260        } else if b == b'\\' {
261            escape = true;
262        } else if b == b'"' {
263            // Decode the full slice [start+1..i] as UTF-8 in one shot.
264            let content = std::str::from_utf8(&bytes[start + 1..i]).ok()?;
265            // Re-process escape sequences by delegating to serde_json, which handles
266            // \uXXXX, \n, \t, etc. correctly without re-implementing the JSON string spec.
267            let json_str = [b"\"", &bytes[start + 1..i], b"\""].concat();
268            let decoded: String =
269                serde_json::from_slice(&json_str).unwrap_or_else(|_| content.to_owned());
270            return Some((decoded, i + 1));
271        }
272        i += 1;
273    }
274    None // unterminated string
275}
276
277enum ReadValue {
278    Complete(serde_json::Value, usize),
279    Incomplete,
280    Malformed,
281}
282
283/// Attempt to read a fully closed JSON value starting at `i`.
284fn read_value(bytes: &[u8], i: usize) -> ReadValue {
285    if i >= bytes.len() {
286        return ReadValue::Incomplete;
287    }
288    match bytes[i] {
289        b'"' => match read_string(bytes, i) {
290            Some((s, end)) => ReadValue::Complete(serde_json::Value::String(s), end),
291            None => ReadValue::Incomplete,
292        },
293        b'{' | b'[' => read_nested(bytes, i),
294        b't' => read_literal(bytes, i, b"true", serde_json::Value::Bool(true)),
295        b'f' => read_literal(bytes, i, b"false", serde_json::Value::Bool(false)),
296        b'n' => read_literal(bytes, i, b"null", serde_json::Value::Null),
297        b'-' | b'0'..=b'9' => read_number(bytes, i),
298        _ => ReadValue::Malformed,
299    }
300}
301
302fn read_literal(bytes: &[u8], i: usize, lit: &[u8], val: serde_json::Value) -> ReadValue {
303    if bytes.len() < i + lit.len() {
304        return ReadValue::Incomplete;
305    }
306    if &bytes[i..i + lit.len()] == lit {
307        ReadValue::Complete(val, i + lit.len())
308    } else {
309        ReadValue::Malformed
310    }
311}
312
313fn read_number(bytes: &[u8], mut i: usize) -> ReadValue {
314    let start = i;
315    if i < bytes.len() && bytes[i] == b'-' {
316        i += 1;
317    }
318    while i < bytes.len() && (bytes[i].is_ascii_digit() || bytes[i] == b'.') {
319        i += 1;
320    }
321    // Exponent
322    if i < bytes.len() && matches!(bytes[i], b'e' | b'E') {
323        i += 1;
324        if i < bytes.len() && matches!(bytes[i], b'+' | b'-') {
325            i += 1;
326        }
327        while i < bytes.len() && bytes[i].is_ascii_digit() {
328            i += 1;
329        }
330    }
331    if i == start {
332        return ReadValue::Malformed;
333    }
334    // Must be followed by structural char or end
335    if i < bytes.len() && !matches!(bytes[i], b',' | b'}' | b']' | b' ' | b'\t' | b'\r' | b'\n') {
336        return ReadValue::Incomplete;
337    }
338    let s = std::str::from_utf8(&bytes[start..i]).unwrap_or("");
339    match serde_json::from_str::<serde_json::Value>(s) {
340        Ok(v) => ReadValue::Complete(v, i),
341        Err(_) => ReadValue::Malformed,
342    }
343}
344
345/// Read a nested `{...}` or `[...]`, tracking depth and string escapes.
346fn read_nested(bytes: &[u8], start: usize) -> ReadValue {
347    let open = bytes[start];
348    let close = if open == b'{' { b'}' } else { b']' };
349    let mut depth = 1u32;
350    let mut i = start + 1;
351    let mut in_string = false;
352    let mut escape = false;
353
354    while i < bytes.len() {
355        let b = bytes[i];
356        if escape {
357            escape = false;
358        } else if in_string {
359            if b == b'\\' {
360                escape = true;
361            } else if b == b'"' {
362                in_string = false;
363            }
364        } else if b == b'"' {
365            in_string = true;
366        } else if b == open {
367            depth += 1;
368        } else if b == close {
369            depth -= 1;
370            if depth == 0 {
371                // Re-parse just the slice [start..=i] as JSON
372                let parsed = std::str::from_utf8(&bytes[start..=i])
373                    .ok()
374                    .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok());
375                return match parsed {
376                    Some(v) => ReadValue::Complete(v, i + 1),
377                    None => ReadValue::Malformed,
378                };
379            }
380        }
381        i += 1;
382    }
383    ReadValue::Incomplete
384}
385
386// ---------------------------------------------------------------------------
387// Unit tests
388// ---------------------------------------------------------------------------
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393
394    fn push_all(p: &mut PartialJsonParser, parts: &[&str]) -> PrefixState {
395        let mut state = PrefixState::Incomplete;
396        for part in parts {
397            state = p.push(part);
398        }
399        state
400    }
401
402    /// Fixture 1: simple single-field bash command arriving in two deltas.
403    #[test]
404    fn fixture_simple_command_two_deltas() {
405        let mut p = PartialJsonParser::new();
406        p.set_required(vec!["command".into()]);
407        p.push(r#"{"command": "ls "#);
408        let state = p.push(r#"-la"}"#);
409        match state {
410            PrefixState::ValidPrefix {
411                known_leaves,
412                missing_required,
413            } => {
414                assert!(missing_required.is_empty());
415                let v = known_leaves["command"].as_str().unwrap();
416                assert!(v.contains("ls") && v.contains("la"), "got: {v}");
417            }
418            other => panic!("expected ValidPrefix, got {other:?}"),
419        }
420    }
421
422    /// Fixture 2: multi-field tool call with nested object arrives incrementally.
423    #[test]
424    fn fixture_multi_field_incremental() {
425        let mut p = PartialJsonParser::new();
426        p.set_required(vec!["path".into(), "content".into()]);
427        let state = push_all(
428            &mut p,
429            &[
430                r#"{"path": "/tmp/f"#,
431                r#"oo.txt", "content": "hel"#,
432                r#"lo world"}"#,
433            ],
434        );
435        match state {
436            PrefixState::ValidPrefix {
437                known_leaves,
438                missing_required,
439            } => {
440                assert!(missing_required.is_empty(), "missing: {missing_required:?}");
441                assert!(known_leaves.contains_key("path"));
442                assert!(known_leaves.contains_key("content"));
443            }
444            other => panic!("expected ValidPrefix, got {other:?}"),
445        }
446    }
447
448    /// Fixture 3: escape sequence inside string value does not break parser.
449    #[test]
450    fn fixture_escape_in_string() {
451        let mut p = PartialJsonParser::new();
452        p.set_required(vec!["msg".into()]);
453        let state = p.push(r#"{"msg": "say \"hello\""}"#);
454        match state {
455            PrefixState::ValidPrefix {
456                known_leaves,
457                missing_required,
458            } => {
459                assert!(missing_required.is_empty());
460                let v = known_leaves["msg"].as_str().unwrap();
461                assert!(v.contains("hello"), "got: {v}");
462            }
463            other => panic!("expected ValidPrefix, got {other:?}"),
464        }
465    }
466
467    /// Fixture 4: mid-delta truncation → Incomplete, then resolved on next delta.
468    #[test]
469    fn fixture_incomplete_then_resolved() {
470        let mut p = PartialJsonParser::new();
471        p.set_required(vec!["x".into()]);
472        let mid = p.push(r#"{"x": 42"#);
473        // No closing brace yet; key present but object not closed → ValidPrefix with x known
474        match &mid {
475            PrefixState::ValidPrefix {
476                known_leaves,
477                missing_required,
478            } => {
479                assert!(missing_required.is_empty());
480                assert_eq!(known_leaves["x"], 42);
481            }
482            PrefixState::Incomplete => {} // also acceptable
483            other @ PrefixState::Malformed => panic!("unexpected: {other:?}"),
484        }
485        let done = p.push("}");
486        assert!(matches!(done, PrefixState::ValidPrefix { .. }));
487    }
488
489    /// Fixture 5: malformed input returns Malformed.
490    #[test]
491    fn fixture_malformed_input() {
492        let mut p = PartialJsonParser::new();
493        let state = p.push("not-json");
494        assert!(matches!(state, PrefixState::Malformed));
495    }
496
497    /// Fixture 6: mid-array value at top level is opaque (depth > 1 skipped).
498    #[test]
499    fn fixture_top_level_array_value() {
500        let mut p = PartialJsonParser::new();
501        p.set_required(vec!["items".into()]);
502        let state = p.push(r#"{"items": [1, 2, 3]}"#);
503        match state {
504            PrefixState::ValidPrefix {
505                known_leaves,
506                missing_required,
507            } => {
508                assert!(missing_required.is_empty());
509                assert!(known_leaves["items"].is_array());
510            }
511            other => panic!("expected ValidPrefix, got {other:?}"),
512        }
513    }
514
515    #[test]
516    fn reset_clears_buffer() {
517        let mut p = PartialJsonParser::new();
518        p.push(r#"{"x": 1}"#);
519        p.reset();
520        let state = p.push(r#"{"y": 2}"#);
521        match state {
522            PrefixState::ValidPrefix { known_leaves, .. } => {
523                assert!(
524                    !known_leaves.contains_key("x"),
525                    "should be cleared after reset"
526                );
527            }
528            other => panic!("{other:?}"),
529        }
530    }
531
532    /// Fixture 7: Unicode / non-ASCII bytes must not be corrupted (C5 regression).
533    #[test]
534    fn fixture_unicode_filename() {
535        let mut p = PartialJsonParser::new();
536        p.set_required(vec!["path".into()]);
537        let state = p.push(r#"{"path": "/tmp/Привет.txt"}"#);
538        match state {
539            PrefixState::ValidPrefix {
540                known_leaves,
541                missing_required,
542            } => {
543                assert!(missing_required.is_empty());
544                let v = known_leaves["path"].as_str().unwrap();
545                assert!(v.contains("Привет"), "non-ASCII corrupted: {v}");
546            }
547            other => panic!("expected ValidPrefix, got {other:?}"),
548        }
549    }
550
551    /// Fixture 8: incremental watermark — second push does not re-parse completed pairs (H2).
552    #[test]
553    fn fixture_incremental_watermark() {
554        let mut p = PartialJsonParser::new();
555        p.set_required(vec!["a".into(), "b".into()]);
556        // First push: 'a' complete, 'b' incomplete
557        let s1 = p.push(r#"{"a": 1, "b": "#);
558        match &s1 {
559            PrefixState::ValidPrefix {
560                known_leaves,
561                missing_required,
562            } => {
563                assert!(known_leaves.contains_key("a"));
564                assert!(missing_required.contains(&"b".to_string()));
565            }
566            PrefixState::Incomplete => {} // also acceptable before 'b' is fully parsed
567            other @ PrefixState::Malformed => panic!("unexpected s1: {other:?}"),
568        }
569        // Second push: completes 'b'
570        let s2 = p.push("2}");
571        match s2 {
572            PrefixState::ValidPrefix {
573                known_leaves,
574                missing_required,
575            } => {
576                assert!(
577                    missing_required.is_empty(),
578                    "still missing: {missing_required:?}"
579                );
580                assert_eq!(known_leaves["a"], 1);
581                assert_eq!(known_leaves["b"], 2);
582            }
583            other => panic!("expected ValidPrefix, got {other:?}"),
584        }
585    }
586}