stream-rs 0.1.0

Zero-dependency, spec-compliant streaming toolkit for LLM responses (SSE, incremental JSON, OpenAI/Anthropic delta accumulators).
Documentation
//! Incremental scanning of concatenated / partial JSON.
//!
//! When consuming a streamed body you often receive JSON in arbitrary byte
//! chunks and need to know *where* one top-level JSON value ends and the next
//! begins — without pulling in a full serde dependency. [`JsonSplitter`] is a
//! dependency-free, byte-level scanner that tracks structural depth, string
//! state and escapes, and emits the byte ranges of complete top-level values
//! as they arrive.
//!
//! It does **not** validate the full JSON grammar (e.g. it will not reject
//! `{,}`); it tracks just enough state — brackets, braces, strings, escapes —
//! to find correct value boundaries, which is exactly what streaming framing
//! needs. Whitespace and separators (commas / newlines) between top-level
//! values are skipped, so it works for both NDJSON and bare concatenation.
//!
//! Because the default splitter is non-validating, malformed *structure* is not
//! flagged: a stray leading close bracket (e.g. `}` or `]` at depth 0, as in
//! `}{"a":1}`) is emitted as a one-byte "value" rather than rejected — the
//! depth counter saturates at zero rather than going negative. Feed it
//! well-formed framing (which is what every LLM streaming API actually sends)
//! and this never arises.
//!
//! If you must guard against adversarial framing, either validate each emitted
//! value with a real JSON parser, or construct the splitter in **strict mode**
//! with [`JsonSplitter::strict`]. In strict mode a structural violation (a
//! close bracket with no matching open at depth 0) is recorded as a
//! [`MalformedJson`] error, surfaced from the next [`JsonSplitter::feed`] return
//! value and from [`JsonSplitter::finish`], and the offending byte is dropped
//! instead of being emitted as a bogus value.
//!
//! # Example
//!
//! ```
//! use stream_rs::incremental_json::JsonSplitter;
//!
//! let mut s = JsonSplitter::new();
//! let mut out = Vec::new();
//! s.feed(br#"{"a":1}{"b":"#, &mut out);
//! assert_eq!(out, vec![r#"{"a":1}"#.to_string()]);
//!
//! out.clear();
//! s.feed(br#"[1,2]}"#, &mut out); // completes the second object
//! assert_eq!(out, vec![r#"{"b":[1,2]}"#.to_string()]);
//! ```
//!
//! When the stream ends, call [`JsonSplitter::finish`]: it flushes a trailing
//! bare scalar that no separator terminated (e.g. a final `42` or `true` with
//! no newline), and returns a [`FinishError`] if structural data was left
//! dangling inside an object, array or string, or — in strict mode — if a
//! framing violation was seen earlier in the stream.

use alloc::string::String;
use alloc::vec::Vec;

use crate::error::{MalformedJson, TruncatedJson};

/// Splits a byte stream into complete top-level JSON values.
#[derive(Debug, Default)]
// The fields are independent scanner flags, not a hidden state machine; an
// enum would obscure rather than clarify the byte-level logic.
#[allow(clippy::struct_excessive_bools)]
pub struct JsonSplitter {
    /// Bytes of the value currently being assembled.
    buf: Vec<u8>,
    /// Structural nesting depth (objects + arrays).
    depth: usize,
    /// Whether the scanner is inside a string literal.
    in_string: bool,
    /// Whether the previous byte was a backslash inside a string.
    escaped: bool,
    /// True once we have seen the first structural/scalar byte of a value.
    in_value: bool,
    /// When `true`, structural framing violations are rejected instead of being
    /// emitted as bogus values (see [`JsonSplitter::strict`]).
    strict: bool,
    /// Count of complete top-level values emitted so far (for error context).
    values_emitted: usize,
    /// First framing violation seen in strict mode, surfaced from `finish`
    /// (and observable via [`JsonSplitter::error`]).
    error: Option<MalformedJson>,
}

impl JsonSplitter {
    /// Create a new splitter in the default, non-validating mode.
    #[must_use]
    pub fn new() -> Self {
        Self::default()
    }

    /// Create a splitter in **strict mode**.
    ///
    /// Strict mode rejects structural framing violations — currently a closing
    /// bracket (`}` or `]`) seen at depth 0 with no matching open, as in
    /// `}{"a":1}`. The offending byte is dropped (never emitted as a bogus
    /// value) and the first such violation is recorded as a [`MalformedJson`]
    /// error, retrievable via [`error`](Self::error) and returned from
    /// [`finish`](Self::finish). Well-formed values continue to be emitted as
    /// usual; only the malformed framing is suppressed.
    #[must_use]
    pub fn strict() -> Self {
        Self {
            strict: true,
            ..Self::default()
        }
    }

    /// The first strict-mode framing violation seen so far, if any.
    ///
    /// Always `None` for a splitter created with [`new`](Self::new).
    #[must_use]
    pub fn error(&self) -> Option<&MalformedJson> {
        self.error.as_ref()
    }

    /// Feed a chunk of bytes. Each completed top-level value is decoded
    /// (lossy UTF-8) and pushed onto `out`.
    pub fn feed(&mut self, chunk: &[u8], out: &mut Vec<String>) {
        for &b in chunk {
            self.push_byte(b, out);
        }
    }

    /// True when there are buffered bytes of an unfinished value.
    #[must_use]
    pub fn has_partial(&self) -> bool {
        !self.buf.is_empty()
    }

    /// Signal end of stream.
    ///
    /// A bare top-level scalar (number, `true`, `false`, `null`) is normally
    /// only emitted once a following separator proves it complete. If the
    /// stream ends right after such a scalar there is no separator, so this
    /// method flushes it onto `out`.
    ///
    /// If the stream ends while still inside an object, array, or string
    /// literal — i.e. the value was genuinely truncated mid-flight — the
    /// buffered bytes are dropped and a [`TruncatedJson`] error is returned so
    /// the caller can distinguish a clean end from a cut connection.
    ///
    /// In **strict mode**, a framing violation recorded earlier (see
    /// [`strict`](Self::strict)) takes priority: it is returned as a
    /// [`FinishError::Malformed`]. A clean truncation is
    /// [`FinishError::Truncated`].
    ///
    /// Either way the splitter is reset and may be reused for a fresh stream
    /// (the strict/non-strict mode itself is preserved).
    pub fn finish(&mut self, out: &mut Vec<String>) -> Result<(), FinishError> {
        // A recorded strict-mode framing violation outranks everything else.
        if let Some(err) = self.error.take() {
            self.reset();
            return Err(FinishError::Malformed(err));
        }

        // A complete bare scalar sits at depth 0, outside any string, with
        // buffered bytes. Anything else still buffered is incomplete.
        if self.in_value && self.depth == 0 && !self.in_string {
            self.emit(out);
            return Ok(());
        }

        let buffered = self.buf.len();
        self.reset();
        if buffered > 0 {
            Err(FinishError::Truncated(TruncatedJson { buffered }))
        } else {
            Ok(())
        }
    }

    /// Reset per-stream scanning state, preserving the configured mode and any
    /// not-yet-consumed strict-mode error.
    fn reset(&mut self) {
        self.buf.clear();
        self.depth = 0;
        self.in_string = false;
        self.escaped = false;
        self.in_value = false;
    }

    fn push_byte(&mut self, b: u8, out: &mut Vec<String>) {
        // Inside a string literal, only `"`/escape transitions matter.
        if self.in_value && self.in_string {
            self.buf.push(b);
            if self.escaped {
                self.escaped = false;
            } else if b == b'\\' {
                self.escaped = true;
            } else if b == b'"' {
                self.in_string = false;
            }
            return;
        }

        let is_sep = b.is_ascii_whitespace() || b == b',';

        // Between values: a separator terminates a pending bare scalar (number,
        // `true`, `false`, `null`) that is at depth 0, otherwise it is skipped.
        if !self.in_value {
            if is_sep {
                return;
            }
            self.in_value = true;
        } else if self.depth == 0 && is_sep {
            // We are accumulating a bare scalar and just hit a separator: the
            // scalar is complete. Do not consume the separator into it.
            self.emit(out);
            return;
        }

        self.buf.push(b);

        match b {
            b'"' => self.in_string = true,
            b'{' | b'[' => self.depth += 1,
            b'}' | b']' => {
                if self.depth == 0 {
                    // A close bracket with nothing open: this only happens on
                    // malformed framing. In strict mode, drop the byte and
                    // record the violation instead of emitting a bogus value.
                    if self.strict {
                        if self.error.is_none() {
                            self.error = Some(MalformedJson {
                                byte: b,
                                values_emitted: self.values_emitted,
                            });
                        }
                        self.buf.pop();
                        self.in_value = false;
                        return;
                    }
                    // Non-strict: preserve the historical behaviour of emitting
                    // the stray byte as a one-byte "value".
                    self.emit(out);
                    return;
                }
                self.depth -= 1;
                if self.depth == 0 {
                    self.emit(out);
                }
            }
            _ => { /* scalar byte; flushed when a separator follows */ }
        }
    }

    /// Emit the currently buffered value and reset per-value state.
    fn emit(&mut self, out: &mut Vec<String>) {
        let bytes = core::mem::take(&mut self.buf);
        out.push(String::from_utf8_lossy(&bytes).into_owned());
        self.values_emitted += 1;
        self.reset();
    }
}

/// Error returned by [`JsonSplitter::finish`].
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum FinishError {
    /// The stream ended mid-value (inside an object, array, or string).
    Truncated(TruncatedJson),
    /// A strict-mode framing violation was seen during the stream.
    Malformed(MalformedJson),
}

impl core::fmt::Display for FinishError {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        match self {
            Self::Truncated(e) => core::fmt::Display::fmt(e, f),
            Self::Malformed(e) => core::fmt::Display::fmt(e, f),
        }
    }
}

impl From<TruncatedJson> for FinishError {
    fn from(e: TruncatedJson) -> Self {
        Self::Truncated(e)
    }
}

impl From<MalformedJson> for FinishError {
    fn from(e: MalformedJson) -> Self {
        Self::Malformed(e)
    }
}

#[cfg(feature = "std")]
impl std::error::Error for FinishError {
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        match self {
            Self::Truncated(e) => Some(e),
            Self::Malformed(e) => Some(e),
        }
    }
}