Skip to main content

agent_sdk_providers/impls/
anthropic.rs

1//! Anthropic API provider implementation.
2//!
3//! This module provides an implementation of `LlmProvider` for the Anthropic
4//! Messages API using reqwest for HTTP calls. Supports both streaming and
5//! non-streaming responses.
6
7pub(crate) mod data;
8
9use crate::attachments::validate_request_attachments;
10use crate::provider::LlmProvider;
11use crate::streaming::{StreamBox, StreamDelta, StreamErrorKind};
12use agent_sdk_foundation::llm::{
13    ChatOutcome, ChatRequest, ChatResponse, ContentBlock, ThinkingConfig, ThinkingMode, Usage,
14};
15use anyhow::Result;
16use async_trait::async_trait;
17use data::{
18    ApiMessagesRequest, ApiOutputConfig, ApiThinkingConfig, ApiToolChoice, build_api_messages,
19    build_api_tools, is_message_stop_event, map_content_blocks, map_stop_reason, parse_sse_event,
20    take_next_sse_event,
21};
22use futures::StreamExt;
23use reqwest::StatusCode;
24
25const API_BASE_URL: &str = "https://api.anthropic.com";
26const API_VERSION: &str = "2023-06-01";
27const CLAUDE_CODE_VERSION: &str = "2.1.75";
28const DEFAULT_SAFE_MAX_OUTPUT_TOKENS: u32 = 32_000;
29
30pub const MODEL_HAIKU_35: &str = "claude-3-5-haiku-20241022";
31pub const MODEL_SONNET_35: &str = "claude-3-5-sonnet-20241022";
32pub const MODEL_SONNET_4: &str = "claude-sonnet-4-20250514";
33pub const MODEL_OPUS_4: &str = "claude-opus-4-20250514";
34
35pub const MODEL_HAIKU_45: &str = "claude-haiku-4-5-20251001";
36pub const MODEL_SONNET_45: &str = "claude-sonnet-4-5-20250929";
37pub const MODEL_SONNET_46: &str = "claude-sonnet-4-6";
38pub const MODEL_OPUS_46: &str = "claude-opus-4-6";
39pub const MODEL_OPUS_47: &str = "claude-opus-4-7";
40pub const MODEL_OPUS_48: &str = "claude-opus-4-8";
41
42/// Claude Code tool name mappings for OAuth mode.
43///
44/// When using OAuth tokens, tool names must match Claude Code's exact casing.
45/// The mapper passes unknown names through unchanged, so extra entries here are
46/// harmless — they future-proof against new tools being registered later.
47/// Source: <https://cchistory.mariozechner.at/data/prompts-2.1.11.md>
48const CLAUDE_CODE_TOOLS: &[&str] = &[
49    "Read",
50    "Write",
51    "Edit",
52    "Bash",
53    "Grep",
54    "Glob",
55    "AskUserQuestion",
56    "EnterPlanMode",
57    "ExitPlanMode",
58    "KillShell",
59    "NotebookEdit",
60    "Skill",
61    "Task",
62    "TaskOutput",
63    "TodoWrite",
64    "WebFetch",
65    "WebSearch",
66];
67
68/// Maps a tool name to Claude Code's canonical casing (case-insensitive match).
69fn to_claude_code_name(name: &str) -> String {
70    let lower = name.to_lowercase();
71    for cc_name in CLAUDE_CODE_TOOLS {
72        if cc_name.to_lowercase() == lower {
73            return (*cc_name).to_string();
74        }
75    }
76    name.to_string()
77}
78
79/// Maps a Claude Code tool name back to the original tool name.
80fn from_claude_code_name(name: &str, original_names: &[String]) -> String {
81    let lower = name.to_lowercase();
82    for original in original_names {
83        if original.to_lowercase() == lower {
84            return original.clone();
85        }
86    }
87    name.to_string()
88}
89
90/// Returns true if the API key is an OAuth token (`sk-ant-oat-*`).
91#[must_use]
92pub fn is_oauth_token(api_key: &str) -> bool {
93    api_key.starts_with("sk-ant-oat")
94}
95
96/// Authentication mode for the Anthropic provider.
97#[derive(Clone, Debug)]
98enum AuthMode {
99    /// Standard API key authentication (x-api-key header).
100    ApiKey,
101    /// OAuth token authentication (Bearer header + Claude Code identity).
102    OAuth,
103}
104
105/// Anthropic LLM provider using the Messages API.
106#[derive(Clone)]
107pub struct AnthropicProvider {
108    client: reqwest::Client,
109    api_key: String,
110    model: String,
111    base_url: String,
112    auth_mode: AuthMode,
113    /// Original tool names for reverse mapping in OAuth mode (reserved for future use).
114    #[allow(dead_code)]
115    original_tool_names: Vec<String>,
116    thinking: Option<ThinkingConfig>,
117    /// Extra headers applied to every request (e.g. for gateway authentication).
118    extra_headers: Vec<(String, String)>,
119}
120
121impl AnthropicProvider {
122    /// The conventional environment variable holding the Anthropic API key.
123    pub const API_KEY_ENV: &'static str = "ANTHROPIC_API_KEY";
124
125    /// Create a new Anthropic provider with the specified API key and model.
126    ///
127    /// Automatically detects OAuth tokens (`sk-ant-oat-*`) and switches to
128    /// Bearer auth with Claude Code identity headers.
129    #[must_use]
130    pub fn new(api_key: impl Into<String>, model: impl Into<String>) -> Self {
131        let api_key = api_key.into();
132        let model = model.into();
133        let auth_mode = if is_oauth_token(&api_key) {
134            AuthMode::OAuth
135        } else {
136            AuthMode::ApiKey
137        };
138
139        // Configure client with appropriate timeouts for streaming
140        // - No overall timeout (streaming can take a long time)
141        // - 30 second connect timeout
142        // - TCP keepalive to prevent connection drops
143        let client = reqwest::Client::builder()
144            .connect_timeout(std::time::Duration::from_secs(30))
145            .tcp_keepalive(std::time::Duration::from_secs(30))
146            .build()
147            .unwrap_or_default();
148
149        Self {
150            client,
151            api_key,
152            model,
153            base_url: API_BASE_URL.to_owned(),
154            auth_mode,
155            original_tool_names: Vec::new(),
156            thinking: None,
157            extra_headers: Vec::new(),
158        }
159    }
160
161    /// Returns whether this provider is using OAuth authentication.
162    #[must_use]
163    pub const fn is_oauth(&self) -> bool {
164        matches!(self.auth_mode, AuthMode::OAuth)
165    }
166
167    /// Applies authentication headers to a request builder.
168    ///
169    /// When `api_key` is empty the provider-specific credential header is
170    /// skipped — useful for BYOK gateways where auth is handled externally
171    /// via `extra_headers`.
172    fn apply_auth(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
173        let builder = if self.api_key.is_empty() {
174            builder.header("anthropic-version", API_VERSION)
175        } else {
176            match self.auth_mode {
177                AuthMode::ApiKey => builder
178                    .header("x-api-key", &self.api_key)
179                    .header("anthropic-version", API_VERSION),
180                AuthMode::OAuth => {
181                    // Build beta features list matching Claude Code's behaviour.
182                    // Adaptive-thinking models (4.6) have interleaved thinking
183                    // built-in; for older reasoning models we need the explicit beta.
184                    let mut beta_features = vec![
185                        "claude-code-20250219",
186                        "oauth-2025-04-20",
187                        "fine-grained-tool-streaming-2025-05-14",
188                    ];
189                    if !self.requires_adaptive_thinking() {
190                        beta_features.push("interleaved-thinking-2025-05-14");
191                    }
192                    builder
193                        .header("Authorization", format!("Bearer {}", self.api_key))
194                        .header("anthropic-version", API_VERSION)
195                        .header("anthropic-beta", beta_features.join(","))
196                        .header("user-agent", format!("claude-cli/{CLAUDE_CODE_VERSION}"))
197                        .header("x-app", "cli")
198                }
199            }
200        };
201        self.extra_headers
202            .iter()
203            .fold(builder, |b, (k, v)| b.header(k.as_str(), v.as_str()))
204    }
205
206    const OAUTH_IDENTITY: &'static str =
207        "You are Claude Code, Anthropic's official CLI for Claude.";
208
209    /// Build the system prompt payload, accounting for OAuth mode.
210    ///
211    /// In OAuth mode the identity string must be a **separate** system block
212    /// (matching the layout Claude Code itself sends).  For API-key auth the
213    /// user-supplied system prompt is sent as a single block.
214    fn build_system_prompt_for_request<'a>(
215        &self,
216        system: &'a str,
217    ) -> Option<data::ApiSystemPrompt<'a>> {
218        let cc = Self::cache_control();
219
220        match self.auth_mode {
221            AuthMode::ApiKey => data::build_api_system_prompt(system, Some(cc)),
222            AuthMode::OAuth => {
223                let mut blocks = vec![data::ApiSystemBlock {
224                    block_type: "text",
225                    text: Self::OAUTH_IDENTITY,
226                    cache_control: Some(cc.clone()),
227                }];
228                if !system.is_empty() {
229                    blocks.push(data::ApiSystemBlock {
230                        block_type: "text",
231                        text: system,
232                        cache_control: Some(cc),
233                    });
234                }
235                Some(data::ApiSystemPrompt::Blocks(blocks))
236            }
237        }
238    }
239
240    const fn cache_control() -> data::ApiCacheControl {
241        data::ApiCacheControl::ephemeral()
242    }
243
244    fn build_cached_api_messages(request: &ChatRequest) -> Vec<data::ApiMessage> {
245        let mut messages = build_api_messages(request);
246        data::apply_cache_control_to_last_user_message(&mut messages, Self::cache_control());
247        messages
248    }
249
250    fn effective_max_tokens(&self, request: &ChatRequest) -> u32 {
251        if request.max_tokens_explicit {
252            request.max_tokens
253        } else {
254            self.default_max_tokens()
255        }
256    }
257
258    /// Create a provider using Claude Sonnet, reading the API key from the
259    /// conventional [`ANTHROPIC_API_KEY`](Self::API_KEY_ENV) environment
260    /// variable.
261    ///
262    /// This is the zero-ceremony on-ramp for the quickstart. Use
263    /// [`try_from_env`](Self::try_from_env) if you want to handle a missing
264    /// key without a panic.
265    ///
266    /// # Panics
267    ///
268    /// Panics if `ANTHROPIC_API_KEY` is not set. Prefer
269    /// [`try_from_env`](Self::try_from_env) outside of examples/tests.
270    #[must_use]
271    pub fn from_env() -> Self {
272        Self::try_from_env().unwrap_or_else(|e| panic!("{e}"))
273    }
274
275    /// Create a provider using Claude Sonnet, reading the API key from the
276    /// conventional [`ANTHROPIC_API_KEY`](Self::API_KEY_ENV) environment
277    /// variable.
278    ///
279    /// # Errors
280    ///
281    /// Returns an error if `ANTHROPIC_API_KEY` is unset or not valid UTF-8.
282    pub fn try_from_env() -> Result<Self> {
283        let api_key = std::env::var(Self::API_KEY_ENV).map_err(|_| {
284            anyhow::anyhow!("environment variable `{}` is not set", Self::API_KEY_ENV)
285        })?;
286        Ok(Self::sonnet(api_key))
287    }
288
289    /// Create a provider using Claude Haiku 4.5.
290    #[must_use]
291    pub fn haiku(api_key: impl Into<String>) -> Self {
292        Self::new(api_key, MODEL_HAIKU_45)
293    }
294
295    /// Create a provider using Claude Sonnet 4.6.
296    #[must_use]
297    pub fn sonnet(api_key: impl Into<String>) -> Self {
298        Self::new(api_key, MODEL_SONNET_46)
299    }
300
301    /// Create a provider using Claude Sonnet 4.5.
302    #[must_use]
303    pub fn sonnet_45(api_key: impl Into<String>) -> Self {
304        Self::new(api_key, MODEL_SONNET_45)
305    }
306
307    /// Create a provider using Claude Sonnet 4.6.
308    #[must_use]
309    pub fn sonnet_46(api_key: impl Into<String>) -> Self {
310        Self::new(api_key, MODEL_SONNET_46)
311    }
312
313    /// Create a provider using Claude Opus 4.6.
314    #[must_use]
315    pub fn opus(api_key: impl Into<String>) -> Self {
316        Self::new(api_key, MODEL_OPUS_46)
317    }
318
319    /// Create a provider using Claude Opus 4.7.
320    ///
321    /// Note: Opus 4.7 requires adaptive thinking. Passing a
322    /// `ThinkingConfig` with `ThinkingMode::Enabled { budget_tokens }`
323    /// will return an `InvalidRequest` — use `ThinkingConfig::adaptive()`
324    /// or `ThinkingConfig::adaptive_with_effort(_)` instead.
325    #[must_use]
326    pub fn opus_47(api_key: impl Into<String>) -> Self {
327        Self::new(api_key, MODEL_OPUS_47)
328    }
329
330    /// Create a provider using Claude Opus 4.8.
331    ///
332    /// Note: Opus 4.8 requires adaptive thinking. Passing a
333    /// `ThinkingConfig` with `ThinkingMode::Enabled { budget_tokens }`
334    /// will return an `InvalidRequest` — use `ThinkingConfig::adaptive()`
335    /// or `ThinkingConfig::adaptive_with_effort(_)` instead.
336    #[must_use]
337    pub fn opus_48(api_key: impl Into<String>) -> Self {
338        Self::new(api_key, MODEL_OPUS_48)
339    }
340
341    /// Set the provider-owned thinking configuration for this model.
342    #[must_use]
343    pub const fn with_thinking(mut self, thinking: ThinkingConfig) -> Self {
344        self.thinking = Some(thinking);
345        self
346    }
347
348    /// Override the base URL (default: `https://api.anthropic.com`).
349    #[must_use]
350    pub fn with_base_url(mut self, base_url: impl Into<String>) -> Self {
351        self.base_url = base_url.into();
352        self
353    }
354
355    /// Add extra HTTP headers applied to every request.
356    #[must_use]
357    pub fn with_extra_headers(mut self, headers: Vec<(String, String)>) -> Self {
358        self.extra_headers = headers;
359        self
360    }
361
362    fn requires_adaptive_thinking(&self) -> bool {
363        matches!(
364            self.model.as_str(),
365            MODEL_SONNET_46 | MODEL_OPUS_46 | MODEL_OPUS_47 | MODEL_OPUS_48
366        )
367    }
368}
369
370#[async_trait]
371#[allow(clippy::too_many_lines)]
372impl LlmProvider for AnthropicProvider {
373    async fn chat(&self, request: ChatRequest) -> Result<ChatOutcome> {
374        let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
375            Ok(thinking) => thinking,
376            Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
377        };
378        if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
379            return Ok(ChatOutcome::InvalidRequest(error.to_string()));
380        }
381        let messages = Self::build_cached_api_messages(&request);
382        let tools = if self.is_oauth() {
383            build_api_tools(&request).map(|tools| {
384                tools
385                    .into_iter()
386                    .map(|mut t| {
387                        t.name = to_claude_code_name(&t.name);
388                        t
389                    })
390                    .collect::<Vec<_>>()
391            })
392        } else {
393            build_api_tools(&request)
394        };
395        let thinking = thinking_config
396            .as_ref()
397            .map(ApiThinkingConfig::from_thinking_config);
398        let output_config = thinking_config
399            .as_ref()
400            .and_then(|t| t.effort)
401            .map(|effort| ApiOutputConfig { effort });
402
403        let system = self.build_system_prompt_for_request(&request.system);
404        let max_tokens = self.effective_max_tokens(&request);
405        let tool_choice = request
406            .tool_choice
407            .as_ref()
408            .map(ApiToolChoice::from_tool_choice);
409
410        let api_request = ApiMessagesRequest {
411            model: Some(&self.model),
412            max_tokens,
413            system,
414            messages: &messages,
415            tools: tools.as_deref(),
416            tool_choice,
417            stream: false,
418            thinking,
419            output_config,
420            anthropic_version: None,
421        };
422
423        log::debug!(
424            "Anthropic LLM request model={} max_tokens={} oauth={}",
425            self.model,
426            max_tokens,
427            self.is_oauth()
428        );
429
430        // Log full request payload for debugging
431        if log::log_enabled!(log::Level::Debug) {
432            match serde_json::to_string_pretty(&api_request) {
433                Ok(json) => log::debug!("Anthropic API request payload:\n{json}"),
434                Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
435            }
436        }
437
438        let builder = self
439            .client
440            .post(format!("{}/v1/messages", self.base_url))
441            .header("Content-Type", "application/json");
442        let response = self
443            .apply_auth(builder)
444            .json(&api_request)
445            .send()
446            .await
447            .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
448
449        let status = response.status();
450        let bytes = response
451            .bytes()
452            .await
453            .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
454
455        log::debug!(
456            "Anthropic LLM response status={} body_len={}",
457            status,
458            bytes.len()
459        );
460
461        if status == StatusCode::TOO_MANY_REQUESTS {
462            return Ok(ChatOutcome::RateLimited);
463        }
464
465        if status.is_server_error() {
466            let body = String::from_utf8_lossy(&bytes);
467            log::error!("Anthropic server error status={status} body={body}");
468            return Ok(ChatOutcome::ServerError(body.into_owned()));
469        }
470
471        if status.is_client_error() {
472            let body = String::from_utf8_lossy(&bytes);
473            log::warn!("Anthropic client error status={status} body={body}");
474            return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
475        }
476
477        let api_response: data::ApiResponse = serde_json::from_slice(&bytes)
478            .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
479
480        // Log the full response for debugging
481        log::debug!(
482            "Anthropic API response: id={} model={} stop_reason={:?} usage={{input_tokens={}, output_tokens={}}} content_blocks={}",
483            api_response.id,
484            api_response.model,
485            api_response.stop_reason,
486            api_response.usage.total_input_tokens(),
487            api_response.usage.output,
488            api_response.content.len()
489        );
490
491        let mut content = map_content_blocks(api_response.content);
492
493        // Reverse-map tool names from Claude Code casing back to original names
494        if self.is_oauth() {
495            let original_names: Vec<String> = request
496                .tools
497                .as_ref()
498                .map(|ts| ts.iter().map(|t| t.name.clone()).collect())
499                .unwrap_or_default();
500            for block in &mut content {
501                if let ContentBlock::ToolUse { name, .. } = block {
502                    *name = from_claude_code_name(name, &original_names);
503                }
504            }
505        }
506
507        let stop_reason = api_response.stop_reason.as_ref().map(map_stop_reason);
508
509        Ok(ChatOutcome::Success(ChatResponse {
510            id: api_response.id,
511            content,
512            model: api_response.model,
513            stop_reason,
514            usage: Usage {
515                input_tokens: api_response.usage.total_input_tokens(),
516                output_tokens: api_response.usage.output,
517                cached_input_tokens: api_response.usage.cached_input_tokens(),
518                cache_creation_input_tokens: api_response.usage.cache_creation_input_tokens(),
519            },
520        }))
521    }
522
523    fn chat_stream(&self, request: ChatRequest) -> StreamBox<'_> {
524        Box::pin(async_stream::stream! {
525            let is_oauth = self.is_oauth();
526            let original_tool_names: Vec<String> = request
527                .tools
528                .as_ref()
529                .map(|ts| ts.iter().map(|t| t.name.clone()).collect())
530                .unwrap_or_default();
531
532            if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
533                yield Ok(StreamDelta::Error {
534                    message: error.to_string(),
535                    kind: StreamErrorKind::InvalidRequest,
536                });
537                return;
538            }
539
540            let messages = Self::build_cached_api_messages(&request);
541            let tools = if is_oauth {
542                build_api_tools(&request).map(|tools| {
543                    tools
544                        .into_iter()
545                        .map(|mut t| {
546                            t.name = to_claude_code_name(&t.name);
547                            t
548                        })
549                        .collect::<Vec<_>>()
550                })
551            } else {
552                build_api_tools(&request)
553            };
554            let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
555                Ok(thinking) => thinking,
556                Err(error) => {
557                    yield Ok(StreamDelta::Error {
558                        message: error.to_string(),
559                        kind: StreamErrorKind::InvalidRequest,
560                    });
561                    return;
562                }
563            };
564            let thinking = thinking_config
565                .as_ref()
566                .map(ApiThinkingConfig::from_thinking_config);
567            let output_config = thinking_config
568                .as_ref()
569                .and_then(|t| t.effort)
570                .map(|effort| ApiOutputConfig { effort });
571
572            let system = self.build_system_prompt_for_request(&request.system);
573            let max_tokens = self.effective_max_tokens(&request);
574            let tool_choice = request
575                .tool_choice
576                .as_ref()
577                .map(ApiToolChoice::from_tool_choice);
578
579            let api_request = ApiMessagesRequest {
580                model: Some(&self.model),
581                max_tokens,
582                system,
583                messages: &messages,
584                tools: tools.as_deref(),
585                tool_choice,
586                stream: true,
587                thinking,
588                output_config,
589                anthropic_version: None,
590            };
591
592            log::debug!("Anthropic streaming LLM request model={} max_tokens={} oauth={}", self.model, max_tokens, is_oauth);
593
594            // Log full request payload for debugging
595            if log::log_enabled!(log::Level::Debug) {
596                match serde_json::to_string_pretty(&api_request) {
597                    Ok(json) => log::debug!("Anthropic streaming API request payload:\n{json}"),
598                    Err(e) => log::debug!("Failed to serialize streaming request for logging: {e}"),
599                }
600            }
601
602            let builder = self
603                .client
604                .post(format!("{}/v1/messages", self.base_url))
605                .header("Content-Type", "application/json");
606            let response = match self
607                .apply_auth(builder)
608                .json(&api_request)
609                .send()
610                .await
611            {
612                Ok(r) => r,
613                Err(e) => {
614                    yield Err(anyhow::anyhow!("request failed: {e}"));
615                    return;
616                }
617            };
618
619            let status = response.status();
620
621            if status == StatusCode::TOO_MANY_REQUESTS {
622                yield Ok(StreamDelta::Error {
623                    message: "Rate limited".to_string(),
624                    kind: StreamErrorKind::RateLimited,
625                });
626                return;
627            }
628
629            if status.is_server_error() {
630                let body = response.text().await.unwrap_or_default();
631                log::error!("Anthropic server error status={status} body={body}");
632                yield Ok(StreamDelta::Error {
633                    message: body,
634                    kind: StreamErrorKind::ServerError,
635                });
636                return;
637            }
638
639            if status.is_client_error() {
640                let body = response.text().await.unwrap_or_default();
641                log::warn!("Anthropic client error status={status} body={body}");
642                yield Ok(StreamDelta::Error {
643                    message: body,
644                    kind: StreamErrorKind::InvalidRequest,
645                });
646                return;
647            }
648
649            // Process SSE stream
650            let mut stream = response.bytes_stream();
651            let mut buffer = String::new();
652            let mut input_tokens: u32 = 0;
653            let mut output_tokens: u32 = 0;
654            let mut cached_input_tokens: u32 = 0;
655            let mut cache_creation_input_tokens: u32 = 0;
656            // Track tool IDs by block index for correlating input deltas
657            let mut tool_ids: std::collections::HashMap<usize, String> =
658                std::collections::HashMap::new();
659
660            let mut received_message_stop = false;
661            let mut pending_stop_reason: Option<agent_sdk_foundation::llm::StopReason> = None;
662            let mut chunk_count: u64 = 0;
663            let mut total_bytes: u64 = 0;
664
665            // Drop guard to detect if the stream is dropped before completion
666            struct StreamDropGuard {
667                completed: bool,
668                chunk_count: u64,
669            }
670            impl Drop for StreamDropGuard {
671                fn drop(&mut self) {
672                    if !self.completed {
673                        // Stream drops are expected when the user cancels a running
674                        // agent loop (Esc / Ctrl-C).  Log at debug level so it does
675                        // not surface as noise in every cancelled session.
676                        log::debug!(
677                            "SSE stream dropped before completion at chunk_count={} (task was likely cancelled)",
678                            self.chunk_count
679                        );
680                    }
681                }
682            }
683            let mut drop_guard = StreamDropGuard { completed: false, chunk_count: 0 };
684
685            log::debug!("Starting SSE stream processing");
686
687            while let Some(chunk_result) = stream.next().await {
688                let chunk = match chunk_result {
689                    Ok(c) => c,
690                    Err(e) => {
691                        log::error!("Stream error while reading chunk error={e} chunk_count={chunk_count} total_bytes={total_bytes}");
692                        yield Err(anyhow::anyhow!("stream error: {e}"));
693                        return;
694                    }
695                };
696
697                chunk_count += 1;
698                total_bytes += chunk.len() as u64;
699                drop_guard.chunk_count = chunk_count;
700
701                // Log progress every 10 chunks to show HTTP stream is alive
702                if chunk_count.is_multiple_of(10) {
703                    log::debug!("SSE chunk progress: chunk_count={chunk_count} total_bytes={total_bytes}");
704                }
705                buffer.push_str(&String::from_utf8_lossy(&chunk));
706
707                // Process complete SSE events (terminated by a blank line)
708                while let Some(event_block) = take_next_sse_event(&mut buffer) {
709                    // Track if we received message_stop
710                    if is_message_stop_event(&event_block) {
711                        log::debug!("Received message_stop event chunk_count={chunk_count} total_bytes={total_bytes}");
712                        received_message_stop = true;
713                    }
714
715                    // Parse SSE event
716                    if let Some(mut delta) = parse_sse_event(
717                        &event_block,
718                        &mut input_tokens,
719                        &mut output_tokens,
720                        &mut cached_input_tokens,
721                        &mut cache_creation_input_tokens,
722                        &mut tool_ids,
723                        &mut pending_stop_reason,
724                    ) {
725                        // Reverse-map tool names from Claude Code casing
726                        if is_oauth
727                            && let StreamDelta::ToolUseStart { ref mut name, .. } = delta
728                        {
729                            *name = from_claude_code_name(name, &original_tool_names);
730                        }
731                        yield Ok(delta);
732                    }
733                    // After message_stop (which emits Usage), emit Done
734                    if is_message_stop_event(&event_block) {
735                        yield Ok(StreamDelta::Done {
736                            stop_reason: pending_stop_reason.take(),
737                        });
738                    }
739                }
740            }
741
742            log::debug!(
743                "SSE stream ended chunk_count={chunk_count} total_bytes={total_bytes} buffer_remaining={} received_message_stop={received_message_stop}",
744                buffer.len()
745            );
746
747            // Process any remaining buffer content (handles incomplete final chunk)
748            let remaining = buffer.trim();
749            if !remaining.is_empty() {
750                log::debug!(
751                    "Processing remaining buffer content remaining_len={} remaining_preview={}",
752                    remaining.len(),
753                    remaining.chars().take(100).collect::<String>()
754                );
755
756                // Track if remaining buffer contains message_stop
757                if is_message_stop_event(remaining) {
758                    received_message_stop = true;
759                }
760
761                if let Some(mut delta) = parse_sse_event(
762                    remaining,
763                    &mut input_tokens,
764                    &mut output_tokens,
765                    &mut cached_input_tokens,
766                    &mut cache_creation_input_tokens,
767                    &mut tool_ids,
768                    &mut pending_stop_reason,
769                ) {
770                    if is_oauth
771                        && let StreamDelta::ToolUseStart { ref mut name, .. } = delta
772                    {
773                        *name = from_claude_code_name(name, &original_tool_names);
774                    }
775                    yield Ok(delta);
776                }
777                // After message_stop (which emits Usage), emit Done
778                if is_message_stop_event(remaining) {
779                    yield Ok(StreamDelta::Done {
780                        stop_reason: pending_stop_reason.take(),
781                    });
782                }
783            }
784
785            // Mark stream as properly completed
786            drop_guard.completed = true;
787
788            // If stream ended without message_stop, emit a server-error (transient) signal
789            if !received_message_stop {
790                log::warn!(
791                    "SSE stream ended without message_stop event - stream may have been interrupted chunk_count={chunk_count} total_bytes={total_bytes}"
792                );
793                yield Ok(StreamDelta::Error {
794                    message: "Stream ended unexpectedly without completion".to_string(),
795                    kind: StreamErrorKind::ServerError,
796                });
797            }
798        })
799    }
800
801    fn validate_thinking_config(&self, thinking: Option<&ThinkingConfig>) -> Result<()> {
802        let Some(thinking) = thinking else {
803            return Ok(());
804        };
805
806        if self
807            .capabilities()
808            .is_some_and(|caps| !caps.supports_thinking)
809        {
810            return Err(anyhow::anyhow!(
811                "thinking is not supported for provider={} model={}",
812                self.provider(),
813                self.model()
814            ));
815        }
816
817        if matches!(thinking.mode, ThinkingMode::Adaptive)
818            && !self
819                .capabilities()
820                .is_some_and(|caps| caps.supports_adaptive_thinking)
821        {
822            return Err(anyhow::anyhow!(
823                "adaptive thinking is not supported for provider={} model={}",
824                self.provider(),
825                self.model()
826            ));
827        }
828
829        if self.requires_adaptive_thinking()
830            && matches!(thinking.mode, ThinkingMode::Enabled { .. })
831        {
832            return Err(anyhow::anyhow!(
833                "budget_tokens thinking is deprecated for provider={} model={}; use ThinkingConfig::adaptive() instead",
834                self.provider(),
835                self.model()
836            ));
837        }
838
839        Ok(())
840    }
841
842    fn model(&self) -> &str {
843        &self.model
844    }
845
846    fn provider(&self) -> &'static str {
847        "anthropic"
848    }
849
850    fn configured_thinking(&self) -> Option<&ThinkingConfig> {
851        self.thinking.as_ref()
852    }
853
854    fn default_max_tokens(&self) -> u32 {
855        let model_max = self
856            .capabilities()
857            .and_then(|caps| caps.max_output_tokens)
858            .or_else(|| {
859                crate::model_capabilities::default_max_output_tokens(self.provider(), self.model())
860            })
861            .unwrap_or(4096);
862        model_max.clamp(4096, DEFAULT_SAFE_MAX_OUTPUT_TOKENS)
863    }
864}
865
866#[cfg(test)]
867mod tests {
868    use super::*;
869
870    // ===================
871    // Constructor Tests
872    // ===================
873
874    #[test]
875    fn test_new_creates_provider_with_custom_model() {
876        let provider = AnthropicProvider::new("test-api-key", "custom-model");
877
878        assert_eq!(provider.model(), "custom-model");
879        assert_eq!(provider.provider(), "anthropic");
880    }
881
882    #[test]
883    fn test_haiku_factory_creates_haiku_provider() {
884        let provider = AnthropicProvider::haiku("test-api-key".to_string());
885
886        assert_eq!(provider.model(), MODEL_HAIKU_45);
887        assert_eq!(provider.provider(), "anthropic");
888    }
889
890    #[test]
891    fn test_only_anthropic_46_models_accept_adaptive_thinking() {
892        let sonnet_46 = AnthropicProvider::sonnet_46("test-api-key".to_string());
893        assert!(
894            sonnet_46
895                .validate_thinking_config(Some(&ThinkingConfig::adaptive()))
896                .is_ok()
897        );
898
899        let sonnet_45 = AnthropicProvider::sonnet_45("test-api-key".to_string());
900        let error = sonnet_45
901            .validate_thinking_config(Some(&ThinkingConfig::adaptive()))
902            .unwrap_err();
903        assert!(
904            error
905                .to_string()
906                .contains("adaptive thinking is not supported")
907        );
908    }
909
910    #[test]
911    fn test_anthropic_46_models_reject_budgeted_thinking() {
912        let sonnet_46 = AnthropicProvider::sonnet_46("test-api-key".to_string());
913        let error = sonnet_46
914            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
915            .unwrap_err();
916        assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
917    }
918
919    #[test]
920    fn test_opus_47_rejects_budgeted_thinking() {
921        // Opus 4.7 follows the same adaptive-only policy as 4.6. Without
922        // this guard the provider would serialise `thinking.type.enabled`
923        // and get a 400 back from the API — we want a clear SDK-level
924        // error instead.
925        let opus_47 = AnthropicProvider::opus_47("test-api-key".to_string());
926        let error = opus_47
927            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
928            .unwrap_err();
929        assert!(
930            error.to_string().contains("ThinkingConfig::adaptive()"),
931            "expected migration hint, got: {error}"
932        );
933    }
934
935    #[test]
936    fn test_opus_47_accepts_adaptive_thinking() {
937        let opus_47 = AnthropicProvider::opus_47("test-api-key".to_string());
938        assert!(
939            opus_47
940                .validate_thinking_config(Some(&ThinkingConfig::adaptive()))
941                .is_ok()
942        );
943        assert!(
944            opus_47
945                .validate_thinking_config(Some(&ThinkingConfig::adaptive_with_effort(
946                    agent_sdk_foundation::llm::Effort::High
947                )))
948                .is_ok()
949        );
950    }
951
952    #[test]
953    fn test_opus_47_factory_creates_opus_47_provider() {
954        let provider = AnthropicProvider::opus_47("test-api-key".to_string());
955        assert_eq!(provider.model(), MODEL_OPUS_47);
956        assert_eq!(provider.provider(), "anthropic");
957    }
958
959    #[test]
960    fn test_opus_48_rejects_budgeted_thinking() {
961        // Opus 4.8 follows the same adaptive-only policy as 4.6/4.7. Without
962        // this guard the provider would serialise `thinking.type.enabled`
963        // and get a 400 back from the API — we want a clear SDK-level
964        // error instead.
965        let opus_48 = AnthropicProvider::opus_48("test-api-key".to_string());
966        let error = opus_48
967            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
968            .unwrap_err();
969        assert!(
970            error.to_string().contains("ThinkingConfig::adaptive()"),
971            "expected migration hint, got: {error}"
972        );
973    }
974
975    #[test]
976    fn test_opus_48_accepts_adaptive_thinking() {
977        let opus_48 = AnthropicProvider::opus_48("test-api-key".to_string());
978        assert!(
979            opus_48
980                .validate_thinking_config(Some(&ThinkingConfig::adaptive()))
981                .is_ok()
982        );
983        assert!(
984            opus_48
985                .validate_thinking_config(Some(&ThinkingConfig::adaptive_with_effort(
986                    agent_sdk_foundation::llm::Effort::High
987                )))
988                .is_ok()
989        );
990    }
991
992    #[test]
993    fn test_opus_48_factory_creates_opus_48_provider() {
994        let provider = AnthropicProvider::opus_48("test-api-key".to_string());
995        assert_eq!(provider.model(), MODEL_OPUS_48);
996        assert_eq!(provider.provider(), "anthropic");
997    }
998
999    #[test]
1000    fn test_sonnet_factory_creates_sonnet_provider() {
1001        let provider = AnthropicProvider::sonnet("test-api-key".to_string());
1002
1003        assert_eq!(provider.model(), MODEL_SONNET_46);
1004        assert_eq!(provider.provider(), "anthropic");
1005    }
1006
1007    #[test]
1008    fn test_sonnet_45_factory_creates_sonnet_provider() {
1009        let provider = AnthropicProvider::sonnet_45("test-api-key".to_string());
1010
1011        assert_eq!(provider.model(), MODEL_SONNET_45);
1012        assert_eq!(provider.provider(), "anthropic");
1013    }
1014
1015    #[test]
1016    fn test_sonnet_46_factory_creates_sonnet_provider() {
1017        let provider = AnthropicProvider::sonnet_46("test-api-key".to_string());
1018
1019        assert_eq!(provider.model(), MODEL_SONNET_46);
1020        assert_eq!(provider.provider(), "anthropic");
1021    }
1022
1023    #[test]
1024    fn test_opus_factory_creates_opus_provider() {
1025        let provider = AnthropicProvider::opus("test-api-key".to_string());
1026
1027        assert_eq!(provider.model(), MODEL_OPUS_46);
1028        assert_eq!(provider.provider(), "anthropic");
1029    }
1030
1031    // ===================
1032    // Model Constants Tests
1033    // ===================
1034
1035    #[test]
1036    fn test_model_constants_have_expected_values() {
1037        assert!(MODEL_HAIKU_35.contains("haiku"));
1038        assert!(MODEL_SONNET_35.contains("sonnet"));
1039        assert!(MODEL_SONNET_4.contains("sonnet"));
1040        assert!(MODEL_SONNET_46.contains("sonnet"));
1041        assert!(MODEL_OPUS_4.contains("opus"));
1042    }
1043
1044    // ===================
1045    // Clone Tests
1046    // ===================
1047
1048    #[test]
1049    fn test_provider_is_cloneable() {
1050        let provider = AnthropicProvider::new("test-api-key", "test-model");
1051        let cloned = provider.clone();
1052
1053        assert_eq!(provider.model(), cloned.model());
1054        assert_eq!(provider.provider(), cloned.provider());
1055    }
1056}