use serde::{Deserialize, Serialize};
#[derive(Debug, Clone)]
pub enum StreamEvent {
MessageStart { id: String, model: String },
ContentBlockStart {
index: u32,
block_type: ContentBlockType,
},
ContentBlockDelta { index: u32, delta: ContentDelta },
ContentBlockStop { index: u32 },
MessageDelta {
stop_reason: Option<String>,
usage: StreamUsage,
},
MessageStop,
}
#[derive(Debug, Clone)]
pub enum ContentBlockType {
Text,
Thinking,
ToolUse { id: String, name: String },
}
#[derive(Debug, Clone)]
pub enum ContentDelta {
Text(String),
Thinking(String),
ToolInput(String),
}
#[derive(Debug, Clone, Default)]
pub struct StreamUsage {
pub input_tokens: u32,
pub output_tokens: u32,
}
#[derive(Debug, Clone, Copy)]
pub enum StreamProvider {
Anthropic,
AnthropicCompatible,
OpenAI,
OpenAICompatible,
Gemini,
AzureOpenAI,
Bedrock,
Ollama,
}
#[derive(Debug)]
pub struct StreamState {
model: String,
message_started: bool,
text_started: bool,
text_finished: bool,
thinking_started: bool,
thinking_finished: bool,
finished: bool,
stop_reason: Option<String>,
usage: Option<StreamUsage>,
#[allow(dead_code)]
tool_index_offset: u32,
#[allow(dead_code)]
tool_calls_count: u32,
}
impl StreamState {
pub fn new(model: String) -> Self {
Self {
model,
message_started: false,
text_started: false,
text_finished: false,
thinking_started: false,
thinking_finished: false,
finished: false,
stop_reason: None,
usage: None,
tool_index_offset: 0,
tool_calls_count: 0,
}
}
pub fn ingest_anthropic(&mut self, event: AnthropicStreamEvent) -> Vec<StreamEvent> {
let mut events = Vec::new();
match event {
AnthropicStreamEvent::MessageStart { message } => {
if !self.message_started {
self.message_started = true;
events.push(StreamEvent::MessageStart {
id: message.id,
model: message.model,
});
}
}
AnthropicStreamEvent::ContentBlockStart {
index,
content_block,
} => {
let block_type = match content_block {
AnthropicContentBlock::Text { .. } => ContentBlockType::Text,
AnthropicContentBlock::Thinking { .. } => ContentBlockType::Thinking,
AnthropicContentBlock::ToolUse { id, name, .. } => {
ContentBlockType::ToolUse { id, name }
}
};
events.push(StreamEvent::ContentBlockStart { index, block_type });
}
AnthropicStreamEvent::ContentBlockDelta { index, delta } => {
let content_delta = match delta {
AnthropicContentDelta::Text { text } => ContentDelta::Text(text),
AnthropicContentDelta::Thinking { thinking } => {
ContentDelta::Thinking(thinking)
}
AnthropicContentDelta::InputJson { partial_json } => {
ContentDelta::ToolInput(partial_json)
}
};
events.push(StreamEvent::ContentBlockDelta {
index,
delta: content_delta,
});
}
AnthropicStreamEvent::ContentBlockStop { index } => {
events.push(StreamEvent::ContentBlockStop { index });
}
AnthropicStreamEvent::MessageDelta { delta, usage } => {
self.stop_reason = delta.stop_reason;
self.usage = Some(StreamUsage {
input_tokens: usage.input_tokens,
output_tokens: usage.output_tokens,
});
events.push(StreamEvent::MessageDelta {
stop_reason: self.stop_reason.clone(),
usage: self.usage.clone().unwrap_or_default(),
});
}
AnthropicStreamEvent::MessageStop { .. } => {
events.push(StreamEvent::MessageStop);
}
}
events
}
pub fn ingest_openai(&mut self, chunk: OpenAiStreamChunk) -> Vec<StreamEvent> {
let mut events = Vec::new();
if !self.message_started {
self.message_started = true;
events.push(StreamEvent::MessageStart {
id: chunk.id.clone(),
model: chunk.model.clone().unwrap_or_else(|| self.model.clone()),
});
}
if let Some(usage) = chunk.usage {
self.usage = Some(StreamUsage {
input_tokens: usage.prompt_tokens,
output_tokens: usage.completion_tokens,
});
}
for choice in chunk.choices {
if let Some(reasoning) = choice.delta.reasoning_content.filter(|v| !v.is_empty()) {
if !self.thinking_started {
self.thinking_started = true;
events.push(StreamEvent::ContentBlockStart {
index: 0,
block_type: ContentBlockType::Thinking,
});
}
events.push(StreamEvent::ContentBlockDelta {
index: 0,
delta: ContentDelta::Thinking(reasoning),
});
}
if let Some(content) = choice.delta.content.filter(|v| !v.is_empty()) {
if self.thinking_started && !self.thinking_finished {
self.thinking_finished = true;
events.push(StreamEvent::ContentBlockStop { index: 0 });
}
let text_index = if self.thinking_started { 1 } else { 0 };
if !self.text_started {
self.text_started = true;
events.push(StreamEvent::ContentBlockStart {
index: text_index,
block_type: ContentBlockType::Text,
});
}
events.push(StreamEvent::ContentBlockDelta {
index: text_index,
delta: ContentDelta::Text(content),
});
}
for (i, tool_call) in choice.delta.tool_calls.into_iter().enumerate() {
let tool_index = (if self.thinking_started { 2 } else { 1 }) + i as u32;
if let Some(name) = tool_call.function.name {
events.push(StreamEvent::ContentBlockStart {
index: tool_index,
block_type: ContentBlockType::ToolUse {
id: tool_call.id.unwrap_or_default(),
name,
},
});
}
if let Some(args) = tool_call.function.arguments {
events.push(StreamEvent::ContentBlockDelta {
index: tool_index,
delta: ContentDelta::ToolInput(args),
});
}
}
if let Some(finish_reason) = choice.finish_reason {
self.stop_reason = Some(normalize_openai_finish_reason(&finish_reason));
}
}
events
}
pub fn ingest_ollama(&mut self, chunk: OllamaStreamChunk) -> Vec<StreamEvent> {
let mut events = Vec::new();
if !self.message_started {
self.message_started = true;
events.push(StreamEvent::MessageStart {
id: "".to_string(),
model: chunk.model.clone().unwrap_or_else(|| self.model.clone()),
});
}
if let Some(message) = &chunk.message {
if let Some(content) = &message.content {
if !content.is_empty() {
if !self.text_started {
self.text_started = true;
events.push(StreamEvent::ContentBlockStart {
index: 0,
block_type: ContentBlockType::Text,
});
}
events.push(StreamEvent::ContentBlockDelta {
index: 0,
delta: ContentDelta::Text(content.clone()),
});
}
}
}
if chunk.done {
if chunk.prompt_eval_count.is_some() || chunk.eval_count.is_some() {
self.usage = Some(StreamUsage {
input_tokens: chunk.prompt_eval_count.unwrap_or(0),
output_tokens: chunk.eval_count.unwrap_or(0),
});
}
if self.text_started && !self.text_finished {
self.text_finished = true;
events.push(StreamEvent::ContentBlockStop { index: 0 });
}
events.push(StreamEvent::MessageDelta {
stop_reason: Some("stop".to_string()),
usage: self.usage.clone().unwrap_or_default(),
});
events.push(StreamEvent::MessageStop);
}
events
}
pub fn finish(&mut self) -> Vec<StreamEvent> {
if self.finished {
return Vec::new();
}
self.finished = true;
let mut events = Vec::new();
if self.thinking_started && !self.thinking_finished {
self.thinking_finished = true;
events.push(StreamEvent::ContentBlockStop { index: 0 });
}
if self.text_started && !self.text_finished {
self.text_finished = true;
let text_index = if self.thinking_started { 1 } else { 0 };
events.push(StreamEvent::ContentBlockStop { index: text_index });
}
if self.message_started {
events.push(StreamEvent::MessageDelta {
stop_reason: self
.stop_reason
.clone()
.or_else(|| Some("end_turn".to_string())),
usage: self.usage.clone().unwrap_or_default(),
});
events.push(StreamEvent::MessageStop);
}
events
}
}
fn normalize_openai_finish_reason(reason: &str) -> String {
match reason {
"stop" => "end_turn".to_string(),
"tool_calls" => "tool_use".to_string(),
other => other.to_string(),
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum AnthropicStreamEvent {
MessageStart { message: AnthropicMessageStart },
ContentBlockStart {
index: u32,
content_block: AnthropicContentBlock,
},
ContentBlockDelta {
index: u32,
delta: AnthropicContentDelta,
},
ContentBlockStop { index: u32 },
MessageDelta {
delta: AnthropicMessageDelta,
#[serde(default)]
usage: AnthropicStreamUsage,
},
MessageStop {},
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AnthropicMessageStart {
pub id: String,
#[serde(rename = "type")]
pub kind: String,
pub role: String,
pub model: String,
#[serde(default)]
pub content: Vec<AnthropicContentBlock>,
#[serde(default)]
pub stop_reason: Option<String>,
#[serde(default)]
pub stop_sequence: Option<String>,
#[serde(default)]
pub usage: AnthropicStreamUsage,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum AnthropicContentBlock {
Text {
text: String,
},
Thinking {
thinking: String,
},
ToolUse {
id: String,
name: String,
input: serde_json::Value,
},
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum AnthropicContentDelta {
#[serde(rename = "text_delta")]
Text { text: String },
#[serde(rename = "thinking_delta")]
Thinking { thinking: String },
#[serde(rename = "input_json_delta")]
InputJson { partial_json: String },
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AnthropicMessageDelta {
pub stop_reason: Option<String>,
pub stop_sequence: Option<String>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct AnthropicStreamUsage {
#[serde(default)]
pub input_tokens: u32,
#[serde(default)]
pub output_tokens: u32,
}
#[derive(Debug, Clone, Deserialize)]
pub struct OpenAiStreamChunk {
pub id: String,
#[serde(default)]
pub model: Option<String>,
#[serde(default)]
pub choices: Vec<OpenAiStreamChoice>,
#[serde(default)]
pub usage: Option<OpenAiStreamUsage>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct OpenAiStreamChoice {
pub delta: OpenAiStreamDelta,
#[serde(default)]
pub finish_reason: Option<String>,
}
#[derive(Debug, Default, Clone, Deserialize)]
pub struct OpenAiStreamDelta {
#[serde(default)]
pub content: Option<String>,
#[serde(default)]
pub reasoning_content: Option<String>,
#[serde(default)]
pub tool_calls: Vec<OpenAiStreamToolCall>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct OpenAiStreamToolCall {
#[serde(default)]
pub index: u32,
#[serde(default)]
pub id: Option<String>,
#[serde(default)]
pub function: OpenAiStreamFunction,
}
#[derive(Debug, Default, Clone, Deserialize)]
pub struct OpenAiStreamFunction {
#[serde(default)]
pub name: Option<String>,
#[serde(default)]
pub arguments: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct OpenAiStreamUsage {
#[serde(default)]
pub prompt_tokens: u32,
#[serde(default)]
pub completion_tokens: u32,
}
#[derive(Debug, Clone, Deserialize)]
pub struct OllamaStreamChunk {
#[serde(default)]
pub model: Option<String>,
#[serde(default)]
pub message: Option<OllamaStreamMessage>,
#[serde(default)]
pub done: bool,
#[serde(default)]
pub prompt_eval_count: Option<u32>,
#[serde(default)]
pub eval_count: Option<u32>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct OllamaStreamMessage {
#[serde(default)]
pub role: Option<String>,
#[serde(default)]
pub content: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn stream_state_handles_anthropic_events() {
let mut state = StreamState::new("claude-sonnet-4-6".to_string());
let start_event = AnthropicStreamEvent::MessageStart {
message: AnthropicMessageStart {
id: "msg_123".to_string(),
kind: "message".to_string(),
role: "assistant".to_string(),
model: "claude-sonnet-4-6".to_string(),
content: vec![],
stop_reason: None,
stop_sequence: None,
usage: AnthropicStreamUsage::default(),
},
};
let events = state.ingest_anthropic(start_event);
assert!(matches!(events[0], StreamEvent::MessageStart { .. }));
}
#[test]
fn stream_state_handles_openai_events() {
let mut state = StreamState::new("gpt-4o".to_string());
let chunk = OpenAiStreamChunk {
id: "chatcmpl_123".to_string(),
model: Some("gpt-4o".to_string()),
choices: vec![OpenAiStreamChoice {
delta: OpenAiStreamDelta {
content: Some("Hello".to_string()),
..Default::default()
},
finish_reason: None,
}],
usage: None,
};
let events = state.ingest_openai(chunk);
assert!(matches!(events[0], StreamEvent::MessageStart { .. }));
assert!(matches!(events[1], StreamEvent::ContentBlockStart { .. }));
}
}