stream-rs 0.1.0

Zero-dependency, spec-compliant streaming toolkit for LLM responses (SSE, incremental JSON, OpenAI/Anthropic delta accumulators).
Documentation
//! Accumulator for Google Gemini `streamGenerateContent` responses.
//!
//! Gemini streams a generation as a sequence of `GenerateContentResponse`
//! objects. Each carries a `candidates` array; every candidate has an `index`
//! and a `content.parts` list whose entries are either a `text` fragment or a
//! `functionCall` (`{ name, args }`). Successive responses extend the same
//! candidate, so a streamed answer arrives as many small `text` parts that must
//! be concatenated, optionally followed by a `finishReason`.
//!
//! This accumulator folds those fragments into the final per-candidate text and
//! function calls. Like the other accumulators it is JSON-library agnostic:
//! parse each streamed object however you like and call the typed methods below.
//!
//! # Example
//!
//! ```
//! use stream_rs::accumulators::gemini::GeminiAccumulator;
//!
//! let mut acc = GeminiAccumulator::new();
//! acc.push_text(0, "Hel");
//! acc.push_text(0, "lo");
//! acc.set_finish_reason(0, "STOP");
//! assert_eq!(acc.candidate(0).unwrap().text, "Hello");
//! assert_eq!(acc.candidate(0).unwrap().finish_reason.as_deref(), Some("STOP"));
//! ```
//!
//! Function calls are recorded in arrival order. Gemini delivers a function
//! call's `args` as one complete JSON object (not fragmented like `OpenAI`), so
//! each [`push_function_call`](GeminiAccumulator::push_function_call) appends a
//! whole call:
//!
//! ```
//! use stream_rs::accumulators::gemini::GeminiAccumulator;
//!
//! let mut acc = GeminiAccumulator::new();
//! acc.push_function_call(0, "get_weather", r#"{"city":"Paris"}"#);
//! let call = &acc.candidate(0).unwrap().function_calls[0];
//! assert_eq!(call.name, "get_weather");
//! assert_eq!(call.args, r#"{"city":"Paris"}"#);
//! ```

use alloc::borrow::ToOwned;
use alloc::collections::BTreeMap;
use alloc::string::String;
use alloc::vec::Vec;

/// A function call emitted by the model.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct FunctionCall {
    /// The function name.
    pub name: String,
    /// The function arguments as a JSON object string, exactly as received.
    pub args: String,
}

/// The assembled state of a single `candidates[index]`.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct Candidate {
    /// The concatenated text from every `parts[].text` fragment.
    pub text: String,
    /// Function calls in arrival order.
    pub function_calls: Vec<FunctionCall>,
    /// The `finishReason`, set when the final chunk for this candidate arrives.
    pub finish_reason: Option<String>,
}

/// Accumulates Gemini `streamGenerateContent` deltas into final candidates.
///
/// Candidates are stored sparsely in a [`BTreeMap`] keyed by their
/// provider-supplied `index`, so a large or non-contiguous index never forces a
/// dense allocation up to that index.
#[derive(Debug, Default)]
pub struct GeminiAccumulator {
    candidates: BTreeMap<usize, Candidate>,
}

impl GeminiAccumulator {
    /// Create an empty accumulator.
    #[must_use]
    pub fn new() -> Self {
        Self::default()
    }

    /// Append a `parts[].text` fragment for `candidates[index]`.
    pub fn push_text(&mut self, index: usize, fragment: &str) {
        self.candidate_mut(index).text.push_str(fragment);
    }

    /// Record a `parts[].functionCall` for `candidates[index]`.
    ///
    /// `args` is the function-call arguments serialized as a JSON object
    /// string; pass it through verbatim from your JSON parser.
    pub fn push_function_call(&mut self, index: usize, name: &str, args: &str) {
        self.candidate_mut(index).function_calls.push(FunctionCall {
            name: name.to_owned(),
            args: args.to_owned(),
        });
    }

    /// Set the `finishReason` for `candidates[index]`.
    pub fn set_finish_reason(&mut self, index: usize, reason: &str) {
        self.candidate_mut(index).finish_reason = Some(reason.to_owned());
    }

    /// Borrow the assembled `candidates[index]`, if it exists.
    #[must_use]
    pub fn candidate(&self, index: usize) -> Option<&Candidate> {
        self.candidates.get(&index)
    }

    /// All assembled candidates in index order, skipping gaps never seen.
    pub fn candidates(&self) -> impl Iterator<Item = (usize, &Candidate)> {
        self.candidates.iter().map(|(&i, c)| (i, c))
    }

    fn candidate_mut(&mut self, index: usize) -> &mut Candidate {
        self.candidates.entry(index).or_default()
    }
}