stream-rs 0.1.0

Zero-dependency, spec-compliant streaming toolkit for LLM responses (SSE, incremental JSON, OpenAI/Anthropic delta accumulators).
Documentation

stream-rs

CI crates.io docs.rs MSRV License: MIT

Zero-dependency, spec-compliant streaming toolkit for LLM responses — in Rust.

stream-rs is the low-level plumbing that sits underneath an LLM client. It does not talk to any model, manage API keys, or open sockets. It does one thing: turn a raw, chunked byte stream into usable values, correctly, with no runtime dependencies.

It is the reliable foundation an LLM application is built on — the part most integrations re-implement by hand and get subtly wrong. By owning the byte-level streaming layer instead of treating the provider SDK as a black box, you get correct behaviour on the edge cases that only show up in production (split chunks, partial UTF-8, interleaved tool-call deltas) and you stay free of heavy dependencies.

It is a Rust companion to the TypeScript sse-stream toolkit — the same problem domain, idiomatic Rust.

Why this matters for LLM apps

When you stream a completion, the provider sends Server-Sent Events over a chunked HTTP body. The bytes you actually read() off the socket have no relationship to event or token boundaries. A single read might give you:

ta: {"choices":[{"delta":{"content":"Par

…the tail of one event, half a JSON object, and a line terminator that will only be completed by the next packet. The naive approach — split("\n\n") on each chunk, or json.loads() per line — silently breaks here:

# The bug in countless streaming integrations:
for chunk in response.iter_content():
    for line in chunk.split(b"\n\n"):        # boundary may fall mid-event
        if line.startswith(b"data: "):
            event = json.loads(line[6:])     # may be half a JSON object -> crash

It works on localhost and fails under real network conditions, with multi-line data fields, or when a tool call's arguments arrive across several deltas.

stream-rs is a stateful push parser: you feed it whatever bytes arrive, and it buffers across calls so a value is only emitted once it is genuinely complete. The same input split at any byte boundary yields identical output — a property verified by tests and a fuzzer (see Testing). End-to-end, the flow is:

bytes  ->  SseParser  ->  event.data (JSON)  ->  your JSON parser  ->  Accumulator  ->  final message

Runnable, no-API-key demonstrations of the full pipeline live in examples/: cargo run --example openai_stream, cargo run --example anthropic_stream, and cargo run --example gemini_stream (the last one frames a Gemini object stream with JsonSplitter instead of SSE).

Why

Every streaming LLM integration re-implements the same fiddly pieces by hand, and most get the edge cases wrong:

  • SSE line endings split across TCP chunks (\r at the end of a read).
  • The leading UTF-8 BOM, lone \r terminators, multi-line data fields.
  • Re-assembling provider deltas (choices[i].delta, content_block_delta).
  • Finding where one JSON value ends and the next begins in a byte stream.

stream-rs solves exactly these, and nothing else.

Features

Module What it does
sse WHATWG-compliant Server-Sent Events push parser (feed bytes, get events).
incremental_json Byte-level splitter that finds complete top-level JSON values in a chunked stream (NDJSON or bare concatenation).
accumulators::openai Fold chat.completion.chunk deltas (content + tool calls) into final choices.
accumulators::anthropic Fold Messages API events (content_block_*, message_*) into final blocks, with ordering validation.
accumulators::gemini Fold streamGenerateContent responses (text parts + function calls) into final candidates.
stream (feature) Optional futures_core::Stream adapter for async byte sources.

The core has no runtime dependencies. The optional stream feature pulls in only futures-core.

Install

Not published to crates.io; depend on it straight from Git:

[dependencies]
stream-rs = { git = "https://github.com/ky2renzzz/stream-rs" }

# optional async Stream adapter:
# stream-rs = { git = "https://github.com/ky2renzzz/stream-rs", features = ["stream"] }

# no_std (embedded / wasm) — drops only the std::error::Error impls:
# stream-rs = { git = "https://github.com/ky2renzzz/stream-rs", default-features = false }

The crate is #![no_std] and needs only alloc. The default std feature adds std::error::Error impls for the error types; everything else — both parsers, all accumulators, and the optional stream adapter — works without std.

Usage

SSE parsing (handles chunk boundaries)

use stream_rs::sse::SseParser;

let mut parser = SseParser::new();
let mut events = Vec::new();

// A CRLF split across two chunks, and a multi-line data field:
parser.feed(b"event: greeting\r\ndata: hello\r", &mut events);
parser.feed(b"\ndata: world\r\n\r\n", &mut events);

assert_eq!(events.len(), 1);
assert_eq!(events[0].event.as_deref(), Some("greeting"));
assert_eq!(events[0].data, "hello\nworld");

Splitting a JSON stream

use stream_rs::incremental_json::JsonSplitter;

let mut splitter = JsonSplitter::new();
let mut values = Vec::new();

splitter.feed(br#"{"a":1}{"b":"#, &mut values);
assert_eq!(values, vec![r#"{"a":1}"#]);

values.clear();
splitter.feed(br#"[1,2]}"#, &mut values); // completes the partial object
assert_eq!(values, vec![r#"{"b":[1,2]}"#]);

// At end of stream, finish() flushes a trailing bare scalar and reports an
// error if the stream was cut mid-value:
let mut values = Vec::new();
let mut s = JsonSplitter::new();
s.feed(b"42", &mut values);          // a bare scalar isn't emitted until proven complete
s.finish(&mut values).unwrap();      // ...so finish() flushes it
assert_eq!(values, vec!["42"]);

Accumulating OpenAI deltas

use stream_rs::accumulators::openai::OpenAiAccumulator;

let mut acc = OpenAiAccumulator::new();
 acc.push_role(0, "assistant");
 acc.push_content(0, "Hel");
 acc.push_content(0, "lo");

assert_eq!(acc.choice(0).unwrap().content, "Hello");

Accumulating Anthropic events (with ordering checks)

use stream_rs::accumulators::anthropic::{AnthropicAccumulator, BlockKind};

let mut acc = AnthropicAccumulator::new();
acc.message_start();
acc.content_block_start(0, BlockKind::Text).unwrap();
acc.text_delta(0, "Hi").unwrap();
acc.content_block_stop(0).unwrap();

assert_eq!(acc.block(0).unwrap().text, "Hi");

// A delta for a block that was never started is rejected:
assert!(acc.text_delta(5, "x").is_err());

Accumulating Gemini responses

use stream_rs::accumulators::gemini::GeminiAccumulator;

let mut acc = GeminiAccumulator::new();
acc.push_text(0, "Hel");
acc.push_text(0, "lo");
acc.push_function_call(0, "get_weather", r#"{"city":"Paris"}"#);
acc.set_finish_reason(0, "STOP");

let c = acc.candidate(0).unwrap();
assert_eq!(c.text, "Hello");
assert_eq!(c.function_calls[0].name, "get_weather");

Async adapter (stream feature)

# async fn run() {
use futures::stream::{self, StreamExt};
use stream_rs::stream::SseStream;

let body = stream::iter(vec![
    Ok::<_, std::io::Error>(b"data: hello\n".to_vec()),
    Ok(b"\n".to_vec()),
]);

let mut events = SseStream::new(body);
while let Some(event) = events.next().await {
    println!("{}", event.unwrap().data);
}
# }

Design principles

  • Spec first. The SSE parser follows the WHATWG event-stream algorithm: BOM stripping, \r / \n / \r\n terminators (including split across chunks), comment lines, single-leading-space removal, NUL-in-id rejection, non-numeric retry rejection, and the "empty data is not dispatched" rule.
  • Push, not pull. Parsers accept arbitrary byte chunks, so they drop into any transport without assuming an async runtime.
  • Zero core dependencies. Async support is strictly opt-in behind a feature flag.
  • JSON-library agnostic accumulators. They take already-parsed primitive pieces, so you wire them to serde_json, simd-json, or anything else.
  • Framing, not validation, for JSON. incremental_json is deliberately a structural splitter, not a parser: it tracks brackets, strings and escapes to find correct value boundaries, but does not enforce the full grammar ({,} passes). That is exactly the contract streaming framing needs — hand each emitted slice to a real JSON parser afterwards.

Security & limits

stream-rs is framing infrastructure, so its safety contract is worth stating explicitly:

  • No unbounded validation, by design. incremental_json is a structural splitter, not a parser: it tracks brackets, strings and escapes to find value boundaries but does not enforce the grammar ({,} passes) and does not impose a maximum nesting depth — depth is a usize guarded by saturating_sub, so deeply nested adversarial input cannot panic or overflow, but it also is not rejected. If you frame untrusted input, cap chunk sizes and hand each emitted slice to a real JSON parser (which enforces depth) immediately.
  • Lossy UTF-8 decoding. Both parsers buffer raw bytes and only decode once a value is complete, so a multibyte codepoint split across chunks is reassembled correctly (covered by tests). Genuinely invalid UTF-8 is replaced with U+FFFD (matching browser SSE leniency) rather than rejected.
  • No panics on arbitrary input. The chunk-boundary-invariance fuzzers assert that any bytes, split at any point, never panic — see Fuzzing.
  • Bounded memory is the caller's responsibility. A push parser buffers an in-progress line / value until it completes; a peer that never sends a terminator can grow that buffer. Enforce your own byte ceiling on the transport if peers are untrusted.
  • Tool-call scope. Accumulators reassemble tool-call arguments as the raw string exactly as streamed; they do not parse or validate that JSON. Parse it yourself once finish_reason / content_block_stop arrives.
  • Index-safe accumulators. The OpenAI, Anthropic and Gemini accumulators key their entries (choices, content blocks, candidates, tool calls) by the provider-supplied index in a sparse BTreeMap. A stream that reports a huge or non-contiguous index costs only the entries actually seen — it can never force a dense allocation up to that index.

Testing

The parsers are verified against a suite of spec conformance tests, including a property that feeding one byte at a time yields the same output as feeding the whole input at once.

cargo test --all-features
cargo clippy --all-features --all-targets -- -D warnings

Benchmarks

Throughput is measured with Criterion; each benchmark reports MB/s for both a single bulk feed and realistic 64-byte ragged chunks:

cargo bench

Fuzzing

The chunk-boundary invariance property is also fuzzed with cargo-fuzz: the same input split at an arbitrary point must produce identical output to feeding it whole, for any bytes, without panicking.

cargo +nightly fuzz run sse_parser
cargo +nightly fuzz run json_splitter

Contributing

Contributions are welcome. Please read CONTRIBUTING.md for the local check commands (fmt, clippy, test, fuzzing) that CI enforces.

License

MIT © ky2renzzz