stream-rs 0.1.0

Zero-dependency, spec-compliant streaming toolkit for LLM responses (SSE, incremental JSON, OpenAI/Anthropic delta accumulators).
Documentation
//! Accumulator for Anthropic Messages API streaming events.
//!
//! Anthropic streams a message as a sequence of typed events:
//!
//! * `message_start` — opens the message.
//! * `content_block_start` — opens a content block at `index` (text or `tool_use`).
//! * `content_block_delta` — appends a `text_delta` or `input_json_delta`.
//! * `content_block_stop` — closes the block at `index`.
//! * `message_delta` / `message_stop` — final metadata and end of stream.
//!
//! This accumulator enforces the legal ordering and folds the deltas into final
//! content blocks. The ordering rules it rejects with an [`AccumulateError`]:
//!
//! * any block event (`content_block_start` / delta / stop) before
//!   `message_start`;
//! * any block event after `message_stop`;
//! * a delta or stop for a block that was never started;
//! * a `content_block_start` for an index whose block is already open (not yet
//!   stopped);
//! * a delta whose kind does not match the block kind (`text_delta` into a
//!   `tool_use` block, or `input_json_delta` into a `text` block).
//!
//! It is JSON-library agnostic: dispatch each parsed event to the matching
//! method below.
//!
//! # Example
//!
//! ```
//! use stream_rs::accumulators::anthropic::{AnthropicAccumulator, BlockKind};
//!
//! let mut acc = AnthropicAccumulator::new();
//! acc.message_start();
//! acc.content_block_start(0, BlockKind::Text).unwrap();
//! acc.text_delta(0, "Hel").unwrap();
//! acc.text_delta(0, "lo").unwrap();
//! acc.content_block_stop(0).unwrap();
//! assert_eq!(acc.block(0).unwrap().text, "Hello");
//! ```

use alloc::borrow::ToOwned;
use alloc::collections::BTreeMap;
use alloc::format;
use alloc::string::String;

use crate::error::AccumulateError;

/// The kind of an Anthropic content block.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BlockKind {
    /// A plain text block; deltas are `text_delta`.
    Text,
    /// A tool-use block; deltas are `input_json_delta` accumulated into `text`.
    ToolUse,
}

/// An assembled content block.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ContentBlock {
    /// The block kind.
    pub kind: BlockKind,
    /// Accumulated text (`text_delta`) or partial JSON (`input_json_delta`).
    pub text: String,
    /// Whether `content_block_stop` has been received for this block.
    pub stopped: bool,
}

/// Accumulates Anthropic Messages streaming events into final content blocks.
///
/// Blocks are stored sparsely in a [`BTreeMap`] keyed by their `index`, so a
/// large or non-contiguous index never forces a dense allocation up to that
/// index.
#[derive(Debug, Default)]
pub struct AnthropicAccumulator {
    started: bool,
    stopped: bool,
    blocks: BTreeMap<usize, ContentBlock>,
    /// `stop_reason` from a `message_delta`, if any.
    stop_reason: Option<String>,
}

impl AnthropicAccumulator {
    /// Create an empty accumulator.
    #[must_use]
    pub fn new() -> Self {
        Self::default()
    }

    /// Handle `message_start`. Idempotent.
    pub fn message_start(&mut self) {
        self.started = true;
    }

    /// Handle `content_block_start` for `index`.
    ///
    /// # Errors
    /// Rejects a start before `message_start`, after `message_stop`, or for an
    /// index whose block is already open (started but not yet stopped).
    pub fn content_block_start(
        &mut self,
        index: usize,
        kind: BlockKind,
    ) -> Result<(), AccumulateError> {
        self.ensure_active("content_block_start")?;
        if matches!(self.blocks.get(&index), Some(b) if !b.stopped) {
            return Err(AccumulateError::UnexpectedEvent {
                got: format!("content_block_start for block {index} that is already open"),
            });
        }
        self.blocks.insert(
            index,
            ContentBlock {
                kind,
                text: String::new(),
                stopped: false,
            },
        );
        Ok(())
    }

    /// Append a `text_delta` to the text block at `index`.
    ///
    /// # Errors
    /// Rejects a delta before `message_start`, after `message_stop`, for a
    /// block that was never started, or whose kind is not [`BlockKind::Text`].
    pub fn text_delta(&mut self, index: usize, fragment: &str) -> Result<(), AccumulateError> {
        self.delta(index, fragment, BlockKind::Text)
    }

    /// Append an `input_json_delta` (partial JSON) to the `tool_use` block at `index`.
    ///
    /// # Errors
    /// Rejects a delta before `message_start`, after `message_stop`, for a
    /// block that was never started, or whose kind is not [`BlockKind::ToolUse`].
    pub fn input_json_delta(
        &mut self,
        index: usize,
        fragment: &str,
    ) -> Result<(), AccumulateError> {
        self.delta(index, fragment, BlockKind::ToolUse)
    }

    /// Handle `content_block_stop` for `index`.
    ///
    /// # Errors
    /// Rejects a stop before `message_start`, after `message_stop`, or for a
    /// block that was never started.
    pub fn content_block_stop(&mut self, index: usize) -> Result<(), AccumulateError> {
        self.ensure_active("content_block_stop")?;
        let block = self.block_mut(index)?;
        block.stopped = true;
        Ok(())
    }

    /// Handle `message_delta` carrying a `stop_reason`.
    pub fn message_delta(&mut self, stop_reason: Option<&str>) {
        if let Some(reason) = stop_reason {
            self.stop_reason = Some(reason.to_owned());
        }
    }

    /// Handle `message_stop`.
    pub fn message_stop(&mut self) {
        self.stopped = true;
    }

    /// The `stop_reason`, if one was reported.
    #[must_use]
    pub fn stop_reason(&self) -> Option<&str> {
        self.stop_reason.as_deref()
    }

    /// Borrow the assembled block at `index`, if it exists.
    #[must_use]
    pub fn block(&self, index: usize) -> Option<&ContentBlock> {
        self.blocks.get(&index)
    }

    /// All assembled blocks in index order.
    pub fn blocks(&self) -> impl Iterator<Item = (usize, &ContentBlock)> {
        self.blocks.iter().map(|(&i, b)| (i, b))
    }

    /// Reject any block event that arrives before `message_start` or after
    /// `message_stop`.
    fn ensure_active(&self, what: &str) -> Result<(), AccumulateError> {
        if !self.started {
            return Err(AccumulateError::UnexpectedEvent {
                got: format!("{what} before message_start"),
            });
        }
        if self.stopped {
            return Err(AccumulateError::UnexpectedEvent {
                got: format!("{what} after message_stop"),
            });
        }
        Ok(())
    }

    fn delta(
        &mut self,
        index: usize,
        fragment: &str,
        expected: BlockKind,
    ) -> Result<(), AccumulateError> {
        self.ensure_active("content_block_delta")?;
        let block = self.block_mut(index)?;
        if block.kind != expected {
            return Err(AccumulateError::BlockKindMismatch {
                index,
                expected: kind_name(expected),
                actual: kind_name(block.kind),
            });
        }
        block.text.push_str(fragment);
        Ok(())
    }

    fn block_mut(&mut self, index: usize) -> Result<&mut ContentBlock, AccumulateError> {
        match self.blocks.get_mut(&index) {
            Some(block) => Ok(block),
            None => Err(AccumulateError::UnexpectedEvent {
                got: format!("delta/stop for content block {index} that was never started"),
            }),
        }
    }
}

/// Human-readable name for a [`BlockKind`], used in error messages.
fn kind_name(kind: BlockKind) -> &'static str {
    match kind {
        BlockKind::Text => "text",
        BlockKind::ToolUse => "tool_use",
    }
}