llmkit-anthropic 0.1.0

Anthropic (Claude) Messages API provider adapter for llmkit-rs
Documentation
//! Anthropic SSE → [`StreamDelta`] parsing.
//!
//! Anthropic's event stream differs from OpenAI's: input tokens arrive in the
//! `message_start` event, output tokens in the trailing `message_delta`, tool
//! calls open with a `content_block_start` and stream arguments as
//! `input_json_delta`s. We normalise all of that into [`StreamDelta`].

use eventsource_stream::Eventsource;
use futures::StreamExt;
use llmkit_core::{ChatStream, LlmError, StreamDelta, TokenUsage};

use crate::types::{StreamContentBlock, StreamDeltaBlock, StreamEvent};

pub(crate) fn parse(resp: reqwest::Response) -> ChatStream {
    let stream = async_stream::stream! {
        let mut events = resp.bytes_stream().eventsource();
        let mut usage = TokenUsage::default();

        while let Some(event) = events.next().await {
            let event = match event {
                Ok(e) => e,
                Err(e) => {
                    yield Err(LlmError::Stream(e.to_string()));
                    return;
                }
            };

            let parsed: StreamEvent = match serde_json::from_str(&event.data) {
                Ok(p) => p,
                Err(e) => {
                    yield Err(LlmError::Stream(format!("malformed event: {e}")));
                    continue;
                }
            };

            match parsed {
                StreamEvent::MessageStart { message } => {
                    if let Some(u) = message.usage {
                        usage.prompt = u.input_tokens;
                    }
                }
                StreamEvent::ContentBlockStart { content_block, .. } => {
                    if let StreamContentBlock::ToolUse { id, name } = content_block {
                        yield Ok(StreamDelta::ToolCall {
                            id: Some(id),
                            name: Some(name),
                            input_delta: String::new(),
                        });
                    }
                }
                StreamEvent::ContentBlockDelta { delta, .. } => match delta {
                    StreamDeltaBlock::TextDelta { text } => {
                        yield Ok(StreamDelta::Text(text));
                    }
                    StreamDeltaBlock::InputJsonDelta { partial_json } => {
                        yield Ok(StreamDelta::ToolCall {
                            id: None,
                            name: None,
                            input_delta: partial_json,
                        });
                    }
                    StreamDeltaBlock::Unknown => {}
                },
                StreamEvent::MessageDelta { usage: u } => {
                    if let Some(u) = u {
                        usage.completion = u.output_tokens;
                        if u.input_tokens > 0 {
                            usage.prompt = u.input_tokens;
                        }
                    }
                }
                StreamEvent::Error { error } => {
                    yield Err(LlmError::Stream(error.message));
                    return;
                }
                StreamEvent::MessageStop => break,
                StreamEvent::ContentBlockStop { .. }
                | StreamEvent::Ping
                | StreamEvent::Unknown => {}
            }
        }

        yield Ok(StreamDelta::Done { usage });
    };

    Box::pin(stream)
}