Skip to main content

agent_sdk_providers/impls/
anthropic.rs

1//! Anthropic API provider implementation.
2//!
3//! This module provides an implementation of `LlmProvider` for the Anthropic
4//! Messages API using reqwest for HTTP calls. Supports both streaming and
5//! non-streaming responses.
6
7pub(crate) mod data;
8
9use crate::attachments::validate_request_attachments;
10use crate::provider::LlmProvider;
11use crate::streaming::{StreamBox, StreamDelta, StreamErrorKind};
12use agent_sdk_foundation::llm::{
13    ChatOutcome, ChatRequest, ChatResponse, ContentBlock, ThinkingConfig, ThinkingMode, Usage,
14};
15use anyhow::Result;
16use async_trait::async_trait;
17use data::{
18    ApiMessagesRequest, ApiOutputConfig, ApiThinkingConfig, ApiToolChoice, build_api_messages,
19    build_api_tools, is_message_stop_event, map_content_blocks, map_stop_reason, parse_sse_event,
20    take_next_sse_event,
21};
22use futures::StreamExt;
23use reqwest::StatusCode;
24
25const API_BASE_URL: &str = "https://api.anthropic.com";
26const API_VERSION: &str = "2023-06-01";
27const CLAUDE_CODE_VERSION: &str = "2.1.75";
28const DEFAULT_SAFE_MAX_OUTPUT_TOKENS: u32 = 32_000;
29
30pub const MODEL_HAIKU_35: &str = "claude-3-5-haiku-20241022";
31pub const MODEL_SONNET_35: &str = "claude-3-5-sonnet-20241022";
32pub const MODEL_SONNET_4: &str = "claude-sonnet-4-20250514";
33pub const MODEL_OPUS_4: &str = "claude-opus-4-20250514";
34
35pub const MODEL_HAIKU_45: &str = "claude-haiku-4-5-20251001";
36pub const MODEL_SONNET_45: &str = "claude-sonnet-4-5-20250929";
37pub const MODEL_SONNET_46: &str = "claude-sonnet-4-6";
38pub const MODEL_OPUS_46: &str = "claude-opus-4-6";
39pub const MODEL_OPUS_47: &str = "claude-opus-4-7";
40pub const MODEL_OPUS_48: &str = "claude-opus-4-8";
41pub const MODEL_FABLE_5: &str = "claude-fable-5";
42
43/// Claude Code tool name mappings for OAuth mode.
44///
45/// When using OAuth tokens, tool names must match Claude Code's exact casing.
46/// The mapper passes unknown names through unchanged, so extra entries here are
47/// harmless — they future-proof against new tools being registered later.
48/// Source: <https://cchistory.mariozechner.at/data/prompts-2.1.11.md>
49const CLAUDE_CODE_TOOLS: &[&str] = &[
50    "Read",
51    "Write",
52    "Edit",
53    "Bash",
54    "Grep",
55    "Glob",
56    "AskUserQuestion",
57    "EnterPlanMode",
58    "ExitPlanMode",
59    "KillShell",
60    "NotebookEdit",
61    "Skill",
62    "Task",
63    "TaskOutput",
64    "TodoWrite",
65    "WebFetch",
66    "WebSearch",
67];
68
69/// Maps a tool name to Claude Code's canonical casing (case-insensitive match).
70fn to_claude_code_name(name: &str) -> String {
71    let lower = name.to_lowercase();
72    for cc_name in CLAUDE_CODE_TOOLS {
73        if cc_name.to_lowercase() == lower {
74            return (*cc_name).to_string();
75        }
76    }
77    name.to_string()
78}
79
80/// Maps a Claude Code tool name back to the original tool name.
81fn from_claude_code_name(name: &str, original_names: &[String]) -> String {
82    let lower = name.to_lowercase();
83    for original in original_names {
84        if original.to_lowercase() == lower {
85            return original.clone();
86        }
87    }
88    name.to_string()
89}
90
91/// Returns true if the API key is an OAuth token (`sk-ant-oat-*`).
92#[must_use]
93pub fn is_oauth_token(api_key: &str) -> bool {
94    api_key.starts_with("sk-ant-oat")
95}
96
97/// Authentication mode for the Anthropic provider.
98#[derive(Clone, Debug)]
99enum AuthMode {
100    /// Standard API key authentication (x-api-key header).
101    ApiKey,
102    /// OAuth token authentication (Bearer header + Claude Code identity).
103    OAuth,
104}
105
106/// Anthropic LLM provider using the Messages API.
107#[derive(Clone)]
108pub struct AnthropicProvider {
109    client: reqwest::Client,
110    api_key: String,
111    model: String,
112    base_url: String,
113    auth_mode: AuthMode,
114    /// Original tool names for reverse mapping in OAuth mode (reserved for future use).
115    #[allow(dead_code)]
116    original_tool_names: Vec<String>,
117    thinking: Option<ThinkingConfig>,
118    /// Extra headers applied to every request (e.g. for gateway authentication).
119    extra_headers: Vec<(String, String)>,
120}
121
122impl AnthropicProvider {
123    /// The conventional environment variable holding the Anthropic API key.
124    pub const API_KEY_ENV: &'static str = "ANTHROPIC_API_KEY";
125
126    /// Create a new Anthropic provider with the specified API key and model.
127    ///
128    /// Automatically detects OAuth tokens (`sk-ant-oat-*`) and switches to
129    /// Bearer auth with Claude Code identity headers.
130    #[must_use]
131    pub fn new(api_key: impl Into<String>, model: impl Into<String>) -> Self {
132        let api_key = api_key.into();
133        let model = model.into();
134        let auth_mode = if is_oauth_token(&api_key) {
135            AuthMode::OAuth
136        } else {
137            AuthMode::ApiKey
138        };
139
140        // Configure client with appropriate timeouts for streaming
141        // - No overall timeout (streaming can take a long time)
142        // - 30 second connect timeout
143        // - TCP keepalive to prevent connection drops
144        let client = reqwest::Client::builder()
145            .connect_timeout(std::time::Duration::from_secs(30))
146            .tcp_keepalive(std::time::Duration::from_secs(30))
147            .build()
148            .unwrap_or_default();
149
150        Self {
151            client,
152            api_key,
153            model,
154            base_url: API_BASE_URL.to_owned(),
155            auth_mode,
156            original_tool_names: Vec::new(),
157            thinking: None,
158            extra_headers: Vec::new(),
159        }
160    }
161
162    /// Returns whether this provider is using OAuth authentication.
163    #[must_use]
164    pub const fn is_oauth(&self) -> bool {
165        matches!(self.auth_mode, AuthMode::OAuth)
166    }
167
168    /// Applies authentication headers to a request builder.
169    ///
170    /// When `api_key` is empty the provider-specific credential header is
171    /// skipped — useful for BYOK gateways where auth is handled externally
172    /// via `extra_headers`.
173    fn apply_auth(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
174        let builder = if self.api_key.is_empty() {
175            builder.header("anthropic-version", API_VERSION)
176        } else {
177            match self.auth_mode {
178                AuthMode::ApiKey => builder
179                    .header("x-api-key", &self.api_key)
180                    .header("anthropic-version", API_VERSION),
181                AuthMode::OAuth => {
182                    // Build beta features list matching Claude Code's behaviour.
183                    // Adaptive-thinking models (4.6) have interleaved thinking
184                    // built-in; for older reasoning models we need the explicit beta.
185                    let mut beta_features = vec![
186                        "claude-code-20250219",
187                        "oauth-2025-04-20",
188                        "fine-grained-tool-streaming-2025-05-14",
189                    ];
190                    if !self.requires_adaptive_thinking() {
191                        beta_features.push("interleaved-thinking-2025-05-14");
192                    }
193                    builder
194                        .header("Authorization", format!("Bearer {}", self.api_key))
195                        .header("anthropic-version", API_VERSION)
196                        .header("anthropic-beta", beta_features.join(","))
197                        .header("user-agent", format!("claude-cli/{CLAUDE_CODE_VERSION}"))
198                        .header("x-app", "cli")
199                }
200            }
201        };
202        self.extra_headers
203            .iter()
204            .fold(builder, |b, (k, v)| b.header(k.as_str(), v.as_str()))
205    }
206
207    const OAUTH_IDENTITY: &'static str =
208        "You are Claude Code, Anthropic's official CLI for Claude.";
209
210    /// Build the system prompt payload, accounting for OAuth mode.
211    ///
212    /// In OAuth mode the identity string must be a **separate** system block
213    /// (matching the layout Claude Code itself sends).  For API-key auth the
214    /// user-supplied system prompt is sent as a single block.
215    fn build_system_prompt_for_request<'a>(
216        &self,
217        system: &'a str,
218    ) -> Option<data::ApiSystemPrompt<'a>> {
219        let cc = Self::cache_control();
220
221        match self.auth_mode {
222            AuthMode::ApiKey => data::build_api_system_prompt(system, Some(cc)),
223            AuthMode::OAuth => {
224                let mut blocks = vec![data::ApiSystemBlock {
225                    block_type: "text",
226                    text: Self::OAUTH_IDENTITY,
227                    cache_control: Some(cc.clone()),
228                }];
229                if !system.is_empty() {
230                    blocks.push(data::ApiSystemBlock {
231                        block_type: "text",
232                        text: system,
233                        cache_control: Some(cc),
234                    });
235                }
236                Some(data::ApiSystemPrompt::Blocks(blocks))
237            }
238        }
239    }
240
241    const fn cache_control() -> data::ApiCacheControl {
242        data::ApiCacheControl::ephemeral()
243    }
244
245    fn build_cached_api_messages(request: &ChatRequest) -> Vec<data::ApiMessage> {
246        let mut messages = build_api_messages(request);
247        data::apply_cache_control_to_last_user_message(&mut messages, Self::cache_control());
248        messages
249    }
250
251    fn effective_max_tokens(&self, request: &ChatRequest) -> u32 {
252        if request.max_tokens_explicit {
253            request.max_tokens
254        } else {
255            self.default_max_tokens()
256        }
257    }
258
259    /// Create a provider using Claude Sonnet, reading the API key from the
260    /// conventional [`ANTHROPIC_API_KEY`](Self::API_KEY_ENV) environment
261    /// variable.
262    ///
263    /// This is the zero-ceremony on-ramp for the quickstart. Use
264    /// [`try_from_env`](Self::try_from_env) if you want to handle a missing
265    /// key without a panic.
266    ///
267    /// # Panics
268    ///
269    /// Panics if `ANTHROPIC_API_KEY` is not set. Prefer
270    /// [`try_from_env`](Self::try_from_env) outside of examples/tests.
271    #[must_use]
272    pub fn from_env() -> Self {
273        Self::try_from_env().unwrap_or_else(|e| panic!("{e}"))
274    }
275
276    /// Create a provider using Claude Sonnet, reading the API key from the
277    /// conventional [`ANTHROPIC_API_KEY`](Self::API_KEY_ENV) environment
278    /// variable.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if `ANTHROPIC_API_KEY` is unset or not valid UTF-8.
283    pub fn try_from_env() -> Result<Self> {
284        let api_key = std::env::var(Self::API_KEY_ENV).map_err(|_| {
285            anyhow::anyhow!("environment variable `{}` is not set", Self::API_KEY_ENV)
286        })?;
287        Ok(Self::sonnet(api_key))
288    }
289
290    /// Create a provider using Claude Haiku 4.5.
291    #[must_use]
292    pub fn haiku(api_key: impl Into<String>) -> Self {
293        Self::new(api_key, MODEL_HAIKU_45)
294    }
295
296    /// Create a provider using Claude Sonnet 4.6.
297    #[must_use]
298    pub fn sonnet(api_key: impl Into<String>) -> Self {
299        Self::new(api_key, MODEL_SONNET_46)
300    }
301
302    /// Create a provider using Claude Sonnet 4.5.
303    #[must_use]
304    pub fn sonnet_45(api_key: impl Into<String>) -> Self {
305        Self::new(api_key, MODEL_SONNET_45)
306    }
307
308    /// Create a provider using Claude Sonnet 4.6.
309    #[must_use]
310    pub fn sonnet_46(api_key: impl Into<String>) -> Self {
311        Self::new(api_key, MODEL_SONNET_46)
312    }
313
314    /// Create a provider using Claude Opus 4.6.
315    #[must_use]
316    pub fn opus(api_key: impl Into<String>) -> Self {
317        Self::new(api_key, MODEL_OPUS_46)
318    }
319
320    /// Create a provider using Claude Opus 4.7.
321    ///
322    /// Note: Opus 4.7 requires adaptive thinking. Passing a
323    /// `ThinkingConfig` with `ThinkingMode::Enabled { budget_tokens }`
324    /// will return an `InvalidRequest` — use `ThinkingConfig::adaptive()`
325    /// or `ThinkingConfig::adaptive_with_effort(_)` instead.
326    #[must_use]
327    pub fn opus_47(api_key: impl Into<String>) -> Self {
328        Self::new(api_key, MODEL_OPUS_47)
329    }
330
331    /// Create a provider using Claude Opus 4.8.
332    ///
333    /// Note: Opus 4.8 requires adaptive thinking. Passing a
334    /// `ThinkingConfig` with `ThinkingMode::Enabled { budget_tokens }`
335    /// will return an `InvalidRequest` — use `ThinkingConfig::adaptive()`
336    /// or `ThinkingConfig::adaptive_with_effort(_)` instead.
337    #[must_use]
338    pub fn opus_48(api_key: impl Into<String>) -> Self {
339        Self::new(api_key, MODEL_OPUS_48)
340    }
341
342    /// Create a provider using Claude Fable 5.
343    ///
344    /// Note: Fable 5 is adaptive-only — the API applies adaptive thinking
345    /// even when no thinking config is sent, and raw chain of thought is
346    /// never returned (thinking blocks arrive with empty content). Passing a
347    /// `ThinkingConfig` with `ThinkingMode::Enabled { budget_tokens }`
348    /// will return an `InvalidRequest` — use `ThinkingConfig::adaptive()`
349    /// or `ThinkingConfig::adaptive_with_effort(_)` instead.
350    #[must_use]
351    pub fn fable(api_key: impl Into<String>) -> Self {
352        Self::new(api_key, MODEL_FABLE_5)
353    }
354
355    /// Set the provider-owned thinking configuration for this model.
356    #[must_use]
357    pub const fn with_thinking(mut self, thinking: ThinkingConfig) -> Self {
358        self.thinking = Some(thinking);
359        self
360    }
361
362    /// Override the base URL (default: `https://api.anthropic.com`).
363    #[must_use]
364    pub fn with_base_url(mut self, base_url: impl Into<String>) -> Self {
365        self.base_url = base_url.into();
366        self
367    }
368
369    /// Add extra HTTP headers applied to every request.
370    #[must_use]
371    pub fn with_extra_headers(mut self, headers: Vec<(String, String)>) -> Self {
372        self.extra_headers = headers;
373        self
374    }
375
376    fn requires_adaptive_thinking(&self) -> bool {
377        matches!(
378            self.model.as_str(),
379            MODEL_SONNET_46 | MODEL_OPUS_46 | MODEL_OPUS_47 | MODEL_OPUS_48 | MODEL_FABLE_5
380        )
381    }
382}
383
384#[async_trait]
385#[allow(clippy::too_many_lines)]
386impl LlmProvider for AnthropicProvider {
387    async fn chat(&self, request: ChatRequest) -> Result<ChatOutcome> {
388        let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
389            Ok(thinking) => thinking,
390            Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
391        };
392        if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
393            return Ok(ChatOutcome::InvalidRequest(error.to_string()));
394        }
395        let messages = Self::build_cached_api_messages(&request);
396        let tools = if self.is_oauth() {
397            build_api_tools(&request).map(|tools| {
398                tools
399                    .into_iter()
400                    .map(|mut t| {
401                        t.name = to_claude_code_name(&t.name);
402                        t
403                    })
404                    .collect::<Vec<_>>()
405            })
406        } else {
407            build_api_tools(&request)
408        };
409        let thinking = thinking_config
410            .as_ref()
411            .map(ApiThinkingConfig::from_thinking_config);
412        let output_config = thinking_config
413            .as_ref()
414            .and_then(|t| t.effort)
415            .map(|effort| ApiOutputConfig { effort });
416
417        let system = self.build_system_prompt_for_request(&request.system);
418        let max_tokens = self.effective_max_tokens(&request);
419        let tool_choice = request
420            .tool_choice
421            .as_ref()
422            .map(ApiToolChoice::from_tool_choice);
423
424        let api_request = ApiMessagesRequest {
425            model: Some(&self.model),
426            max_tokens,
427            system,
428            messages: &messages,
429            tools: tools.as_deref(),
430            tool_choice,
431            stream: false,
432            thinking,
433            output_config,
434            anthropic_version: None,
435        };
436
437        log::debug!(
438            "Anthropic LLM request model={} max_tokens={} oauth={}",
439            self.model,
440            max_tokens,
441            self.is_oauth()
442        );
443
444        // Log full request payload for debugging
445        if log::log_enabled!(log::Level::Debug) {
446            match serde_json::to_string_pretty(&api_request) {
447                Ok(json) => log::debug!("Anthropic API request payload:\n{json}"),
448                Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
449            }
450        }
451
452        let builder = self
453            .client
454            .post(format!("{}/v1/messages", self.base_url))
455            .header("Content-Type", "application/json");
456        let response = self
457            .apply_auth(builder)
458            .json(&api_request)
459            .send()
460            .await
461            .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
462
463        let status = response.status();
464        let bytes = response
465            .bytes()
466            .await
467            .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
468
469        log::debug!(
470            "Anthropic LLM response status={} body_len={}",
471            status,
472            bytes.len()
473        );
474
475        if status == StatusCode::TOO_MANY_REQUESTS {
476            return Ok(ChatOutcome::RateLimited);
477        }
478
479        if status.is_server_error() {
480            let body = String::from_utf8_lossy(&bytes);
481            log::error!("Anthropic server error status={status} body={body}");
482            return Ok(ChatOutcome::ServerError(body.into_owned()));
483        }
484
485        if status.is_client_error() {
486            let body = String::from_utf8_lossy(&bytes);
487            log::warn!("Anthropic client error status={status} body={body}");
488            return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
489        }
490
491        let api_response: data::ApiResponse = serde_json::from_slice(&bytes)
492            .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
493
494        // Log the full response for debugging
495        log::debug!(
496            "Anthropic API response: id={} model={} stop_reason={:?} usage={{input_tokens={}, output_tokens={}}} content_blocks={}",
497            api_response.id,
498            api_response.model,
499            api_response.stop_reason,
500            api_response.usage.total_input_tokens(),
501            api_response.usage.output,
502            api_response.content.len()
503        );
504
505        let mut content = map_content_blocks(api_response.content);
506
507        // Reverse-map tool names from Claude Code casing back to original names
508        if self.is_oauth() {
509            let original_names: Vec<String> = request
510                .tools
511                .as_ref()
512                .map(|ts| ts.iter().map(|t| t.name.clone()).collect())
513                .unwrap_or_default();
514            for block in &mut content {
515                if let ContentBlock::ToolUse { name, .. } = block {
516                    *name = from_claude_code_name(name, &original_names);
517                }
518            }
519        }
520
521        let stop_reason = api_response.stop_reason.as_ref().map(map_stop_reason);
522
523        Ok(ChatOutcome::Success(ChatResponse {
524            id: api_response.id,
525            content,
526            model: api_response.model,
527            stop_reason,
528            usage: Usage {
529                input_tokens: api_response.usage.total_input_tokens(),
530                output_tokens: api_response.usage.output,
531                cached_input_tokens: api_response.usage.cached_input_tokens(),
532                cache_creation_input_tokens: api_response.usage.cache_creation_input_tokens(),
533            },
534        }))
535    }
536
537    fn chat_stream(&self, request: ChatRequest) -> StreamBox<'_> {
538        Box::pin(async_stream::stream! {
539            let is_oauth = self.is_oauth();
540            let original_tool_names: Vec<String> = request
541                .tools
542                .as_ref()
543                .map(|ts| ts.iter().map(|t| t.name.clone()).collect())
544                .unwrap_or_default();
545
546            if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
547                yield Ok(StreamDelta::Error {
548                    message: error.to_string(),
549                    kind: StreamErrorKind::InvalidRequest,
550                });
551                return;
552            }
553
554            let messages = Self::build_cached_api_messages(&request);
555            let tools = if is_oauth {
556                build_api_tools(&request).map(|tools| {
557                    tools
558                        .into_iter()
559                        .map(|mut t| {
560                            t.name = to_claude_code_name(&t.name);
561                            t
562                        })
563                        .collect::<Vec<_>>()
564                })
565            } else {
566                build_api_tools(&request)
567            };
568            let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
569                Ok(thinking) => thinking,
570                Err(error) => {
571                    yield Ok(StreamDelta::Error {
572                        message: error.to_string(),
573                        kind: StreamErrorKind::InvalidRequest,
574                    });
575                    return;
576                }
577            };
578            let thinking = thinking_config
579                .as_ref()
580                .map(ApiThinkingConfig::from_thinking_config);
581            let output_config = thinking_config
582                .as_ref()
583                .and_then(|t| t.effort)
584                .map(|effort| ApiOutputConfig { effort });
585
586            let system = self.build_system_prompt_for_request(&request.system);
587            let max_tokens = self.effective_max_tokens(&request);
588            let tool_choice = request
589                .tool_choice
590                .as_ref()
591                .map(ApiToolChoice::from_tool_choice);
592
593            let api_request = ApiMessagesRequest {
594                model: Some(&self.model),
595                max_tokens,
596                system,
597                messages: &messages,
598                tools: tools.as_deref(),
599                tool_choice,
600                stream: true,
601                thinking,
602                output_config,
603                anthropic_version: None,
604            };
605
606            log::debug!("Anthropic streaming LLM request model={} max_tokens={} oauth={}", self.model, max_tokens, is_oauth);
607
608            // Log full request payload for debugging
609            if log::log_enabled!(log::Level::Debug) {
610                match serde_json::to_string_pretty(&api_request) {
611                    Ok(json) => log::debug!("Anthropic streaming API request payload:\n{json}"),
612                    Err(e) => log::debug!("Failed to serialize streaming request for logging: {e}"),
613                }
614            }
615
616            let builder = self
617                .client
618                .post(format!("{}/v1/messages", self.base_url))
619                .header("Content-Type", "application/json");
620            let response = match self
621                .apply_auth(builder)
622                .json(&api_request)
623                .send()
624                .await
625            {
626                Ok(r) => r,
627                Err(e) => {
628                    yield Err(anyhow::anyhow!("request failed: {e}"));
629                    return;
630                }
631            };
632
633            let status = response.status();
634
635            if status == StatusCode::TOO_MANY_REQUESTS {
636                yield Ok(StreamDelta::Error {
637                    message: "Rate limited".to_string(),
638                    kind: StreamErrorKind::RateLimited,
639                });
640                return;
641            }
642
643            if status.is_server_error() {
644                let body = response.text().await.unwrap_or_default();
645                log::error!("Anthropic server error status={status} body={body}");
646                yield Ok(StreamDelta::Error {
647                    message: body,
648                    kind: StreamErrorKind::ServerError,
649                });
650                return;
651            }
652
653            if status.is_client_error() {
654                let body = response.text().await.unwrap_or_default();
655                log::warn!("Anthropic client error status={status} body={body}");
656                yield Ok(StreamDelta::Error {
657                    message: body,
658                    kind: StreamErrorKind::InvalidRequest,
659                });
660                return;
661            }
662
663            // Process SSE stream
664            let mut stream = response.bytes_stream();
665            let mut buffer = String::new();
666            let mut input_tokens: u32 = 0;
667            let mut output_tokens: u32 = 0;
668            let mut cached_input_tokens: u32 = 0;
669            let mut cache_creation_input_tokens: u32 = 0;
670            // Track tool IDs by block index for correlating input deltas
671            let mut tool_ids: std::collections::HashMap<usize, String> =
672                std::collections::HashMap::new();
673
674            let mut received_message_stop = false;
675            let mut pending_stop_reason: Option<agent_sdk_foundation::llm::StopReason> = None;
676            let mut chunk_count: u64 = 0;
677            let mut total_bytes: u64 = 0;
678
679            // Drop guard to detect if the stream is dropped before completion
680            struct StreamDropGuard {
681                completed: bool,
682                chunk_count: u64,
683            }
684            impl Drop for StreamDropGuard {
685                fn drop(&mut self) {
686                    if !self.completed {
687                        // Stream drops are expected when the user cancels a running
688                        // agent loop (Esc / Ctrl-C).  Log at debug level so it does
689                        // not surface as noise in every cancelled session.
690                        log::debug!(
691                            "SSE stream dropped before completion at chunk_count={} (task was likely cancelled)",
692                            self.chunk_count
693                        );
694                    }
695                }
696            }
697            let mut drop_guard = StreamDropGuard { completed: false, chunk_count: 0 };
698
699            log::debug!("Starting SSE stream processing");
700
701            while let Some(chunk_result) = stream.next().await {
702                let chunk = match chunk_result {
703                    Ok(c) => c,
704                    Err(e) => {
705                        log::error!("Stream error while reading chunk error={e} chunk_count={chunk_count} total_bytes={total_bytes}");
706                        yield Err(anyhow::anyhow!("stream error: {e}"));
707                        return;
708                    }
709                };
710
711                chunk_count += 1;
712                total_bytes += chunk.len() as u64;
713                drop_guard.chunk_count = chunk_count;
714
715                // Log progress every 10 chunks to show HTTP stream is alive
716                if chunk_count.is_multiple_of(10) {
717                    log::debug!("SSE chunk progress: chunk_count={chunk_count} total_bytes={total_bytes}");
718                }
719                buffer.push_str(&String::from_utf8_lossy(&chunk));
720
721                // Process complete SSE events (terminated by a blank line)
722                while let Some(event_block) = take_next_sse_event(&mut buffer) {
723                    // Track if we received message_stop
724                    if is_message_stop_event(&event_block) {
725                        log::debug!("Received message_stop event chunk_count={chunk_count} total_bytes={total_bytes}");
726                        received_message_stop = true;
727                    }
728
729                    // Parse SSE event
730                    if let Some(mut delta) = parse_sse_event(
731                        &event_block,
732                        &mut input_tokens,
733                        &mut output_tokens,
734                        &mut cached_input_tokens,
735                        &mut cache_creation_input_tokens,
736                        &mut tool_ids,
737                        &mut pending_stop_reason,
738                    ) {
739                        // Reverse-map tool names from Claude Code casing
740                        if is_oauth
741                            && let StreamDelta::ToolUseStart { ref mut name, .. } = delta
742                        {
743                            *name = from_claude_code_name(name, &original_tool_names);
744                        }
745                        yield Ok(delta);
746                    }
747                    // After message_stop (which emits Usage), emit Done
748                    if is_message_stop_event(&event_block) {
749                        yield Ok(StreamDelta::Done {
750                            stop_reason: pending_stop_reason.take(),
751                        });
752                    }
753                }
754            }
755
756            log::debug!(
757                "SSE stream ended chunk_count={chunk_count} total_bytes={total_bytes} buffer_remaining={} received_message_stop={received_message_stop}",
758                buffer.len()
759            );
760
761            // Process any remaining buffer content (handles incomplete final chunk)
762            let remaining = buffer.trim();
763            if !remaining.is_empty() {
764                log::debug!(
765                    "Processing remaining buffer content remaining_len={} remaining_preview={}",
766                    remaining.len(),
767                    remaining.chars().take(100).collect::<String>()
768                );
769
770                // Track if remaining buffer contains message_stop
771                if is_message_stop_event(remaining) {
772                    received_message_stop = true;
773                }
774
775                if let Some(mut delta) = parse_sse_event(
776                    remaining,
777                    &mut input_tokens,
778                    &mut output_tokens,
779                    &mut cached_input_tokens,
780                    &mut cache_creation_input_tokens,
781                    &mut tool_ids,
782                    &mut pending_stop_reason,
783                ) {
784                    if is_oauth
785                        && let StreamDelta::ToolUseStart { ref mut name, .. } = delta
786                    {
787                        *name = from_claude_code_name(name, &original_tool_names);
788                    }
789                    yield Ok(delta);
790                }
791                // After message_stop (which emits Usage), emit Done
792                if is_message_stop_event(remaining) {
793                    yield Ok(StreamDelta::Done {
794                        stop_reason: pending_stop_reason.take(),
795                    });
796                }
797            }
798
799            // Mark stream as properly completed
800            drop_guard.completed = true;
801
802            // If stream ended without message_stop, emit a server-error (transient) signal
803            if !received_message_stop {
804                log::warn!(
805                    "SSE stream ended without message_stop event - stream may have been interrupted chunk_count={chunk_count} total_bytes={total_bytes}"
806                );
807                yield Ok(StreamDelta::Error {
808                    message: "Stream ended unexpectedly without completion".to_string(),
809                    kind: StreamErrorKind::ServerError,
810                });
811            }
812        })
813    }
814
815    fn validate_thinking_config(&self, thinking: Option<&ThinkingConfig>) -> Result<()> {
816        let Some(thinking) = thinking else {
817            return Ok(());
818        };
819
820        if self
821            .capabilities()
822            .is_some_and(|caps| !caps.supports_thinking)
823        {
824            return Err(anyhow::anyhow!(
825                "thinking is not supported for provider={} model={}",
826                self.provider(),
827                self.model()
828            ));
829        }
830
831        if matches!(thinking.mode, ThinkingMode::Adaptive)
832            && !self
833                .capabilities()
834                .is_some_and(|caps| caps.supports_adaptive_thinking)
835        {
836            return Err(anyhow::anyhow!(
837                "adaptive thinking is not supported for provider={} model={}",
838                self.provider(),
839                self.model()
840            ));
841        }
842
843        if self.requires_adaptive_thinking()
844            && matches!(thinking.mode, ThinkingMode::Enabled { .. })
845        {
846            return Err(anyhow::anyhow!(
847                "budget_tokens thinking is deprecated for provider={} model={}; use ThinkingConfig::adaptive() instead",
848                self.provider(),
849                self.model()
850            ));
851        }
852
853        Ok(())
854    }
855
856    fn model(&self) -> &str {
857        &self.model
858    }
859
860    fn provider(&self) -> &'static str {
861        "anthropic"
862    }
863
864    fn configured_thinking(&self) -> Option<&ThinkingConfig> {
865        self.thinking.as_ref()
866    }
867
868    fn default_max_tokens(&self) -> u32 {
869        let model_max = self
870            .capabilities()
871            .and_then(|caps| caps.max_output_tokens)
872            .or_else(|| {
873                crate::model_capabilities::default_max_output_tokens(self.provider(), self.model())
874            })
875            .unwrap_or(4096);
876        model_max.clamp(4096, DEFAULT_SAFE_MAX_OUTPUT_TOKENS)
877    }
878}
879
880#[cfg(test)]
881mod tests {
882    use super::*;
883
884    // ===================
885    // Constructor Tests
886    // ===================
887
888    #[test]
889    fn test_new_creates_provider_with_custom_model() {
890        let provider = AnthropicProvider::new("test-api-key", "custom-model");
891
892        assert_eq!(provider.model(), "custom-model");
893        assert_eq!(provider.provider(), "anthropic");
894    }
895
896    #[test]
897    fn test_haiku_factory_creates_haiku_provider() {
898        let provider = AnthropicProvider::haiku("test-api-key".to_string());
899
900        assert_eq!(provider.model(), MODEL_HAIKU_45);
901        assert_eq!(provider.provider(), "anthropic");
902    }
903
904    #[test]
905    fn test_only_anthropic_46_models_accept_adaptive_thinking() {
906        let sonnet_46 = AnthropicProvider::sonnet_46("test-api-key".to_string());
907        assert!(
908            sonnet_46
909                .validate_thinking_config(Some(&ThinkingConfig::adaptive()))
910                .is_ok()
911        );
912
913        let sonnet_45 = AnthropicProvider::sonnet_45("test-api-key".to_string());
914        let error = sonnet_45
915            .validate_thinking_config(Some(&ThinkingConfig::adaptive()))
916            .unwrap_err();
917        assert!(
918            error
919                .to_string()
920                .contains("adaptive thinking is not supported")
921        );
922    }
923
924    #[test]
925    fn test_anthropic_46_models_reject_budgeted_thinking() {
926        let sonnet_46 = AnthropicProvider::sonnet_46("test-api-key".to_string());
927        let error = sonnet_46
928            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
929            .unwrap_err();
930        assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
931    }
932
933    #[test]
934    fn test_opus_47_rejects_budgeted_thinking() {
935        // Opus 4.7 follows the same adaptive-only policy as 4.6. Without
936        // this guard the provider would serialise `thinking.type.enabled`
937        // and get a 400 back from the API — we want a clear SDK-level
938        // error instead.
939        let opus_47 = AnthropicProvider::opus_47("test-api-key".to_string());
940        let error = opus_47
941            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
942            .unwrap_err();
943        assert!(
944            error.to_string().contains("ThinkingConfig::adaptive()"),
945            "expected migration hint, got: {error}"
946        );
947    }
948
949    #[test]
950    fn test_opus_47_accepts_adaptive_thinking() {
951        let opus_47 = AnthropicProvider::opus_47("test-api-key".to_string());
952        assert!(
953            opus_47
954                .validate_thinking_config(Some(&ThinkingConfig::adaptive()))
955                .is_ok()
956        );
957        assert!(
958            opus_47
959                .validate_thinking_config(Some(&ThinkingConfig::adaptive_with_effort(
960                    agent_sdk_foundation::llm::Effort::High
961                )))
962                .is_ok()
963        );
964    }
965
966    #[test]
967    fn test_opus_47_factory_creates_opus_47_provider() {
968        let provider = AnthropicProvider::opus_47("test-api-key".to_string());
969        assert_eq!(provider.model(), MODEL_OPUS_47);
970        assert_eq!(provider.provider(), "anthropic");
971    }
972
973    #[test]
974    fn test_opus_48_rejects_budgeted_thinking() {
975        // Opus 4.8 follows the same adaptive-only policy as 4.6/4.7. Without
976        // this guard the provider would serialise `thinking.type.enabled`
977        // and get a 400 back from the API — we want a clear SDK-level
978        // error instead.
979        let opus_48 = AnthropicProvider::opus_48("test-api-key".to_string());
980        let error = opus_48
981            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
982            .unwrap_err();
983        assert!(
984            error.to_string().contains("ThinkingConfig::adaptive()"),
985            "expected migration hint, got: {error}"
986        );
987    }
988
989    #[test]
990    fn test_opus_48_accepts_adaptive_thinking() {
991        let opus_48 = AnthropicProvider::opus_48("test-api-key".to_string());
992        assert!(
993            opus_48
994                .validate_thinking_config(Some(&ThinkingConfig::adaptive()))
995                .is_ok()
996        );
997        assert!(
998            opus_48
999                .validate_thinking_config(Some(&ThinkingConfig::adaptive_with_effort(
1000                    agent_sdk_foundation::llm::Effort::High
1001                )))
1002                .is_ok()
1003        );
1004    }
1005
1006    #[test]
1007    fn test_opus_48_factory_creates_opus_48_provider() {
1008        let provider = AnthropicProvider::opus_48("test-api-key".to_string());
1009        assert_eq!(provider.model(), MODEL_OPUS_48);
1010        assert_eq!(provider.provider(), "anthropic");
1011    }
1012
1013    #[test]
1014    fn test_fable_5_rejects_budgeted_thinking() {
1015        // Fable 5 is adaptive-only: the API applies adaptive thinking even
1016        // when `thinking` is unset and rejects budget-based configs. Fail
1017        // fast with a migration hint instead of a 400 from the API.
1018        let fable = AnthropicProvider::fable("test-api-key".to_string());
1019        let error = fable
1020            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1021            .unwrap_err();
1022        assert!(
1023            error.to_string().contains("ThinkingConfig::adaptive()"),
1024            "expected migration hint, got: {error}"
1025        );
1026    }
1027
1028    #[test]
1029    fn test_fable_5_accepts_adaptive_thinking() {
1030        let fable = AnthropicProvider::fable("test-api-key".to_string());
1031        assert!(
1032            fable
1033                .validate_thinking_config(Some(&ThinkingConfig::adaptive()))
1034                .is_ok()
1035        );
1036        assert!(
1037            fable
1038                .validate_thinking_config(Some(&ThinkingConfig::adaptive_with_effort(
1039                    agent_sdk_foundation::llm::Effort::High
1040                )))
1041                .is_ok()
1042        );
1043    }
1044
1045    #[test]
1046    fn test_fable_factory_creates_fable_5_provider() {
1047        let provider = AnthropicProvider::fable("test-api-key".to_string());
1048        assert_eq!(provider.model(), MODEL_FABLE_5);
1049        assert_eq!(provider.provider(), "anthropic");
1050    }
1051
1052    #[test]
1053    fn test_sonnet_factory_creates_sonnet_provider() {
1054        let provider = AnthropicProvider::sonnet("test-api-key".to_string());
1055
1056        assert_eq!(provider.model(), MODEL_SONNET_46);
1057        assert_eq!(provider.provider(), "anthropic");
1058    }
1059
1060    #[test]
1061    fn test_sonnet_45_factory_creates_sonnet_provider() {
1062        let provider = AnthropicProvider::sonnet_45("test-api-key".to_string());
1063
1064        assert_eq!(provider.model(), MODEL_SONNET_45);
1065        assert_eq!(provider.provider(), "anthropic");
1066    }
1067
1068    #[test]
1069    fn test_sonnet_46_factory_creates_sonnet_provider() {
1070        let provider = AnthropicProvider::sonnet_46("test-api-key".to_string());
1071
1072        assert_eq!(provider.model(), MODEL_SONNET_46);
1073        assert_eq!(provider.provider(), "anthropic");
1074    }
1075
1076    #[test]
1077    fn test_opus_factory_creates_opus_provider() {
1078        let provider = AnthropicProvider::opus("test-api-key".to_string());
1079
1080        assert_eq!(provider.model(), MODEL_OPUS_46);
1081        assert_eq!(provider.provider(), "anthropic");
1082    }
1083
1084    // ===================
1085    // Model Constants Tests
1086    // ===================
1087
1088    #[test]
1089    fn test_model_constants_have_expected_values() {
1090        assert!(MODEL_HAIKU_35.contains("haiku"));
1091        assert!(MODEL_SONNET_35.contains("sonnet"));
1092        assert!(MODEL_SONNET_4.contains("sonnet"));
1093        assert!(MODEL_SONNET_46.contains("sonnet"));
1094        assert!(MODEL_OPUS_4.contains("opus"));
1095    }
1096
1097    // ===================
1098    // Clone Tests
1099    // ===================
1100
1101    #[test]
1102    fn test_provider_is_cloneable() {
1103        let provider = AnthropicProvider::new("test-api-key", "test-model");
1104        let cloned = provider.clone();
1105
1106        assert_eq!(provider.model(), cloned.model());
1107        assert_eq!(provider.provider(), cloned.provider());
1108    }
1109}