stream-rs 0.1.0

Zero-dependency, spec-compliant streaming toolkit for LLM responses (SSE, incremental JSON, OpenAI/Anthropic delta accumulators).
Documentation
# stream-rs

[![CI](https://github.com/ky2renzzz/stream-rs/actions/workflows/ci.yml/badge.svg)](https://github.com/ky2renzzz/stream-rs/actions/workflows/ci.yml)
[![crates.io](https://img.shields.io/crates/v/stream-rs.svg)](https://crates.io/crates/stream-rs)
[![docs.rs](https://img.shields.io/docsrs/stream-rs)](https://docs.rs/stream-rs)
[![MSRV](https://img.shields.io/badge/MSRV-1.70-blue.svg)](#install)
[![License: MIT](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE)

> Zero-dependency, spec-compliant streaming toolkit for LLM responses — in Rust.

`stream-rs` is the low-level plumbing that sits *underneath* an LLM client. It
does **not** talk to any model, manage API keys, or open sockets. It does one
thing: turn a raw, chunked byte stream into usable values, correctly, with no
runtime dependencies.

It is the reliable foundation an LLM application is built on — the part most
integrations re-implement by hand and get subtly wrong. By owning the byte-level
streaming layer instead of treating the provider SDK as a black box, you get
correct behaviour on the edge cases that only show up in production (split
chunks, partial UTF-8, interleaved tool-call deltas) and you stay free of heavy
dependencies.

It is a Rust companion to the TypeScript
[`sse-stream`](https://github.com/ky2renzzz/sse-stream) toolkit — the same
problem domain, idiomatic Rust.

## Why this matters for LLM apps

When you stream a completion, the provider sends Server-Sent Events over a
chunked HTTP body. The bytes you actually `read()` off the socket have **no
relationship** to event or token boundaries. A single `read` might give you:

```text
ta: {"choices":[{"delta":{"content":"Par
```

…the tail of one event, half a JSON object, and a line terminator that will only
be completed by the *next* packet. The naive approach — `split("\n\n")` on each
chunk, or `json.loads()` per line — silently breaks here:

```python
# The bug in countless streaming integrations:
for chunk in response.iter_content():
    for line in chunk.split(b"\n\n"):        # boundary may fall mid-event
        if line.startswith(b"data: "):
            event = json.loads(line[6:])     # may be half a JSON object -> crash
```

It works on localhost and fails under real network conditions, with multi-line
`data` fields, or when a tool call's arguments arrive across several deltas.

`stream-rs` is a **stateful push parser**: you feed it whatever bytes arrive,
and it buffers across calls so a value is only emitted once it is genuinely
complete. The same input split at *any* byte boundary yields identical output —
a property verified by tests and a fuzzer (see [Testing](#testing)). End-to-end,
the flow is:

```text
bytes  ->  SseParser  ->  event.data (JSON)  ->  your JSON parser  ->  Accumulator  ->  final message
```

Runnable, no-API-key demonstrations of the full pipeline live in
[`examples/`](examples): `cargo run --example openai_stream`,
`cargo run --example anthropic_stream`, and `cargo run --example gemini_stream`
(the last one frames a Gemini object stream with `JsonSplitter` instead of SSE).

## Why

Every streaming LLM integration re-implements the same fiddly pieces by hand,
and most get the edge cases wrong:

- SSE line endings split across TCP chunks (`\r` at the end of a `read`).
- The leading UTF-8 BOM, lone `\r` terminators, multi-line `data` fields.
- Re-assembling provider deltas (`choices[i].delta`, `content_block_delta`).
- Finding where one JSON value ends and the next begins in a byte stream.

`stream-rs` solves exactly these, and nothing else.

## Features

| Module | What it does |
| --- | --- |
| [`sse`] | WHATWG-compliant Server-Sent Events **push parser** (feed bytes, get events). |
| [`incremental_json`] | Byte-level splitter that finds complete top-level JSON values in a chunked stream (NDJSON or bare concatenation). |
| [`accumulators::openai`] | Fold `chat.completion.chunk` deltas (content + tool calls) into final choices. |
| [`accumulators::anthropic`] | Fold Messages API events (`content_block_*`, `message_*`) into final blocks, with ordering validation. |
| [`accumulators::gemini`] | Fold `streamGenerateContent` responses (text parts + function calls) into final candidates. |
| `stream` *(feature)* | Optional `futures_core::Stream` adapter for async byte sources. |

The core has **no runtime dependencies**. The optional `stream` feature pulls in
only `futures-core`.

## Install

Not published to crates.io; depend on it straight from Git:

```toml
[dependencies]
stream-rs = { git = "https://github.com/ky2renzzz/stream-rs" }

# optional async Stream adapter:
# stream-rs = { git = "https://github.com/ky2renzzz/stream-rs", features = ["stream"] }

# no_std (embedded / wasm) — drops only the std::error::Error impls:
# stream-rs = { git = "https://github.com/ky2renzzz/stream-rs", default-features = false }
```

The crate is `#![no_std]` and needs only `alloc`. The default `std` feature
adds `std::error::Error` impls for the error types; everything else — both
parsers, all accumulators, and the optional `stream` adapter — works without
`std`.

## Usage

### SSE parsing (handles chunk boundaries)

```rust
use stream_rs::sse::SseParser;

let mut parser = SseParser::new();
let mut events = Vec::new();

// A CRLF split across two chunks, and a multi-line data field:
parser.feed(b"event: greeting\r\ndata: hello\r", &mut events);
parser.feed(b"\ndata: world\r\n\r\n", &mut events);

assert_eq!(events.len(), 1);
assert_eq!(events[0].event.as_deref(), Some("greeting"));
assert_eq!(events[0].data, "hello\nworld");
```

### Splitting a JSON stream

```rust
use stream_rs::incremental_json::JsonSplitter;

let mut splitter = JsonSplitter::new();
let mut values = Vec::new();

splitter.feed(br#"{"a":1}{"b":"#, &mut values);
assert_eq!(values, vec![r#"{"a":1}"#]);

values.clear();
splitter.feed(br#"[1,2]}"#, &mut values); // completes the partial object
assert_eq!(values, vec![r#"{"b":[1,2]}"#]);

// At end of stream, finish() flushes a trailing bare scalar and reports an
// error if the stream was cut mid-value:
let mut values = Vec::new();
let mut s = JsonSplitter::new();
s.feed(b"42", &mut values);          // a bare scalar isn't emitted until proven complete
s.finish(&mut values).unwrap();      // ...so finish() flushes it
assert_eq!(values, vec!["42"]);
```

### Accumulating OpenAI deltas

```rust
use stream_rs::accumulators::openai::OpenAiAccumulator;

let mut acc = OpenAiAccumulator::new();
 acc.push_role(0, "assistant");
 acc.push_content(0, "Hel");
 acc.push_content(0, "lo");

assert_eq!(acc.choice(0).unwrap().content, "Hello");
```

### Accumulating Anthropic events (with ordering checks)

```rust
use stream_rs::accumulators::anthropic::{AnthropicAccumulator, BlockKind};

let mut acc = AnthropicAccumulator::new();
acc.message_start();
acc.content_block_start(0, BlockKind::Text).unwrap();
acc.text_delta(0, "Hi").unwrap();
acc.content_block_stop(0).unwrap();

assert_eq!(acc.block(0).unwrap().text, "Hi");

// A delta for a block that was never started is rejected:
assert!(acc.text_delta(5, "x").is_err());
```

### Accumulating Gemini responses

```rust
use stream_rs::accumulators::gemini::GeminiAccumulator;

let mut acc = GeminiAccumulator::new();
acc.push_text(0, "Hel");
acc.push_text(0, "lo");
acc.push_function_call(0, "get_weather", r#"{"city":"Paris"}"#);
acc.set_finish_reason(0, "STOP");

let c = acc.candidate(0).unwrap();
assert_eq!(c.text, "Hello");
assert_eq!(c.function_calls[0].name, "get_weather");
```

### Async adapter (`stream` feature)

```rust,no_run
# async fn run() {
use futures::stream::{self, StreamExt};
use stream_rs::stream::SseStream;

let body = stream::iter(vec![
    Ok::<_, std::io::Error>(b"data: hello\n".to_vec()),
    Ok(b"\n".to_vec()),
]);

let mut events = SseStream::new(body);
while let Some(event) = events.next().await {
    println!("{}", event.unwrap().data);
}
# }
```

## Design principles

- **Spec first.** The SSE parser follows the WHATWG event-stream algorithm:
  BOM stripping, `\r` / `\n` / `\r\n` terminators (including split across
  chunks), comment lines, single-leading-space removal, NUL-in-`id` rejection,
  non-numeric `retry` rejection, and the "empty data is not dispatched" rule.
- **Push, not pull.** Parsers accept arbitrary byte chunks, so they drop into
  any transport without assuming an async runtime.
- **Zero core dependencies.** Async support is strictly opt-in behind a feature
  flag.
- **JSON-library agnostic accumulators.** They take already-parsed primitive
  pieces, so you wire them to `serde_json`, `simd-json`, or anything else.
- **Framing, not validation, for JSON.** `incremental_json` is *deliberately* a
  structural splitter, not a parser: it tracks brackets, strings and escapes to
  find correct value boundaries, but does not enforce the full grammar (`{,}`
  passes). That is exactly the contract streaming framing needs — hand each
  emitted slice to a real JSON parser afterwards.

## Security & limits

`stream-rs` is framing infrastructure, so its safety contract is worth stating
explicitly:

- **No unbounded validation, by design.** `incremental_json` is a structural
  splitter, not a parser: it tracks brackets, strings and escapes to find value
  boundaries but does **not** enforce the grammar (`{,}` passes) and does **not**
  impose a maximum nesting depth — `depth` is a `usize` guarded by
  `saturating_sub`, so deeply nested adversarial input cannot panic or overflow,
  but it also is not rejected. If you frame untrusted input, cap chunk sizes
  and hand each emitted slice to a real JSON parser (which enforces depth)
  immediately.
- **Lossy UTF-8 decoding.** Both parsers buffer **raw bytes** and only decode
  once a value is complete, so a multibyte codepoint split across chunks is
  reassembled correctly (covered by tests). Genuinely invalid UTF-8 is replaced
  with U+FFFD (matching browser SSE leniency) rather than rejected.
- **No panics on arbitrary input.** The chunk-boundary-invariance fuzzers assert
  that any bytes, split at any point, never panic — see [Fuzzing]#fuzzing.
- **Bounded memory is the caller's responsibility.** A push parser buffers an
  in-progress line / value until it completes; a peer that never sends a
  terminator can grow that buffer. Enforce your own byte ceiling on the
  transport if peers are untrusted.
- **Tool-call scope.** Accumulators reassemble tool-call *arguments* as the raw
  string exactly as streamed; they do not parse or validate that JSON. Parse it
  yourself once `finish_reason` / `content_block_stop` arrives.
- **Index-safe accumulators.** The OpenAI, Anthropic and Gemini accumulators key
  their entries (choices, content blocks, candidates, tool calls) by the
  provider-supplied index in a sparse `BTreeMap`. A stream that reports a huge or
  non-contiguous index costs only the entries actually seen — it can never force
  a dense allocation up to that index.

## Testing

The parsers are verified against a suite of spec conformance tests, including a
property that **feeding one byte at a time yields the same output as feeding the
whole input at once**.

```sh
cargo test --all-features
cargo clippy --all-features --all-targets -- -D warnings
```

### Benchmarks

Throughput is measured with [Criterion]; each benchmark reports MB/s for both a
single bulk `feed` and realistic 64-byte ragged chunks:

```sh
cargo bench
```

### Fuzzing

The chunk-boundary invariance property is also fuzzed with
[`cargo-fuzz`](https://github.com/rust-fuzz/cargo-fuzz): the same input split at
an arbitrary point must produce identical output to feeding it whole, for any
bytes, without panicking.

```sh
cargo +nightly fuzz run sse_parser
cargo +nightly fuzz run json_splitter
```

[Criterion]: https://github.com/bheisler/criterion.rs

## Contributing

Contributions are welcome. Please read [CONTRIBUTING.md](CONTRIBUTING.md) for the
local check commands (`fmt`, `clippy`, `test`, fuzzing) that CI enforces.

## License

MIT © ky2renzzz

[`sse`]: src/sse.rs
[`incremental_json`]: src/incremental_json.rs
[`accumulators::openai`]: src/accumulators/openai.rs
[`accumulators::anthropic`]: src/accumulators/anthropic.rs
[`accumulators::gemini`]: src/accumulators/gemini.rs