use super::Provider;
use crate::error::AppError;
use crate::telemetry::provider_metrics::{MetricsExtractor, ProviderMetrics};
use async_trait::async_trait;
use axum::http::HeaderMap;
use serde_json::{Value, json};
use tracing::{debug, error};
use std::cell::RefCell;
use axum::{
body::{Body, to_bytes},
http::{HeaderValue, Response},
};
use chrono;
thread_local! {
static ANTHROPIC_INPUT_TOKENS: RefCell<Option<u32>> = RefCell::new(None);
}
pub struct AnthropicProvider {
base_url: String,
}
impl AnthropicProvider {
pub fn new() -> Self {
Self {
base_url: "https://api.anthropic.com".to_string(),
}
}
}
#[async_trait]
impl Provider for AnthropicProvider {
fn base_url(&self) -> String {
self.base_url.clone()
}
fn name(&self) -> &str {
"anthropic"
}
fn transform_path(&self, path: &str) -> String {
if path.contains("/chat/completions") {
"/v1/messages".to_string()
} else {
path.to_string()
}
}
fn process_headers(&self, original_headers: &HeaderMap) -> Result<HeaderMap, AppError> {
debug!("Processing Anthropic request headers");
let mut headers = HeaderMap::new();
headers.insert(
http::header::CONTENT_TYPE,
http::header::HeaderValue::from_static("application/json"),
);
headers.insert(
http::header::HeaderName::from_static("anthropic-version"),
http::header::HeaderValue::from_static("2023-06-01"),
);
if let Some(auth) = original_headers
.get("authorization")
.and_then(|h| h.to_str().ok())
{
debug!("Converting Bearer token to x-api-key format");
let api_key = auth.trim_start_matches("Bearer ");
headers.insert(
http::header::HeaderName::from_static("x-api-key"),
http::header::HeaderValue::from_str(api_key).map_err(|_| {
error!("Failed to process Anthropic authorization header");
AppError::InvalidHeader
})?,
);
} else {
error!("No authorization header found for Anthropic request");
return Err(AppError::MissingApiKey);
}
Ok(headers)
}
async fn process_response(&self, response: Response<Body>) -> Result<Response<Body>, AppError> {
let (mut parts, body) = response.into_parts();
let is_streaming = parts.headers.get(http::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.map_or(false, |ct| ct.contains("text/event-stream"));
if is_streaming {
let request_id = parts.headers.get("request-id")
.and_then(|v| v.to_str().ok())
.map(|id| id.to_string());
if let Some(id) = request_id {
debug!("Adding Anthropic request ID to streaming response headers: {}", id);
if let Ok(header_value) = HeaderValue::from_str(&id) {
parts.headers.insert("x-request-id", header_value);
}
} else {
debug!("No request-id header found for Anthropic streaming response");
}
debug!("Returning streaming response without transformation");
return Ok(Response::from_parts(parts, body));
}
let bytes = to_bytes(body, usize::MAX).await?;
let request_id = parts.headers.get("request-id")
.and_then(|v| v.to_str().ok())
.map(|id| id.to_string());
if let Some(id) = request_id.clone() {
debug!("Adding Anthropic request ID from headers to response: {}", id);
if let Ok(header_value) = HeaderValue::from_str(&id) {
parts.headers.insert("x-request-id", header_value);
}
}
if let Ok(json) = serde_json::from_slice::<Value>(&bytes) {
debug!("Successfully parsed response body as JSON: {:?}", json);
let body_request_id = if request_id.is_none() {
if let Some(id) = json.get("id").and_then(|v| v.as_str()) {
Some(id.to_string())
} else if let Some(message) = json.get("message") {
message.get("id").and_then(|v| v.as_str()).map(|id| id.to_string())
} else {
None
}
} else {
None
};
if let Some(id) = body_request_id {
debug!("Adding Anthropic request ID from body to response headers: {}", id);
if let Ok(header_value) = HeaderValue::from_str(&id) {
parts.headers.insert("x-request-id", header_value);
}
}
let json_id = json.get("id").and_then(|v| v.as_str()).map(String::from);
let transformed_response = transform_anthropic_to_openai_format(json);
debug!("Transformed Anthropic response to OpenAI format");
if !parts.headers.contains_key("x-request-id") {
if let Some(id) = json_id {
debug!("Setting x-request-id header from Anthropic response ID: {}", id);
if let Ok(header_value) = HeaderValue::from_str(&id) {
parts.headers.insert("x-request-id", header_value);
}
}
}
return Ok(Response::from_parts(parts, Body::from(serde_json::to_vec(&transformed_response)?)));
} else {
debug!("Failed to parse response body as JSON, returning original response");
}
Ok(Response::from_parts(parts, Body::from(bytes)))
}
}
pub struct AnthropicMetricsExtractor;
impl MetricsExtractor for AnthropicMetricsExtractor {
fn extract_metrics(&self, response_body: &Value) -> ProviderMetrics {
debug!("Extracting Anthropic metrics from response: {}", response_body);
let mut metrics = ProviderMetrics::default();
if let Some(usage) = response_body.get("usage") {
metrics.input_tokens = usage.get("input_tokens").and_then(|v| v.as_u64()).map(|v| v as u32);
metrics.output_tokens = usage.get("output_tokens").and_then(|v| v.as_u64()).map(|v| v as u32);
if let (Some(input), Some(output)) = (metrics.input_tokens, metrics.output_tokens) {
metrics.total_tokens = Some(input + output);
}
else if metrics.input_tokens.is_some() {
metrics.total_tokens = metrics.input_tokens;
}
else if metrics.output_tokens.is_some() {
metrics.total_tokens = metrics.output_tokens;
}
}
if let Some(model) = response_body.get("model").and_then(|v| v.as_str()) {
metrics.model = model.to_string();
if let Some(total_tokens) = metrics.total_tokens {
metrics.cost = Some(calculate_anthropic_cost(&metrics.model, total_tokens));
}
}
if let Some(id) = response_body.get("id").and_then(|v| v.as_str()) {
debug!("Found Anthropic message ID: {}", id);
metrics.request_id = Some(id.to_string());
} else if let Some(message) = response_body.get("message") {
if let Some(id) = message.get("id").and_then(|v| v.as_str()) {
debug!("Found Anthropic message ID in message object: {}", id);
metrics.request_id = Some(id.to_string());
}
}
debug!("Final extracted Anthropic metrics: {:?}", metrics);
metrics
}
fn try_extract_provider_specific_streaming_metrics(&self, chunk: &str) -> Option<ProviderMetrics> {
debug!("Attempting to extract metrics from Anthropic streaming chunk: {}", chunk);
if let Ok(json) = serde_json::from_str::<Value>(chunk) {
let event_type = json.get("type").and_then(|t| t.as_str())?;
let mut metrics = ProviderMetrics::default();
metrics.model = "claude".to_string();
if let Some(model) = json.get("message").and_then(|m| m.get("model")).and_then(|v| v.as_str()) {
metrics.model = model.to_string();
}
if event_type == "message_start" {
if let Some(message) = json.get("message") {
if let Some(usage) = message.get("usage") {
if let Some(input_tokens) = usage.get("input_tokens").and_then(|v| v.as_u64()).map(|v| v as u32) {
ANTHROPIC_INPUT_TOKENS.with(|tokens| {
*tokens.borrow_mut() = Some(input_tokens);
debug!("Stored input tokens from message_start: {}", input_tokens);
});
metrics.input_tokens = Some(input_tokens);
metrics.total_tokens = Some(input_tokens);
if let Some(id) = message.get("id").and_then(|v| v.as_str()) {
metrics.request_id = Some(id.to_string());
}
return Some(metrics);
}
}
}
}
else if event_type == "message_delta" {
if let Some(usage) = json.get("usage") {
if let Some(output_tokens) = usage.get("output_tokens").and_then(|v| v.as_u64()).map(|v| v as u32) {
let input_tokens = ANTHROPIC_INPUT_TOKENS.with(|tokens| *tokens.borrow());
metrics.output_tokens = Some(output_tokens);
metrics.input_tokens = input_tokens;
if let Some(input) = input_tokens {
metrics.total_tokens = Some(input + output_tokens);
metrics.cost = Some(calculate_anthropic_cost(&metrics.model, input + output_tokens));
debug!("Combined stored input tokens ({}) with output tokens ({}) in message_delta",
input, output_tokens);
} else {
metrics.total_tokens = Some(output_tokens);
metrics.cost = Some(calculate_anthropic_cost(&metrics.model, output_tokens));
debug!("No stored input tokens, using only output tokens ({}) in message_delta", output_tokens);
}
return Some(metrics);
}
}
}
}
None
}
}
fn calculate_anthropic_cost(model: &str, total_tokens: u32) -> f64 {
let tokens = total_tokens as f64;
match model {
m if m.contains("claude-3.5-sonnet") => tokens * 0.000003,
m if m.contains("claude-3-opus") => tokens * 0.000015, m if m.contains("claude-3-sonnet") => tokens * 0.000003, m if m.contains("claude-3-haiku") => tokens * 0.000000125,
m if m.contains("claude-2") => tokens * 0.000008,
m if m.contains("claude-instant") => tokens * 0.000001,
m if m.contains("claude-3") => tokens * 0.000003, m if m.contains("claude") => tokens * 0.000002,
_ => {
debug!("Unknown Anthropic model for cost calculation: {}", model);
tokens * 0.000002 },
}
}
fn transform_anthropic_to_openai_format(anthropic_response: Value) -> Value {
let content = if let Some(content_array) = anthropic_response.get("content").and_then(|c| c.as_array()) {
let mut text = String::new();
for item in content_array {
if let Some(item_text) = item.get("text").and_then(|t| t.as_str()) {
text.push_str(item_text);
}
}
text
} else {
anthropic_response.get("content").and_then(|c| c.as_str()).unwrap_or("").to_string()
};
let usage = {
let mut usage_map = json!({
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
});
if let Some(anthropic_usage) = anthropic_response.get("usage") {
if let Some(input_tokens) = anthropic_usage.get("input_tokens").and_then(|t| t.as_u64()) {
usage_map["prompt_tokens"] = json!(input_tokens);
}
if let Some(output_tokens) = anthropic_usage.get("output_tokens").and_then(|t| t.as_u64()) {
usage_map["completion_tokens"] = json!(output_tokens);
}
let prompt_tokens = usage_map["prompt_tokens"].as_u64().unwrap_or(0);
let completion_tokens = usage_map["completion_tokens"].as_u64().unwrap_or(0);
usage_map["total_tokens"] = json!(prompt_tokens + completion_tokens);
}
usage_map
};
let finish_reason = match anthropic_response.get("stop_reason").and_then(|r| r.as_str()) {
Some("end_turn") => "stop",
Some("max_tokens") => "length",
Some("stop_sequence") => "stop",
Some(reason) => reason,
None => "stop" };
let mut transformed = json!({
"id": anthropic_response.get("id").unwrap_or(&Value::Null),
"object": "chat.completion",
"created": chrono::Utc::now().timestamp(),
"model": anthropic_response.get("model").unwrap_or(&Value::Null),
"type": anthropic_response.get("type").unwrap_or(&json!("message")),
"role": anthropic_response.get("role").unwrap_or(&json!("assistant")),
"choices": [{
"index": 0,
"message": {
"role": anthropic_response.get("role").unwrap_or(&json!("assistant")),
"content": content
},
"finish_reason": finish_reason
}],
"usage": usage,
"system_fingerprint": format!("anthropic-{}", anthropic_response.get("model").and_then(|m| m.as_str()).unwrap_or("claude"))
});
if let Some(seed) = anthropic_response.get("seed") {
if let Some(choices) = transformed.get_mut("choices").and_then(|c| c.as_array_mut()) {
for choice in choices {
if seed.is_number() {
choice["seed"] = json!(seed.to_string());
} else {
choice["seed"] = seed.clone();
}
}
}
}
transformed
}