Skip to main content

zeph_core/agent/speculative/
partial_json.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Streaming partial-JSON parser for speculative tool-call dispatch.
5//!
6//! The Anthropic SSE tool-use stream emits a single JSON object across many
7//! `input_json_delta` events. Standard JSON parsers (including
8//! `serde_json::StreamDeserializer`) wait for the closing brace before yielding
9//! any value — providing no benefit for speculative dispatch.
10//!
11//! `PartialJsonParser` is a ~120-line brace/string/escape state machine that
12//! accumulates delta strings and extracts top-level leaf keys whose values have
13//! been **fully closed** (primitives, fully closed nested objects/arrays).
14//! When all required fields of a tool's `input_schema` are present, the engine
15//! can speculatively dispatch the tool call without waiting for `ToolUseStop`.
16//!
17//! ## Invariants
18//!
19//! - **Escape state**: the escape flag is set after a literal `\` inside a string
20//!   and cleared after the following character, regardless of what that character is.
21//!   Multi-byte escape sequences (e.g. `\uXXXX`) are not individually validated;
22//!   the parser only tracks structural JSON tokens.
23//! - **Mid-array edge case**: array values at depth > 1 are treated as opaque; their
24//!   contents are never surfaced as `known_leaves`. Only top-level keys (depth == 1)
25//!   whose values close cleanly are included.
26//! - **No allocation on malformed input**: `Malformed` is returned immediately when
27//!   a structural invariant is violated; the buffer is not reallocated.
28
29#![allow(dead_code)]
30
31use serde_json::Map;
32
33#[non_exhaustive]
34/// Result of feeding accumulated JSON delta bytes to [`PartialJsonParser::push`].
35#[derive(Debug, Clone, PartialEq)]
36pub enum PrefixState {
37    /// Input is still inside an unterminated string, imbalanced braces, or ends mid-escape.
38    Incomplete,
39    /// A valid prefix has been parsed: the `known_leaves` map contains fully closed top-level
40    /// key-value pairs. `missing_required` lists required schema keys not yet seen.
41    ValidPrefix {
42        /// Top-level keys whose values are fully closed.
43        known_leaves: Map<String, serde_json::Value>,
44        /// Required tool input schema keys not yet present in the buffer.
45        missing_required: Vec<String>,
46    },
47    /// The buffer contains a sequence that cannot be a valid JSON object prefix.
48    Malformed,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52enum Ctx {
53    TopObject,
54    InKey,
55    AfterKey,
56    InValue,
57    InStringValue,
58    InNestedValue { depth: u32 },
59}
60
61/// Streaming structural parser for partial Anthropic SSE tool-input JSON.
62///
63/// Feed each `InputJsonDelta` string via [`push`](Self::push). When the result is
64/// [`PrefixState::ValidPrefix`] with an empty `missing_required`, the engine may
65/// synthesize a `ToolCall` and speculatively dispatch it.
66///
67/// # Examples
68///
69/// ```rust
70/// use zeph_core::agent::speculative::partial_json::{PartialJsonParser, PrefixState};
71///
72/// let mut p = PartialJsonParser::new();
73/// // Simulate incremental SSE deltas
74/// p.push(r#"{"command": "ls "#);
75/// let state = p.push(r#"-la"}"#);
76/// assert!(matches!(state, PrefixState::ValidPrefix { .. }));
77/// ```
78pub struct PartialJsonParser {
79    buf: String,
80    required: Vec<String>,
81    /// Cached known leaves from the last successful scan.
82    known_cache: Map<String, serde_json::Value>,
83    /// Byte offset in `buf` up to which the cache is valid.
84    /// When `buf.len() > scan_watermark` we re-scan only the tail, rebuilding from the
85    /// cached state (H2: avoids O(N²) full-buffer rescan on every push).
86    scan_watermark: usize,
87}
88
89impl PartialJsonParser {
90    /// Create a new parser. Call [`set_required`](Self::set_required) to configure
91    /// the list of required schema keys before the first [`push`](Self::push).
92    #[must_use]
93    pub fn new() -> Self {
94        Self {
95            buf: String::new(),
96            required: Vec::new(),
97            known_cache: Map::new(),
98            scan_watermark: 0,
99        }
100    }
101
102    /// Set the list of keys required by the tool's `input_schema`.
103    ///
104    /// Keys in this list that are absent from the accumulated buffer are reported as
105    /// `missing_required` in [`PrefixState::ValidPrefix`].
106    pub fn set_required(&mut self, required: Vec<String>) {
107        self.required = required;
108    }
109
110    /// Append `delta` and re-scan the buffer.
111    ///
112    /// Returns the current [`PrefixState`]. May be called repeatedly; each call
113    /// replaces the previous result. Returns [`PrefixState::Malformed`] when the
114    /// accumulated buffer would exceed 512 KiB (protection against oversized inputs).
115    pub fn push(&mut self, delta: &str) -> PrefixState {
116        const MAX_TOOL_INPUT_BYTES: usize = 512 * 1024;
117        if self.buf.len() + delta.len() > MAX_TOOL_INPUT_BYTES {
118            tracing::warn!(
119                buf_len = self.buf.len(),
120                delta_len = delta.len(),
121                "PartialJsonParser: tool input exceeded 512 KiB cap; treating as malformed"
122            );
123            return PrefixState::Malformed;
124        }
125        self.buf.push_str(delta);
126        self.scan()
127    }
128
129    /// Reset the parser for reuse after a commit or cancel.
130    pub fn reset(&mut self) {
131        self.buf.clear();
132        self.known_cache.clear();
133        self.scan_watermark = 0;
134    }
135
136    /// Scan `self.buf` and extract fully closed top-level key-value pairs.
137    ///
138    /// Uses a watermark cursor to avoid replaying already-scanned bytes (H2).
139    /// Previously confirmed key-value pairs are carried in `known_cache`; only the
140    /// bytes after `scan_watermark` are newly examined.
141    fn scan(&mut self) -> PrefixState {
142        let bytes = self.buf.as_bytes();
143        let len = bytes.len();
144
145        // On the first call, consume the opening '{'.
146        let mut i = if self.scan_watermark == 0 {
147            let start = skip_ws(bytes, 0);
148            if start >= len || bytes[start] != b'{' {
149                return if self.buf.trim().is_empty() {
150                    PrefixState::Incomplete
151                } else {
152                    PrefixState::Malformed
153                };
154            }
155            start + 1 // consume '{'
156        } else {
157            self.scan_watermark
158        };
159
160        // Start with previously confirmed pairs; we may add more this call.
161        let mut known = self.known_cache.clone();
162
163        loop {
164            i = skip_ws(bytes, i);
165            if i >= len {
166                break; // still incomplete
167            }
168
169            // End of object
170            if bytes[i] == b'}' {
171                self.scan_watermark = i + 1;
172                let missing = self.missing(&known);
173                self.known_cache.clone_from(&known);
174                return PrefixState::ValidPrefix {
175                    known_leaves: known,
176                    missing_required: missing,
177                };
178            }
179
180            // Comma between pairs
181            if bytes[i] == b',' {
182                i += 1;
183                i = skip_ws(bytes, i);
184                if i >= len {
185                    break;
186                }
187            }
188
189            // Key string
190            if bytes[i] != b'"' {
191                return PrefixState::Malformed;
192            }
193            let Some((key, after_key)) = read_string(bytes, i) else {
194                break; // incomplete string
195            };
196            i = after_key;
197            i = skip_ws(bytes, i);
198
199            if i >= len {
200                break;
201            }
202            if bytes[i] != b':' {
203                return PrefixState::Malformed;
204            }
205            i += 1; // consume ':'
206            i = skip_ws(bytes, i);
207
208            if i >= len {
209                break; // value not yet arrived
210            }
211
212            // Try to read a fully closed value
213            match read_value(bytes, i) {
214                ReadValue::Complete(value, end) => {
215                    known.insert(key, value);
216                    // Advance watermark: this pair is confirmed; next call starts here.
217                    self.scan_watermark = end;
218                    self.known_cache.clone_from(&known);
219                    i = end;
220                }
221                ReadValue::Incomplete => break,
222                ReadValue::Malformed => return PrefixState::Malformed,
223            }
224        }
225
226        // Buffer ended mid-object — partial but valid so far
227        let missing = self.missing(&known);
228        PrefixState::ValidPrefix {
229            known_leaves: known,
230            missing_required: missing,
231        }
232    }
233
234    fn missing(&self, known: &Map<String, serde_json::Value>) -> Vec<String> {
235        self.required
236            .iter()
237            .filter(|k| !known.contains_key(k.as_str()))
238            .cloned()
239            .collect()
240    }
241}
242
243impl Default for PartialJsonParser {
244    fn default() -> Self {
245        Self::new()
246    }
247}
248
249// --- Internal helpers -------------------------------------------------------
250
251fn skip_ws(bytes: &[u8], mut i: usize) -> usize {
252    while i < bytes.len() && matches!(bytes[i], b' ' | b'\t' | b'\r' | b'\n') {
253        i += 1;
254    }
255    i
256}
257
258/// Read a JSON string starting at `i` (which must point at `"`).
259/// Returns `(string_content, index_after_closing_quote)` or `None` if incomplete.
260///
261/// Operates on raw bytes to avoid the Latin-1 `b as char` cast that corrupts non-ASCII.
262/// The closed string slice is validated as UTF-8 and decoded at object boundary.
263fn read_string(bytes: &[u8], start: usize) -> Option<(String, usize)> {
264    debug_assert_eq!(bytes[start], b'"');
265    let mut i = start + 1;
266    let mut escape = false;
267    while i < bytes.len() {
268        let b = bytes[i];
269        if escape {
270            escape = false;
271        } else if b == b'\\' {
272            escape = true;
273        } else if b == b'"' {
274            // Decode the full slice [start+1..i] as UTF-8 in one shot.
275            let content = std::str::from_utf8(&bytes[start + 1..i]).ok()?;
276            // Re-process escape sequences by delegating to serde_json, which handles
277            // \uXXXX, \n, \t, etc. correctly without re-implementing the JSON string spec.
278            let json_str = [b"\"", &bytes[start + 1..i], b"\""].concat();
279            let decoded: String =
280                serde_json::from_slice(&json_str).unwrap_or_else(|_| content.to_owned());
281            return Some((decoded, i + 1));
282        }
283        i += 1;
284    }
285    None // unterminated string
286}
287
288enum ReadValue {
289    Complete(serde_json::Value, usize),
290    Incomplete,
291    Malformed,
292}
293
294/// Attempt to read a fully closed JSON value starting at `i`.
295fn read_value(bytes: &[u8], i: usize) -> ReadValue {
296    if i >= bytes.len() {
297        return ReadValue::Incomplete;
298    }
299    match bytes[i] {
300        b'"' => match read_string(bytes, i) {
301            Some((s, end)) => ReadValue::Complete(serde_json::Value::String(s), end),
302            None => ReadValue::Incomplete,
303        },
304        b'{' | b'[' => read_nested(bytes, i),
305        b't' => read_literal(bytes, i, b"true", serde_json::Value::Bool(true)),
306        b'f' => read_literal(bytes, i, b"false", serde_json::Value::Bool(false)),
307        b'n' => read_literal(bytes, i, b"null", serde_json::Value::Null),
308        b'-' | b'0'..=b'9' => read_number(bytes, i),
309        _ => ReadValue::Malformed,
310    }
311}
312
313fn read_literal(bytes: &[u8], i: usize, lit: &[u8], val: serde_json::Value) -> ReadValue {
314    if bytes.len() < i + lit.len() {
315        return ReadValue::Incomplete;
316    }
317    if &bytes[i..i + lit.len()] == lit {
318        ReadValue::Complete(val, i + lit.len())
319    } else {
320        ReadValue::Malformed
321    }
322}
323
324fn read_number(bytes: &[u8], mut i: usize) -> ReadValue {
325    let start = i;
326    if i < bytes.len() && bytes[i] == b'-' {
327        i += 1;
328    }
329    while i < bytes.len() && (bytes[i].is_ascii_digit() || bytes[i] == b'.') {
330        i += 1;
331    }
332    // Exponent
333    if i < bytes.len() && matches!(bytes[i], b'e' | b'E') {
334        i += 1;
335        if i < bytes.len() && matches!(bytes[i], b'+' | b'-') {
336            i += 1;
337        }
338        while i < bytes.len() && bytes[i].is_ascii_digit() {
339            i += 1;
340        }
341    }
342    if i == start {
343        return ReadValue::Malformed;
344    }
345    // Must be followed by structural char or end
346    if i < bytes.len() && !matches!(bytes[i], b',' | b'}' | b']' | b' ' | b'\t' | b'\r' | b'\n') {
347        return ReadValue::Incomplete;
348    }
349    let s = std::str::from_utf8(&bytes[start..i]).unwrap_or("");
350    match serde_json::from_str::<serde_json::Value>(s) {
351        Ok(v) => ReadValue::Complete(v, i),
352        Err(_) => ReadValue::Malformed,
353    }
354}
355
356/// Read a nested `{...}` or `[...]`, tracking depth and string escapes.
357fn read_nested(bytes: &[u8], start: usize) -> ReadValue {
358    let open = bytes[start];
359    let close = if open == b'{' { b'}' } else { b']' };
360    let mut depth = 1u32;
361    let mut i = start + 1;
362    let mut in_string = false;
363    let mut escape = false;
364
365    while i < bytes.len() {
366        let b = bytes[i];
367        if escape {
368            escape = false;
369        } else if in_string {
370            if b == b'\\' {
371                escape = true;
372            } else if b == b'"' {
373                in_string = false;
374            }
375        } else if b == b'"' {
376            in_string = true;
377        } else if b == open {
378            depth += 1;
379        } else if b == close {
380            depth -= 1;
381            if depth == 0 {
382                // Re-parse just the slice [start..=i] as JSON
383                let parsed = std::str::from_utf8(&bytes[start..=i])
384                    .ok()
385                    .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok());
386                return match parsed {
387                    Some(v) => ReadValue::Complete(v, i + 1),
388                    None => ReadValue::Malformed,
389                };
390            }
391        }
392        i += 1;
393    }
394    ReadValue::Incomplete
395}
396
397// ---------------------------------------------------------------------------
398// Unit tests
399// ---------------------------------------------------------------------------
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404
405    fn push_all(p: &mut PartialJsonParser, parts: &[&str]) -> PrefixState {
406        let mut state = PrefixState::Incomplete;
407        for part in parts {
408            state = p.push(part);
409        }
410        state
411    }
412
413    /// Fixture 1: simple single-field bash command arriving in two deltas.
414    #[test]
415    fn fixture_simple_command_two_deltas() {
416        let mut p = PartialJsonParser::new();
417        p.set_required(vec!["command".into()]);
418        p.push(r#"{"command": "ls "#);
419        let state = p.push(r#"-la"}"#);
420        match state {
421            PrefixState::ValidPrefix {
422                known_leaves,
423                missing_required,
424            } => {
425                assert!(missing_required.is_empty());
426                let v = known_leaves["command"].as_str().unwrap();
427                assert!(v.contains("ls") && v.contains("la"), "got: {v}");
428            }
429            other => panic!("expected ValidPrefix, got {other:?}"),
430        }
431    }
432
433    /// Fixture 2: multi-field tool call with nested object arrives incrementally.
434    #[test]
435    fn fixture_multi_field_incremental() {
436        let mut p = PartialJsonParser::new();
437        p.set_required(vec!["path".into(), "content".into()]);
438        let state = push_all(
439            &mut p,
440            &[
441                r#"{"path": "/tmp/f"#,
442                r#"oo.txt", "content": "hel"#,
443                r#"lo world"}"#,
444            ],
445        );
446        match state {
447            PrefixState::ValidPrefix {
448                known_leaves,
449                missing_required,
450            } => {
451                assert!(missing_required.is_empty(), "missing: {missing_required:?}");
452                assert!(known_leaves.contains_key("path"));
453                assert!(known_leaves.contains_key("content"));
454            }
455            other => panic!("expected ValidPrefix, got {other:?}"),
456        }
457    }
458
459    /// Fixture 3: escape sequence inside string value does not break parser.
460    #[test]
461    fn fixture_escape_in_string() {
462        let mut p = PartialJsonParser::new();
463        p.set_required(vec!["msg".into()]);
464        let state = p.push(r#"{"msg": "say \"hello\""}"#);
465        match state {
466            PrefixState::ValidPrefix {
467                known_leaves,
468                missing_required,
469            } => {
470                assert!(missing_required.is_empty());
471                let v = known_leaves["msg"].as_str().unwrap();
472                assert!(v.contains("hello"), "got: {v}");
473            }
474            other => panic!("expected ValidPrefix, got {other:?}"),
475        }
476    }
477
478    /// Fixture 4: mid-delta truncation → Incomplete, then resolved on next delta.
479    #[test]
480    fn fixture_incomplete_then_resolved() {
481        let mut p = PartialJsonParser::new();
482        p.set_required(vec!["x".into()]);
483        let mid = p.push(r#"{"x": 42"#);
484        // No closing brace yet; key present but object not closed → ValidPrefix with x known
485        match &mid {
486            PrefixState::ValidPrefix {
487                known_leaves,
488                missing_required,
489            } => {
490                assert!(missing_required.is_empty());
491                assert_eq!(known_leaves["x"], 42);
492            }
493            PrefixState::Incomplete => {} // also acceptable
494            other @ PrefixState::Malformed => panic!("unexpected: {other:?}"),
495        }
496        let done = p.push("}");
497        assert!(matches!(done, PrefixState::ValidPrefix { .. }));
498    }
499
500    /// Fixture 5: malformed input returns Malformed.
501    #[test]
502    fn fixture_malformed_input() {
503        let mut p = PartialJsonParser::new();
504        let state = p.push("not-json");
505        assert!(matches!(state, PrefixState::Malformed));
506    }
507
508    /// Fixture 6: mid-array value at top level is opaque (depth > 1 skipped).
509    #[test]
510    fn fixture_top_level_array_value() {
511        let mut p = PartialJsonParser::new();
512        p.set_required(vec!["items".into()]);
513        let state = p.push(r#"{"items": [1, 2, 3]}"#);
514        match state {
515            PrefixState::ValidPrefix {
516                known_leaves,
517                missing_required,
518            } => {
519                assert!(missing_required.is_empty());
520                assert!(known_leaves["items"].is_array());
521            }
522            other => panic!("expected ValidPrefix, got {other:?}"),
523        }
524    }
525
526    #[test]
527    fn reset_clears_buffer() {
528        let mut p = PartialJsonParser::new();
529        p.push(r#"{"x": 1}"#);
530        p.reset();
531        let state = p.push(r#"{"y": 2}"#);
532        match state {
533            PrefixState::ValidPrefix { known_leaves, .. } => {
534                assert!(
535                    !known_leaves.contains_key("x"),
536                    "should be cleared after reset"
537                );
538            }
539            other => panic!("{other:?}"),
540        }
541    }
542
543    /// Fixture 7: Unicode / non-ASCII bytes must not be corrupted (C5 regression).
544    #[test]
545    fn fixture_unicode_filename() {
546        let mut p = PartialJsonParser::new();
547        p.set_required(vec!["path".into()]);
548        let state = p.push(r#"{"path": "/tmp/Привет.txt"}"#);
549        match state {
550            PrefixState::ValidPrefix {
551                known_leaves,
552                missing_required,
553            } => {
554                assert!(missing_required.is_empty());
555                let v = known_leaves["path"].as_str().unwrap();
556                assert!(v.contains("Привет"), "non-ASCII corrupted: {v}");
557            }
558            other => panic!("expected ValidPrefix, got {other:?}"),
559        }
560    }
561
562    /// Fixture 8: incremental watermark — second push does not re-parse completed pairs (H2).
563    #[test]
564    fn fixture_incremental_watermark() {
565        let mut p = PartialJsonParser::new();
566        p.set_required(vec!["a".into(), "b".into()]);
567        // First push: 'a' complete, 'b' incomplete
568        let s1 = p.push(r#"{"a": 1, "b": "#);
569        match &s1 {
570            PrefixState::ValidPrefix {
571                known_leaves,
572                missing_required,
573            } => {
574                assert!(known_leaves.contains_key("a"));
575                assert!(missing_required.contains(&"b".to_string()));
576            }
577            PrefixState::Incomplete => {} // also acceptable before 'b' is fully parsed
578            other @ PrefixState::Malformed => panic!("unexpected s1: {other:?}"),
579        }
580        // Second push: completes 'b'
581        let s2 = p.push("2}");
582        match s2 {
583            PrefixState::ValidPrefix {
584                known_leaves,
585                missing_required,
586            } => {
587                assert!(
588                    missing_required.is_empty(),
589                    "still missing: {missing_required:?}"
590                );
591                assert_eq!(known_leaves["a"], 1);
592                assert_eq!(known_leaves["b"], 2);
593            }
594            other => panic!("expected ValidPrefix, got {other:?}"),
595        }
596    }
597}