stream-rs 0.1.0

Zero-dependency, spec-compliant streaming toolkit for LLM responses (SSE, incremental JSON, OpenAI/Anthropic delta accumulators).
Documentation
//! End-to-end example: parse a real `OpenAI` `chat.completion.chunk` SSE stream
//! into final assistant text — including a streamed tool call.
//!
//! This is the whole point of the crate shown in one place: raw bytes off the
//! wire arrive in *arbitrary* chunks, and we turn them into a finished message
//! with **zero runtime dependencies** and no JSON library.
//!
//! Run it:
//!
//! ```sh
//! cargo run --example openai_stream
//! ```
//!
//! It needs no API key and no network — the bytes below are a faithful capture
//! of what `POST /v1/chat/completions` with `"stream": true` actually sends,
//! deliberately split at awkward byte boundaries (mid-CRLF, mid-JSON) to prove
//! the parser survives them.

use stream_rs::accumulators::openai::OpenAiAccumulator;
use stream_rs::sse::SseParser;

/// A captured `OpenAI` stream, pre-chopped into the kind of ragged chunks a TCP
/// socket hands you. Note the splits inside CRLFs and inside JSON values.
fn wire_chunks() -> Vec<&'static [u8]> {
    vec![
        // role delta, with the CRLF split across the chunk boundary ---------
        b"data: {\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\"}}]}\r",
        b"\n\r\n",
        // content arrives a few tokens at a time --------------------------
        b"data: {\"choices\":[{\"index\":0,\"delta\":{\"content\":\"The weather\"}}]}\n\n",
        b"data: {\"choices\":[{\"index\":0,\"delta\":{\"content\":\" in \"}}]}\n\n",
        // this data line is split mid-JSON across two reads ----------------
        b"data: {\"choices\":[{\"index\":0,\"delta\":{\"content\":\"Par",
        b"is\"}}]}\n\n",
        // a streamed tool call, arguments delivered in fragments -----------
        b"data: {\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_1\",\"function\":{\"name\":\"get_weather\",\"arguments\":\"{\\\"ci\"}}]}}]}\n\n",
        b"data: {\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":[{\"index\":0,\"function\":{\"arguments\":\"ty\\\":\\\"Paris\\\"}\"}}]}}]}\n\n",
        // finish + the OpenAI [DONE] sentinel ------------------------------
        b"data: {\"choices\":[{\"index\":0,\"delta\":{},\"finish_reason\":\"tool_calls\"}]}\n\n",
        b"data: [DONE]\n\n",
    ]
}

fn main() {
    let mut parser = SseParser::new();
    let mut acc = OpenAiAccumulator::new();
    let mut events = Vec::new();

    for chunk in wire_chunks() {
        events.clear();
        parser.feed(chunk, &mut events);

        for ev in &events {
            // OpenAI signals end-of-stream with a literal `[DONE]` data line.
            if ev.data == "[DONE]" {
                continue;
            }
            // Each event's data is one `chat.completion.chunk` JSON object.
            // We hand-extract the few fields we need so the example stays
            // dependency-free; in real code you would use serde_json here.
            apply_chunk(&ev.data, &mut acc);
        }
    }
    parser.finish(&mut events);

    // Report the assembled result.
    let choice = acc.choice(0).expect("a choice was streamed");
    println!("role           : {:?}", choice.role.as_deref());
    println!("content        : {:?}", choice.content);
    println!("finish_reason  : {:?}", choice.finish_reason.as_deref());
    for (i, tc) in &choice.tool_calls {
        println!(
            "tool_call[{i}]  : id={:?} name={:?} args={}",
            tc.id.as_deref(),
            tc.name.as_deref(),
            tc.arguments
        );
    }

    assert_eq!(choice.content, "The weather in Paris");
    assert_eq!(choice.finish_reason.as_deref(), Some("tool_calls"));
    assert_eq!(choice.tool_calls[&0].arguments, r#"{"city":"Paris"}"#);
    println!("\nOK: stream reassembled correctly across ragged chunk boundaries.");
}

/// Minimal, dependency-free field extraction from one chunk's JSON. This is a
/// toy reader for the example only — in production, parse `data` with your JSON
/// library and call the same accumulator methods.
fn apply_chunk(json: &str, acc: &mut OpenAiAccumulator) {
    let index = 0; // the example only streams choices[0]

    if let Some(role) = scalar_after(json, "\"role\":\"") {
        acc.push_role(index, &role);
    }
    if let Some(content) = scalar_after(json, "\"content\":\"") {
        acc.push_content(index, &unescape(&content));
    }
    if let Some(reason) = scalar_after(json, "\"finish_reason\":\"") {
        acc.set_finish_reason(index, &reason);
    }
    if json.contains("\"tool_calls\"") {
        let id = scalar_after(json, "\"id\":\"");
        let name = scalar_after(json, "\"name\":\"");
        let args = scalar_after(json, "\"arguments\":\"").map(|s| unescape(&s));
        acc.push_tool_call(index, 0, id.as_deref(), name.as_deref(), args.as_deref());
    }
}

/// Return the (unescaped-quote-aware) string literal that follows `marker`.
fn scalar_after(haystack: &str, marker: &str) -> Option<String> {
    let start = haystack.find(marker)? + marker.len();
    let bytes = haystack.as_bytes();
    let mut i = start;
    let mut out = String::new();
    while i < bytes.len() {
        match bytes[i] {
            b'\\' if i + 1 < bytes.len() => {
                out.push('\\');
                out.push(bytes[i + 1] as char);
                i += 2;
            }
            b'"' => return Some(out),
            b => {
                out.push(b as char);
                i += 1;
            }
        }
    }
    Some(out)
}

/// Resolve the JSON escapes this toy reader cares about.
fn unescape(s: &str) -> String {
    s.replace("\\\"", "\"").replace("\\\\", "\\")
}