stream-rs 0.1.0

Zero-dependency, spec-compliant streaming toolkit for LLM responses (SSE, incremental JSON, OpenAI/Anthropic delta accumulators).
Documentation
//! Accumulator for `OpenAI` `chat.completion.chunk` streaming deltas.
//!
//! `OpenAI` streams a chat completion as a sequence of chunks, each carrying a
//! `choices` array. Every choice has an `index` and a `delta` that may contain
//! a `role`, a `content` fragment, and/or `tool_calls` fragments. This type
//! folds those fragments into the final per-choice content and tool calls.
//!
//! It is JSON-library agnostic: parse each chunk however you like and call the
//! typed methods below. The accumulator never allocates per fragment beyond the
//! string growth it performs.
//!
//! Choices and tool calls are stored sparsely in a [`BTreeMap`] keyed by their
//! provider-supplied index, so a stream that reports a large or non-contiguous
//! `index` (whether by quirk or by a malicious server) costs only the entries
//! actually seen — it never forces a dense allocation up to that index.
//!
//! # Example
//!
//! ```
//! use stream_rs::accumulators::openai::OpenAiAccumulator;
//!
//! let mut acc = OpenAiAccumulator::new();
//! acc.push_role(0, "assistant");
//! acc.push_content(0, "Hel");
//! acc.push_content(0, "lo");
//! assert_eq!(acc.choice(0).unwrap().content, "Hello");
//! assert_eq!(acc.choice(0).unwrap().role.as_deref(), Some("assistant"));
//! ```

use alloc::borrow::ToOwned;
use alloc::collections::BTreeMap;
use alloc::string::String;

/// One streamed tool call being assembled.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ToolCall {
    /// The tool call id (set once, from the first fragment that carries it).
    pub id: Option<String>,
    /// The function name (set once).
    pub name: Option<String>,
    /// The function arguments, accumulated from streamed JSON fragments.
    pub arguments: String,
}

/// The assembled state of a single `choices[index]`.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct Choice {
    /// The role from the first delta that carried one.
    pub role: Option<String>,
    /// The concatenated text content.
    pub content: String,
    /// Tool calls keyed by their `tool_calls[i].index`.
    pub tool_calls: BTreeMap<usize, ToolCall>,
    /// The `finish_reason`, set when the final chunk for this choice arrives.
    pub finish_reason: Option<String>,
}

/// Accumulates `OpenAI` chat-completion streaming deltas into final choices.
#[derive(Debug, Default)]
pub struct OpenAiAccumulator {
    choices: BTreeMap<usize, Choice>,
}

impl OpenAiAccumulator {
    /// Create an empty accumulator.
    #[must_use]
    pub fn new() -> Self {
        Self::default()
    }

    /// Apply `delta.role` for `choices[index]`.
    ///
    /// The choice is created on first reference, so any `index` is valid; the
    /// role is recorded only from the first delta that carries one.
    pub fn push_role(&mut self, index: usize, role: &str) {
        let choice = self.choice_mut(index);
        if choice.role.is_none() {
            choice.role = Some(role.to_owned());
        }
    }

    /// Append a `delta.content` fragment for `choices[index]`.
    ///
    /// The choice is created on first reference, so any `index` is valid.
    pub fn push_content(&mut self, index: usize, fragment: &str) {
        self.choice_mut(index).content.push_str(fragment);
    }

    /// Set the `finish_reason` for `choices[index]`.
    pub fn set_finish_reason(&mut self, index: usize, reason: &str) {
        self.choice_mut(index).finish_reason = Some(reason.to_owned());
    }

    /// Apply a `tool_calls[tool_index]` fragment for `choices[index]`.
    ///
    /// Any of `id`, `name`, or `arguments` may be `None`/empty for a given
    /// fragment; `arguments` fragments are concatenated in arrival order.
    pub fn push_tool_call(
        &mut self,
        index: usize,
        tool_index: usize,
        id: Option<&str>,
        name: Option<&str>,
        arguments: Option<&str>,
    ) {
        let choice = self.choice_mut(index);
        let tc = choice.tool_calls.entry(tool_index).or_default();
        if let Some(id) = id {
            if tc.id.is_none() {
                tc.id = Some(id.to_owned());
            }
        }
        if let Some(name) = name {
            if tc.name.is_none() {
                tc.name = Some(name.to_owned());
            }
        }
        if let Some(args) = arguments {
            tc.arguments.push_str(args);
        }
    }

    /// Borrow the assembled `choices[index]`, if it exists.
    #[must_use]
    pub fn choice(&self, index: usize) -> Option<&Choice> {
        self.choices.get(&index)
    }

    /// All assembled choices in index order, skipping gaps that were never seen.
    pub fn choices(&self) -> impl Iterator<Item = (usize, &Choice)> {
        self.choices.iter().map(|(&i, c)| (i, c))
    }

    fn choice_mut(&mut self, index: usize) -> &mut Choice {
        self.choices.entry(index).or_default()
    }
}