stream-rs 0.1.0

Zero-dependency, spec-compliant streaming toolkit for LLM responses (SSE, incremental JSON, OpenAI/Anthropic delta accumulators).
Documentation
//! End-to-end example: parse a real Anthropic Messages API SSE stream into the
//! final content blocks, with ordering validation.
//!
//! Run it:
//!
//! ```sh
//! cargo run --example anthropic_stream
//! ```
//!
//! No API key, no network. The bytes are a faithful capture of what the
//! Messages API sends with `"stream": true`: named `event:` lines paired with
//! JSON `data:` lines, again split at awkward boundaries.

use stream_rs::accumulators::anthropic::{AnthropicAccumulator, BlockKind};
use stream_rs::sse::SseParser;

fn wire_chunks() -> Vec<&'static [u8]> {
    vec![
        b"event: message_start\ndata: {\"type\":\"message_start\"}\n\n",
        // open a text block, then stream it in fragments -------------------
        b"event: content_block_start\ndata: {\"index\":0,\"content_block\":{\"type\":\"text\"}}\n\n",
        b"event: content_block_delta\ndata: {\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"Hello",
        b"\"}}\n\n", // the delta JSON was split across two reads
        b"event: content_block_delta\ndata: {\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\", world\"}}\n\n",
        b"event: content_block_stop\ndata: {\"index\":0}\n\n",
        // a tool_use block whose input JSON is streamed --------------------
        b"event: content_block_start\ndata: {\"index\":1,\"content_block\":{\"type\":\"tool_use\"}}\n\n",
        b"event: content_block_delta\ndata: {\"index\":1,\"delta\":{\"type\":\"input_json_delta\",\"partial_json\":\"{\\\"x\\\":\"}}\n\n",
        b"event: content_block_delta\ndata: {\"index\":1,\"delta\":{\"type\":\"input_json_delta\",\"partial_json\":\"1}\"}}\n\n",
        b"event: content_block_stop\ndata: {\"index\":1}\n\n",
        b"event: message_delta\ndata: {\"delta\":{\"stop_reason\":\"end_turn\"}}\n\n",
        b"event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n",
    ]
}

fn main() {
    let mut parser = SseParser::new();
    let mut acc = AnthropicAccumulator::new();
    let mut events = Vec::new();

    for chunk in wire_chunks() {
        events.clear();
        parser.feed(chunk, &mut events);
        for ev in &events {
            dispatch(ev.event.as_deref().unwrap_or("message"), &ev.data, &mut acc);
        }
    }
    parser.finish(&mut events);

    for (i, block) in acc.blocks() {
        println!(
            "block[{i}] kind={:?} stopped={} text={:?}",
            block.kind, block.stopped, block.text
        );
    }
    println!("stop_reason : {:?}", acc.stop_reason());

    assert_eq!(acc.block(0).unwrap().text, "Hello, world");
    assert_eq!(acc.block(1).unwrap().text, r#"{"x":1}"#);
    assert_eq!(acc.stop_reason(), Some("end_turn"));
    println!("\nOK: message reassembled and block ordering validated.");
}

/// Route one named SSE event into the accumulator. Field extraction is a toy
/// dependency-free reader; in production parse `data` with `serde_json`.
fn dispatch(event: &str, data: &str, acc: &mut AnthropicAccumulator) {
    match event {
        "message_start" => acc.message_start(),
        "content_block_start" => {
            let index = usize_after(data, "\"index\":").unwrap_or(0);
            let kind = if data.contains("\"tool_use\"") {
                BlockKind::ToolUse
            } else {
                BlockKind::Text
            };
            // A delta/stop for a block that was never started would error here,
            // which is exactly the ordering guarantee the accumulator provides.
            acc.content_block_start(index, kind).unwrap();
        }
        "content_block_delta" => {
            let index = usize_after(data, "\"index\":").unwrap_or(0);
            if let Some(text) = string_after(data, "\"text\":\"") {
                acc.text_delta(index, &unescape(&text)).unwrap();
            } else if let Some(json) = string_after(data, "\"partial_json\":\"") {
                acc.input_json_delta(index, &unescape(&json)).unwrap();
            }
        }
        "content_block_stop" => {
            let index = usize_after(data, "\"index\":").unwrap_or(0);
            acc.content_block_stop(index).unwrap();
        }
        "message_delta" => {
            acc.message_delta(string_after(data, "\"stop_reason\":\"").as_deref());
        }
        "message_stop" => acc.message_stop(),
        _ => {}
    }
}

fn usize_after(haystack: &str, marker: &str) -> Option<usize> {
    let start = haystack.find(marker)? + marker.len();
    let rest = &haystack[start..];
    let end = rest
        .find(|c: char| !c.is_ascii_digit())
        .unwrap_or(rest.len());
    rest[..end].parse().ok()
}

fn string_after(haystack: &str, marker: &str) -> Option<String> {
    let start = haystack.find(marker)? + marker.len();
    let bytes = haystack.as_bytes();
    let mut i = start;
    let mut out = String::new();
    while i < bytes.len() {
        match bytes[i] {
            b'\\' if i + 1 < bytes.len() => {
                out.push('\\');
                out.push(bytes[i + 1] as char);
                i += 2;
            }
            b'"' => return Some(out),
            b => {
                out.push(b as char);
                i += 1;
            }
        }
    }
    Some(out)
}

fn unescape(s: &str) -> String {
    s.replace("\\\"", "\"").replace("\\\\", "\\")
}