# stream-rs
[](https://github.com/ky2renzzz/stream-rs/actions/workflows/ci.yml)
[](https://crates.io/crates/stream-rs)
[](https://docs.rs/stream-rs)
[](#install)
[](LICENSE)
> Zero-dependency, spec-compliant streaming toolkit for LLM responses — in Rust.
`stream-rs` is the low-level plumbing that sits *underneath* an LLM client. It
does **not** talk to any model, manage API keys, or open sockets. It does one
thing: turn a raw, chunked byte stream into usable values, correctly, with no
runtime dependencies.
It is the reliable foundation an LLM application is built on — the part most
integrations re-implement by hand and get subtly wrong. By owning the byte-level
streaming layer instead of treating the provider SDK as a black box, you get
correct behaviour on the edge cases that only show up in production (split
chunks, partial UTF-8, interleaved tool-call deltas) and you stay free of heavy
dependencies.
It is a Rust companion to the TypeScript
[`sse-stream`](https://github.com/ky2renzzz/sse-stream) toolkit — the same
problem domain, idiomatic Rust.
## Why this matters for LLM apps
When you stream a completion, the provider sends Server-Sent Events over a
chunked HTTP body. The bytes you actually `read()` off the socket have **no
relationship** to event or token boundaries. A single `read` might give you:
```text
ta: {"choices":[{"delta":{"content":"Par
```
…the tail of one event, half a JSON object, and a line terminator that will only
be completed by the *next* packet. The naive approach — `split("\n\n")` on each
chunk, or `json.loads()` per line — silently breaks here:
```python
# The bug in countless streaming integrations:
for chunk in response.iter_content():
for line in chunk.split(b"\n\n"): # boundary may fall mid-event
if line.startswith(b"data: "):
event = json.loads(line[6:]) # may be half a JSON object -> crash
```
It works on localhost and fails under real network conditions, with multi-line
`data` fields, or when a tool call's arguments arrive across several deltas.
`stream-rs` is a **stateful push parser**: you feed it whatever bytes arrive,
and it buffers across calls so a value is only emitted once it is genuinely
complete. The same input split at *any* byte boundary yields identical output —
a property verified by tests and a fuzzer (see [Testing](#testing)). End-to-end,
the flow is:
```text
bytes -> SseParser -> event.data (JSON) -> your JSON parser -> Accumulator -> final message
```
Runnable, no-API-key demonstrations of the full pipeline live in
[`examples/`](examples): `cargo run --example openai_stream`,
`cargo run --example anthropic_stream`, and `cargo run --example gemini_stream`
(the last one frames a Gemini object stream with `JsonSplitter` instead of SSE).
## Why
Every streaming LLM integration re-implements the same fiddly pieces by hand,
and most get the edge cases wrong:
- SSE line endings split across TCP chunks (`\r` at the end of a `read`).
- The leading UTF-8 BOM, lone `\r` terminators, multi-line `data` fields.
- Re-assembling provider deltas (`choices[i].delta`, `content_block_delta`).
- Finding where one JSON value ends and the next begins in a byte stream.
`stream-rs` solves exactly these, and nothing else.
## Features
| [`sse`] | WHATWG-compliant Server-Sent Events **push parser** (feed bytes, get events). |
| [`incremental_json`] | Byte-level splitter that finds complete top-level JSON values in a chunked stream (NDJSON or bare concatenation). |
| [`accumulators::openai`] | Fold `chat.completion.chunk` deltas (content + tool calls) into final choices. |
| [`accumulators::anthropic`] | Fold Messages API events (`content_block_*`, `message_*`) into final blocks, with ordering validation. |
| [`accumulators::gemini`] | Fold `streamGenerateContent` responses (text parts + function calls) into final candidates. |
| `stream` *(feature)* | Optional `futures_core::Stream` adapter for async byte sources. |
The core has **no runtime dependencies**. The optional `stream` feature pulls in
only `futures-core`.
## Install
Not published to crates.io; depend on it straight from Git:
```toml
[dependencies]
stream-rs = { git = "https://github.com/ky2renzzz/stream-rs" }
# optional async Stream adapter:
# stream-rs = { git = "https://github.com/ky2renzzz/stream-rs", features = ["stream"] }
# no_std (embedded / wasm) — drops only the std::error::Error impls:
# stream-rs = { git = "https://github.com/ky2renzzz/stream-rs", default-features = false }
```
The crate is `#![no_std]` and needs only `alloc`. The default `std` feature
adds `std::error::Error` impls for the error types; everything else — both
parsers, all accumulators, and the optional `stream` adapter — works without
`std`.
## Usage
### SSE parsing (handles chunk boundaries)
```rust
use stream_rs::sse::SseParser;
let mut parser = SseParser::new();
let mut events = Vec::new();
// A CRLF split across two chunks, and a multi-line data field:
parser.feed(b"event: greeting\r\ndata: hello\r", &mut events);
parser.feed(b"\ndata: world\r\n\r\n", &mut events);
assert_eq!(events.len(), 1);
assert_eq!(events[0].event.as_deref(), Some("greeting"));
assert_eq!(events[0].data, "hello\nworld");
```
### Splitting a JSON stream
```rust
use stream_rs::incremental_json::JsonSplitter;
let mut splitter = JsonSplitter::new();
let mut values = Vec::new();
splitter.feed(br#"{"a":1}{"b":"#, &mut values);
assert_eq!(values, vec![r#"{"a":1}"#]);
values.clear();
splitter.feed(br#"[1,2]}"#, &mut values); // completes the partial object
assert_eq!(values, vec![r#"{"b":[1,2]}"#]);
// At end of stream, finish() flushes a trailing bare scalar and reports an
// error if the stream was cut mid-value:
let mut values = Vec::new();
let mut s = JsonSplitter::new();
s.feed(b"42", &mut values); // a bare scalar isn't emitted until proven complete
s.finish(&mut values).unwrap(); // ...so finish() flushes it
assert_eq!(values, vec!["42"]);
```
### Accumulating OpenAI deltas
```rust
use stream_rs::accumulators::openai::OpenAiAccumulator;
let mut acc = OpenAiAccumulator::new();
acc.push_role(0, "assistant");
acc.push_content(0, "Hel");
acc.push_content(0, "lo");
assert_eq!(acc.choice(0).unwrap().content, "Hello");
```
### Accumulating Anthropic events (with ordering checks)
```rust
use stream_rs::accumulators::anthropic::{AnthropicAccumulator, BlockKind};
let mut acc = AnthropicAccumulator::new();
acc.message_start();
acc.content_block_start(0, BlockKind::Text).unwrap();
acc.text_delta(0, "Hi").unwrap();
acc.content_block_stop(0).unwrap();
assert_eq!(acc.block(0).unwrap().text, "Hi");
// A delta for a block that was never started is rejected:
assert!(acc.text_delta(5, "x").is_err());
```
### Accumulating Gemini responses
```rust
use stream_rs::accumulators::gemini::GeminiAccumulator;
let mut acc = GeminiAccumulator::new();
acc.push_text(0, "Hel");
acc.push_text(0, "lo");
acc.push_function_call(0, "get_weather", r#"{"city":"Paris"}"#);
acc.set_finish_reason(0, "STOP");
let c = acc.candidate(0).unwrap();
assert_eq!(c.text, "Hello");
assert_eq!(c.function_calls[0].name, "get_weather");
```
### Async adapter (`stream` feature)
```rust,no_run
# async fn run() {
use futures::stream::{self, StreamExt};
use stream_rs::stream::SseStream;
let body = stream::iter(vec![
Ok::<_, std::io::Error>(b"data: hello\n".to_vec()),
Ok(b"\n".to_vec()),
]);
let mut events = SseStream::new(body);
while let Some(event) = events.next().await {
println!("{}", event.unwrap().data);
}
# }
```
## Design principles
- **Spec first.** The SSE parser follows the WHATWG event-stream algorithm:
BOM stripping, `\r` / `\n` / `\r\n` terminators (including split across
chunks), comment lines, single-leading-space removal, NUL-in-`id` rejection,
non-numeric `retry` rejection, and the "empty data is not dispatched" rule.
- **Push, not pull.** Parsers accept arbitrary byte chunks, so they drop into
any transport without assuming an async runtime.
- **Zero core dependencies.** Async support is strictly opt-in behind a feature
flag.
- **JSON-library agnostic accumulators.** They take already-parsed primitive
pieces, so you wire them to `serde_json`, `simd-json`, or anything else.
- **Framing, not validation, for JSON.** `incremental_json` is *deliberately* a
structural splitter, not a parser: it tracks brackets, strings and escapes to
find correct value boundaries, but does not enforce the full grammar (`{,}`
passes). That is exactly the contract streaming framing needs — hand each
emitted slice to a real JSON parser afterwards.
## Security & limits
`stream-rs` is framing infrastructure, so its safety contract is worth stating
explicitly:
- **No unbounded validation, by design.** `incremental_json` is a structural
splitter, not a parser: it tracks brackets, strings and escapes to find value
boundaries but does **not** enforce the grammar (`{,}` passes) and does **not**
impose a maximum nesting depth — `depth` is a `usize` guarded by
`saturating_sub`, so deeply nested adversarial input cannot panic or overflow,
but it also is not rejected. If you frame untrusted input, cap chunk sizes
and hand each emitted slice to a real JSON parser (which enforces depth)
immediately.
- **Lossy UTF-8 decoding.** Both parsers buffer **raw bytes** and only decode
once a value is complete, so a multibyte codepoint split across chunks is
reassembled correctly (covered by tests). Genuinely invalid UTF-8 is replaced
with U+FFFD (matching browser SSE leniency) rather than rejected.
- **No panics on arbitrary input.** The chunk-boundary-invariance fuzzers assert
that any bytes, split at any point, never panic — see [Fuzzing](#fuzzing).
- **Bounded memory is the caller's responsibility.** A push parser buffers an
in-progress line / value until it completes; a peer that never sends a
terminator can grow that buffer. Enforce your own byte ceiling on the
transport if peers are untrusted.
- **Tool-call scope.** Accumulators reassemble tool-call *arguments* as the raw
string exactly as streamed; they do not parse or validate that JSON. Parse it
yourself once `finish_reason` / `content_block_stop` arrives.
- **Index-safe accumulators.** The OpenAI, Anthropic and Gemini accumulators key
their entries (choices, content blocks, candidates, tool calls) by the
provider-supplied index in a sparse `BTreeMap`. A stream that reports a huge or
non-contiguous index costs only the entries actually seen — it can never force
a dense allocation up to that index.
## Testing
The parsers are verified against a suite of spec conformance tests, including a
property that **feeding one byte at a time yields the same output as feeding the
whole input at once**.
```sh
cargo test --all-features
cargo clippy --all-features --all-targets -- -D warnings
```
### Benchmarks
Throughput is measured with [Criterion]; each benchmark reports MB/s for both a
single bulk `feed` and realistic 64-byte ragged chunks:
```sh
cargo bench
```
### Fuzzing
The chunk-boundary invariance property is also fuzzed with
[`cargo-fuzz`](https://github.com/rust-fuzz/cargo-fuzz): the same input split at
an arbitrary point must produce identical output to feeding it whole, for any
bytes, without panicking.
```sh
cargo +nightly fuzz run sse_parser
cargo +nightly fuzz run json_splitter
```
[Criterion]: https://github.com/bheisler/criterion.rs
## Contributing
Contributions are welcome. Please read [CONTRIBUTING.md](CONTRIBUTING.md) for the
local check commands (`fmt`, `clippy`, `test`, fuzzing) that CI enforces.
## License
MIT © ky2renzzz
[`sse`]: src/sse.rs
[`incremental_json`]: src/incremental_json.rs
[`accumulators::openai`]: src/accumulators/openai.rs
[`accumulators::anthropic`]: src/accumulators/anthropic.rs
[`accumulators::gemini`]: src/accumulators/gemini.rs