use chrono::Utc;
use serde::{Deserialize, Serialize};
use crate::config::LogLevel;
#[derive(Debug, Deserialize)]
pub struct AnthropicResponse {
pub content: Vec<AnthropicContentBlock>,
#[serde(rename = "stop_reason")]
pub stop_reason: Option<String>,
pub usage: Option<AnthropicUsage>,
}
#[derive(Debug, Deserialize)]
#[serde(tag = "type")]
pub enum AnthropicContentBlock {
#[serde(rename = "text")]
Text {
text: String,
},
#[serde(rename = "tool_use")]
ToolUse {
id: String,
name: String,
input: serde_json::Value,
},
}
#[derive(Debug, Deserialize)]
pub struct AnthropicUsage {
#[serde(rename = "input_tokens")]
pub input_tokens: Option<u32>,
#[serde(rename = "output_tokens")]
pub output_tokens: Option<u32>,
}
#[derive(Debug, Deserialize)]
#[serde(tag = "type")]
pub enum AnthropicStreamEvent {
#[serde(rename = "content_block_delta")]
ContentBlockDelta {
delta: AnthropicDelta,
},
#[serde(rename = "content_block_start")]
ContentBlockStart {
#[serde(rename = "content_block")]
content_block: AnthropicStreamContentBlock,
},
#[serde(rename = "content_block_stop")]
ContentBlockStop,
#[serde(rename = "message_stop")]
MessageStop {
#[serde(rename = "stop_reason")]
stop_reason: Option<String>,
},
#[serde(rename = "message_start")]
MessageStart {
#[allow(dead_code)]
message: serde_json::Value,
},
#[serde(rename = "message_delta")]
MessageDelta {
delta: MessageDelta,
},
#[serde(rename = "ping")]
Ping,
}
#[derive(Debug, Deserialize)]
pub struct AnthropicDelta {
pub text: Option<String>,
#[serde(rename = "partial_json")]
pub partial_json: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct AnthropicStreamContentBlock {
#[serde(rename = "type")]
pub block_type: String,
pub id: Option<String>,
pub name: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct MessageDelta {
#[serde(rename = "stop_reason")]
pub stop_reason: Option<String>,
#[serde(rename = "stop_sequence")]
#[allow(dead_code)]
pub stop_sequence: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct OpenAiResponse {
pub id: String,
pub object: String,
pub created: i64,
pub model: String,
pub choices: Vec<OpenAiChoice>,
pub usage: OpenAiUsage,
}
#[derive(Debug, Serialize)]
pub struct OpenAiChoice {
pub index: u32,
pub message: OpenAiResponseMessage,
#[serde(rename = "finish_reason")]
pub finish_reason: String,
}
#[derive(Debug, Serialize)]
pub struct OpenAiResponseMessage {
pub role: String,
pub content: Option<String>,
#[serde(rename = "tool_calls", skip_serializing_if = "Option::is_none")]
pub tool_calls: Option<Vec<OpenAiToolCall>>,
}
#[derive(Debug, Serialize)]
pub struct OpenAiToolCall {
pub id: String,
#[serde(rename = "type")]
pub call_type: String,
pub function: OpenAiFunctionCall,
}
#[derive(Debug, Serialize)]
pub struct OpenAiFunctionCall {
pub name: String,
pub arguments: String,
}
#[derive(Debug, Serialize)]
pub struct OpenAiUsage {
#[serde(rename = "prompt_tokens")]
pub prompt_tokens: u32,
#[serde(rename = "completion_tokens")]
pub completion_tokens: u32,
#[serde(rename = "total_tokens")]
pub total_tokens: u32,
}
#[derive(Debug, Serialize)]
pub struct OpenAiStreamChunk {
pub id: String,
pub object: String,
pub created: i64,
pub model: String,
pub choices: Vec<OpenAiStreamChoice>,
}
#[derive(Debug, Serialize)]
pub struct OpenAiStreamChoice {
pub index: u32,
pub delta: OpenAiStreamDelta,
#[serde(rename = "finish_reason", skip_serializing_if = "Option::is_none")]
pub finish_reason: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct OpenAiStreamDelta {
#[serde(skip_serializing_if = "Option::is_none")]
pub content: Option<String>,
#[serde(rename = "tool_calls", skip_serializing_if = "Option::is_none")]
pub tool_calls: Option<Vec<OpenAiStreamToolCall>>,
}
#[derive(Debug, Serialize)]
pub struct OpenAiStreamToolCall {
pub index: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
#[serde(rename = "type", skip_serializing_if = "Option::is_none")]
pub call_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub function: Option<OpenAiStreamFunctionCall>,
}
#[derive(Debug, Serialize)]
pub struct OpenAiStreamFunctionCall {
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub arguments: Option<String>,
}
#[derive(Debug)]
pub struct StreamingToolCall {
#[allow(dead_code)]
pub id: String,
pub name: String,
pub arguments: String,
}
pub struct AnthropicToOpenAiConverter {
log_level: LogLevel,
}
const CHAT_COMPLETION_OBJECT: &str = "chat.completion";
const CHAT_COMPLETION_CHUNK_OBJECT: &str = "chat.completion.chunk";
const ASSISTANT_ROLE: &str = "assistant";
const FUNCTION_TOOL_TYPE: &str = "function";
impl AnthropicToOpenAiConverter {
pub fn new(log_level: LogLevel) -> Self {
Self { log_level }
}
pub fn convert(&self, response: AnthropicResponse, model: &str) -> OpenAiResponse {
let mut message = OpenAiResponseMessage {
role: ASSISTANT_ROLE.to_string(),
content: None,
tool_calls: None,
};
self.extract_text_content(&mut message, &response.content);
self.extract_tool_calls(&mut message, &response.content);
let finish_reason =
self.determine_finish_reason(&response.stop_reason, &message.tool_calls);
let usage = self.convert_usage(response.usage);
OpenAiResponse {
id: self.generate_response_id(),
object: CHAT_COMPLETION_OBJECT.to_string(),
created: Utc::now().timestamp(),
model: model.to_string(),
choices: vec![OpenAiChoice { index: 0, message, finish_reason }],
usage,
}
}
pub fn convert_stream_event(
&self,
event: &AnthropicStreamEvent,
model: &str,
current_tool_call: &mut Option<StreamingToolCall>,
has_tool_calls: &mut bool,
stop_reason_from_delta: &mut Option<String>,
) -> Option<OpenAiStreamChunk> {
match event {
AnthropicStreamEvent::ContentBlockDelta { delta } => {
self.handle_content_delta(delta, model, current_tool_call)
}
AnthropicStreamEvent::ContentBlockStart { content_block } => {
self.handle_content_start(content_block, model, current_tool_call, has_tool_calls)
}
AnthropicStreamEvent::ContentBlockStop => self.handle_content_stop(current_tool_call),
AnthropicStreamEvent::MessageStart { .. } => self.handle_message_start(),
AnthropicStreamEvent::MessageDelta { delta } => {
self.handle_message_delta(delta, stop_reason_from_delta)
}
AnthropicStreamEvent::Ping => None,
AnthropicStreamEvent::MessageStop { stop_reason } => self.handle_message_stop(
stop_reason,
model,
current_tool_call,
has_tool_calls,
stop_reason_from_delta,
),
}
}
fn extract_text_content(
&self,
message: &mut OpenAiResponseMessage,
content_blocks: &[AnthropicContentBlock],
) {
let text_content: Vec<&str> = content_blocks
.iter()
.filter_map(|block| {
if let AnthropicContentBlock::Text { text } = block {
Some(text.as_str())
} else {
None
}
})
.collect();
if !text_content.is_empty() {
message.content = Some(text_content.join(""));
}
}
fn extract_tool_calls(
&self,
message: &mut OpenAiResponseMessage,
content_blocks: &[AnthropicContentBlock],
) {
let tool_use_blocks: Vec<_> = content_blocks
.iter()
.filter_map(|block| {
if let AnthropicContentBlock::ToolUse { id, name, input } = block {
Some((id, name, input))
} else {
None
}
})
.collect();
if !tool_use_blocks.is_empty() {
self.debug(&format!(
"Found {} tool call(s) in Anthropic response",
tool_use_blocks.len()
));
message.tool_calls = Some(
tool_use_blocks
.into_iter()
.map(|(id, name, input)| {
let args_str =
serde_json::to_string(input).unwrap_or_else(|_| "{}".to_string());
self.debug(&format!("Tool call: {}({})", name, args_str));
OpenAiToolCall {
id: id.clone(),
call_type: FUNCTION_TOOL_TYPE.to_string(),
function: OpenAiFunctionCall {
name: name.clone(),
arguments: args_str,
},
}
})
.collect(),
);
}
}
fn determine_finish_reason(
&self,
stop_reason: &Option<String>,
tool_calls: &Option<Vec<OpenAiToolCall>>,
) -> String {
match stop_reason.as_deref() {
Some("end_turn") => "stop",
Some("tool_use") => "tool_calls",
Some("max_tokens") => "length",
_ => {
if tool_calls.is_some() {
"tool_calls"
} else {
"length"
}
}
}
.to_string()
}
fn convert_usage(&self, usage: Option<AnthropicUsage>) -> OpenAiUsage {
let usage = usage.unwrap_or(AnthropicUsage { input_tokens: None, output_tokens: None });
let prompt_tokens = usage.input_tokens.unwrap_or(0);
let completion_tokens = usage.output_tokens.unwrap_or(0);
OpenAiUsage {
prompt_tokens,
completion_tokens,
total_tokens: prompt_tokens + completion_tokens,
}
}
fn generate_response_id(&self) -> String {
format!("chatcmpl-{}", Utc::now().timestamp_millis())
}
fn handle_content_delta(
&self,
delta: &AnthropicDelta,
model: &str,
current_tool_call: &mut Option<StreamingToolCall>,
) -> Option<OpenAiStreamChunk> {
if let Some(text) = &delta.text {
self.create_text_chunk(text, model)
} else if let Some(partial_json) = &delta.partial_json {
self.handle_tool_argument_delta(partial_json, model, current_tool_call)
} else {
None
}
}
pub fn create_text_chunk(&self, text: &str, model: &str) -> Option<OpenAiStreamChunk> {
Some(OpenAiStreamChunk {
id: self.generate_response_id(),
object: CHAT_COMPLETION_CHUNK_OBJECT.to_string(),
created: Utc::now().timestamp(),
model: model.to_string(),
choices: vec![OpenAiStreamChoice {
index: 0,
delta: OpenAiStreamDelta { content: Some(text.to_string()), tool_calls: None },
finish_reason: None,
}],
})
}
fn handle_tool_argument_delta(
&self,
partial_json: &str,
model: &str,
current_tool_call: &mut Option<StreamingToolCall>,
) -> Option<OpenAiStreamChunk> {
if let Some(tool_call) = current_tool_call.as_mut() {
self.debug(&format!(
"[STREAM] Tool call arguments delta for {}: {}",
tool_call.name, partial_json
));
tool_call.arguments.push_str(partial_json);
Some(self.create_tool_argument_chunk(partial_json, model))
} else {
None
}
}
fn create_tool_argument_chunk(&self, partial_json: &str, model: &str) -> OpenAiStreamChunk {
OpenAiStreamChunk {
id: self.generate_response_id(),
object: CHAT_COMPLETION_CHUNK_OBJECT.to_string(),
created: Utc::now().timestamp(),
model: model.to_string(),
choices: vec![OpenAiStreamChoice {
index: 0,
delta: OpenAiStreamDelta {
content: None,
tool_calls: Some(vec![OpenAiStreamToolCall {
index: 0,
id: None,
call_type: None,
function: Some(OpenAiStreamFunctionCall {
name: None,
arguments: Some(partial_json.to_string()),
}),
}]),
},
finish_reason: None,
}],
}
}
fn handle_content_start(
&self,
content_block: &AnthropicStreamContentBlock,
model: &str,
current_tool_call: &mut Option<StreamingToolCall>,
has_tool_calls: &mut bool,
) -> Option<OpenAiStreamChunk> {
if content_block.block_type == "tool_use" {
if let (Some(id), Some(name)) = (&content_block.id, &content_block.name) {
self.debug(&format!("[STREAM] Tool call started: {} (id: {})", name, id));
*has_tool_calls = true;
*current_tool_call = Some(StreamingToolCall {
id: id.clone(),
name: name.clone(),
arguments: String::new(),
});
Some(self.create_tool_start_chunk(id, name, model))
} else {
None
}
} else {
None
}
}
fn create_tool_start_chunk(&self, id: &str, name: &str, model: &str) -> OpenAiStreamChunk {
OpenAiStreamChunk {
id: self.generate_response_id(),
object: CHAT_COMPLETION_CHUNK_OBJECT.to_string(),
created: Utc::now().timestamp(),
model: model.to_string(),
choices: vec![OpenAiStreamChoice {
index: 0,
delta: OpenAiStreamDelta {
content: None,
tool_calls: Some(vec![OpenAiStreamToolCall {
index: 0,
id: Some(id.to_string()),
call_type: Some(FUNCTION_TOOL_TYPE.to_string()),
function: Some(OpenAiStreamFunctionCall {
name: Some(name.to_string()),
arguments: Some(String::new()),
}),
}]),
},
finish_reason: None,
}],
}
}
fn handle_content_stop(
&self,
current_tool_call: &Option<StreamingToolCall>,
) -> Option<OpenAiStreamChunk> {
if let Some(tool_call) = current_tool_call {
self.debug(&format!("[STREAM] Tool call block stopped: {}", tool_call.name));
}
None
}
fn handle_message_start(&self) -> Option<OpenAiStreamChunk> {
self.debug("[STREAM] Message start");
None
}
fn handle_message_delta(
&self,
delta: &MessageDelta,
stop_reason_from_delta: &mut Option<String>,
) -> Option<OpenAiStreamChunk> {
if let Some(stop_reason) = &delta.stop_reason {
*stop_reason_from_delta = Some(stop_reason.clone());
self.debug(&format!("[STREAM] Message delta - stop_reason: {:?}", stop_reason));
}
None
}
fn handle_message_stop(
&self,
stop_reason: &Option<String>,
model: &str,
current_tool_call: &mut Option<StreamingToolCall>,
has_tool_calls: &bool,
stop_reason_from_delta: &mut Option<String>,
) -> Option<OpenAiStreamChunk> {
let effective_stop_reason = stop_reason_from_delta.as_deref().or(stop_reason.as_deref());
let finish_reason = if *has_tool_calls || current_tool_call.is_some() {
"tool_calls"
} else {
match effective_stop_reason {
Some("tool_use") => "tool_calls",
Some("end_turn") => "stop",
Some("max_tokens") => "length",
_ => "stop",
}
};
self.debug(&format!(
"[STREAM] Message stop - reason: {:?}, hasToolCalls: {}, finish_reason: {}",
effective_stop_reason, has_tool_calls, finish_reason
));
*stop_reason_from_delta = None;
if let Some(tool_call) = current_tool_call.take() {
self.debug(&format!(
"[STREAM] Completed tool call: {}({})",
tool_call.name, tool_call.arguments
));
}
Some(OpenAiStreamChunk {
id: self.generate_response_id(),
object: CHAT_COMPLETION_CHUNK_OBJECT.to_string(),
created: Utc::now().timestamp(),
model: model.to_string(),
choices: vec![OpenAiStreamChoice {
index: 0,
delta: OpenAiStreamDelta { content: None, tool_calls: None },
finish_reason: Some(finish_reason.to_string()),
}],
})
}
pub(crate) fn debug(&self, msg: &str) {
if self.log_level.is_trace_enabled() {
tracing::debug!("[TRACE] {}", msg);
}
}
}