Skip to main content

stream_rs/
incremental_json.rs

1//! Incremental scanning of concatenated / partial JSON.
2//!
3//! When consuming a streamed body you often receive JSON in arbitrary byte
4//! chunks and need to know *where* one top-level JSON value ends and the next
5//! begins — without pulling in a full serde dependency. [`JsonSplitter`] is a
6//! dependency-free, byte-level scanner that tracks structural depth, string
7//! state and escapes, and emits the byte ranges of complete top-level values
8//! as they arrive.
9//!
10//! It does **not** validate the full JSON grammar (e.g. it will not reject
11//! `{,}`); it tracks just enough state — brackets, braces, strings, escapes —
12//! to find correct value boundaries, which is exactly what streaming framing
13//! needs. Whitespace and separators (commas / newlines) between top-level
14//! values are skipped, so it works for both NDJSON and bare concatenation.
15//!
16//! Because the default splitter is non-validating, malformed *structure* is not
17//! flagged: a stray leading close bracket (e.g. `}` or `]` at depth 0, as in
18//! `}{"a":1}`) is emitted as a one-byte "value" rather than rejected — the
19//! depth counter saturates at zero rather than going negative. Feed it
20//! well-formed framing (which is what every LLM streaming API actually sends)
21//! and this never arises.
22//!
23//! If you must guard against adversarial framing, either validate each emitted
24//! value with a real JSON parser, or construct the splitter in **strict mode**
25//! with [`JsonSplitter::strict`]. In strict mode a structural violation (a
26//! close bracket with no matching open at depth 0) is recorded as a
27//! [`MalformedJson`] error, surfaced from the next [`JsonSplitter::feed`] return
28//! value and from [`JsonSplitter::finish`], and the offending byte is dropped
29//! instead of being emitted as a bogus value.
30//!
31//! # Example
32//!
33//! ```
34//! use stream_rs::incremental_json::JsonSplitter;
35//!
36//! let mut s = JsonSplitter::new();
37//! let mut out = Vec::new();
38//! s.feed(br#"{"a":1}{"b":"#, &mut out);
39//! assert_eq!(out, vec![r#"{"a":1}"#.to_string()]);
40//!
41//! out.clear();
42//! s.feed(br#"[1,2]}"#, &mut out); // completes the second object
43//! assert_eq!(out, vec![r#"{"b":[1,2]}"#.to_string()]);
44//! ```
45//!
46//! When the stream ends, call [`JsonSplitter::finish`]: it flushes a trailing
47//! bare scalar that no separator terminated (e.g. a final `42` or `true` with
48//! no newline), and returns a [`FinishError`] if structural data was left
49//! dangling inside an object, array or string, or — in strict mode — if a
50//! framing violation was seen earlier in the stream.
51
52use alloc::string::String;
53use alloc::vec::Vec;
54
55use crate::error::{MalformedJson, TruncatedJson};
56
57/// Splits a byte stream into complete top-level JSON values.
58#[derive(Debug, Default)]
59// The fields are independent scanner flags, not a hidden state machine; an
60// enum would obscure rather than clarify the byte-level logic.
61#[allow(clippy::struct_excessive_bools)]
62pub struct JsonSplitter {
63    /// Bytes of the value currently being assembled.
64    buf: Vec<u8>,
65    /// Structural nesting depth (objects + arrays).
66    depth: usize,
67    /// Whether the scanner is inside a string literal.
68    in_string: bool,
69    /// Whether the previous byte was a backslash inside a string.
70    escaped: bool,
71    /// True once we have seen the first structural/scalar byte of a value.
72    in_value: bool,
73    /// When `true`, structural framing violations are rejected instead of being
74    /// emitted as bogus values (see [`JsonSplitter::strict`]).
75    strict: bool,
76    /// Count of complete top-level values emitted so far (for error context).
77    values_emitted: usize,
78    /// First framing violation seen in strict mode, surfaced from `finish`
79    /// (and observable via [`JsonSplitter::error`]).
80    error: Option<MalformedJson>,
81}
82
83impl JsonSplitter {
84    /// Create a new splitter in the default, non-validating mode.
85    #[must_use]
86    pub fn new() -> Self {
87        Self::default()
88    }
89
90    /// Create a splitter in **strict mode**.
91    ///
92    /// Strict mode rejects structural framing violations — currently a closing
93    /// bracket (`}` or `]`) seen at depth 0 with no matching open, as in
94    /// `}{"a":1}`. The offending byte is dropped (never emitted as a bogus
95    /// value) and the first such violation is recorded as a [`MalformedJson`]
96    /// error, retrievable via [`error`](Self::error) and returned from
97    /// [`finish`](Self::finish). Well-formed values continue to be emitted as
98    /// usual; only the malformed framing is suppressed.
99    #[must_use]
100    pub fn strict() -> Self {
101        Self {
102            strict: true,
103            ..Self::default()
104        }
105    }
106
107    /// The first strict-mode framing violation seen so far, if any.
108    ///
109    /// Always `None` for a splitter created with [`new`](Self::new).
110    #[must_use]
111    pub fn error(&self) -> Option<&MalformedJson> {
112        self.error.as_ref()
113    }
114
115    /// Feed a chunk of bytes. Each completed top-level value is decoded
116    /// (lossy UTF-8) and pushed onto `out`.
117    pub fn feed(&mut self, chunk: &[u8], out: &mut Vec<String>) {
118        for &b in chunk {
119            self.push_byte(b, out);
120        }
121    }
122
123    /// True when there are buffered bytes of an unfinished value.
124    #[must_use]
125    pub fn has_partial(&self) -> bool {
126        !self.buf.is_empty()
127    }
128
129    /// Signal end of stream.
130    ///
131    /// A bare top-level scalar (number, `true`, `false`, `null`) is normally
132    /// only emitted once a following separator proves it complete. If the
133    /// stream ends right after such a scalar there is no separator, so this
134    /// method flushes it onto `out`.
135    ///
136    /// If the stream ends while still inside an object, array, or string
137    /// literal — i.e. the value was genuinely truncated mid-flight — the
138    /// buffered bytes are dropped and a [`TruncatedJson`] error is returned so
139    /// the caller can distinguish a clean end from a cut connection.
140    ///
141    /// In **strict mode**, a framing violation recorded earlier (see
142    /// [`strict`](Self::strict)) takes priority: it is returned as a
143    /// [`FinishError::Malformed`]. A clean truncation is
144    /// [`FinishError::Truncated`].
145    ///
146    /// Either way the splitter is reset and may be reused for a fresh stream
147    /// (the strict/non-strict mode itself is preserved).
148    pub fn finish(&mut self, out: &mut Vec<String>) -> Result<(), FinishError> {
149        // A recorded strict-mode framing violation outranks everything else.
150        if let Some(err) = self.error.take() {
151            self.reset();
152            return Err(FinishError::Malformed(err));
153        }
154
155        // A complete bare scalar sits at depth 0, outside any string, with
156        // buffered bytes. Anything else still buffered is incomplete.
157        if self.in_value && self.depth == 0 && !self.in_string {
158            self.emit(out);
159            return Ok(());
160        }
161
162        let buffered = self.buf.len();
163        self.reset();
164        if buffered > 0 {
165            Err(FinishError::Truncated(TruncatedJson { buffered }))
166        } else {
167            Ok(())
168        }
169    }
170
171    /// Reset per-stream scanning state, preserving the configured mode and any
172    /// not-yet-consumed strict-mode error.
173    fn reset(&mut self) {
174        self.buf.clear();
175        self.depth = 0;
176        self.in_string = false;
177        self.escaped = false;
178        self.in_value = false;
179    }
180
181    fn push_byte(&mut self, b: u8, out: &mut Vec<String>) {
182        // Inside a string literal, only `"`/escape transitions matter.
183        if self.in_value && self.in_string {
184            self.buf.push(b);
185            if self.escaped {
186                self.escaped = false;
187            } else if b == b'\\' {
188                self.escaped = true;
189            } else if b == b'"' {
190                self.in_string = false;
191            }
192            return;
193        }
194
195        let is_sep = b.is_ascii_whitespace() || b == b',';
196
197        // Between values: a separator terminates a pending bare scalar (number,
198        // `true`, `false`, `null`) that is at depth 0, otherwise it is skipped.
199        if !self.in_value {
200            if is_sep {
201                return;
202            }
203            self.in_value = true;
204        } else if self.depth == 0 && is_sep {
205            // We are accumulating a bare scalar and just hit a separator: the
206            // scalar is complete. Do not consume the separator into it.
207            self.emit(out);
208            return;
209        }
210
211        self.buf.push(b);
212
213        match b {
214            b'"' => self.in_string = true,
215            b'{' | b'[' => self.depth += 1,
216            b'}' | b']' => {
217                if self.depth == 0 {
218                    // A close bracket with nothing open: this only happens on
219                    // malformed framing. In strict mode, drop the byte and
220                    // record the violation instead of emitting a bogus value.
221                    if self.strict {
222                        if self.error.is_none() {
223                            self.error = Some(MalformedJson {
224                                byte: b,
225                                values_emitted: self.values_emitted,
226                            });
227                        }
228                        self.buf.pop();
229                        self.in_value = false;
230                        return;
231                    }
232                    // Non-strict: preserve the historical behaviour of emitting
233                    // the stray byte as a one-byte "value".
234                    self.emit(out);
235                    return;
236                }
237                self.depth -= 1;
238                if self.depth == 0 {
239                    self.emit(out);
240                }
241            }
242            _ => { /* scalar byte; flushed when a separator follows */ }
243        }
244    }
245
246    /// Emit the currently buffered value and reset per-value state.
247    fn emit(&mut self, out: &mut Vec<String>) {
248        let bytes = core::mem::take(&mut self.buf);
249        out.push(String::from_utf8_lossy(&bytes).into_owned());
250        self.values_emitted += 1;
251        self.reset();
252    }
253}
254
255/// Error returned by [`JsonSplitter::finish`].
256#[derive(Debug, Clone, PartialEq, Eq)]
257#[non_exhaustive]
258pub enum FinishError {
259    /// The stream ended mid-value (inside an object, array, or string).
260    Truncated(TruncatedJson),
261    /// A strict-mode framing violation was seen during the stream.
262    Malformed(MalformedJson),
263}
264
265impl core::fmt::Display for FinishError {
266    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
267        match self {
268            Self::Truncated(e) => core::fmt::Display::fmt(e, f),
269            Self::Malformed(e) => core::fmt::Display::fmt(e, f),
270        }
271    }
272}
273
274impl From<TruncatedJson> for FinishError {
275    fn from(e: TruncatedJson) -> Self {
276        Self::Truncated(e)
277    }
278}
279
280impl From<MalformedJson> for FinishError {
281    fn from(e: MalformedJson) -> Self {
282        Self::Malformed(e)
283    }
284}
285
286#[cfg(feature = "std")]
287impl std::error::Error for FinishError {
288    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
289        match self {
290            Self::Truncated(e) => Some(e),
291            Self::Malformed(e) => Some(e),
292        }
293    }
294}