Skip to main content

zeph_core/agent/speculative/
partial_json.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Streaming partial-JSON parser for speculative tool-call dispatch.
5//!
6//! The Anthropic SSE tool-use stream emits a single JSON object across many
7//! `input_json_delta` events. Standard JSON parsers (including
8//! `serde_json::StreamDeserializer`) wait for the closing brace before yielding
9//! any value — providing no benefit for speculative dispatch.
10//!
11//! `PartialJsonParser` is a ~120-line brace/string/escape state machine that
12//! accumulates delta strings and extracts top-level leaf keys whose values have
13//! been **fully closed** (primitives, fully closed nested objects/arrays).
14//! When all required fields of a tool's `input_schema` are present, the engine
15//! can speculatively dispatch the tool call without waiting for `ToolUseStop`.
16//!
17//! ## Invariants
18//!
19//! - **Escape state**: the escape flag is set after a literal `\` inside a string
20//!   and cleared after the following character, regardless of what that character is.
21//!   Multi-byte escape sequences (e.g. `\uXXXX`) are not individually validated;
22//!   the parser only tracks structural JSON tokens.
23//! - **Mid-array edge case**: array values at depth > 1 are treated as opaque; their
24//!   contents are never surfaced as `known_leaves`. Only top-level keys (depth == 1)
25//!   whose values close cleanly are included.
26//! - **No allocation on malformed input**: `Malformed` is returned immediately when
27//!   a structural invariant is violated; the buffer is not reallocated.
28
29#![allow(dead_code)]
30
31use serde_json::Map;
32
33/// Result of feeding accumulated JSON delta bytes to [`PartialJsonParser::push`].
34#[derive(Debug, Clone, PartialEq)]
35pub enum PrefixState {
36    /// Input is still inside an unterminated string, imbalanced braces, or ends mid-escape.
37    Incomplete,
38    /// A valid prefix has been parsed: the `known_leaves` map contains fully closed top-level
39    /// key-value pairs. `missing_required` lists required schema keys not yet seen.
40    ValidPrefix {
41        /// Top-level keys whose values are fully closed.
42        known_leaves: Map<String, serde_json::Value>,
43        /// Required tool input schema keys not yet present in the buffer.
44        missing_required: Vec<String>,
45    },
46    /// The buffer contains a sequence that cannot be a valid JSON object prefix.
47    Malformed,
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51enum Ctx {
52    TopObject,
53    InKey,
54    AfterKey,
55    InValue,
56    InStringValue,
57    InNestedValue { depth: u32 },
58}
59
60/// Streaming structural parser for partial Anthropic SSE tool-input JSON.
61///
62/// Feed each `InputJsonDelta` string via [`push`](Self::push). When the result is
63/// [`PrefixState::ValidPrefix`] with an empty `missing_required`, the engine may
64/// synthesize a `ToolCall` and speculatively dispatch it.
65///
66/// # Examples
67///
68/// ```rust
69/// use zeph_core::agent::speculative::partial_json::{PartialJsonParser, PrefixState};
70///
71/// let mut p = PartialJsonParser::new();
72/// // Simulate incremental SSE deltas
73/// p.push(r#"{"command": "ls "#);
74/// let state = p.push(r#"-la"}"#);
75/// assert!(matches!(state, PrefixState::ValidPrefix { .. }));
76/// ```
77pub struct PartialJsonParser {
78    buf: String,
79    required: Vec<String>,
80    /// Cached known leaves from the last successful scan.
81    known_cache: Map<String, serde_json::Value>,
82    /// Byte offset in `buf` up to which the cache is valid.
83    /// When `buf.len() > scan_watermark` we re-scan only the tail, rebuilding from the
84    /// cached state (H2: avoids O(N²) full-buffer rescan on every push).
85    scan_watermark: usize,
86}
87
88impl PartialJsonParser {
89    /// Create a new parser. Call [`set_required`](Self::set_required) to configure
90    /// the list of required schema keys before the first [`push`](Self::push).
91    #[must_use]
92    pub fn new() -> Self {
93        Self {
94            buf: String::new(),
95            required: Vec::new(),
96            known_cache: Map::new(),
97            scan_watermark: 0,
98        }
99    }
100
101    /// Set the list of keys required by the tool's `input_schema`.
102    ///
103    /// Keys in this list that are absent from the accumulated buffer are reported as
104    /// `missing_required` in [`PrefixState::ValidPrefix`].
105    pub fn set_required(&mut self, required: Vec<String>) {
106        self.required = required;
107    }
108
109    /// Append `delta` and re-scan the buffer.
110    ///
111    /// Returns the current [`PrefixState`]. May be called repeatedly; each call
112    /// replaces the previous result. Returns [`PrefixState::Malformed`] when the
113    /// accumulated buffer would exceed 512 KiB (protection against oversized inputs).
114    pub fn push(&mut self, delta: &str) -> PrefixState {
115        const MAX_TOOL_INPUT_BYTES: usize = 512 * 1024;
116        if self.buf.len() + delta.len() > MAX_TOOL_INPUT_BYTES {
117            tracing::warn!(
118                buf_len = self.buf.len(),
119                delta_len = delta.len(),
120                "PartialJsonParser: tool input exceeded 512 KiB cap; treating as malformed"
121            );
122            return PrefixState::Malformed;
123        }
124        self.buf.push_str(delta);
125        self.scan()
126    }
127
128    /// Reset the parser for reuse after a commit or cancel.
129    pub fn reset(&mut self) {
130        self.buf.clear();
131        self.known_cache.clear();
132        self.scan_watermark = 0;
133    }
134
135    /// Scan `self.buf` and extract fully closed top-level key-value pairs.
136    ///
137    /// Uses a watermark cursor to avoid replaying already-scanned bytes (H2).
138    /// Previously confirmed key-value pairs are carried in `known_cache`; only the
139    /// bytes after `scan_watermark` are newly examined.
140    fn scan(&mut self) -> PrefixState {
141        let bytes = self.buf.as_bytes();
142        let len = bytes.len();
143
144        // On the first call, consume the opening '{'.
145        let mut i = if self.scan_watermark == 0 {
146            let start = skip_ws(bytes, 0);
147            if start >= len || bytes[start] != b'{' {
148                return if self.buf.trim().is_empty() {
149                    PrefixState::Incomplete
150                } else {
151                    PrefixState::Malformed
152                };
153            }
154            start + 1 // consume '{'
155        } else {
156            self.scan_watermark
157        };
158
159        // Start with previously confirmed pairs; we may add more this call.
160        let mut known = self.known_cache.clone();
161
162        loop {
163            i = skip_ws(bytes, i);
164            if i >= len {
165                break; // still incomplete
166            }
167
168            // End of object
169            if bytes[i] == b'}' {
170                self.scan_watermark = i + 1;
171                let missing = self.missing(&known);
172                self.known_cache.clone_from(&known);
173                return PrefixState::ValidPrefix {
174                    known_leaves: known,
175                    missing_required: missing,
176                };
177            }
178
179            // Comma between pairs
180            if bytes[i] == b',' {
181                i += 1;
182                i = skip_ws(bytes, i);
183                if i >= len {
184                    break;
185                }
186            }
187
188            // Key string
189            if bytes[i] != b'"' {
190                return PrefixState::Malformed;
191            }
192            let Some((key, after_key)) = read_string(bytes, i) else {
193                break; // incomplete string
194            };
195            i = after_key;
196            i = skip_ws(bytes, i);
197
198            if i >= len {
199                break;
200            }
201            if bytes[i] != b':' {
202                return PrefixState::Malformed;
203            }
204            i += 1; // consume ':'
205            i = skip_ws(bytes, i);
206
207            if i >= len {
208                break; // value not yet arrived
209            }
210
211            // Try to read a fully closed value
212            match read_value(bytes, i) {
213                ReadValue::Complete(value, end) => {
214                    known.insert(key, value);
215                    // Advance watermark: this pair is confirmed; next call starts here.
216                    self.scan_watermark = end;
217                    self.known_cache.clone_from(&known);
218                    i = end;
219                }
220                ReadValue::Incomplete => break,
221                ReadValue::Malformed => return PrefixState::Malformed,
222            }
223        }
224
225        // Buffer ended mid-object — partial but valid so far
226        let missing = self.missing(&known);
227        PrefixState::ValidPrefix {
228            known_leaves: known,
229            missing_required: missing,
230        }
231    }
232
233    fn missing(&self, known: &Map<String, serde_json::Value>) -> Vec<String> {
234        self.required
235            .iter()
236            .filter(|k| !known.contains_key(k.as_str()))
237            .cloned()
238            .collect()
239    }
240}
241
242impl Default for PartialJsonParser {
243    fn default() -> Self {
244        Self::new()
245    }
246}
247
248// --- Internal helpers -------------------------------------------------------
249
250fn skip_ws(bytes: &[u8], mut i: usize) -> usize {
251    while i < bytes.len() && matches!(bytes[i], b' ' | b'\t' | b'\r' | b'\n') {
252        i += 1;
253    }
254    i
255}
256
257/// Read a JSON string starting at `i` (which must point at `"`).
258/// Returns `(string_content, index_after_closing_quote)` or `None` if incomplete.
259///
260/// Operates on raw bytes to avoid the Latin-1 `b as char` cast that corrupts non-ASCII.
261/// The closed string slice is validated as UTF-8 and decoded at object boundary.
262fn read_string(bytes: &[u8], start: usize) -> Option<(String, usize)> {
263    debug_assert_eq!(bytes[start], b'"');
264    let mut i = start + 1;
265    let mut escape = false;
266    while i < bytes.len() {
267        let b = bytes[i];
268        if escape {
269            escape = false;
270        } else if b == b'\\' {
271            escape = true;
272        } else if b == b'"' {
273            // Decode the full slice [start+1..i] as UTF-8 in one shot.
274            let content = std::str::from_utf8(&bytes[start + 1..i]).ok()?;
275            // Re-process escape sequences by delegating to serde_json, which handles
276            // \uXXXX, \n, \t, etc. correctly without re-implementing the JSON string spec.
277            let json_str = [b"\"", &bytes[start + 1..i], b"\""].concat();
278            let decoded: String =
279                serde_json::from_slice(&json_str).unwrap_or_else(|_| content.to_owned());
280            return Some((decoded, i + 1));
281        }
282        i += 1;
283    }
284    None // unterminated string
285}
286
287enum ReadValue {
288    Complete(serde_json::Value, usize),
289    Incomplete,
290    Malformed,
291}
292
293/// Attempt to read a fully closed JSON value starting at `i`.
294fn read_value(bytes: &[u8], i: usize) -> ReadValue {
295    if i >= bytes.len() {
296        return ReadValue::Incomplete;
297    }
298    match bytes[i] {
299        b'"' => match read_string(bytes, i) {
300            Some((s, end)) => ReadValue::Complete(serde_json::Value::String(s), end),
301            None => ReadValue::Incomplete,
302        },
303        b'{' | b'[' => read_nested(bytes, i),
304        b't' => read_literal(bytes, i, b"true", serde_json::Value::Bool(true)),
305        b'f' => read_literal(bytes, i, b"false", serde_json::Value::Bool(false)),
306        b'n' => read_literal(bytes, i, b"null", serde_json::Value::Null),
307        b'-' | b'0'..=b'9' => read_number(bytes, i),
308        _ => ReadValue::Malformed,
309    }
310}
311
312fn read_literal(bytes: &[u8], i: usize, lit: &[u8], val: serde_json::Value) -> ReadValue {
313    if bytes.len() < i + lit.len() {
314        return ReadValue::Incomplete;
315    }
316    if &bytes[i..i + lit.len()] == lit {
317        ReadValue::Complete(val, i + lit.len())
318    } else {
319        ReadValue::Malformed
320    }
321}
322
323fn read_number(bytes: &[u8], mut i: usize) -> ReadValue {
324    let start = i;
325    if i < bytes.len() && bytes[i] == b'-' {
326        i += 1;
327    }
328    while i < bytes.len() && (bytes[i].is_ascii_digit() || bytes[i] == b'.') {
329        i += 1;
330    }
331    // Exponent
332    if i < bytes.len() && matches!(bytes[i], b'e' | b'E') {
333        i += 1;
334        if i < bytes.len() && matches!(bytes[i], b'+' | b'-') {
335            i += 1;
336        }
337        while i < bytes.len() && bytes[i].is_ascii_digit() {
338            i += 1;
339        }
340    }
341    if i == start {
342        return ReadValue::Malformed;
343    }
344    // Must be followed by structural char or end
345    if i < bytes.len() && !matches!(bytes[i], b',' | b'}' | b']' | b' ' | b'\t' | b'\r' | b'\n') {
346        return ReadValue::Incomplete;
347    }
348    let s = std::str::from_utf8(&bytes[start..i]).unwrap_or("");
349    match serde_json::from_str::<serde_json::Value>(s) {
350        Ok(v) => ReadValue::Complete(v, i),
351        Err(_) => ReadValue::Malformed,
352    }
353}
354
355/// Read a nested `{...}` or `[...]`, tracking depth and string escapes.
356fn read_nested(bytes: &[u8], start: usize) -> ReadValue {
357    let open = bytes[start];
358    let close = if open == b'{' { b'}' } else { b']' };
359    let mut depth = 1u32;
360    let mut i = start + 1;
361    let mut in_string = false;
362    let mut escape = false;
363
364    while i < bytes.len() {
365        let b = bytes[i];
366        if escape {
367            escape = false;
368        } else if in_string {
369            if b == b'\\' {
370                escape = true;
371            } else if b == b'"' {
372                in_string = false;
373            }
374        } else if b == b'"' {
375            in_string = true;
376        } else if b == open {
377            depth += 1;
378        } else if b == close {
379            depth -= 1;
380            if depth == 0 {
381                // Re-parse just the slice [start..=i] as JSON
382                let parsed = std::str::from_utf8(&bytes[start..=i])
383                    .ok()
384                    .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok());
385                return match parsed {
386                    Some(v) => ReadValue::Complete(v, i + 1),
387                    None => ReadValue::Malformed,
388                };
389            }
390        }
391        i += 1;
392    }
393    ReadValue::Incomplete
394}
395
396// ---------------------------------------------------------------------------
397// Unit tests
398// ---------------------------------------------------------------------------
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403
404    fn push_all(p: &mut PartialJsonParser, parts: &[&str]) -> PrefixState {
405        let mut state = PrefixState::Incomplete;
406        for part in parts {
407            state = p.push(part);
408        }
409        state
410    }
411
412    /// Fixture 1: simple single-field bash command arriving in two deltas.
413    #[test]
414    fn fixture_simple_command_two_deltas() {
415        let mut p = PartialJsonParser::new();
416        p.set_required(vec!["command".into()]);
417        p.push(r#"{"command": "ls "#);
418        let state = p.push(r#"-la"}"#);
419        match state {
420            PrefixState::ValidPrefix {
421                known_leaves,
422                missing_required,
423            } => {
424                assert!(missing_required.is_empty());
425                let v = known_leaves["command"].as_str().unwrap();
426                assert!(v.contains("ls") && v.contains("la"), "got: {v}");
427            }
428            other => panic!("expected ValidPrefix, got {other:?}"),
429        }
430    }
431
432    /// Fixture 2: multi-field tool call with nested object arrives incrementally.
433    #[test]
434    fn fixture_multi_field_incremental() {
435        let mut p = PartialJsonParser::new();
436        p.set_required(vec!["path".into(), "content".into()]);
437        let state = push_all(
438            &mut p,
439            &[
440                r#"{"path": "/tmp/f"#,
441                r#"oo.txt", "content": "hel"#,
442                r#"lo world"}"#,
443            ],
444        );
445        match state {
446            PrefixState::ValidPrefix {
447                known_leaves,
448                missing_required,
449            } => {
450                assert!(missing_required.is_empty(), "missing: {missing_required:?}");
451                assert!(known_leaves.contains_key("path"));
452                assert!(known_leaves.contains_key("content"));
453            }
454            other => panic!("expected ValidPrefix, got {other:?}"),
455        }
456    }
457
458    /// Fixture 3: escape sequence inside string value does not break parser.
459    #[test]
460    fn fixture_escape_in_string() {
461        let mut p = PartialJsonParser::new();
462        p.set_required(vec!["msg".into()]);
463        let state = p.push(r#"{"msg": "say \"hello\""}"#);
464        match state {
465            PrefixState::ValidPrefix {
466                known_leaves,
467                missing_required,
468            } => {
469                assert!(missing_required.is_empty());
470                let v = known_leaves["msg"].as_str().unwrap();
471                assert!(v.contains("hello"), "got: {v}");
472            }
473            other => panic!("expected ValidPrefix, got {other:?}"),
474        }
475    }
476
477    /// Fixture 4: mid-delta truncation → Incomplete, then resolved on next delta.
478    #[test]
479    fn fixture_incomplete_then_resolved() {
480        let mut p = PartialJsonParser::new();
481        p.set_required(vec!["x".into()]);
482        let mid = p.push(r#"{"x": 42"#);
483        // No closing brace yet; key present but object not closed → ValidPrefix with x known
484        match &mid {
485            PrefixState::ValidPrefix {
486                known_leaves,
487                missing_required,
488            } => {
489                assert!(missing_required.is_empty());
490                assert_eq!(known_leaves["x"], 42);
491            }
492            PrefixState::Incomplete => {} // also acceptable
493            other @ PrefixState::Malformed => panic!("unexpected: {other:?}"),
494        }
495        let done = p.push("}");
496        assert!(matches!(done, PrefixState::ValidPrefix { .. }));
497    }
498
499    /// Fixture 5: malformed input returns Malformed.
500    #[test]
501    fn fixture_malformed_input() {
502        let mut p = PartialJsonParser::new();
503        let state = p.push("not-json");
504        assert!(matches!(state, PrefixState::Malformed));
505    }
506
507    /// Fixture 6: mid-array value at top level is opaque (depth > 1 skipped).
508    #[test]
509    fn fixture_top_level_array_value() {
510        let mut p = PartialJsonParser::new();
511        p.set_required(vec!["items".into()]);
512        let state = p.push(r#"{"items": [1, 2, 3]}"#);
513        match state {
514            PrefixState::ValidPrefix {
515                known_leaves,
516                missing_required,
517            } => {
518                assert!(missing_required.is_empty());
519                assert!(known_leaves["items"].is_array());
520            }
521            other => panic!("expected ValidPrefix, got {other:?}"),
522        }
523    }
524
525    #[test]
526    fn reset_clears_buffer() {
527        let mut p = PartialJsonParser::new();
528        p.push(r#"{"x": 1}"#);
529        p.reset();
530        let state = p.push(r#"{"y": 2}"#);
531        match state {
532            PrefixState::ValidPrefix { known_leaves, .. } => {
533                assert!(
534                    !known_leaves.contains_key("x"),
535                    "should be cleared after reset"
536                );
537            }
538            other => panic!("{other:?}"),
539        }
540    }
541
542    /// Fixture 7: Unicode / non-ASCII bytes must not be corrupted (C5 regression).
543    #[test]
544    fn fixture_unicode_filename() {
545        let mut p = PartialJsonParser::new();
546        p.set_required(vec!["path".into()]);
547        let state = p.push(r#"{"path": "/tmp/Привет.txt"}"#);
548        match state {
549            PrefixState::ValidPrefix {
550                known_leaves,
551                missing_required,
552            } => {
553                assert!(missing_required.is_empty());
554                let v = known_leaves["path"].as_str().unwrap();
555                assert!(v.contains("Привет"), "non-ASCII corrupted: {v}");
556            }
557            other => panic!("expected ValidPrefix, got {other:?}"),
558        }
559    }
560
561    /// Fixture 8: incremental watermark — second push does not re-parse completed pairs (H2).
562    #[test]
563    fn fixture_incremental_watermark() {
564        let mut p = PartialJsonParser::new();
565        p.set_required(vec!["a".into(), "b".into()]);
566        // First push: 'a' complete, 'b' incomplete
567        let s1 = p.push(r#"{"a": 1, "b": "#);
568        match &s1 {
569            PrefixState::ValidPrefix {
570                known_leaves,
571                missing_required,
572            } => {
573                assert!(known_leaves.contains_key("a"));
574                assert!(missing_required.contains(&"b".to_string()));
575            }
576            PrefixState::Incomplete => {} // also acceptable before 'b' is fully parsed
577            other @ PrefixState::Malformed => panic!("unexpected s1: {other:?}"),
578        }
579        // Second push: completes 'b'
580        let s2 = p.push("2}");
581        match s2 {
582            PrefixState::ValidPrefix {
583                known_leaves,
584                missing_required,
585            } => {
586                assert!(
587                    missing_required.is_empty(),
588                    "still missing: {missing_required:?}"
589                );
590                assert_eq!(known_leaves["a"], 1);
591                assert_eq!(known_leaves["b"], 2);
592            }
593            other => panic!("expected ValidPrefix, got {other:?}"),
594        }
595    }
596}