1pub(crate) mod data;
8
9use crate::attachments::validate_request_attachments;
10use crate::provider::LlmProvider;
11use crate::streaming::{StreamBox, StreamDelta, StreamErrorKind};
12use agent_sdk_foundation::llm::{
13 CacheTtl, ChatOutcome, ChatRequest, ChatResponse, ContentBlock, ThinkingConfig, ThinkingMode,
14 Usage,
15};
16use anyhow::Result;
17use async_trait::async_trait;
18use data::{
19 ApiMessagesRequest, ApiOutputConfig, ApiThinkingConfig, ApiToolChoice, build_api_messages,
20 build_api_tools_with_cache, is_message_stop_event, map_content_blocks, map_stop_reason,
21 parse_sse_event, take_next_sse_event,
22};
23use futures::StreamExt;
24use reqwest::StatusCode;
25
26const API_BASE_URL: &str = "https://api.anthropic.com";
27const API_VERSION: &str = "2023-06-01";
28const CLAUDE_CODE_VERSION: &str = "2.1.75";
29const DEFAULT_SAFE_MAX_OUTPUT_TOKENS: u32 = 32_000;
30const MODELS_PAGE_LIMIT: u32 = 1000;
32const MODELS_MAX_PAGES: usize = 100;
36
37pub const MODEL_HAIKU_35: &str = "claude-3-5-haiku-20241022";
38pub const MODEL_SONNET_35: &str = "claude-3-5-sonnet-20241022";
39pub const MODEL_SONNET_4: &str = "claude-sonnet-4-20250514";
40pub const MODEL_OPUS_4: &str = "claude-opus-4-20250514";
41
42pub const MODEL_HAIKU_45: &str = "claude-haiku-4-5-20251001";
43pub const MODEL_SONNET_45: &str = "claude-sonnet-4-5-20250929";
44pub const MODEL_SONNET_46: &str = "claude-sonnet-4-6";
45pub const MODEL_OPUS_46: &str = "claude-opus-4-6";
46pub const MODEL_OPUS_47: &str = "claude-opus-4-7";
47pub const MODEL_OPUS_48: &str = "claude-opus-4-8";
48pub const MODEL_FABLE_5: &str = "claude-fable-5";
49
50const CLAUDE_CODE_TOOLS: &[&str] = &[
57 "Read",
58 "Write",
59 "Edit",
60 "Bash",
61 "Grep",
62 "Glob",
63 "AskUserQuestion",
64 "EnterPlanMode",
65 "ExitPlanMode",
66 "KillShell",
67 "NotebookEdit",
68 "Skill",
69 "Task",
70 "TaskOutput",
71 "TodoWrite",
72 "WebFetch",
73 "WebSearch",
74];
75
76fn to_claude_code_name(name: &str) -> String {
78 let lower = name.to_lowercase();
79 for cc_name in CLAUDE_CODE_TOOLS {
80 if cc_name.to_lowercase() == lower {
81 return (*cc_name).to_string();
82 }
83 }
84 name.to_string()
85}
86
87fn from_claude_code_name(name: &str, original_names: &[String]) -> String {
89 let lower = name.to_lowercase();
90 for original in original_names {
91 if original.to_lowercase() == lower {
92 return original.clone();
93 }
94 }
95 name.to_string()
96}
97
98fn oauth_tool_name_collision(
106 tools: Option<&[agent_sdk_foundation::llm::Tool]>,
107) -> Option<(String, String)> {
108 let tools = tools?;
109 for (index, tool) in tools.iter().enumerate() {
110 for other in &tools[index + 1..] {
111 if tool.name != other.name && tool.name.eq_ignore_ascii_case(&other.name) {
112 return Some((tool.name.clone(), other.name.clone()));
113 }
114 }
115 }
116 None
117}
118
119fn oauth_tool_collision_message(first: &str, second: &str) -> String {
120 format!(
121 "OAuth tool names collide case-insensitively: '{first}' and '{second}' would map to the same Claude Code tool name; rename one to disambiguate"
122 )
123}
124
125#[must_use]
127pub fn is_oauth_token(api_key: &str) -> bool {
128 api_key.starts_with("sk-ant-oat")
129}
130
131struct AnthropicModelsPage {
134 models: Vec<crate::provider::ModelInfo>,
135 has_more: bool,
136 last_id: Option<String>,
137}
138
139fn parse_models_page(body: &str) -> Result<AnthropicModelsPage> {
146 #[derive(serde::Deserialize)]
147 struct ListResponse {
148 #[serde(default)]
149 data: Vec<ModelRow>,
150 #[serde(default)]
151 has_more: bool,
152 #[serde(default)]
153 last_id: Option<String>,
154 }
155 #[derive(serde::Deserialize)]
156 struct ModelRow {
157 id: String,
158 #[serde(default)]
159 display_name: Option<String>,
160 }
161 let parsed: ListResponse = serde_json::from_str(body)
162 .map_err(|e| anyhow::anyhow!("failed to parse Anthropic models list: {e}"))?;
163 let models = parsed
164 .data
165 .into_iter()
166 .map(|row| crate::provider::ModelInfo {
167 id: row.id,
168 display_name: row.display_name,
169 context_window: None,
170 max_output_tokens: None,
171 })
172 .collect();
173 Ok(AnthropicModelsPage {
174 models,
175 has_more: parsed.has_more,
176 last_id: parsed.last_id,
177 })
178}
179
180struct CacheRegions {
184 tools: Option<data::ApiCacheControl>,
185 system: Option<data::ApiCacheControl>,
186 messages: Option<data::ApiCacheControl>,
187}
188
189impl CacheRegions {
190 const DISABLED: Self = Self {
192 tools: None,
193 system: None,
194 messages: None,
195 };
196}
197
198#[derive(Clone, Debug)]
200enum AuthMode {
201 ApiKey,
203 OAuth,
205}
206
207#[derive(Clone)]
209pub struct AnthropicProvider {
210 client: reqwest::Client,
211 api_key: String,
212 model: String,
213 base_url: String,
214 auth_mode: AuthMode,
215 thinking: Option<ThinkingConfig>,
216 extra_headers: Vec<(String, String)>,
218}
219
220impl AnthropicProvider {
221 pub const API_KEY_ENV: &'static str = "ANTHROPIC_API_KEY";
223
224 #[must_use]
229 pub fn new(api_key: impl Into<String>, model: impl Into<String>) -> Self {
230 let api_key = api_key.into();
231 let model = model.into();
232 let auth_mode = if is_oauth_token(&api_key) {
233 AuthMode::OAuth
234 } else {
235 AuthMode::ApiKey
236 };
237
238 let client = reqwest::Client::builder()
243 .connect_timeout(std::time::Duration::from_secs(30))
244 .tcp_keepalive(std::time::Duration::from_secs(30))
245 .build()
246 .unwrap_or_default();
247
248 Self {
249 client,
250 api_key,
251 model,
252 base_url: API_BASE_URL.to_owned(),
253 auth_mode,
254 thinking: None,
255 extra_headers: Vec::new(),
256 }
257 }
258
259 #[must_use]
261 pub const fn is_oauth(&self) -> bool {
262 matches!(self.auth_mode, AuthMode::OAuth)
263 }
264
265 fn apply_auth(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
271 let builder = if self.api_key.is_empty() {
272 builder.header("anthropic-version", API_VERSION)
273 } else {
274 match self.auth_mode {
275 AuthMode::ApiKey => builder
276 .header("x-api-key", &self.api_key)
277 .header("anthropic-version", API_VERSION),
278 AuthMode::OAuth => {
279 let mut beta_features = vec![
283 "claude-code-20250219",
284 "oauth-2025-04-20",
285 "fine-grained-tool-streaming-2025-05-14",
286 ];
287 if !self.requires_adaptive_thinking() {
288 beta_features.push("interleaved-thinking-2025-05-14");
289 }
290 builder
291 .header("Authorization", format!("Bearer {}", self.api_key))
292 .header("anthropic-version", API_VERSION)
293 .header("anthropic-beta", beta_features.join(","))
294 .header("user-agent", format!("claude-cli/{CLAUDE_CODE_VERSION}"))
295 .header("x-app", "cli")
296 }
297 }
298 };
299 self.extra_headers
300 .iter()
301 .fold(builder, |b, (k, v)| b.header(k.as_str(), v.as_str()))
302 }
303
304 const OAUTH_IDENTITY: &'static str =
305 "You are Claude Code, Anthropic's official CLI for Claude.";
306
307 fn build_system_prompt_for_request<'a>(
313 &self,
314 system: &'a str,
315 cache_control: Option<data::ApiCacheControl>,
316 ) -> Option<data::ApiSystemPrompt<'a>> {
317 match self.auth_mode {
318 AuthMode::ApiKey => data::build_api_system_prompt(system, cache_control),
319 AuthMode::OAuth => {
320 let mut blocks = vec![data::ApiSystemBlock {
321 block_type: "text",
322 text: Self::OAUTH_IDENTITY,
323 cache_control: cache_control.clone(),
324 }];
325 if !system.is_empty() {
326 blocks.push(data::ApiSystemBlock {
327 block_type: "text",
328 text: system,
329 cache_control,
330 });
331 }
332 Some(data::ApiSystemPrompt::Blocks(blocks))
333 }
334 }
335 }
336
337 fn cache_regions(request: &ChatRequest) -> CacheRegions {
346 let (enabled, ttl, max_breakpoints) =
347 request.cache.as_ref().map_or((true, None, None), |cfg| {
348 (cfg.enabled, cfg.ttl, cfg.max_breakpoints)
349 });
350 if !enabled {
351 return CacheRegions::DISABLED;
352 }
353 let control = data::ApiCacheControl::ephemeral_with_ttl(ttl.map(CacheTtl::as_wire_str));
354 let limit = max_breakpoints.unwrap_or(u8::MAX);
355 CacheRegions {
356 tools: (limit >= 1).then(|| control.clone()),
357 system: (limit >= 2).then(|| control.clone()),
358 messages: (limit >= 3).then_some(control),
359 }
360 }
361
362 fn build_cached_api_messages(
363 request: &ChatRequest,
364 cache_control: Option<data::ApiCacheControl>,
365 ) -> Vec<data::ApiMessage> {
366 let mut messages = build_api_messages(request);
367 if let Some(cache_control) = cache_control {
368 data::apply_cache_control_to_last_user_message(&mut messages, cache_control);
369 }
370 messages
371 }
372
373 fn effective_max_tokens(&self, request: &ChatRequest) -> u32 {
374 if request.max_tokens_explicit {
375 request.max_tokens
376 } else {
377 self.default_max_tokens()
378 }
379 }
380
381 #[must_use]
394 pub fn from_env() -> Self {
395 Self::try_from_env().unwrap_or_else(|e| panic!("{e}"))
396 }
397
398 pub fn try_from_env() -> Result<Self> {
406 let api_key = std::env::var(Self::API_KEY_ENV).map_err(|_| {
407 anyhow::anyhow!("environment variable `{}` is not set", Self::API_KEY_ENV)
408 })?;
409 Ok(Self::sonnet(api_key))
410 }
411
412 #[must_use]
414 pub fn haiku(api_key: impl Into<String>) -> Self {
415 Self::new(api_key, MODEL_HAIKU_45)
416 }
417
418 #[must_use]
420 pub fn sonnet(api_key: impl Into<String>) -> Self {
421 Self::new(api_key, MODEL_SONNET_46)
422 }
423
424 #[must_use]
426 pub fn sonnet_45(api_key: impl Into<String>) -> Self {
427 Self::new(api_key, MODEL_SONNET_45)
428 }
429
430 #[must_use]
432 pub fn sonnet_46(api_key: impl Into<String>) -> Self {
433 Self::new(api_key, MODEL_SONNET_46)
434 }
435
436 #[must_use]
438 pub fn opus(api_key: impl Into<String>) -> Self {
439 Self::new(api_key, MODEL_OPUS_46)
440 }
441
442 #[must_use]
449 pub fn opus_47(api_key: impl Into<String>) -> Self {
450 Self::new(api_key, MODEL_OPUS_47)
451 }
452
453 #[must_use]
460 pub fn opus_48(api_key: impl Into<String>) -> Self {
461 Self::new(api_key, MODEL_OPUS_48)
462 }
463
464 #[must_use]
473 pub fn fable(api_key: impl Into<String>) -> Self {
474 Self::new(api_key, MODEL_FABLE_5)
475 }
476
477 #[must_use]
479 pub const fn with_thinking(mut self, thinking: ThinkingConfig) -> Self {
480 self.thinking = Some(thinking);
481 self
482 }
483
484 #[must_use]
486 pub fn with_base_url(mut self, base_url: impl Into<String>) -> Self {
487 self.base_url = base_url.into();
488 self
489 }
490
491 #[must_use]
493 pub fn with_extra_headers(mut self, headers: Vec<(String, String)>) -> Self {
494 self.extra_headers = headers;
495 self
496 }
497
498 fn requires_adaptive_thinking(&self) -> bool {
499 matches!(
500 self.model.as_str(),
501 MODEL_SONNET_46 | MODEL_OPUS_46 | MODEL_OPUS_47 | MODEL_OPUS_48 | MODEL_FABLE_5
502 )
503 }
504}
505
506#[async_trait]
507#[allow(clippy::too_many_lines)]
508impl LlmProvider for AnthropicProvider {
509 async fn chat(&self, request: ChatRequest) -> Result<ChatOutcome> {
510 let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
511 Ok(thinking) => thinking,
512 Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
513 };
514 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
515 return Ok(ChatOutcome::InvalidRequest(error.to_string()));
516 }
517 if self.is_oauth()
518 && let Some((first, second)) = oauth_tool_name_collision(request.tools.as_deref())
519 {
520 return Ok(ChatOutcome::InvalidRequest(oauth_tool_collision_message(
521 &first, &second,
522 )));
523 }
524 let CacheRegions {
525 tools: tools_cache,
526 system: system_cache,
527 messages: messages_cache,
528 } = Self::cache_regions(&request);
529 let messages = Self::build_cached_api_messages(&request, messages_cache);
530 let tools = if self.is_oauth() {
531 build_api_tools_with_cache(&request, tools_cache).map(|tools| {
532 tools
533 .into_iter()
534 .map(|mut t| {
535 t.name = to_claude_code_name(&t.name);
536 t
537 })
538 .collect::<Vec<_>>()
539 })
540 } else {
541 build_api_tools_with_cache(&request, tools_cache)
542 };
543 let thinking = thinking_config
544 .as_ref()
545 .map(ApiThinkingConfig::from_thinking_config);
546 let output_config = thinking_config
547 .as_ref()
548 .and_then(|t| t.effort)
549 .map(|effort| ApiOutputConfig { effort });
550
551 let system = self.build_system_prompt_for_request(&request.system, system_cache);
552 let max_tokens = self.effective_max_tokens(&request);
553 let tool_choice = request
554 .tool_choice
555 .as_ref()
556 .map(ApiToolChoice::from_tool_choice);
557
558 let api_request = ApiMessagesRequest {
559 model: Some(&self.model),
560 max_tokens,
561 system,
562 messages: &messages,
563 tools: tools.as_deref(),
564 tool_choice,
565 stream: false,
566 thinking,
567 output_config,
568 anthropic_version: None,
569 };
570
571 log::debug!(
572 "Anthropic LLM request model={} max_tokens={} oauth={}",
573 self.model,
574 max_tokens,
575 self.is_oauth()
576 );
577
578 if log::log_enabled!(log::Level::Debug) {
580 match serde_json::to_string_pretty(&api_request) {
581 Ok(json) => log::debug!("Anthropic API request payload:\n{json}"),
582 Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
583 }
584 }
585
586 let builder = self
587 .client
588 .post(format!("{}/v1/messages", self.base_url))
589 .header("Content-Type", "application/json");
590 let response = self
591 .apply_auth(builder)
592 .json(&api_request)
593 .send()
594 .await
595 .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
596
597 let status = response.status();
598 let retry_after = if status == StatusCode::TOO_MANY_REQUESTS {
601 crate::http::retry_after_from_headers(response.headers())
602 } else {
603 None
604 };
605 let bytes = response
606 .bytes()
607 .await
608 .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
609
610 log::debug!(
611 "Anthropic LLM response status={} body_len={}",
612 status,
613 bytes.len()
614 );
615
616 if status == StatusCode::TOO_MANY_REQUESTS {
617 return Ok(ChatOutcome::RateLimited(retry_after));
618 }
619
620 if status.is_server_error() {
621 let body = String::from_utf8_lossy(&bytes);
622 log::error!("Anthropic server error status={status} body={body}");
623 return Ok(ChatOutcome::ServerError(body.into_owned()));
624 }
625
626 if status.is_client_error() {
627 let body = String::from_utf8_lossy(&bytes);
628 log::warn!("Anthropic client error status={status} body={body}");
629 return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
630 }
631
632 let api_response: data::ApiResponse = serde_json::from_slice(&bytes)
633 .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
634
635 log::debug!(
637 "Anthropic API response: id={} model={} stop_reason={:?} usage={{input_tokens={}, output_tokens={}}} content_blocks={}",
638 api_response.id,
639 api_response.model,
640 api_response.stop_reason,
641 api_response.usage.total_input_tokens(),
642 api_response.usage.output,
643 api_response.content.len()
644 );
645
646 let mut content = map_content_blocks(api_response.content);
647
648 if self.is_oauth() {
650 let original_names: Vec<String> = request
651 .tools
652 .as_ref()
653 .map(|ts| ts.iter().map(|t| t.name.clone()).collect())
654 .unwrap_or_default();
655 for block in &mut content {
656 if let ContentBlock::ToolUse { name, .. } = block {
657 *name = from_claude_code_name(name, &original_names);
658 }
659 }
660 }
661
662 let stop_reason = api_response.stop_reason.as_ref().map(map_stop_reason);
663
664 Ok(ChatOutcome::Success(ChatResponse {
665 id: api_response.id,
666 content,
667 model: api_response.model,
668 stop_reason,
669 usage: Usage {
670 input_tokens: api_response.usage.total_input_tokens(),
671 output_tokens: api_response.usage.output,
672 cached_input_tokens: api_response.usage.cached_input_tokens(),
673 cache_creation_input_tokens: api_response.usage.cache_creation_input_tokens(),
674 },
675 }))
676 }
677
678 fn chat_stream(&self, request: ChatRequest) -> StreamBox<'_> {
679 Box::pin(async_stream::stream! {
680 let is_oauth = self.is_oauth();
681 let original_tool_names: Vec<String> = request
682 .tools
683 .as_ref()
684 .map(|ts| ts.iter().map(|t| t.name.clone()).collect())
685 .unwrap_or_default();
686
687 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
688 yield Ok(StreamDelta::Error {
689 message: error.to_string(),
690 kind: StreamErrorKind::InvalidRequest,
691 });
692 return;
693 }
694
695 if is_oauth
696 && let Some((first, second)) = oauth_tool_name_collision(request.tools.as_deref())
697 {
698 yield Ok(StreamDelta::Error {
699 message: oauth_tool_collision_message(&first, &second),
700 kind: StreamErrorKind::InvalidRequest,
701 });
702 return;
703 }
704
705 let CacheRegions {
706 tools: tools_cache,
707 system: system_cache,
708 messages: messages_cache,
709 } = Self::cache_regions(&request);
710 let messages = Self::build_cached_api_messages(&request, messages_cache);
711 let tools = if is_oauth {
712 build_api_tools_with_cache(&request, tools_cache).map(|tools| {
713 tools
714 .into_iter()
715 .map(|mut t| {
716 t.name = to_claude_code_name(&t.name);
717 t
718 })
719 .collect::<Vec<_>>()
720 })
721 } else {
722 build_api_tools_with_cache(&request, tools_cache)
723 };
724 let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
725 Ok(thinking) => thinking,
726 Err(error) => {
727 yield Ok(StreamDelta::Error {
728 message: error.to_string(),
729 kind: StreamErrorKind::InvalidRequest,
730 });
731 return;
732 }
733 };
734 let thinking = thinking_config
735 .as_ref()
736 .map(ApiThinkingConfig::from_thinking_config);
737 let output_config = thinking_config
738 .as_ref()
739 .and_then(|t| t.effort)
740 .map(|effort| ApiOutputConfig { effort });
741
742 let system = self.build_system_prompt_for_request(&request.system, system_cache);
743 let max_tokens = self.effective_max_tokens(&request);
744 let tool_choice = request
745 .tool_choice
746 .as_ref()
747 .map(ApiToolChoice::from_tool_choice);
748
749 let api_request = ApiMessagesRequest {
750 model: Some(&self.model),
751 max_tokens,
752 system,
753 messages: &messages,
754 tools: tools.as_deref(),
755 tool_choice,
756 stream: true,
757 thinking,
758 output_config,
759 anthropic_version: None,
760 };
761
762 log::debug!("Anthropic streaming LLM request model={} max_tokens={} oauth={}", self.model, max_tokens, is_oauth);
763
764 if log::log_enabled!(log::Level::Debug) {
766 match serde_json::to_string_pretty(&api_request) {
767 Ok(json) => log::debug!("Anthropic streaming API request payload:\n{json}"),
768 Err(e) => log::debug!("Failed to serialize streaming request for logging: {e}"),
769 }
770 }
771
772 let builder = self
773 .client
774 .post(format!("{}/v1/messages", self.base_url))
775 .header("Content-Type", "application/json");
776 let response = match self
777 .apply_auth(builder)
778 .json(&api_request)
779 .send()
780 .await
781 {
782 Ok(r) => r,
783 Err(e) => {
784 yield Err(anyhow::anyhow!("request failed: {e}"));
785 return;
786 }
787 };
788
789 let status = response.status();
790
791 if status == StatusCode::TOO_MANY_REQUESTS {
792 yield Ok(StreamDelta::Error {
793 message: "Rate limited".to_string(),
794 kind: StreamErrorKind::RateLimited,
795 });
796 return;
797 }
798
799 if status.is_server_error() {
800 let body = response.text().await.unwrap_or_default();
801 log::error!("Anthropic server error status={status} body={body}");
802 yield Ok(StreamDelta::Error {
803 message: body,
804 kind: StreamErrorKind::ServerError,
805 });
806 return;
807 }
808
809 if status.is_client_error() {
810 let body = response.text().await.unwrap_or_default();
811 log::warn!("Anthropic client error status={status} body={body}");
812 yield Ok(StreamDelta::Error {
813 message: body,
814 kind: StreamErrorKind::InvalidRequest,
815 });
816 return;
817 }
818
819 let mut stream = response.bytes_stream();
821 let mut buffer = String::new();
822 let mut input_tokens: u32 = 0;
823 let mut output_tokens: u32 = 0;
824 let mut cached_input_tokens: u32 = 0;
825 let mut cache_creation_input_tokens: u32 = 0;
826 let mut tool_ids: std::collections::HashMap<usize, String> =
828 std::collections::HashMap::new();
829
830 let mut received_message_stop = false;
831 let mut pending_stop_reason: Option<agent_sdk_foundation::llm::StopReason> = None;
832 let mut chunk_count: u64 = 0;
833 let mut total_bytes: u64 = 0;
834
835 struct StreamDropGuard {
837 completed: bool,
838 chunk_count: u64,
839 }
840 impl Drop for StreamDropGuard {
841 fn drop(&mut self) {
842 if !self.completed {
843 log::debug!(
847 "SSE stream dropped before completion at chunk_count={} (task was likely cancelled)",
848 self.chunk_count
849 );
850 }
851 }
852 }
853 let mut drop_guard = StreamDropGuard { completed: false, chunk_count: 0 };
854
855 log::debug!("Starting SSE stream processing");
856
857 while let Some(chunk_result) = stream.next().await {
858 let chunk = match chunk_result {
859 Ok(c) => c,
860 Err(e) => {
861 log::error!("Stream error while reading chunk error={e} chunk_count={chunk_count} total_bytes={total_bytes}");
862 yield Err(anyhow::anyhow!("stream error: {e}"));
863 return;
864 }
865 };
866
867 chunk_count += 1;
868 total_bytes += chunk.len() as u64;
869 drop_guard.chunk_count = chunk_count;
870
871 if chunk_count.is_multiple_of(10) {
873 log::debug!("SSE chunk progress: chunk_count={chunk_count} total_bytes={total_bytes}");
874 }
875 buffer.push_str(&String::from_utf8_lossy(&chunk));
876
877 while let Some(event_block) = take_next_sse_event(&mut buffer) {
879 if is_message_stop_event(&event_block) {
881 log::debug!("Received message_stop event chunk_count={chunk_count} total_bytes={total_bytes}");
882 received_message_stop = true;
883 }
884
885 if let Some(mut delta) = parse_sse_event(
887 &event_block,
888 &mut input_tokens,
889 &mut output_tokens,
890 &mut cached_input_tokens,
891 &mut cache_creation_input_tokens,
892 &mut tool_ids,
893 &mut pending_stop_reason,
894 ) {
895 if is_oauth
897 && let StreamDelta::ToolUseStart { ref mut name, .. } = delta
898 {
899 *name = from_claude_code_name(name, &original_tool_names);
900 }
901 yield Ok(delta);
902 }
903 if is_message_stop_event(&event_block) {
905 yield Ok(StreamDelta::Done {
906 stop_reason: pending_stop_reason.take(),
907 });
908 }
909 }
910 }
911
912 log::debug!(
913 "SSE stream ended chunk_count={chunk_count} total_bytes={total_bytes} buffer_remaining={} received_message_stop={received_message_stop}",
914 buffer.len()
915 );
916
917 let remaining = buffer.trim();
919 if !remaining.is_empty() {
920 log::debug!(
921 "Processing remaining buffer content remaining_len={} remaining_preview={}",
922 remaining.len(),
923 remaining.chars().take(100).collect::<String>()
924 );
925
926 if is_message_stop_event(remaining) {
928 received_message_stop = true;
929 }
930
931 if let Some(mut delta) = parse_sse_event(
932 remaining,
933 &mut input_tokens,
934 &mut output_tokens,
935 &mut cached_input_tokens,
936 &mut cache_creation_input_tokens,
937 &mut tool_ids,
938 &mut pending_stop_reason,
939 ) {
940 if is_oauth
941 && let StreamDelta::ToolUseStart { ref mut name, .. } = delta
942 {
943 *name = from_claude_code_name(name, &original_tool_names);
944 }
945 yield Ok(delta);
946 }
947 if is_message_stop_event(remaining) {
949 yield Ok(StreamDelta::Done {
950 stop_reason: pending_stop_reason.take(),
951 });
952 }
953 }
954
955 drop_guard.completed = true;
957
958 if !received_message_stop {
960 log::warn!(
961 "SSE stream ended without message_stop event - stream may have been interrupted chunk_count={chunk_count} total_bytes={total_bytes}"
962 );
963 yield Ok(StreamDelta::Error {
964 message: "Stream ended unexpectedly without completion".to_string(),
965 kind: StreamErrorKind::ServerError,
966 });
967 }
968 })
969 }
970
971 fn validate_thinking_config(&self, thinking: Option<&ThinkingConfig>) -> Result<()> {
972 let Some(thinking) = thinking else {
973 return Ok(());
974 };
975
976 if self
977 .capabilities()
978 .is_some_and(|caps| !caps.supports_thinking)
979 {
980 return Err(anyhow::anyhow!(
981 "thinking is not supported for provider={} model={}",
982 self.provider(),
983 self.model()
984 ));
985 }
986
987 if matches!(thinking.mode, ThinkingMode::Adaptive)
988 && !self
989 .capabilities()
990 .is_some_and(|caps| caps.supports_adaptive_thinking)
991 {
992 return Err(anyhow::anyhow!(
993 "adaptive thinking is not supported for provider={} model={}",
994 self.provider(),
995 self.model()
996 ));
997 }
998
999 if self.requires_adaptive_thinking()
1000 && matches!(thinking.mode, ThinkingMode::Enabled { .. })
1001 {
1002 return Err(anyhow::anyhow!(
1003 "budget_tokens thinking is deprecated for provider={} model={}; use ThinkingConfig::adaptive() instead",
1004 self.provider(),
1005 self.model()
1006 ));
1007 }
1008
1009 Ok(())
1010 }
1011
1012 async fn list_models(&self) -> Result<Vec<crate::provider::ModelInfo>> {
1013 let mut models = Vec::new();
1017 let mut after_id: Option<String> = None;
1018 for _ in 0..MODELS_MAX_PAGES {
1019 let mut query: Vec<(&str, String)> = vec![("limit", MODELS_PAGE_LIMIT.to_string())];
1020 if let Some(after) = &after_id {
1021 query.push(("after_id", after.clone()));
1022 }
1023 let builder = self
1024 .client
1025 .get(format!("{}/v1/models", self.base_url))
1026 .header("Content-Type", "application/json")
1027 .query(&query);
1028 let builder = self.apply_auth(builder);
1029 let body =
1030 crate::impls::model_listing::fetch_model_list_body(builder, "Anthropic").await?;
1031 let page = parse_models_page(&body)?;
1032 models.extend(page.models);
1033 if !page.has_more {
1034 return Ok(models);
1035 }
1036 match page.last_id {
1037 Some(last) => after_id = Some(last),
1038 None => return Ok(models),
1040 }
1041 }
1042 Ok(models)
1043 }
1044
1045 fn model(&self) -> &str {
1046 &self.model
1047 }
1048
1049 fn provider(&self) -> &'static str {
1050 "anthropic"
1051 }
1052
1053 fn configured_thinking(&self) -> Option<&ThinkingConfig> {
1054 self.thinking.as_ref()
1055 }
1056
1057 fn default_max_tokens(&self) -> u32 {
1058 let model_max = self
1059 .capabilities()
1060 .and_then(|caps| caps.max_output_tokens)
1061 .or_else(|| {
1062 crate::model_capabilities::default_max_output_tokens(self.provider(), self.model())
1063 })
1064 .unwrap_or(4096);
1065 model_max.clamp(4096, DEFAULT_SAFE_MAX_OUTPUT_TOKENS)
1066 }
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071 use super::*;
1072
1073 const ANTHROPIC_MODELS_FIXTURE: &str = r#"{
1074 "data": [
1075 {"type": "model", "id": "claude-opus-4-8", "display_name": "Claude Opus 4.8"},
1076 {"type": "model", "id": "claude-sonnet-4-5", "display_name": "Claude Sonnet 4.5"}
1077 ],
1078 "has_more": false
1079 }"#;
1080
1081 #[test]
1082 fn parse_models_page_reads_id_and_display_name() -> anyhow::Result<()> {
1083 let page = parse_models_page(ANTHROPIC_MODELS_FIXTURE)?;
1084 assert_eq!(page.models.len(), 2);
1085 assert_eq!(page.models[0].id, "claude-opus-4-8");
1086 assert_eq!(
1087 page.models[0].display_name.as_deref(),
1088 Some("Claude Opus 4.8")
1089 );
1090 assert_eq!(page.models[0].context_window, None);
1092 assert_eq!(page.models[0].max_output_tokens, None);
1093 assert!(!page.has_more);
1095 assert_eq!(page.last_id, None);
1096 Ok(())
1097 }
1098
1099 #[tokio::test]
1100 async fn list_models_follows_pagination_across_pages() -> anyhow::Result<()> {
1101 use wiremock::matchers::{method, path, query_param, query_param_is_missing};
1102 use wiremock::{Mock, MockServer, ResponseTemplate};
1103
1104 let server = MockServer::start().await;
1105
1106 Mock::given(method("GET"))
1109 .and(path("/v1/models"))
1110 .and(query_param_is_missing("after_id"))
1111 .respond_with(ResponseTemplate::new(200).set_body_string(
1112 r#"{
1113 "data": [
1114 {"type": "model", "id": "claude-opus-4-8", "display_name": "Opus"},
1115 {"type": "model", "id": "claude-sonnet-4-5", "display_name": "Sonnet"}
1116 ],
1117 "has_more": true,
1118 "last_id": "claude-sonnet-4-5"
1119 }"#,
1120 ))
1121 .mount(&server)
1122 .await;
1123
1124 Mock::given(method("GET"))
1126 .and(path("/v1/models"))
1127 .and(query_param("after_id", "claude-sonnet-4-5"))
1128 .respond_with(ResponseTemplate::new(200).set_body_string(
1129 r#"{
1130 "data": [
1131 {"type": "model", "id": "claude-haiku-4-5", "display_name": "Haiku"}
1132 ],
1133 "has_more": false,
1134 "last_id": "claude-haiku-4-5"
1135 }"#,
1136 ))
1137 .mount(&server)
1138 .await;
1139
1140 let provider = AnthropicProvider::new("test-key-not-a-secret", "claude-test")
1141 .with_base_url(server.uri());
1142 let models = provider.list_models().await?;
1143
1144 let ids: Vec<&str> = models.iter().map(|m| m.id.as_str()).collect();
1146 assert_eq!(
1147 ids,
1148 vec!["claude-opus-4-8", "claude-sonnet-4-5", "claude-haiku-4-5"]
1149 );
1150 Ok(())
1151 }
1152
1153 #[test]
1158 fn test_new_creates_provider_with_custom_model() {
1159 let provider = AnthropicProvider::new("test-api-key", "custom-model");
1160
1161 assert_eq!(provider.model(), "custom-model");
1162 assert_eq!(provider.provider(), "anthropic");
1163 }
1164
1165 #[test]
1166 fn test_haiku_factory_creates_haiku_provider() {
1167 let provider = AnthropicProvider::haiku("test-api-key".to_string());
1168
1169 assert_eq!(provider.model(), MODEL_HAIKU_45);
1170 assert_eq!(provider.provider(), "anthropic");
1171 }
1172
1173 #[test]
1174 fn test_only_anthropic_46_models_accept_adaptive_thinking() {
1175 let sonnet_46 = AnthropicProvider::sonnet_46("test-api-key".to_string());
1176 assert!(
1177 sonnet_46
1178 .validate_thinking_config(Some(&ThinkingConfig::adaptive()))
1179 .is_ok()
1180 );
1181
1182 let sonnet_45 = AnthropicProvider::sonnet_45("test-api-key".to_string());
1183 let error = sonnet_45
1184 .validate_thinking_config(Some(&ThinkingConfig::adaptive()))
1185 .unwrap_err();
1186 assert!(
1187 error
1188 .to_string()
1189 .contains("adaptive thinking is not supported")
1190 );
1191 }
1192
1193 #[test]
1194 fn test_anthropic_46_models_reject_budgeted_thinking() {
1195 let sonnet_46 = AnthropicProvider::sonnet_46("test-api-key".to_string());
1196 let error = sonnet_46
1197 .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1198 .unwrap_err();
1199 assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1200 }
1201
1202 #[test]
1203 fn test_opus_47_rejects_budgeted_thinking() {
1204 let opus_47 = AnthropicProvider::opus_47("test-api-key".to_string());
1209 let error = opus_47
1210 .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1211 .unwrap_err();
1212 assert!(
1213 error.to_string().contains("ThinkingConfig::adaptive()"),
1214 "expected migration hint, got: {error}"
1215 );
1216 }
1217
1218 #[test]
1219 fn test_opus_47_accepts_adaptive_thinking() {
1220 let opus_47 = AnthropicProvider::opus_47("test-api-key".to_string());
1221 assert!(
1222 opus_47
1223 .validate_thinking_config(Some(&ThinkingConfig::adaptive()))
1224 .is_ok()
1225 );
1226 assert!(
1227 opus_47
1228 .validate_thinking_config(Some(&ThinkingConfig::adaptive_with_effort(
1229 agent_sdk_foundation::llm::Effort::High
1230 )))
1231 .is_ok()
1232 );
1233 }
1234
1235 #[test]
1236 fn test_opus_47_factory_creates_opus_47_provider() {
1237 let provider = AnthropicProvider::opus_47("test-api-key".to_string());
1238 assert_eq!(provider.model(), MODEL_OPUS_47);
1239 assert_eq!(provider.provider(), "anthropic");
1240 }
1241
1242 #[test]
1243 fn test_opus_48_rejects_budgeted_thinking() {
1244 let opus_48 = AnthropicProvider::opus_48("test-api-key".to_string());
1249 let error = opus_48
1250 .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1251 .unwrap_err();
1252 assert!(
1253 error.to_string().contains("ThinkingConfig::adaptive()"),
1254 "expected migration hint, got: {error}"
1255 );
1256 }
1257
1258 #[test]
1259 fn test_opus_48_accepts_adaptive_thinking() {
1260 let opus_48 = AnthropicProvider::opus_48("test-api-key".to_string());
1261 assert!(
1262 opus_48
1263 .validate_thinking_config(Some(&ThinkingConfig::adaptive()))
1264 .is_ok()
1265 );
1266 assert!(
1267 opus_48
1268 .validate_thinking_config(Some(&ThinkingConfig::adaptive_with_effort(
1269 agent_sdk_foundation::llm::Effort::High
1270 )))
1271 .is_ok()
1272 );
1273 }
1274
1275 #[test]
1276 fn test_opus_48_factory_creates_opus_48_provider() {
1277 let provider = AnthropicProvider::opus_48("test-api-key".to_string());
1278 assert_eq!(provider.model(), MODEL_OPUS_48);
1279 assert_eq!(provider.provider(), "anthropic");
1280 }
1281
1282 #[test]
1283 fn test_fable_5_rejects_budgeted_thinking() {
1284 let fable = AnthropicProvider::fable("test-api-key".to_string());
1288 let error = fable
1289 .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1290 .unwrap_err();
1291 assert!(
1292 error.to_string().contains("ThinkingConfig::adaptive()"),
1293 "expected migration hint, got: {error}"
1294 );
1295 }
1296
1297 #[test]
1298 fn test_fable_5_accepts_adaptive_thinking() {
1299 let fable = AnthropicProvider::fable("test-api-key".to_string());
1300 assert!(
1301 fable
1302 .validate_thinking_config(Some(&ThinkingConfig::adaptive()))
1303 .is_ok()
1304 );
1305 assert!(
1306 fable
1307 .validate_thinking_config(Some(&ThinkingConfig::adaptive_with_effort(
1308 agent_sdk_foundation::llm::Effort::High
1309 )))
1310 .is_ok()
1311 );
1312 }
1313
1314 #[test]
1315 fn test_fable_factory_creates_fable_5_provider() {
1316 let provider = AnthropicProvider::fable("test-api-key".to_string());
1317 assert_eq!(provider.model(), MODEL_FABLE_5);
1318 assert_eq!(provider.provider(), "anthropic");
1319 }
1320
1321 #[test]
1322 fn test_sonnet_factory_creates_sonnet_provider() {
1323 let provider = AnthropicProvider::sonnet("test-api-key".to_string());
1324
1325 assert_eq!(provider.model(), MODEL_SONNET_46);
1326 assert_eq!(provider.provider(), "anthropic");
1327 }
1328
1329 #[test]
1330 fn test_sonnet_45_factory_creates_sonnet_provider() {
1331 let provider = AnthropicProvider::sonnet_45("test-api-key".to_string());
1332
1333 assert_eq!(provider.model(), MODEL_SONNET_45);
1334 assert_eq!(provider.provider(), "anthropic");
1335 }
1336
1337 #[test]
1338 fn test_sonnet_46_factory_creates_sonnet_provider() {
1339 let provider = AnthropicProvider::sonnet_46("test-api-key".to_string());
1340
1341 assert_eq!(provider.model(), MODEL_SONNET_46);
1342 assert_eq!(provider.provider(), "anthropic");
1343 }
1344
1345 #[test]
1346 fn test_opus_factory_creates_opus_provider() {
1347 let provider = AnthropicProvider::opus("test-api-key".to_string());
1348
1349 assert_eq!(provider.model(), MODEL_OPUS_46);
1350 assert_eq!(provider.provider(), "anthropic");
1351 }
1352
1353 #[test]
1358 fn test_model_constants_have_expected_values() {
1359 assert!(MODEL_HAIKU_35.contains("haiku"));
1360 assert!(MODEL_SONNET_35.contains("sonnet"));
1361 assert!(MODEL_SONNET_4.contains("sonnet"));
1362 assert!(MODEL_SONNET_46.contains("sonnet"));
1363 assert!(MODEL_OPUS_4.contains("opus"));
1364 }
1365
1366 #[test]
1371 fn test_provider_is_cloneable() {
1372 let provider = AnthropicProvider::new("test-api-key", "test-model");
1373 let cloned = provider.clone();
1374
1375 assert_eq!(provider.model(), cloned.model());
1376 assert_eq!(provider.provider(), cloned.provider());
1377 }
1378
1379 fn tool(name: &str) -> agent_sdk_foundation::llm::Tool {
1384 agent_sdk_foundation::llm::Tool {
1385 name: name.to_string(),
1386 description: "desc".to_string(),
1387 input_schema: serde_json::json!({ "type": "object" }),
1388 display_name: name.to_string(),
1389 tier: agent_sdk_foundation::ToolTier::Observe,
1390 }
1391 }
1392
1393 fn request_with_tools(tools: Vec<agent_sdk_foundation::llm::Tool>) -> ChatRequest {
1394 ChatRequest {
1395 system: String::new(),
1396 messages: vec![agent_sdk_foundation::llm::Message::user("hi")],
1397 tools: Some(tools),
1398 max_tokens: 1024,
1399 max_tokens_explicit: true,
1400 session_id: None,
1401 cached_content: None,
1402 thinking: None,
1403 tool_choice: None,
1404 response_format: None,
1405 cache: None,
1406 }
1407 }
1408
1409 #[test]
1410 fn test_oauth_tool_name_collision_detects_case_variants() {
1411 let tools = vec![tool("task"), tool("Task")];
1412 let collision = oauth_tool_name_collision(Some(&tools));
1413 assert!(collision.is_some());
1414 }
1415
1416 #[test]
1417 fn test_oauth_tool_name_collision_allows_distinct_names() {
1418 let tools = vec![tool("read"), tool("write"), tool("Read_File")];
1419 assert!(oauth_tool_name_collision(Some(&tools)).is_none());
1420 assert!(oauth_tool_name_collision(None).is_none());
1421 }
1422
1423 #[tokio::test]
1424 async fn test_oauth_chat_rejects_case_colliding_tools() -> anyhow::Result<()> {
1425 let provider = AnthropicProvider::new("sk-ant-oat-test", MODEL_SONNET_45);
1427 assert!(provider.is_oauth());
1428 let request = request_with_tools(vec![tool("task"), tool("Task")]);
1429 let outcome = provider.chat(request).await?;
1430 match outcome {
1431 ChatOutcome::InvalidRequest(msg) => {
1432 assert!(msg.contains("collide case-insensitively"), "got: {msg}");
1433 }
1434 other => panic!("expected InvalidRequest, got {other:?}"),
1435 }
1436 Ok(())
1437 }
1438
1439 #[tokio::test]
1440 async fn test_api_key_chat_does_not_apply_oauth_collision_gate() -> anyhow::Result<()> {
1441 let provider = AnthropicProvider::new("sk-ant-api-test", MODEL_SONNET_45);
1446 assert!(!provider.is_oauth());
1447 let tools = vec![tool("task"), tool("Task")];
1448 assert!(oauth_tool_name_collision(Some(&tools)).is_some());
1450 Ok(())
1451 }
1452}