Skip to main content

dot/provider/anthropic/
mod.rs

1mod auth;
2mod stream;
3mod types;
4
5use auth::{AnthropicAuth, AuthResolved, refresh_oauth_token};
6use stream::process_sse_stream;
7use types::{AnthropicRequest, convert_messages, convert_tools};
8
9use std::{
10    future::Future,
11    pin::Pin,
12    time::{SystemTime, UNIX_EPOCH},
13};
14
15use anyhow::Context;
16use tokio::sync::{mpsc, mpsc::UnboundedReceiver};
17use tracing::warn;
18
19use crate::provider::{Message, Provider, StreamEvent, StreamEventType, ToolDefinition};
20
21pub struct AnthropicProvider {
22    client: reqwest::Client,
23    model: String,
24    auth: tokio::sync::Mutex<AnthropicAuth>,
25    cached_models: std::sync::Mutex<Option<Vec<String>>>,
26}
27
28impl AnthropicProvider {
29    pub fn new_with_api_key(api_key: impl Into<String>, model: impl Into<String>) -> Self {
30        Self {
31            client: reqwest::Client::builder()
32                .user_agent("dot/0.1.0")
33                .build()
34                .expect("Failed to build reqwest client"),
35            model: model.into(),
36            auth: tokio::sync::Mutex::new(AnthropicAuth::ApiKey(api_key.into())),
37            cached_models: std::sync::Mutex::new(None),
38        }
39    }
40
41    pub fn new_with_oauth(
42        access_token: impl Into<String>,
43        refresh_token: impl Into<String>,
44        expires_at: i64,
45        model: impl Into<String>,
46    ) -> Self {
47        Self {
48            client: reqwest::Client::builder()
49                .user_agent("claude-code/2.1.49 (external, cli)")
50                .build()
51                .expect("Failed to build reqwest client"),
52            model: model.into(),
53            auth: tokio::sync::Mutex::new(AnthropicAuth::OAuth {
54                access_token: access_token.into(),
55                refresh_token: refresh_token.into(),
56                expires_at,
57            }),
58            cached_models: std::sync::Mutex::new(None),
59        }
60    }
61
62    async fn fetch_model_context_window(&self, model: &str) -> Option<u32> {
63        let auth = self.resolve_auth().await.ok()?;
64        let url = format!("https://api.anthropic.com/v1/models/{model}");
65        let mut req = self
66            .client
67            .get(&url)
68            .header(&auth.header_name, &auth.header_value)
69            .header("anthropic-version", "2023-06-01");
70        if auth.is_oauth {
71            req = req
72                .header(
73                    "anthropic-beta",
74                    "oauth-2025-04-20,interleaved-thinking-2025-05-14",
75                )
76                .header("user-agent", "claude-code/2.1.49 (external, cli)");
77        }
78        let data: serde_json::Value = req.send().await.ok()?.json().await.ok()?;
79        data["context_window"].as_u64().map(|v| v as u32)
80    }
81
82    fn model_context_window(model: &str) -> u32 {
83        if model.contains("claude") {
84            return 200_000;
85        }
86        0
87    }
88
89    async fn resolve_auth(&self) -> anyhow::Result<AuthResolved> {
90        let mut auth = self.auth.lock().await;
91        match &*auth {
92            AnthropicAuth::ApiKey(key) => Ok(AuthResolved {
93                header_name: "x-api-key".to_string(),
94                header_value: key.clone(),
95                is_oauth: false,
96            }),
97            AnthropicAuth::OAuth {
98                access_token,
99                refresh_token,
100                expires_at,
101            } => {
102                let now = SystemTime::now()
103                    .duration_since(UNIX_EPOCH)
104                    .unwrap_or_default()
105                    .as_secs() as i64;
106                // Handle legacy millis-format expires_at from older credentials
107                let expires_at_secs = if *expires_at > 1_000_000_000_000 {
108                    *expires_at / 1000
109                } else {
110                    *expires_at
111                };
112
113                let token = if now >= expires_at_secs - 60 {
114                    let rt = refresh_token.clone();
115                    match refresh_oauth_token(&self.client, &rt).await {
116                        Ok((new_token, new_expires_at)) => {
117                            if let AnthropicAuth::OAuth {
118                                access_token,
119                                expires_at,
120                                ..
121                            } = &mut *auth
122                            {
123                                *access_token = new_token.clone();
124                                *expires_at = new_expires_at;
125                            }
126                            new_token
127                        }
128                        Err(e) => {
129                            warn!("OAuth token refresh failed: {e}");
130                            access_token.clone()
131                        }
132                    }
133                } else {
134                    access_token.clone()
135                };
136
137                Ok(AuthResolved {
138                    header_name: "Authorization".to_string(),
139                    header_value: format!("Bearer {token}"),
140                    is_oauth: true,
141                })
142            }
143        }
144    }
145}
146
147impl Provider for AnthropicProvider {
148    fn name(&self) -> &str {
149        "anthropic"
150    }
151
152    fn model(&self) -> &str {
153        &self.model
154    }
155
156    fn set_model(&mut self, model: String) {
157        self.model = model;
158    }
159
160    fn available_models(&self) -> Vec<String> {
161        let cache = self.cached_models.lock().unwrap();
162        cache.clone().unwrap_or_default()
163    }
164
165    fn context_window(&self) -> u32 {
166        Self::model_context_window(&self.model)
167    }
168
169    fn supports_server_compaction(&self) -> bool {
170        true
171    }
172
173    fn fetch_context_window(
174        &self,
175    ) -> Pin<Box<dyn Future<Output = anyhow::Result<u32>> + Send + '_>> {
176        Box::pin(async move {
177            if let Some(cw) = self.fetch_model_context_window(&self.model).await {
178                return Ok(cw);
179            }
180            Ok(Self::model_context_window(&self.model))
181        })
182    }
183
184    fn fetch_models(
185        &self,
186    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<String>>> + Send + '_>> {
187        Box::pin(async move {
188            {
189                let cache = self.cached_models.lock().unwrap();
190                if let Some(ref models) = *cache {
191                    return Ok(models.clone());
192                }
193            }
194            let auth = self.resolve_auth().await?;
195            let mut all_models: Vec<String> = Vec::new();
196            let mut after_id: Option<String> = None;
197
198            loop {
199                let mut url = "https://api.anthropic.com/v1/models?limit=1000".to_string();
200                if let Some(ref cursor) = after_id {
201                    url.push_str(&format!("&after_id={cursor}"));
202                }
203
204                let mut req = self
205                    .client
206                    .get(&url)
207                    .header(&auth.header_name, &auth.header_value)
208                    .header("anthropic-version", "2023-06-01");
209
210                if auth.is_oauth {
211                    req = req
212                        .header(
213                            "anthropic-beta",
214                            "oauth-2025-04-20,interleaved-thinking-2025-05-14",
215                        )
216                        .header("user-agent", "claude-code/2.1.49 (external, cli)");
217                }
218
219                let resp = req
220                    .send()
221                    .await
222                    .context("Failed to fetch Anthropic models")?;
223
224                if !resp.status().is_success() {
225                    let status = resp.status();
226                    let body = resp.text().await.unwrap_or_default();
227                    return Err(anyhow::anyhow!(
228                        "Anthropic models API error {status}: {body}"
229                    ));
230                }
231
232                let data: serde_json::Value = resp
233                    .json()
234                    .await
235                    .context("Failed to parse Anthropic models response")?;
236
237                if let Some(arr) = data["data"].as_array() {
238                    for m in arr {
239                        if let Some(id) = m["id"].as_str() {
240                            all_models.push(id.to_string());
241                        }
242                    }
243                }
244
245                let has_more = data["has_more"].as_bool().unwrap_or(false);
246                if !has_more {
247                    break;
248                }
249
250                match data["last_id"].as_str() {
251                    Some(last) => after_id = Some(last.to_string()),
252                    None => break,
253                }
254            }
255
256            if all_models.is_empty() {
257                return Err(anyhow::anyhow!("Anthropic models API returned empty list"));
258            }
259
260            all_models.sort();
261            let mut cache = self.cached_models.lock().unwrap();
262            *cache = Some(all_models.clone());
263
264            Ok(all_models)
265        })
266    }
267
268    fn stream(
269        &self,
270        messages: &[Message],
271        system: Option<&str>,
272        tools: &[ToolDefinition],
273        max_tokens: u32,
274        thinking_budget: u32,
275    ) -> Pin<Box<dyn Future<Output = anyhow::Result<UnboundedReceiver<StreamEvent>>> + Send + '_>>
276    {
277        self.stream_with_model(
278            &self.model,
279            messages,
280            system,
281            tools,
282            max_tokens,
283            thinking_budget,
284        )
285    }
286
287    fn stream_with_model(
288        &self,
289        model: &str,
290        messages: &[Message],
291        system: Option<&str>,
292        tools: &[ToolDefinition],
293        max_tokens: u32,
294        thinking_budget: u32,
295    ) -> Pin<Box<dyn Future<Output = anyhow::Result<UnboundedReceiver<StreamEvent>>> + Send + '_>>
296    {
297        let messages = messages.to_vec();
298        let system = system.map(String::from);
299        let tools = tools.to_vec();
300        let model = model.to_string();
301
302        Box::pin(async move {
303            let auth = self.resolve_auth().await?;
304
305            let url = if auth.is_oauth {
306                "https://api.anthropic.com/v1/messages?beta=true".to_string()
307            } else {
308                "https://api.anthropic.com/v1/messages".to_string()
309            };
310
311            let thinking = if thinking_budget >= 1024 {
312                Some(serde_json::json!({
313                    "type": "enabled",
314                    "budget_tokens": thinking_budget,
315                }))
316            } else {
317                None
318            };
319
320            let effective_max_tokens = if thinking_budget >= 1024 {
321                max_tokens.max(thinking_budget.saturating_add(4096))
322            } else {
323                max_tokens
324            };
325
326            let context_management = Some(serde_json::json!({
327                "edits": [{ "type": "compact_20260112" }]
328            }));
329
330            let body = AnthropicRequest {
331                model: &model,
332                messages: convert_messages(&messages),
333                max_tokens: effective_max_tokens,
334                stream: true,
335                system: system.as_deref(),
336                tools: convert_tools(&tools),
337                temperature: 1.0,
338                thinking,
339                context_management,
340            };
341
342            let mut req_builder = self
343                .client
344                .post(&url)
345                .header(&auth.header_name, &auth.header_value)
346                .header("anthropic-version", "2023-06-01")
347                .header("content-type", "application/json");
348
349            let compact_beta = "compact-2026-01-12";
350            if auth.is_oauth {
351                req_builder = req_builder
352                    .header(
353                        "anthropic-beta",
354                        format!("oauth-2025-04-20,interleaved-thinking-2025-05-14,{compact_beta}"),
355                    )
356                    .header("user-agent", "claude-code/2.1.49 (external, cli)");
357            } else if thinking_budget >= 1024 {
358                req_builder = req_builder.header(
359                    "anthropic-beta",
360                    format!("interleaved-thinking-2025-05-14,{compact_beta}"),
361                );
362            } else {
363                req_builder = req_builder.header("anthropic-beta", compact_beta);
364            }
365
366            let response = req_builder
367                .json(&body)
368                .send()
369                .await
370                .context("Failed to connect to Anthropic API")?;
371
372            if !response.status().is_success() {
373                let status = response.status();
374                let body_text = response.text().await.unwrap_or_default();
375                return Err(anyhow::anyhow!("Anthropic API error {status}: {body_text}"));
376            }
377
378            let (tx, rx) = mpsc::unbounded_channel::<StreamEvent>();
379            let tx_clone = tx.clone();
380
381            tokio::spawn(async move {
382                if let Err(e) = process_sse_stream(response, tx_clone.clone()).await {
383                    let _ = tx_clone.send(StreamEvent {
384                        event_type: StreamEventType::Error(e.to_string()),
385                    });
386                }
387            });
388
389            Ok(rx)
390        })
391    }
392}