stream-rs 0.1.0

Zero-dependency, spec-compliant streaming toolkit for LLM responses (SSE, incremental JSON, OpenAI/Anthropic delta accumulators).
Documentation
//! End-to-end example: parse a real Google Gemini `streamGenerateContent`
//! response into the final candidate text plus a streamed function call.
//!
//! Gemini's streaming endpoint sends a sequence of `GenerateContentResponse`
//! JSON objects — one complete object per delivery, concatenated back to back
//! (NDJSON-style) — delivered as ragged byte chunks. This is exactly what
//! [`JsonSplitter`] is for: it frames each complete top-level value out of the
//! stream, regardless of where the chunk boundaries fall. We then hand each
//! framed object to a [`GeminiAccumulator`].
//!
//! (When you request `?alt=sse`, Gemini wraps each object in a `data:` SSE
//! line instead; in that case pipe the bytes through [`SseParser`] first and
//! feed each event's `data` here. The framing concern is identical.)
//!
//! [`SseParser`]: stream_rs::sse::SseParser
//!
//! Run it:
//!
//! ```sh
//! cargo run --example gemini_stream
//! ```
//!
//! No API key, no network. The bytes below mimic what the REST endpoint sends,
//! deliberately split mid-JSON to prove the splitter survives it. The example
//! also calls [`JsonSplitter::finish`] at the end to confirm the stream closed
//! on a clean value boundary.

use stream_rs::accumulators::gemini::GeminiAccumulator;
use stream_rs::incremental_json::JsonSplitter;

/// A captured Gemini stream, pre-chopped into ragged chunks like a socket hands
/// you. Note the splits inside JSON objects and across the array separators.
fn wire_chunks() -> Vec<&'static [u8]> {
    vec![
        // First response object, split mid-string across two reads.
        br#"{"candidates":[{"content":{"parts":[{"text":"The weath"#,
        br#"er"}]},"index":0}]}"#,
        // Next object, concatenated directly after the previous one.
        br#"{"candidates":[{"content":{"parts":[{"text":" in Paris"}]},"index":0}]}"#,
        // A function call arrives as one complete object.
        br#"{"candidates":[{"content":{"parts":[{"functionCall":{"name":"get_weather","args":{"city":"Paris"}}}]},"index":0}]}"#,
        // Final object carries the finishReason.
        br#"{"candidates":[{"finishReason":"STOP","index":0}]}"#,
    ]
}

fn main() {
    let mut splitter = JsonSplitter::new();
    let mut acc = GeminiAccumulator::new();
    let mut values = Vec::new();

    for chunk in wire_chunks() {
        values.clear();
        splitter.feed(chunk, &mut values);
        for value in &values {
            apply_value(value, &mut acc);
        }
    }
    // The stream should end on a clean value boundary (the last object closed
    // with no buffered partial). finish() proves it.
    splitter
        .finish(&mut values)
        .expect("Gemini stream ended on a clean JSON boundary");

    let c = acc.candidate(0).expect("a candidate was streamed");
    println!("text           : {:?}", c.text);
    println!("finish_reason  : {:?}", c.finish_reason.as_deref());
    for (i, call) in c.function_calls.iter().enumerate() {
        println!(
            "function_call[{i}]: name={:?} args={}",
            call.name, call.args
        );
    }

    assert_eq!(c.text, "The weather in Paris");
    assert_eq!(c.finish_reason.as_deref(), Some("STOP"));
    assert_eq!(c.function_calls[0].name, "get_weather");
    assert_eq!(c.function_calls[0].args, r#"{"city":"Paris"}"#);
    println!("\nOK: Gemini stream reassembled correctly across ragged chunk boundaries.");
}

/// Minimal, dependency-free extraction from one `GenerateContentResponse`. This
/// is a toy reader for the example only; in production parse `value` with your
/// JSON library and call the same accumulator methods.
fn apply_value(json: &str, acc: &mut GeminiAccumulator) {
    let index = 0; // the example only streams candidates[0]

    // A text part: `"text":"..."` (but not the `name`/string inside a call).
    if let Some(text) = string_after(json, "\"text\":\"") {
        acc.push_text(index, &unescape(&text));
    }

    // A function call: pull the name and the raw args object.
    if let Some(name) = string_after(json, "\"name\":\"") {
        if let Some(args) = object_after(json, "\"args\":") {
            acc.push_function_call(index, &name, &args);
        }
    }

    if let Some(reason) = string_after(json, "\"finishReason\":\"") {
        acc.set_finish_reason(index, &reason);
    }
}

/// Return the (escape-aware) string literal that follows `marker`.
fn string_after(haystack: &str, marker: &str) -> Option<String> {
    let start = haystack.find(marker)? + marker.len();
    let bytes = haystack.as_bytes();
    let mut i = start;
    let mut out = String::new();
    while i < bytes.len() {
        match bytes[i] {
            b'\\' if i + 1 < bytes.len() => {
                out.push('\\');
                out.push(bytes[i + 1] as char);
                i += 2;
            }
            b'"' => return Some(out),
            b => {
                out.push(b as char);
                i += 1;
            }
        }
    }
    Some(out)
}

/// Return the balanced `{...}` object literal that follows `marker`.
fn object_after(haystack: &str, marker: &str) -> Option<String> {
    let start = haystack.find(marker)? + marker.len();
    let bytes = haystack.as_bytes();
    if bytes.get(start) != Some(&b'{') {
        return None;
    }
    let mut depth = 0usize;
    let mut in_string = false;
    let mut escaped = false;
    let mut out = String::new();
    for &b in &bytes[start..] {
        out.push(b as char);
        if in_string {
            if escaped {
                escaped = false;
            } else if b == b'\\' {
                escaped = true;
            } else if b == b'"' {
                in_string = false;
            }
            continue;
        }
        match b {
            b'"' => in_string = true,
            b'{' => depth += 1,
            b'}' => {
                depth -= 1;
                if depth == 0 {
                    return Some(out);
                }
            }
            _ => {}
        }
    }
    None
}

/// Resolve the JSON escapes this toy reader cares about.
fn unescape(s: &str) -> String {
    s.replace("\\\"", "\"").replace("\\\\", "\\")
}