1mod auth;
2mod stream;
3mod types;
4
5use auth::{AnthropicAuth, AuthResolved, refresh_oauth_token};
6use stream::process_sse_stream;
7use types::{AnthropicRequest, convert_messages, convert_tools};
8
9use std::{
10 future::Future,
11 pin::Pin,
12 time::{SystemTime, UNIX_EPOCH},
13};
14
15use anyhow::Context;
16use tokio::sync::{mpsc, mpsc::UnboundedReceiver};
17use tracing::warn;
18
19use crate::provider::{Message, Provider, StreamEvent, StreamEventType, ToolDefinition};
20
21pub struct AnthropicProvider {
22 client: reqwest::Client,
23 model: String,
24 auth: tokio::sync::Mutex<AnthropicAuth>,
25 cached_models: std::sync::Mutex<Option<Vec<String>>>,
26}
27
28impl AnthropicProvider {
29 pub fn new_with_api_key(api_key: impl Into<String>, model: impl Into<String>) -> Self {
30 Self {
31 client: reqwest::Client::builder()
32 .user_agent("dot/0.1.0")
33 .build()
34 .expect("Failed to build reqwest client"),
35 model: model.into(),
36 auth: tokio::sync::Mutex::new(AnthropicAuth::ApiKey(api_key.into())),
37 cached_models: std::sync::Mutex::new(None),
38 }
39 }
40
41 pub fn new_with_oauth(
42 access_token: impl Into<String>,
43 refresh_token: impl Into<String>,
44 expires_at: i64,
45 model: impl Into<String>,
46 ) -> Self {
47 Self {
48 client: reqwest::Client::builder()
49 .user_agent("claude-code/2.1.49 (external, cli)")
50 .build()
51 .expect("Failed to build reqwest client"),
52 model: model.into(),
53 auth: tokio::sync::Mutex::new(AnthropicAuth::OAuth {
54 access_token: access_token.into(),
55 refresh_token: refresh_token.into(),
56 expires_at,
57 }),
58 cached_models: std::sync::Mutex::new(None),
59 }
60 }
61
62 async fn fetch_model_context_window(&self, model: &str) -> Option<u32> {
63 let auth = self.resolve_auth().await.ok()?;
64 let url = format!("https://api.anthropic.com/v1/models/{model}");
65 let mut req = self
66 .client
67 .get(&url)
68 .header(&auth.header_name, &auth.header_value)
69 .header("anthropic-version", "2023-06-01");
70 if auth.is_oauth {
71 req = req
72 .header(
73 "anthropic-beta",
74 "oauth-2025-04-20,interleaved-thinking-2025-05-14",
75 )
76 .header("user-agent", "claude-code/2.1.49 (external, cli)");
77 }
78 let data: serde_json::Value = req.send().await.ok()?.json().await.ok()?;
79 data["context_window"].as_u64().map(|v| v as u32)
80 }
81
82 fn model_context_window(model: &str) -> u32 {
83 if model.contains("claude") {
84 return 200_000;
85 }
86 0
87 }
88
89 async fn resolve_auth(&self) -> anyhow::Result<AuthResolved> {
90 let mut auth = self.auth.lock().await;
91 match &*auth {
92 AnthropicAuth::ApiKey(key) => Ok(AuthResolved {
93 header_name: "x-api-key".to_string(),
94 header_value: key.clone(),
95 is_oauth: false,
96 }),
97 AnthropicAuth::OAuth {
98 access_token,
99 refresh_token,
100 expires_at,
101 } => {
102 let now = SystemTime::now()
103 .duration_since(UNIX_EPOCH)
104 .unwrap_or_default()
105 .as_secs() as i64;
106 let expires_at_secs = if *expires_at > 1_000_000_000_000 {
108 *expires_at / 1000
109 } else {
110 *expires_at
111 };
112
113 let token = if now >= expires_at_secs - 60 {
114 let rt = refresh_token.clone();
115 match refresh_oauth_token(&self.client, &rt).await {
116 Ok((new_token, new_expires_at)) => {
117 if let AnthropicAuth::OAuth {
118 access_token,
119 expires_at,
120 ..
121 } = &mut *auth
122 {
123 *access_token = new_token.clone();
124 *expires_at = new_expires_at;
125 }
126 new_token
127 }
128 Err(e) => {
129 warn!("OAuth token refresh failed: {e}");
130 access_token.clone()
131 }
132 }
133 } else {
134 access_token.clone()
135 };
136
137 Ok(AuthResolved {
138 header_name: "Authorization".to_string(),
139 header_value: format!("Bearer {token}"),
140 is_oauth: true,
141 })
142 }
143 }
144 }
145}
146
147impl Provider for AnthropicProvider {
148 fn name(&self) -> &str {
149 "anthropic"
150 }
151
152 fn model(&self) -> &str {
153 &self.model
154 }
155
156 fn set_model(&mut self, model: String) {
157 self.model = model;
158 }
159
160 fn available_models(&self) -> Vec<String> {
161 let cache = self.cached_models.lock().unwrap();
162 cache.clone().unwrap_or_default()
163 }
164
165 fn context_window(&self) -> u32 {
166 Self::model_context_window(&self.model)
167 }
168
169 fn supports_server_compaction(&self) -> bool {
170 true
171 }
172
173 fn fetch_context_window(
174 &self,
175 ) -> Pin<Box<dyn Future<Output = anyhow::Result<u32>> + Send + '_>> {
176 Box::pin(async move {
177 if let Some(cw) = self.fetch_model_context_window(&self.model).await {
178 return Ok(cw);
179 }
180 Ok(Self::model_context_window(&self.model))
181 })
182 }
183
184 fn fetch_models(
185 &self,
186 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<String>>> + Send + '_>> {
187 Box::pin(async move {
188 {
189 let cache = self.cached_models.lock().unwrap();
190 if let Some(ref models) = *cache {
191 return Ok(models.clone());
192 }
193 }
194 let auth = self.resolve_auth().await?;
195 let mut all_models: Vec<String> = Vec::new();
196 let mut after_id: Option<String> = None;
197
198 loop {
199 let mut url = "https://api.anthropic.com/v1/models?limit=1000".to_string();
200 if let Some(ref cursor) = after_id {
201 url.push_str(&format!("&after_id={cursor}"));
202 }
203
204 let mut req = self
205 .client
206 .get(&url)
207 .header(&auth.header_name, &auth.header_value)
208 .header("anthropic-version", "2023-06-01");
209
210 if auth.is_oauth {
211 req = req
212 .header(
213 "anthropic-beta",
214 "oauth-2025-04-20,interleaved-thinking-2025-05-14",
215 )
216 .header("user-agent", "claude-code/2.1.49 (external, cli)");
217 }
218
219 let resp = req
220 .send()
221 .await
222 .context("Failed to fetch Anthropic models")?;
223
224 if !resp.status().is_success() {
225 let status = resp.status();
226 let body = resp.text().await.unwrap_or_default();
227 return Err(anyhow::anyhow!(
228 "Anthropic models API error {status}: {body}"
229 ));
230 }
231
232 let data: serde_json::Value = resp
233 .json()
234 .await
235 .context("Failed to parse Anthropic models response")?;
236
237 if let Some(arr) = data["data"].as_array() {
238 for m in arr {
239 if let Some(id) = m["id"].as_str() {
240 all_models.push(id.to_string());
241 }
242 }
243 }
244
245 let has_more = data["has_more"].as_bool().unwrap_or(false);
246 if !has_more {
247 break;
248 }
249
250 match data["last_id"].as_str() {
251 Some(last) => after_id = Some(last.to_string()),
252 None => break,
253 }
254 }
255
256 if all_models.is_empty() {
257 return Err(anyhow::anyhow!("Anthropic models API returned empty list"));
258 }
259
260 all_models.sort();
261 let mut cache = self.cached_models.lock().unwrap();
262 *cache = Some(all_models.clone());
263
264 Ok(all_models)
265 })
266 }
267
268 fn stream(
269 &self,
270 messages: &[Message],
271 system: Option<&str>,
272 tools: &[ToolDefinition],
273 max_tokens: u32,
274 thinking_budget: u32,
275 ) -> Pin<Box<dyn Future<Output = anyhow::Result<UnboundedReceiver<StreamEvent>>> + Send + '_>>
276 {
277 self.stream_with_model(
278 &self.model,
279 messages,
280 system,
281 tools,
282 max_tokens,
283 thinking_budget,
284 )
285 }
286
287 fn stream_with_model(
288 &self,
289 model: &str,
290 messages: &[Message],
291 system: Option<&str>,
292 tools: &[ToolDefinition],
293 max_tokens: u32,
294 thinking_budget: u32,
295 ) -> Pin<Box<dyn Future<Output = anyhow::Result<UnboundedReceiver<StreamEvent>>> + Send + '_>>
296 {
297 let messages = messages.to_vec();
298 let system = system.map(String::from);
299 let tools = tools.to_vec();
300 let model = model.to_string();
301
302 Box::pin(async move {
303 let auth = self.resolve_auth().await?;
304
305 let url = if auth.is_oauth {
306 "https://api.anthropic.com/v1/messages?beta=true".to_string()
307 } else {
308 "https://api.anthropic.com/v1/messages".to_string()
309 };
310
311 let thinking = if thinking_budget >= 1024 {
312 Some(serde_json::json!({
313 "type": "enabled",
314 "budget_tokens": thinking_budget,
315 }))
316 } else {
317 None
318 };
319
320 let effective_max_tokens = if thinking_budget >= 1024 {
321 max_tokens.max(thinking_budget.saturating_add(4096))
322 } else {
323 max_tokens
324 };
325
326 let context_management = Some(serde_json::json!({
327 "edits": [{ "type": "compact_20260112" }]
328 }));
329
330 let body = AnthropicRequest {
331 model: &model,
332 messages: convert_messages(&messages),
333 max_tokens: effective_max_tokens,
334 stream: true,
335 system: system.as_deref(),
336 tools: convert_tools(&tools),
337 temperature: 1.0,
338 thinking,
339 context_management,
340 };
341
342 let mut req_builder = self
343 .client
344 .post(&url)
345 .header(&auth.header_name, &auth.header_value)
346 .header("anthropic-version", "2023-06-01")
347 .header("content-type", "application/json");
348
349 let compact_beta = "compact-2026-01-12";
350 if auth.is_oauth {
351 req_builder = req_builder
352 .header(
353 "anthropic-beta",
354 format!("oauth-2025-04-20,interleaved-thinking-2025-05-14,{compact_beta}"),
355 )
356 .header("user-agent", "claude-code/2.1.49 (external, cli)");
357 } else if thinking_budget >= 1024 {
358 req_builder = req_builder.header(
359 "anthropic-beta",
360 format!("interleaved-thinking-2025-05-14,{compact_beta}"),
361 );
362 } else {
363 req_builder = req_builder.header("anthropic-beta", compact_beta);
364 }
365
366 let response = req_builder
367 .json(&body)
368 .send()
369 .await
370 .context("Failed to connect to Anthropic API")?;
371
372 if !response.status().is_success() {
373 let status = response.status();
374 let body_text = response.text().await.unwrap_or_default();
375 return Err(anyhow::anyhow!("Anthropic API error {status}: {body_text}"));
376 }
377
378 let (tx, rx) = mpsc::unbounded_channel::<StreamEvent>();
379 let tx_clone = tx.clone();
380
381 tokio::spawn(async move {
382 if let Err(e) = process_sse_stream(response, tx_clone.clone()).await {
383 let _ = tx_clone.send(StreamEvent {
384 event_type: StreamEventType::Error(e.to_string()),
385 });
386 }
387 });
388
389 Ok(rx)
390 })
391 }
392}