stream-rs
Zero-dependency, spec-compliant streaming toolkit for LLM responses — in Rust.
stream-rs is the low-level plumbing that sits underneath an LLM client. It
does not talk to any model, manage API keys, or open sockets. It does one
thing: turn a raw, chunked byte stream into usable values, correctly, with no
runtime dependencies.
It is the reliable foundation an LLM application is built on — the part most integrations re-implement by hand and get subtly wrong. By owning the byte-level streaming layer instead of treating the provider SDK as a black box, you get correct behaviour on the edge cases that only show up in production (split chunks, partial UTF-8, interleaved tool-call deltas) and you stay free of heavy dependencies.
It is a Rust companion to the TypeScript
sse-stream toolkit — the same
problem domain, idiomatic Rust.
Why this matters for LLM apps
When you stream a completion, the provider sends Server-Sent Events over a
chunked HTTP body. The bytes you actually read() off the socket have no
relationship to event or token boundaries. A single read might give you:
ta: {"choices":[{"delta":{"content":"Par
…the tail of one event, half a JSON object, and a line terminator that will only
be completed by the next packet. The naive approach — split("\n\n") on each
chunk, or json.loads() per line — silently breaks here:
# The bug in countless streaming integrations:
# boundary may fall mid-event
= # may be half a JSON object -> crash
It works on localhost and fails under real network conditions, with multi-line
data fields, or when a tool call's arguments arrive across several deltas.
stream-rs is a stateful push parser: you feed it whatever bytes arrive,
and it buffers across calls so a value is only emitted once it is genuinely
complete. The same input split at any byte boundary yields identical output —
a property verified by tests and a fuzzer (see Testing). End-to-end,
the flow is:
bytes -> SseParser -> event.data (JSON) -> your JSON parser -> Accumulator -> final message
Runnable, no-API-key demonstrations of the full pipeline live in
examples/: cargo run --example openai_stream,
cargo run --example anthropic_stream, and cargo run --example gemini_stream
(the last one frames a Gemini object stream with JsonSplitter instead of SSE).
Why
Every streaming LLM integration re-implements the same fiddly pieces by hand, and most get the edge cases wrong:
- SSE line endings split across TCP chunks (
\rat the end of aread). - The leading UTF-8 BOM, lone
\rterminators, multi-linedatafields. - Re-assembling provider deltas (
choices[i].delta,content_block_delta). - Finding where one JSON value ends and the next begins in a byte stream.
stream-rs solves exactly these, and nothing else.
Features
| Module | What it does |
|---|---|
sse |
WHATWG-compliant Server-Sent Events push parser (feed bytes, get events). |
incremental_json |
Byte-level splitter that finds complete top-level JSON values in a chunked stream (NDJSON or bare concatenation). |
accumulators::openai |
Fold chat.completion.chunk deltas (content + tool calls) into final choices. |
accumulators::anthropic |
Fold Messages API events (content_block_*, message_*) into final blocks, with ordering validation. |
accumulators::gemini |
Fold streamGenerateContent responses (text parts + function calls) into final candidates. |
stream (feature) |
Optional futures_core::Stream adapter for async byte sources. |
The core has no runtime dependencies. The optional stream feature pulls in
only futures-core.
Install
Not published to crates.io; depend on it straight from Git:
[]
= { = "https://github.com/ky2renzzz/stream-rs" }
# optional async Stream adapter:
# stream-rs = { git = "https://github.com/ky2renzzz/stream-rs", features = ["stream"] }
# no_std (embedded / wasm) — drops only the std::error::Error impls:
# stream-rs = { git = "https://github.com/ky2renzzz/stream-rs", default-features = false }
The crate is #![no_std] and needs only alloc. The default std feature
adds std::error::Error impls for the error types; everything else — both
parsers, all accumulators, and the optional stream adapter — works without
std.
Usage
SSE parsing (handles chunk boundaries)
use SseParser;
let mut parser = new;
let mut events = Vecnew;
// A CRLF split across two chunks, and a multi-line data field:
parser.feed;
parser.feed;
assert_eq!;
assert_eq!;
assert_eq!;
Splitting a JSON stream
use JsonSplitter;
let mut splitter = new;
let mut values = Vecnew;
splitter.feed;
assert_eq!;
values.clear;
splitter.feed; // completes the partial object
assert_eq!;
// At end of stream, finish() flushes a trailing bare scalar and reports an
// error if the stream was cut mid-value:
let mut values = Vecnew;
let mut s = new;
s.feed; // a bare scalar isn't emitted until proven complete
s.finish.unwrap; // ...so finish() flushes it
assert_eq!;
Accumulating OpenAI deltas
use OpenAiAccumulator;
let mut acc = new;
acc.push_role;
acc.push_content;
acc.push_content;
assert_eq!;
Accumulating Anthropic events (with ordering checks)
use ;
let mut acc = new;
acc.message_start;
acc.content_block_start.unwrap;
acc.text_delta.unwrap;
acc.content_block_stop.unwrap;
assert_eq!;
// A delta for a block that was never started is rejected:
assert!;
Accumulating Gemini responses
use GeminiAccumulator;
let mut acc = new;
acc.push_text;
acc.push_text;
acc.push_function_call;
acc.set_finish_reason;
let c = acc.candidate.unwrap;
assert_eq!;
assert_eq!;
Async adapter (stream feature)
# async
Design principles
- Spec first. The SSE parser follows the WHATWG event-stream algorithm:
BOM stripping,
\r/\n/\r\nterminators (including split across chunks), comment lines, single-leading-space removal, NUL-in-idrejection, non-numericretryrejection, and the "empty data is not dispatched" rule. - Push, not pull. Parsers accept arbitrary byte chunks, so they drop into any transport without assuming an async runtime.
- Zero core dependencies. Async support is strictly opt-in behind a feature flag.
- JSON-library agnostic accumulators. They take already-parsed primitive
pieces, so you wire them to
serde_json,simd-json, or anything else. - Framing, not validation, for JSON.
incremental_jsonis deliberately a structural splitter, not a parser: it tracks brackets, strings and escapes to find correct value boundaries, but does not enforce the full grammar ({,}passes). That is exactly the contract streaming framing needs — hand each emitted slice to a real JSON parser afterwards.
Security & limits
stream-rs is framing infrastructure, so its safety contract is worth stating
explicitly:
- No unbounded validation, by design.
incremental_jsonis a structural splitter, not a parser: it tracks brackets, strings and escapes to find value boundaries but does not enforce the grammar ({,}passes) and does not impose a maximum nesting depth —depthis ausizeguarded bysaturating_sub, so deeply nested adversarial input cannot panic or overflow, but it also is not rejected. If you frame untrusted input, cap chunk sizes and hand each emitted slice to a real JSON parser (which enforces depth) immediately. - Lossy UTF-8 decoding. Both parsers buffer raw bytes and only decode once a value is complete, so a multibyte codepoint split across chunks is reassembled correctly (covered by tests). Genuinely invalid UTF-8 is replaced with U+FFFD (matching browser SSE leniency) rather than rejected.
- No panics on arbitrary input. The chunk-boundary-invariance fuzzers assert that any bytes, split at any point, never panic — see Fuzzing.
- Bounded memory is the caller's responsibility. A push parser buffers an in-progress line / value until it completes; a peer that never sends a terminator can grow that buffer. Enforce your own byte ceiling on the transport if peers are untrusted.
- Tool-call scope. Accumulators reassemble tool-call arguments as the raw
string exactly as streamed; they do not parse or validate that JSON. Parse it
yourself once
finish_reason/content_block_stoparrives. - Index-safe accumulators. The OpenAI, Anthropic and Gemini accumulators key
their entries (choices, content blocks, candidates, tool calls) by the
provider-supplied index in a sparse
BTreeMap. A stream that reports a huge or non-contiguous index costs only the entries actually seen — it can never force a dense allocation up to that index.
Testing
The parsers are verified against a suite of spec conformance tests, including a property that feeding one byte at a time yields the same output as feeding the whole input at once.
Benchmarks
Throughput is measured with Criterion; each benchmark reports MB/s for both a
single bulk feed and realistic 64-byte ragged chunks:
Fuzzing
The chunk-boundary invariance property is also fuzzed with
cargo-fuzz: the same input split at
an arbitrary point must produce identical output to feeding it whole, for any
bytes, without panicking.
Contributing
Contributions are welcome. Please read CONTRIBUTING.md for the
local check commands (fmt, clippy, test, fuzzing) that CI enforces.
License
MIT © ky2renzzz