stream_rs/incremental_json.rs
1//! Incremental scanning of concatenated / partial JSON.
2//!
3//! When consuming a streamed body you often receive JSON in arbitrary byte
4//! chunks and need to know *where* one top-level JSON value ends and the next
5//! begins — without pulling in a full serde dependency. [`JsonSplitter`] is a
6//! dependency-free, byte-level scanner that tracks structural depth, string
7//! state and escapes, and emits the byte ranges of complete top-level values
8//! as they arrive.
9//!
10//! It does **not** validate the full JSON grammar (e.g. it will not reject
11//! `{,}`); it tracks just enough state — brackets, braces, strings, escapes —
12//! to find correct value boundaries, which is exactly what streaming framing
13//! needs. Whitespace and separators (commas / newlines) between top-level
14//! values are skipped, so it works for both NDJSON and bare concatenation.
15//!
16//! Because the default splitter is non-validating, malformed *structure* is not
17//! flagged: a stray leading close bracket (e.g. `}` or `]` at depth 0, as in
18//! `}{"a":1}`) is emitted as a one-byte "value" rather than rejected — the
19//! depth counter saturates at zero rather than going negative. Feed it
20//! well-formed framing (which is what every LLM streaming API actually sends)
21//! and this never arises.
22//!
23//! If you must guard against adversarial framing, either validate each emitted
24//! value with a real JSON parser, or construct the splitter in **strict mode**
25//! with [`JsonSplitter::strict`]. In strict mode a structural violation (a
26//! close bracket with no matching open at depth 0) is recorded as a
27//! [`MalformedJson`] error, surfaced from the next [`JsonSplitter::feed`] return
28//! value and from [`JsonSplitter::finish`], and the offending byte is dropped
29//! instead of being emitted as a bogus value.
30//!
31//! # Example
32//!
33//! ```
34//! use stream_rs::incremental_json::JsonSplitter;
35//!
36//! let mut s = JsonSplitter::new();
37//! let mut out = Vec::new();
38//! s.feed(br#"{"a":1}{"b":"#, &mut out);
39//! assert_eq!(out, vec![r#"{"a":1}"#.to_string()]);
40//!
41//! out.clear();
42//! s.feed(br#"[1,2]}"#, &mut out); // completes the second object
43//! assert_eq!(out, vec![r#"{"b":[1,2]}"#.to_string()]);
44//! ```
45//!
46//! When the stream ends, call [`JsonSplitter::finish`]: it flushes a trailing
47//! bare scalar that no separator terminated (e.g. a final `42` or `true` with
48//! no newline), and returns a [`FinishError`] if structural data was left
49//! dangling inside an object, array or string, or — in strict mode — if a
50//! framing violation was seen earlier in the stream.
51
52use alloc::string::String;
53use alloc::vec::Vec;
54
55use crate::error::{MalformedJson, TruncatedJson};
56
57/// Splits a byte stream into complete top-level JSON values.
58#[derive(Debug, Default)]
59// The fields are independent scanner flags, not a hidden state machine; an
60// enum would obscure rather than clarify the byte-level logic.
61#[allow(clippy::struct_excessive_bools)]
62pub struct JsonSplitter {
63 /// Bytes of the value currently being assembled.
64 buf: Vec<u8>,
65 /// Structural nesting depth (objects + arrays).
66 depth: usize,
67 /// Whether the scanner is inside a string literal.
68 in_string: bool,
69 /// Whether the previous byte was a backslash inside a string.
70 escaped: bool,
71 /// True once we have seen the first structural/scalar byte of a value.
72 in_value: bool,
73 /// When `true`, structural framing violations are rejected instead of being
74 /// emitted as bogus values (see [`JsonSplitter::strict`]).
75 strict: bool,
76 /// Count of complete top-level values emitted so far (for error context).
77 values_emitted: usize,
78 /// First framing violation seen in strict mode, surfaced from `finish`
79 /// (and observable via [`JsonSplitter::error`]).
80 error: Option<MalformedJson>,
81}
82
83impl JsonSplitter {
84 /// Create a new splitter in the default, non-validating mode.
85 #[must_use]
86 pub fn new() -> Self {
87 Self::default()
88 }
89
90 /// Create a splitter in **strict mode**.
91 ///
92 /// Strict mode rejects structural framing violations — currently a closing
93 /// bracket (`}` or `]`) seen at depth 0 with no matching open, as in
94 /// `}{"a":1}`. The offending byte is dropped (never emitted as a bogus
95 /// value) and the first such violation is recorded as a [`MalformedJson`]
96 /// error, retrievable via [`error`](Self::error) and returned from
97 /// [`finish`](Self::finish). Well-formed values continue to be emitted as
98 /// usual; only the malformed framing is suppressed.
99 #[must_use]
100 pub fn strict() -> Self {
101 Self {
102 strict: true,
103 ..Self::default()
104 }
105 }
106
107 /// The first strict-mode framing violation seen so far, if any.
108 ///
109 /// Always `None` for a splitter created with [`new`](Self::new).
110 #[must_use]
111 pub fn error(&self) -> Option<&MalformedJson> {
112 self.error.as_ref()
113 }
114
115 /// Feed a chunk of bytes. Each completed top-level value is decoded
116 /// (lossy UTF-8) and pushed onto `out`.
117 pub fn feed(&mut self, chunk: &[u8], out: &mut Vec<String>) {
118 for &b in chunk {
119 self.push_byte(b, out);
120 }
121 }
122
123 /// True when there are buffered bytes of an unfinished value.
124 #[must_use]
125 pub fn has_partial(&self) -> bool {
126 !self.buf.is_empty()
127 }
128
129 /// Signal end of stream.
130 ///
131 /// A bare top-level scalar (number, `true`, `false`, `null`) is normally
132 /// only emitted once a following separator proves it complete. If the
133 /// stream ends right after such a scalar there is no separator, so this
134 /// method flushes it onto `out`.
135 ///
136 /// If the stream ends while still inside an object, array, or string
137 /// literal — i.e. the value was genuinely truncated mid-flight — the
138 /// buffered bytes are dropped and a [`TruncatedJson`] error is returned so
139 /// the caller can distinguish a clean end from a cut connection.
140 ///
141 /// In **strict mode**, a framing violation recorded earlier (see
142 /// [`strict`](Self::strict)) takes priority: it is returned as a
143 /// [`FinishError::Malformed`]. A clean truncation is
144 /// [`FinishError::Truncated`].
145 ///
146 /// Either way the splitter is reset and may be reused for a fresh stream
147 /// (the strict/non-strict mode itself is preserved).
148 pub fn finish(&mut self, out: &mut Vec<String>) -> Result<(), FinishError> {
149 // A recorded strict-mode framing violation outranks everything else.
150 if let Some(err) = self.error.take() {
151 self.reset();
152 return Err(FinishError::Malformed(err));
153 }
154
155 // A complete bare scalar sits at depth 0, outside any string, with
156 // buffered bytes. Anything else still buffered is incomplete.
157 if self.in_value && self.depth == 0 && !self.in_string {
158 self.emit(out);
159 return Ok(());
160 }
161
162 let buffered = self.buf.len();
163 self.reset();
164 if buffered > 0 {
165 Err(FinishError::Truncated(TruncatedJson { buffered }))
166 } else {
167 Ok(())
168 }
169 }
170
171 /// Reset per-stream scanning state, preserving the configured mode and any
172 /// not-yet-consumed strict-mode error.
173 fn reset(&mut self) {
174 self.buf.clear();
175 self.depth = 0;
176 self.in_string = false;
177 self.escaped = false;
178 self.in_value = false;
179 }
180
181 fn push_byte(&mut self, b: u8, out: &mut Vec<String>) {
182 // Inside a string literal, only `"`/escape transitions matter.
183 if self.in_value && self.in_string {
184 self.buf.push(b);
185 if self.escaped {
186 self.escaped = false;
187 } else if b == b'\\' {
188 self.escaped = true;
189 } else if b == b'"' {
190 self.in_string = false;
191 }
192 return;
193 }
194
195 let is_sep = b.is_ascii_whitespace() || b == b',';
196
197 // Between values: a separator terminates a pending bare scalar (number,
198 // `true`, `false`, `null`) that is at depth 0, otherwise it is skipped.
199 if !self.in_value {
200 if is_sep {
201 return;
202 }
203 self.in_value = true;
204 } else if self.depth == 0 && is_sep {
205 // We are accumulating a bare scalar and just hit a separator: the
206 // scalar is complete. Do not consume the separator into it.
207 self.emit(out);
208 return;
209 }
210
211 self.buf.push(b);
212
213 match b {
214 b'"' => self.in_string = true,
215 b'{' | b'[' => self.depth += 1,
216 b'}' | b']' => {
217 if self.depth == 0 {
218 // A close bracket with nothing open: this only happens on
219 // malformed framing. In strict mode, drop the byte and
220 // record the violation instead of emitting a bogus value.
221 if self.strict {
222 if self.error.is_none() {
223 self.error = Some(MalformedJson {
224 byte: b,
225 values_emitted: self.values_emitted,
226 });
227 }
228 self.buf.pop();
229 self.in_value = false;
230 return;
231 }
232 // Non-strict: preserve the historical behaviour of emitting
233 // the stray byte as a one-byte "value".
234 self.emit(out);
235 return;
236 }
237 self.depth -= 1;
238 if self.depth == 0 {
239 self.emit(out);
240 }
241 }
242 _ => { /* scalar byte; flushed when a separator follows */ }
243 }
244 }
245
246 /// Emit the currently buffered value and reset per-value state.
247 fn emit(&mut self, out: &mut Vec<String>) {
248 let bytes = core::mem::take(&mut self.buf);
249 out.push(String::from_utf8_lossy(&bytes).into_owned());
250 self.values_emitted += 1;
251 self.reset();
252 }
253}
254
255/// Error returned by [`JsonSplitter::finish`].
256#[derive(Debug, Clone, PartialEq, Eq)]
257#[non_exhaustive]
258pub enum FinishError {
259 /// The stream ended mid-value (inside an object, array, or string).
260 Truncated(TruncatedJson),
261 /// A strict-mode framing violation was seen during the stream.
262 Malformed(MalformedJson),
263}
264
265impl core::fmt::Display for FinishError {
266 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
267 match self {
268 Self::Truncated(e) => core::fmt::Display::fmt(e, f),
269 Self::Malformed(e) => core::fmt::Display::fmt(e, f),
270 }
271 }
272}
273
274impl From<TruncatedJson> for FinishError {
275 fn from(e: TruncatedJson) -> Self {
276 Self::Truncated(e)
277 }
278}
279
280impl From<MalformedJson> for FinishError {
281 fn from(e: MalformedJson) -> Self {
282 Self::Malformed(e)
283 }
284}
285
286#[cfg(feature = "std")]
287impl std::error::Error for FinishError {
288 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
289 match self {
290 Self::Truncated(e) => Some(e),
291 Self::Malformed(e) => Some(e),
292 }
293 }
294}