use async_trait::async_trait;
use std::collections::HashMap;
use std::time::Instant;
use crate::error::LlmError;
use crate::params::{AnthropicParameterMapper, ParameterMapper};
use crate::stream::{ChatStream, ChatStreamEvent};
use crate::tracing::ProviderTracer;
use crate::traits::ChatCapability;
use crate::types::*;
use super::types::*;
use super::utils::*;
#[derive(Clone)]
pub struct AnthropicChatCapability {
pub api_key: String,
pub base_url: String,
pub http_client: reqwest::Client,
pub http_config: HttpConfig,
pub parameter_mapper: AnthropicParameterMapper,
anthropic_params: AnthropicSpecificParams,
pub common_params: CommonParams,
}
impl AnthropicChatCapability {
pub fn new(
api_key: String,
base_url: String,
http_client: reqwest::Client,
http_config: HttpConfig,
anthropic_params: AnthropicSpecificParams,
common_params: CommonParams,
) -> Self {
Self {
api_key,
base_url,
http_client,
http_config,
parameter_mapper: AnthropicParameterMapper,
anthropic_params,
common_params,
}
}
pub fn build_chat_request_body(
&self,
request: &ChatRequest,
anthropic_params: Option<&super::types::AnthropicSpecificParams>,
) -> Result<serde_json::Value, LlmError> {
let mut body = self
.parameter_mapper
.map_common_params(&request.common_params);
if let Some(ref provider_params) = request.provider_params {
body = self
.parameter_mapper
.merge_provider_params(body, provider_params);
}
if let Some(params) = anthropic_params {
if let Some(ref thinking_config) = params.thinking_config {
body["thinking"] = thinking_config.to_request_params();
}
if let Some(ref metadata) = params.metadata {
body["metadata"] = metadata.clone();
}
}
self.parameter_mapper.validate_params(&body)?;
let (messages, system) = convert_messages(&request.messages)?;
body["messages"] = serde_json::to_value(messages)?;
if let Some(system_content) = system {
body["system"] = serde_json::Value::String(system_content);
}
if let Some(ref tools) = request.tools {
let anthropic_tools = convert_tools_to_anthropic_format(tools)?;
body["tools"] = serde_json::Value::Array(anthropic_tools);
}
if request.stream {
body["stream"] = serde_json::Value::Bool(true);
}
Ok(body)
}
pub fn parse_chat_response(
&self,
response: AnthropicChatResponse,
) -> Result<ChatResponse, LlmError> {
let (content, tool_calls) = parse_response_content_and_tools(&response.content);
let finish_reason = parse_finish_reason(response.stop_reason.as_deref());
let usage = create_usage_from_response(response.usage);
let _metadata = ResponseMetadata {
id: Some(response.id.clone()),
model: Some(response.model.clone()),
created: Some(chrono::Utc::now()), provider: "anthropic".to_string(),
request_id: None,
};
let mut provider_data = HashMap::new();
if let Some(thinking_content) = extract_thinking_content(&response.content) {
provider_data.insert(
"thinking".to_string(),
serde_json::Value::String(thinking_content),
);
}
if let Some(stop_seq) = response.stop_sequence {
provider_data.insert(
"stop_sequence".to_string(),
serde_json::Value::String(stop_seq),
);
}
Ok(ChatResponse {
id: Some(response.id),
content,
model: Some(response.model),
usage,
finish_reason,
tool_calls,
thinking: extract_thinking_content(&response.content),
metadata: provider_data,
})
}
}
#[async_trait]
impl ChatCapability for AnthropicChatCapability {
async fn chat_with_tools(
&self,
messages: Vec<ChatMessage>,
tools: Option<Vec<Tool>>,
) -> Result<ChatResponse, LlmError> {
let start_time = Instant::now();
let request = ChatRequest {
messages,
tools,
common_params: self.common_params.clone(),
provider_params: None,
http_config: None,
web_search: None,
stream: false,
};
let model = request.common_params.model.clone();
let tracer = ProviderTracer::new("anthropic").with_model(model);
let headers = build_headers(&self.api_key, &self.http_config.headers)?;
let body = self.build_chat_request_body(&request, Some(&self.anthropic_params))?;
let url = crate::utils::url::join_url(&self.base_url, "v1/messages");
tracer.trace_request_start("POST", &url);
tracer.trace_request_details(&headers, &body);
let response = self
.http_client
.post(&url)
.headers(headers)
.json(&body)
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let error_text = response.text().await.unwrap_or_default();
tracer.trace_request_error(status.as_u16(), &error_text, start_time);
if let Ok(error_json) = serde_json::from_str::<serde_json::Value>(&error_text)
&& let Some(error_obj) = error_json.get("error")
{
let error_type = error_obj
.get("type")
.and_then(|t| t.as_str())
.unwrap_or("unknown");
let error_message = error_obj
.get("message")
.and_then(|m| m.as_str())
.unwrap_or("Unknown error");
return Err(map_anthropic_error(
status.as_u16(),
error_type,
error_message,
error_json.clone(),
));
}
return Err(LlmError::ApiError {
code: status.as_u16(),
message: format!("Anthropic API error: {error_text}"),
details: serde_json::from_str(&error_text).ok(),
});
}
tracer.trace_response_success(response.status().as_u16(), start_time, response.headers());
let response_text = response.text().await?;
tracer.trace_response_body(&response_text);
let anthropic_response: AnthropicChatResponse = serde_json::from_str(&response_text)?;
let chat_response = self.parse_chat_response(anthropic_response)?;
tracer.trace_request_complete(start_time, chat_response.content.all_text().len());
Ok(chat_response)
}
async fn chat_stream(
&self,
messages: Vec<ChatMessage>,
tools: Option<Vec<Tool>>,
) -> Result<ChatStream, LlmError> {
let request = ChatRequest {
messages,
tools,
common_params: self.common_params.clone(),
provider_params: None,
http_config: None,
web_search: None,
stream: true,
};
let anthropic_params = crate::params::AnthropicParams::default();
let streaming = super::streaming::AnthropicStreaming::new(
anthropic_params,
self.http_client.clone(),
self.api_key.clone(),
self.base_url.clone(),
self.http_config.clone(),
);
streaming.create_chat_stream(request).await
}
}
impl AnthropicChatCapability {
pub fn parse_sse_event(chunk: &str) -> Option<Result<ChatStreamEvent, LlmError>> {
for line in chunk.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with(':') {
continue;
}
if let Some(data) = line.strip_prefix("data: ") {
if data == "[DONE]" {
return None;
}
match serde_json::from_str::<AnthropicStreamEvent>(data) {
Ok(event) => {
return Self::handle_stream_event(event);
}
Err(e) => {
return Some(Err(LlmError::ParseError(format!(
"Failed to parse stream event: {e}"
))));
}
}
}
}
None
}
fn handle_stream_event(
event: AnthropicStreamEvent,
) -> Option<Result<ChatStreamEvent, LlmError>> {
match event.r#type.as_str() {
"message_start" => {
None
}
"content_block_start" => {
None
}
"content_block_delta" => {
match serde_json::from_value::<ContentBlockDeltaEvent>(event.data) {
Ok(delta_event) => {
match delta_event.delta {
AnthropicDelta::TextDelta { text } => {
Some(Ok(ChatStreamEvent::ContentDelta {
delta: text,
index: Some(delta_event.index as usize),
}))
}
AnthropicDelta::ThinkingDelta { thinking } => {
Some(Ok(ChatStreamEvent::ThinkingDelta { delta: thinking }))
}
AnthropicDelta::InputJsonDelta { partial_json } => {
Some(Ok(ChatStreamEvent::ContentDelta {
delta: partial_json,
index: Some(delta_event.index as usize),
}))
}
AnthropicDelta::SignatureDelta { signature } => {
Some(Ok(ChatStreamEvent::ThinkingDelta { delta: signature }))
}
}
}
Err(e) => Some(Err(LlmError::ParseError(format!(
"Failed to parse content block delta: {e}"
)))),
}
}
"content_block_stop" => {
None
}
"message_delta" => {
None
}
"message_stop" => {
None
}
"ping" => {
None
}
"error" => {
let error_msg = event
.data
.get("error")
.and_then(|e| e.get("message"))
.and_then(|m| m.as_str())
.unwrap_or("Unknown streaming error");
Some(Err(LlmError::StreamError(error_msg.to_string())))
}
_ => {
None
}
}
}
}