pub(crate) mod data;
use crate::llm::attachments::validate_request_attachments;
use crate::llm::{
ChatOutcome, ChatRequest, ChatResponse, ContentBlock, LlmProvider, StreamBox, StreamDelta,
ThinkingConfig, ThinkingMode, Usage,
};
use anyhow::Result;
use async_trait::async_trait;
use data::{
ApiMessagesRequest, ApiOutputConfig, ApiThinkingConfig, build_api_messages, build_api_tools,
is_message_stop_event, map_content_blocks, map_stop_reason, parse_sse_event,
take_next_sse_event,
};
use futures::StreamExt;
use reqwest::StatusCode;
const API_BASE_URL: &str = "https://api.anthropic.com";
const API_VERSION: &str = "2023-06-01";
const CLAUDE_CODE_VERSION: &str = "2.1.75";
const DEFAULT_SAFE_MAX_OUTPUT_TOKENS: u32 = 32_000;
pub const MODEL_HAIKU_35: &str = "claude-3-5-haiku-20241022";
pub const MODEL_SONNET_35: &str = "claude-3-5-sonnet-20241022";
pub const MODEL_SONNET_4: &str = "claude-sonnet-4-20250514";
pub const MODEL_OPUS_4: &str = "claude-opus-4-20250514";
pub const MODEL_HAIKU_45: &str = "claude-haiku-4-5-20251001";
pub const MODEL_SONNET_45: &str = "claude-sonnet-4-5-20250929";
pub const MODEL_SONNET_46: &str = "claude-sonnet-4-6";
pub const MODEL_OPUS_46: &str = "claude-opus-4-6";
const CLAUDE_CODE_TOOLS: &[&str] = &[
"Read",
"Write",
"Edit",
"Bash",
"Grep",
"Glob",
"AskUserQuestion",
"EnterPlanMode",
"ExitPlanMode",
"KillShell",
"NotebookEdit",
"Skill",
"Task",
"TaskOutput",
"TodoWrite",
"WebFetch",
"WebSearch",
];
fn to_claude_code_name(name: &str) -> String {
let lower = name.to_lowercase();
for cc_name in CLAUDE_CODE_TOOLS {
if cc_name.to_lowercase() == lower {
return (*cc_name).to_string();
}
}
name.to_string()
}
fn from_claude_code_name(name: &str, original_names: &[String]) -> String {
let lower = name.to_lowercase();
for original in original_names {
if original.to_lowercase() == lower {
return original.clone();
}
}
name.to_string()
}
#[must_use]
pub fn is_oauth_token(api_key: &str) -> bool {
api_key.starts_with("sk-ant-oat")
}
#[derive(Clone, Debug)]
enum AuthMode {
ApiKey,
OAuth,
}
#[derive(Clone)]
pub struct AnthropicProvider {
client: reqwest::Client,
api_key: String,
model: String,
base_url: String,
auth_mode: AuthMode,
#[allow(dead_code)]
original_tool_names: Vec<String>,
thinking: Option<ThinkingConfig>,
extra_headers: Vec<(String, String)>,
}
impl AnthropicProvider {
#[must_use]
pub fn new(api_key: String, model: String) -> Self {
let auth_mode = if is_oauth_token(&api_key) {
AuthMode::OAuth
} else {
AuthMode::ApiKey
};
let client = reqwest::Client::builder()
.connect_timeout(std::time::Duration::from_secs(30))
.tcp_keepalive(std::time::Duration::from_secs(30))
.build()
.unwrap_or_default();
Self {
client,
api_key,
model,
base_url: API_BASE_URL.to_owned(),
auth_mode,
original_tool_names: Vec::new(),
thinking: None,
extra_headers: Vec::new(),
}
}
#[must_use]
pub const fn is_oauth(&self) -> bool {
matches!(self.auth_mode, AuthMode::OAuth)
}
fn apply_auth(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
let builder = if self.api_key.is_empty() {
builder.header("anthropic-version", API_VERSION)
} else {
match self.auth_mode {
AuthMode::ApiKey => builder
.header("x-api-key", &self.api_key)
.header("anthropic-version", API_VERSION),
AuthMode::OAuth => {
let mut beta_features = vec![
"claude-code-20250219",
"oauth-2025-04-20",
"fine-grained-tool-streaming-2025-05-14",
];
if !self.requires_adaptive_thinking() {
beta_features.push("interleaved-thinking-2025-05-14");
}
builder
.header("Authorization", format!("Bearer {}", self.api_key))
.header("anthropic-version", API_VERSION)
.header("anthropic-beta", beta_features.join(","))
.header("user-agent", format!("claude-cli/{CLAUDE_CODE_VERSION}"))
.header("x-app", "cli")
}
}
};
self.extra_headers
.iter()
.fold(builder, |b, (k, v)| b.header(k.as_str(), v.as_str()))
}
const OAUTH_IDENTITY: &'static str =
"You are Claude Code, Anthropic's official CLI for Claude.";
fn build_system_prompt_for_request<'a>(
&self,
system: &'a str,
) -> Option<data::ApiSystemPrompt<'a>> {
let cc = Self::cache_control();
match self.auth_mode {
AuthMode::ApiKey => data::build_api_system_prompt(system, Some(cc)),
AuthMode::OAuth => {
let mut blocks = vec![data::ApiSystemBlock {
block_type: "text",
text: Self::OAUTH_IDENTITY,
cache_control: Some(cc.clone()),
}];
if !system.is_empty() {
blocks.push(data::ApiSystemBlock {
block_type: "text",
text: system,
cache_control: Some(cc),
});
}
Some(data::ApiSystemPrompt::Blocks(blocks))
}
}
}
const fn cache_control() -> data::ApiCacheControl {
data::ApiCacheControl::ephemeral()
}
fn build_cached_api_messages(request: &ChatRequest) -> Vec<data::ApiMessage> {
let mut messages = build_api_messages(request);
data::apply_cache_control_to_last_user_message(&mut messages, Self::cache_control());
messages
}
fn effective_max_tokens(&self, request: &ChatRequest) -> u32 {
if request.max_tokens_explicit {
request.max_tokens
} else {
self.default_max_tokens()
}
}
#[must_use]
pub fn haiku(api_key: String) -> Self {
Self::new(api_key, MODEL_HAIKU_45.to_owned())
}
#[must_use]
pub fn sonnet(api_key: String) -> Self {
Self::new(api_key, MODEL_SONNET_46.to_owned())
}
#[must_use]
pub fn sonnet_45(api_key: String) -> Self {
Self::new(api_key, MODEL_SONNET_45.to_owned())
}
#[must_use]
pub fn sonnet_46(api_key: String) -> Self {
Self::new(api_key, MODEL_SONNET_46.to_owned())
}
#[must_use]
pub fn opus(api_key: String) -> Self {
Self::new(api_key, MODEL_OPUS_46.to_owned())
}
#[must_use]
pub const fn with_thinking(mut self, thinking: ThinkingConfig) -> Self {
self.thinking = Some(thinking);
self
}
#[must_use]
pub fn with_base_url(mut self, base_url: String) -> Self {
self.base_url = base_url;
self
}
#[must_use]
pub fn with_extra_headers(mut self, headers: Vec<(String, String)>) -> Self {
self.extra_headers = headers;
self
}
fn requires_adaptive_thinking(&self) -> bool {
matches!(self.model.as_str(), MODEL_SONNET_46 | MODEL_OPUS_46)
}
}
#[async_trait]
#[allow(clippy::too_many_lines)]
impl LlmProvider for AnthropicProvider {
async fn chat(&self, request: ChatRequest) -> Result<ChatOutcome> {
let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
Ok(thinking) => thinking,
Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
};
if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
return Ok(ChatOutcome::InvalidRequest(error.to_string()));
}
let messages = Self::build_cached_api_messages(&request);
let tools = if self.is_oauth() {
build_api_tools(&request).map(|tools| {
tools
.into_iter()
.map(|mut t| {
t.name = to_claude_code_name(&t.name);
t
})
.collect::<Vec<_>>()
})
} else {
build_api_tools(&request)
};
let thinking = thinking_config
.as_ref()
.map(ApiThinkingConfig::from_thinking_config);
let output_config = thinking_config
.as_ref()
.and_then(|t| t.effort)
.map(|effort| ApiOutputConfig { effort });
let system = self.build_system_prompt_for_request(&request.system);
let max_tokens = self.effective_max_tokens(&request);
let api_request = ApiMessagesRequest {
model: Some(&self.model),
max_tokens,
system,
messages: &messages,
tools: tools.as_deref(),
stream: false,
thinking,
output_config,
anthropic_version: None,
};
log::debug!(
"Anthropic LLM request model={} max_tokens={} oauth={}",
self.model,
max_tokens,
self.is_oauth()
);
if log::log_enabled!(log::Level::Debug) {
match serde_json::to_string_pretty(&api_request) {
Ok(json) => log::debug!("Anthropic API request payload:\n{json}"),
Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
}
}
let builder = self
.client
.post(format!("{}/v1/messages", self.base_url))
.header("Content-Type", "application/json");
let response = self
.apply_auth(builder)
.json(&api_request)
.send()
.await
.map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
let status = response.status();
let bytes = response
.bytes()
.await
.map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
log::debug!(
"Anthropic LLM response status={} body_len={}",
status,
bytes.len()
);
if status == StatusCode::TOO_MANY_REQUESTS {
return Ok(ChatOutcome::RateLimited);
}
if status.is_server_error() {
let body = String::from_utf8_lossy(&bytes);
log::error!("Anthropic server error status={status} body={body}");
return Ok(ChatOutcome::ServerError(body.into_owned()));
}
if status.is_client_error() {
let body = String::from_utf8_lossy(&bytes);
log::warn!("Anthropic client error status={status} body={body}");
return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
}
let api_response: data::ApiResponse = serde_json::from_slice(&bytes)
.map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
log::debug!(
"Anthropic API response: id={} model={} stop_reason={:?} usage={{input_tokens={}, output_tokens={}}} content_blocks={}",
api_response.id,
api_response.model,
api_response.stop_reason,
api_response.usage.total_input_tokens(),
api_response.usage.output,
api_response.content.len()
);
let mut content = map_content_blocks(api_response.content);
if self.is_oauth() {
let original_names: Vec<String> = request
.tools
.as_ref()
.map(|ts| ts.iter().map(|t| t.name.clone()).collect())
.unwrap_or_default();
for block in &mut content {
if let ContentBlock::ToolUse { name, .. } = block {
*name = from_claude_code_name(name, &original_names);
}
}
}
let stop_reason = api_response.stop_reason.as_ref().map(map_stop_reason);
Ok(ChatOutcome::Success(ChatResponse {
id: api_response.id,
content,
model: api_response.model,
stop_reason,
usage: Usage {
input_tokens: api_response.usage.total_input_tokens(),
output_tokens: api_response.usage.output,
cached_input_tokens: api_response.usage.cached_input_tokens(),
},
}))
}
fn chat_stream(&self, request: ChatRequest) -> StreamBox<'_> {
Box::pin(async_stream::stream! {
let is_oauth = self.is_oauth();
let original_tool_names: Vec<String> = request
.tools
.as_ref()
.map(|ts| ts.iter().map(|t| t.name.clone()).collect())
.unwrap_or_default();
if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
yield Ok(StreamDelta::Error {
message: error.to_string(),
recoverable: false,
});
return;
}
let messages = Self::build_cached_api_messages(&request);
let tools = if is_oauth {
build_api_tools(&request).map(|tools| {
tools
.into_iter()
.map(|mut t| {
t.name = to_claude_code_name(&t.name);
t
})
.collect::<Vec<_>>()
})
} else {
build_api_tools(&request)
};
let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
Ok(thinking) => thinking,
Err(error) => {
yield Ok(StreamDelta::Error {
message: error.to_string(),
recoverable: false,
});
return;
}
};
let thinking = thinking_config
.as_ref()
.map(ApiThinkingConfig::from_thinking_config);
let output_config = thinking_config
.as_ref()
.and_then(|t| t.effort)
.map(|effort| ApiOutputConfig { effort });
let system = self.build_system_prompt_for_request(&request.system);
let max_tokens = self.effective_max_tokens(&request);
let api_request = ApiMessagesRequest {
model: Some(&self.model),
max_tokens,
system,
messages: &messages,
tools: tools.as_deref(),
stream: true,
thinking,
output_config,
anthropic_version: None,
};
log::debug!("Anthropic streaming LLM request model={} max_tokens={} oauth={}", self.model, max_tokens, is_oauth);
if log::log_enabled!(log::Level::Debug) {
match serde_json::to_string_pretty(&api_request) {
Ok(json) => log::debug!("Anthropic streaming API request payload:\n{json}"),
Err(e) => log::debug!("Failed to serialize streaming request for logging: {e}"),
}
}
let builder = self
.client
.post(format!("{}/v1/messages", self.base_url))
.header("Content-Type", "application/json");
let response = match self
.apply_auth(builder)
.json(&api_request)
.send()
.await
{
Ok(r) => r,
Err(e) => {
yield Err(anyhow::anyhow!("request failed: {e}"));
return;
}
};
let status = response.status();
if status == StatusCode::TOO_MANY_REQUESTS {
yield Ok(StreamDelta::Error {
message: "Rate limited".to_string(),
recoverable: true,
});
return;
}
if status.is_server_error() {
let body = response.text().await.unwrap_or_default();
log::error!("Anthropic server error status={status} body={body}");
yield Ok(StreamDelta::Error {
message: body,
recoverable: true,
});
return;
}
if status.is_client_error() {
let body = response.text().await.unwrap_or_default();
log::warn!("Anthropic client error status={status} body={body}");
yield Ok(StreamDelta::Error {
message: body,
recoverable: false,
});
return;
}
let mut stream = response.bytes_stream();
let mut buffer = String::new();
let mut input_tokens: u32 = 0;
let mut output_tokens: u32 = 0;
let mut cached_input_tokens: u32 = 0;
let mut tool_ids: std::collections::HashMap<usize, String> =
std::collections::HashMap::new();
let mut received_message_stop = false;
let mut pending_stop_reason: Option<crate::llm::StopReason> = None;
let mut chunk_count: u64 = 0;
let mut total_bytes: u64 = 0;
struct StreamDropGuard {
completed: bool,
chunk_count: u64,
}
impl Drop for StreamDropGuard {
fn drop(&mut self) {
if !self.completed {
eprintln!(
"[agent-sdk] CRITICAL: SSE stream DROPPED at chunk_count={} - task was cancelled!",
self.chunk_count
);
log::error!(
"SSE stream was DROPPED before completion at chunk_count={} - the consuming task was likely cancelled",
self.chunk_count
);
}
}
}
let mut drop_guard = StreamDropGuard { completed: false, chunk_count: 0 };
log::debug!("Starting SSE stream processing");
while let Some(chunk_result) = stream.next().await {
let chunk = match chunk_result {
Ok(c) => c,
Err(e) => {
log::error!("Stream error while reading chunk error={e} chunk_count={chunk_count} total_bytes={total_bytes}");
yield Err(anyhow::anyhow!("stream error: {e}"));
return;
}
};
chunk_count += 1;
total_bytes += chunk.len() as u64;
drop_guard.chunk_count = chunk_count;
if chunk_count.is_multiple_of(10) {
log::debug!("SSE chunk progress: chunk_count={chunk_count} total_bytes={total_bytes}");
}
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(event_block) = take_next_sse_event(&mut buffer) {
if is_message_stop_event(&event_block) {
log::debug!("Received message_stop event chunk_count={chunk_count} total_bytes={total_bytes}");
received_message_stop = true;
}
if let Some(mut delta) = parse_sse_event(
&event_block,
&mut input_tokens,
&mut output_tokens,
&mut cached_input_tokens,
&mut tool_ids,
&mut pending_stop_reason,
) {
if is_oauth
&& let StreamDelta::ToolUseStart { ref mut name, .. } = delta
{
*name = from_claude_code_name(name, &original_tool_names);
}
yield Ok(delta);
}
if is_message_stop_event(&event_block) {
yield Ok(StreamDelta::Done {
stop_reason: pending_stop_reason.take(),
});
}
}
}
log::debug!(
"SSE stream ended chunk_count={chunk_count} total_bytes={total_bytes} buffer_remaining={} received_message_stop={received_message_stop}",
buffer.len()
);
let remaining = buffer.trim();
if !remaining.is_empty() {
log::debug!(
"Processing remaining buffer content remaining_len={} remaining_preview={}",
remaining.len(),
remaining.chars().take(100).collect::<String>()
);
if is_message_stop_event(remaining) {
received_message_stop = true;
}
if let Some(mut delta) = parse_sse_event(
remaining,
&mut input_tokens,
&mut output_tokens,
&mut cached_input_tokens,
&mut tool_ids,
&mut pending_stop_reason,
) {
if is_oauth
&& let StreamDelta::ToolUseStart { ref mut name, .. } = delta
{
*name = from_claude_code_name(name, &original_tool_names);
}
yield Ok(delta);
}
if is_message_stop_event(remaining) {
yield Ok(StreamDelta::Done {
stop_reason: pending_stop_reason.take(),
});
}
}
drop_guard.completed = true;
if !received_message_stop {
log::warn!(
"SSE stream ended without message_stop event - stream may have been interrupted chunk_count={chunk_count} total_bytes={total_bytes}"
);
yield Ok(StreamDelta::Error {
message: "Stream ended unexpectedly without completion".to_string(),
recoverable: true,
});
}
})
}
fn validate_thinking_config(&self, thinking: Option<&ThinkingConfig>) -> Result<()> {
let Some(thinking) = thinking else {
return Ok(());
};
if self
.capabilities()
.is_some_and(|caps| !caps.supports_thinking)
{
return Err(anyhow::anyhow!(
"thinking is not supported for provider={} model={}",
self.provider(),
self.model()
));
}
if matches!(thinking.mode, ThinkingMode::Adaptive)
&& !self
.capabilities()
.is_some_and(|caps| caps.supports_adaptive_thinking)
{
return Err(anyhow::anyhow!(
"adaptive thinking is not supported for provider={} model={}",
self.provider(),
self.model()
));
}
if self.requires_adaptive_thinking()
&& matches!(thinking.mode, ThinkingMode::Enabled { .. })
{
return Err(anyhow::anyhow!(
"budget_tokens thinking is deprecated for provider={} model={}; use ThinkingConfig::adaptive() instead",
self.provider(),
self.model()
));
}
Ok(())
}
fn model(&self) -> &str {
&self.model
}
fn provider(&self) -> &'static str {
"anthropic"
}
fn configured_thinking(&self) -> Option<&ThinkingConfig> {
self.thinking.as_ref()
}
fn default_max_tokens(&self) -> u32 {
let model_max = self
.capabilities()
.and_then(|caps| caps.max_output_tokens)
.or_else(|| {
crate::model_capabilities::default_max_output_tokens(self.provider(), self.model())
})
.unwrap_or(4096);
model_max.clamp(4096, DEFAULT_SAFE_MAX_OUTPUT_TOKENS)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_creates_provider_with_custom_model() {
let provider =
AnthropicProvider::new("test-api-key".to_string(), "custom-model".to_string());
assert_eq!(provider.model(), "custom-model");
assert_eq!(provider.provider(), "anthropic");
}
#[test]
fn test_haiku_factory_creates_haiku_provider() {
let provider = AnthropicProvider::haiku("test-api-key".to_string());
assert_eq!(provider.model(), MODEL_HAIKU_45);
assert_eq!(provider.provider(), "anthropic");
}
#[test]
fn test_only_anthropic_46_models_accept_adaptive_thinking() {
let sonnet_46 = AnthropicProvider::sonnet_46("test-api-key".to_string());
assert!(
sonnet_46
.validate_thinking_config(Some(&ThinkingConfig::adaptive()))
.is_ok()
);
let sonnet_45 = AnthropicProvider::sonnet_45("test-api-key".to_string());
let error = sonnet_45
.validate_thinking_config(Some(&ThinkingConfig::adaptive()))
.unwrap_err();
assert!(
error
.to_string()
.contains("adaptive thinking is not supported")
);
}
#[test]
fn test_anthropic_46_models_reject_budgeted_thinking() {
let sonnet_46 = AnthropicProvider::sonnet_46("test-api-key".to_string());
let error = sonnet_46
.validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
.unwrap_err();
assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
}
#[test]
fn test_sonnet_factory_creates_sonnet_provider() {
let provider = AnthropicProvider::sonnet("test-api-key".to_string());
assert_eq!(provider.model(), MODEL_SONNET_46);
assert_eq!(provider.provider(), "anthropic");
}
#[test]
fn test_sonnet_45_factory_creates_sonnet_provider() {
let provider = AnthropicProvider::sonnet_45("test-api-key".to_string());
assert_eq!(provider.model(), MODEL_SONNET_45);
assert_eq!(provider.provider(), "anthropic");
}
#[test]
fn test_sonnet_46_factory_creates_sonnet_provider() {
let provider = AnthropicProvider::sonnet_46("test-api-key".to_string());
assert_eq!(provider.model(), MODEL_SONNET_46);
assert_eq!(provider.provider(), "anthropic");
}
#[test]
fn test_opus_factory_creates_opus_provider() {
let provider = AnthropicProvider::opus("test-api-key".to_string());
assert_eq!(provider.model(), MODEL_OPUS_46);
assert_eq!(provider.provider(), "anthropic");
}
#[test]
fn test_model_constants_have_expected_values() {
assert!(MODEL_HAIKU_35.contains("haiku"));
assert!(MODEL_SONNET_35.contains("sonnet"));
assert!(MODEL_SONNET_4.contains("sonnet"));
assert!(MODEL_SONNET_46.contains("sonnet"));
assert!(MODEL_OPUS_4.contains("opus"));
}
#[test]
fn test_provider_is_cloneable() {
let provider = AnthropicProvider::new("test-api-key".to_string(), "test-model".to_string());
let cloned = provider.clone();
assert_eq!(provider.model(), cloned.model());
assert_eq!(provider.provider(), cloned.provider());
}
}