use crabllm_core::{
AnthropicContentBlock, AnthropicResponse, AnthropicUsage, ChatCompletionChunk, Error, Usage,
};
use futures::{Stream, StreamExt, stream};
use serde::Serialize;
use std::collections::VecDeque;
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
pub enum AnthropicSseEvent {
#[serde(rename = "message_start")]
MessageStart { message: AnthropicResponse },
#[serde(rename = "content_block_start")]
ContentBlockStart {
index: u32,
content_block: AnthropicContentBlock,
},
#[serde(rename = "content_block_delta")]
ContentBlockDelta { index: u32, delta: BlockDelta },
#[serde(rename = "content_block_stop")]
ContentBlockStop { index: u32 },
#[serde(rename = "message_delta")]
MessageDelta {
delta: MessageDeltaPayload,
usage: AnthropicUsage,
},
#[serde(rename = "message_stop")]
MessageStop,
}
impl AnthropicSseEvent {
pub fn event_name(&self) -> &'static str {
match self {
Self::MessageStart { .. } => "message_start",
Self::ContentBlockStart { .. } => "content_block_start",
Self::ContentBlockDelta { .. } => "content_block_delta",
Self::ContentBlockStop { .. } => "content_block_stop",
Self::MessageDelta { .. } => "message_delta",
Self::MessageStop => "message_stop",
}
}
}
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
pub enum BlockDelta {
#[serde(rename = "text_delta")]
Text { text: String },
#[serde(rename = "thinking_delta")]
Thinking { thinking: String },
#[serde(rename = "input_json_delta")]
InputJson { partial_json: String },
}
#[derive(Debug, Clone, Serialize)]
pub struct MessageDeltaPayload {
#[serde(skip_serializing_if = "Option::is_none")]
pub stop_reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stop_sequence: Option<String>,
}
pub fn to_anthropic_sse(
chunks: impl Stream<Item = Result<ChatCompletionChunk, Error>> + Unpin + Send + 'static,
) -> impl Stream<Item = Result<AnthropicSseEvent, Error>> + Send + 'static {
let state = AdapterState::default();
stream::unfold((chunks, state), |(mut chunks, mut state)| async move {
loop {
if let Some(event) = state.pending.pop_front() {
return Some((Ok(event), (chunks, state)));
}
if let Some(err) = state.deferred_error.take() {
state.finished = true;
return Some((Err(err), (chunks, state)));
}
if state.finished {
return None;
}
match chunks.next().await {
Some(Ok(chunk)) => state.handle_chunk(chunk),
Some(Err(e)) => state.abort_with_error(e),
None => state.finalize("end_turn".to_string()),
}
}
})
}
#[derive(Default)]
struct AdapterState {
started: bool,
finished: bool,
current: Option<CurrentBlock>,
next_index: u32,
pending: VecDeque<AnthropicSseEvent>,
deferred_error: Option<Error>,
latest_usage: Option<Usage>,
stop_reason: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum CurrentBlock {
Text { index: u32 },
Thinking { index: u32 },
ToolUse { index: u32, openai_index: u32 },
}
impl AdapterState {
fn handle_chunk(&mut self, chunk: ChatCompletionChunk) {
self.ensure_started(&chunk);
if let Some(usage) = chunk.usage {
self.latest_usage = Some(usage);
}
let Some(choice) = chunk.choices.into_iter().next() else {
return;
};
let delta = choice.delta;
if let Some(reasoning) = delta.reasoning_content
&& !reasoning.is_empty()
{
let index = self.switch_to_thinking();
self.pending
.push_back(AnthropicSseEvent::ContentBlockDelta {
index,
delta: BlockDelta::Thinking {
thinking: reasoning,
},
});
}
if let Some(text) = delta.content
&& !text.is_empty()
{
let index = self.switch_to_text();
self.pending
.push_back(AnthropicSseEvent::ContentBlockDelta {
index,
delta: BlockDelta::Text { text },
});
}
if let Some(tool_deltas) = delta.tool_calls {
for tc in tool_deltas {
self.handle_tool_delta(tc);
}
}
if let Some(reason) = choice.finish_reason {
self.stop_reason = Some(finish_reason_to_stop(&reason));
}
}
fn ensure_started(&mut self, chunk: &ChatCompletionChunk) {
if self.started {
return;
}
self.started = true;
let msg = AnthropicResponse {
id: chunk.id.clone(),
r#type: "message".to_string(),
role: "assistant".to_string(),
model: chunk.model.clone(),
content: Vec::new(),
stop_reason: None,
stop_sequence: None,
usage: empty_usage(),
};
self.pending
.push_back(AnthropicSseEvent::MessageStart { message: msg });
}
fn close_current(&mut self) {
if let Some(block) = self.current.take() {
let index = match block {
CurrentBlock::Text { index }
| CurrentBlock::Thinking { index }
| CurrentBlock::ToolUse { index, .. } => index,
};
self.pending
.push_back(AnthropicSseEvent::ContentBlockStop { index });
}
}
fn switch_to_text(&mut self) -> u32 {
if let Some(CurrentBlock::Text { index }) = self.current {
return index;
}
self.close_current();
let index = self.allocate_index();
self.pending
.push_back(AnthropicSseEvent::ContentBlockStart {
index,
content_block: AnthropicContentBlock::Text {
text: String::new(),
},
});
self.current = Some(CurrentBlock::Text { index });
index
}
fn switch_to_thinking(&mut self) -> u32 {
if let Some(CurrentBlock::Thinking { index }) = self.current {
return index;
}
self.close_current();
let index = self.allocate_index();
self.pending
.push_back(AnthropicSseEvent::ContentBlockStart {
index,
content_block: AnthropicContentBlock::Thinking {
thinking: String::new(),
signature: None,
},
});
self.current = Some(CurrentBlock::Thinking { index });
index
}
fn open_tool_use(&mut self, openai_index: u32, id: String, name: String) -> u32 {
self.close_current();
let index = self.allocate_index();
self.pending
.push_back(AnthropicSseEvent::ContentBlockStart {
index,
content_block: AnthropicContentBlock::ToolUse {
id,
name,
input: serde_json::json!({}),
},
});
self.current = Some(CurrentBlock::ToolUse {
index,
openai_index,
});
index
}
fn handle_tool_delta(&mut self, tc: crabllm_core::ToolCallDelta) {
let openai_index = tc.index;
let current_index = match self.current {
Some(CurrentBlock::ToolUse {
index,
openai_index: oi,
}) if oi == openai_index => index,
_ => {
let id = tc.id.clone().unwrap_or_default();
let name = tc
.function
.as_ref()
.and_then(|f| f.name.clone())
.unwrap_or_default();
self.open_tool_use(openai_index, id, name)
}
};
if let Some(func) = tc.function
&& let Some(args) = func.arguments
&& !args.is_empty()
{
self.pending
.push_back(AnthropicSseEvent::ContentBlockDelta {
index: current_index,
delta: BlockDelta::InputJson { partial_json: args },
});
}
}
fn allocate_index(&mut self) -> u32 {
let idx = self.next_index;
self.next_index += 1;
idx
}
fn finalize(&mut self, default_stop: String) {
debug_assert!(
self.started,
"finalize called before any chunk arrived — caller should have produced at least one chunk"
);
self.close_current();
let stop_reason = self.stop_reason.take().or(Some(default_stop));
let usage = self
.latest_usage
.take()
.map(usage_to_anthropic)
.unwrap_or_else(empty_usage);
self.pending.push_back(AnthropicSseEvent::MessageDelta {
delta: MessageDeltaPayload {
stop_reason,
stop_sequence: None,
},
usage,
});
self.pending.push_back(AnthropicSseEvent::MessageStop);
self.finished = true;
}
fn abort_with_error(&mut self, err: Error) {
if self.started {
self.finalize("error".to_string());
} else {
self.finished = true;
}
self.deferred_error = Some(err);
}
}
fn empty_usage() -> AnthropicUsage {
AnthropicUsage {
input_tokens: 0,
output_tokens: 0,
cache_read_input_tokens: None,
cache_creation_input_tokens: None,
}
}
fn usage_to_anthropic(u: Usage) -> AnthropicUsage {
AnthropicUsage {
input_tokens: u.prompt_tokens,
output_tokens: u.completion_tokens,
cache_read_input_tokens: u.prompt_cache_hit_tokens,
cache_creation_input_tokens: u.prompt_cache_miss_tokens,
}
}
fn finish_reason_to_stop(reason: &crabllm_core::FinishReason) -> String {
use crabllm_core::FinishReason::*;
match reason {
Stop => "end_turn".to_string(),
Length => "max_tokens".to_string(),
ToolCalls => "tool_use".to_string(),
ContentFilter => "content_filter".to_string(),
Custom(s) => s.clone(),
}
}