1use std::path::Path;
6use std::time::Duration;
7
8use chrono::Utc;
9
10use crate::cache::{Cache, acquire_lock};
11use crate::error::{AppError, Result};
12use crate::usage::AnthropicSnapshot;
13
14use super::creds::{self, OauthCreds};
15use super::oauth;
16use super::types::UsageResponse;
17
18pub const USAGE_URL: &str = "https://api.anthropic.com/api/oauth/usage";
19pub const USAGE_BETA_HEADER: &str = "oauth-2025-04-20";
20const HTTP_TIMEOUT: Duration = Duration::from_secs(10);
21const REFRESH_TIMEOUT: Duration = Duration::from_secs(25);
22const LOCK_TIMEOUT: Duration = Duration::from_secs(45);
23
24#[derive(Debug, Clone)]
26pub struct Endpoints {
27 pub usage: String,
28 pub token: String,
29}
30
31impl Default for Endpoints {
32 fn default() -> Self {
33 Self {
34 usage: USAGE_URL.into(),
35 token: oauth::TOKEN_URL.into(),
36 }
37 }
38}
39
40#[derive(Debug, Clone)]
42pub struct FetchOutcome {
43 pub snapshot: AnthropicSnapshot,
44 pub stale: bool,
47 pub last_error: Option<(u16, String)>,
49 pub cache_age: Option<Duration>,
51}
52
53pub async fn fetch_snapshot(
57 client: &reqwest::Client,
58 creds_path: &Path,
59 cache: &Cache,
60 endpoints: &Endpoints,
61 cache_ttl: Duration,
62) -> Result<FetchOutcome> {
63 cache.ensure_dir()?;
64 let _lock = acquire_lock(&cache.lock_path(), LOCK_TIMEOUT)?;
65
66 let mut creds = creds::read_from(creds_path)?;
69 let plan_label = creds.claude_ai_oauth.plan_label();
70
71 if let Some(bytes) = cache.fresh_payload(cache_ttl)? {
72 return Ok(reuse_cache(bytes, plan_label, cache, false));
73 }
74
75 let mut auth_ok = true;
77 let mut refresh_transient = false;
78 let now = Utc::now().timestamp();
79 if oauth::needs_refresh(creds.claude_ai_oauth.expires_at_secs(), now) {
80 match tokio::time::timeout(
81 REFRESH_TIMEOUT,
82 oauth::refresh(
83 client,
84 &endpoints.token,
85 &creds.claude_ai_oauth.refresh_token,
86 ),
87 )
88 .await
89 {
90 Ok(Ok(rr)) => {
91 creds.claude_ai_oauth.access_token = rr.access_token;
92 if let Some(new_rt) = rr.refresh_token {
93 creds.claude_ai_oauth.refresh_token = new_rt;
94 }
95 creds.claude_ai_oauth.expires_at_ms =
96 Utc::now().timestamp_millis() + (rr.expires_in as i64) * 1000;
97 let _ = creds::write_back(creds_path, &creds.claude_ai_oauth);
100 }
101 Ok(Err(AppError::Http { status, body })) => {
102 auth_ok = false;
103 cache.write_last_error(status, &body);
104 }
105 Ok(Err(e)) if e.is_transient() => {
106 auth_ok = false;
107 refresh_transient = true;
108 }
109 Ok(Err(e)) => {
110 auth_ok = false;
111 cache.write_last_error(0, &e.to_string());
112 }
113 Err(_elapsed) => {
114 auth_ok = false;
115 refresh_transient = true;
116 }
117 }
118 }
119
120 if !auth_ok {
121 return handle_auth_failure(cache, plan_label, refresh_transient);
122 }
123
124 match tokio::time::timeout(
126 HTTP_TIMEOUT,
127 fetch_usage(client, &endpoints.usage, &creds.claude_ai_oauth),
128 )
129 .await
130 {
131 Ok(Ok(bytes)) => {
132 cache.write_payload(&bytes)?;
133 let snap = parse_payload(&bytes, plan_label.clone())?;
134 Ok(FetchOutcome {
135 snapshot: snap,
136 stale: false,
137 last_error: None,
138 cache_age: Some(Duration::ZERO),
139 })
140 }
141 Ok(Err(AppError::Http { status, body })) => {
142 cache.mark_stale();
143 cache.write_last_error(status, &body);
144 fallback_to_cache(cache, plan_label, Some((status, body)))
145 }
146 Ok(Err(e)) if e.is_transient() => {
147 fallback_to_cache_silent(cache, plan_label)
149 }
150 Ok(Err(e)) => {
151 cache.mark_stale();
152 cache.write_last_error(0, &e.to_string());
153 fallback_to_cache(cache, plan_label, Some((0, e.to_string())))
154 }
155 Err(_elapsed) => fallback_to_cache_silent(cache, plan_label),
156 }
157}
158
159fn reuse_cache(bytes: Vec<u8>, plan_label: String, cache: &Cache, stale: bool) -> FetchOutcome {
160 let snap =
161 parse_payload(&bytes, plan_label).unwrap_or_else(|_| empty_snapshot("Unknown".into()));
162 FetchOutcome {
163 snapshot: snap,
164 stale,
165 last_error: cache.read_last_error(),
166 cache_age: cache.payload_age(),
167 }
168}
169
170fn fallback_to_cache(
171 cache: &Cache,
172 plan_label: String,
173 last_error: Option<(u16, String)>,
174) -> Result<FetchOutcome> {
175 let Some(bytes) = cache.maybe_payload()? else {
176 return Err(AppError::Other("no usable cache".into()));
177 };
178 let snap = parse_payload(&bytes, plan_label)?;
179 Ok(FetchOutcome {
180 snapshot: snap,
181 stale: true,
182 last_error,
183 cache_age: cache.payload_age(),
184 })
185}
186
187fn fallback_to_cache_silent(cache: &Cache, plan_label: String) -> Result<FetchOutcome> {
188 let Some(bytes) = cache.maybe_payload()? else {
189 return Err(AppError::Transport(
190 "no cache and network unreachable".into(),
191 ));
192 };
193 let snap = parse_payload(&bytes, plan_label)?;
194 Ok(FetchOutcome {
195 snapshot: snap,
196 stale: true,
197 last_error: cache.read_last_error(),
198 cache_age: cache.payload_age(),
199 })
200}
201
202fn handle_auth_failure(cache: &Cache, plan_label: String, transient: bool) -> Result<FetchOutcome> {
203 let Some(bytes) = cache.maybe_payload()? else {
204 return if transient {
205 Err(AppError::Transport(
206 "no cache and refresh failed transiently".into(),
207 ))
208 } else {
209 Err(AppError::Credentials(
210 "token refresh failed; run `claude` to re-auth".into(),
211 ))
212 };
213 };
214 let snap = parse_payload(&bytes, plan_label)?;
215 Ok(FetchOutcome {
216 snapshot: snap,
217 stale: true,
218 last_error: cache.read_last_error(),
219 cache_age: cache.payload_age(),
220 })
221}
222
223fn parse_payload(bytes: &[u8], plan_label: String) -> Result<AnthropicSnapshot> {
224 let resp: UsageResponse = serde_json::from_slice(bytes)?;
225 Ok(resp.into_snapshot(plan_label))
226}
227
228fn empty_snapshot(plan_label: String) -> AnthropicSnapshot {
229 UsageResponse::default().into_snapshot(plan_label)
230}
231
232async fn fetch_usage(client: &reqwest::Client, url: &str, creds: &OauthCreds) -> Result<Vec<u8>> {
233 let resp = client
234 .get(url)
235 .header("Authorization", format!("Bearer {}", creds.access_token))
236 .header("anthropic-beta", USAGE_BETA_HEADER)
237 .send()
238 .await?;
239
240 let status = resp.status();
241 let bytes = resp.bytes().await?;
242
243 if status.is_success() {
244 let _: UsageResponse = serde_json::from_slice(&bytes)
247 .map_err(|e| AppError::Schema(format!("usage response unparseable: {e}")))?;
248 Ok(bytes.to_vec())
249 } else {
250 let body = String::from_utf8_lossy(&bytes).into_owned();
251 let msg =
252 oauth::parse_error_body(&body).unwrap_or_else(|| body.chars().take(200).collect());
253 Err(AppError::Http {
254 status: status.as_u16(),
255 body: msg,
256 })
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263 use std::io::Write;
264 use tempfile::{NamedTempFile, TempDir};
265
266 fn future_creds() -> NamedTempFile {
267 let mut f = NamedTempFile::new().unwrap();
268 let expires_ms = (Utc::now().timestamp_millis()) + 3_600_000;
270 let s = format!(
271 r#"{{"claudeAiOauth":{{
272 "accessToken":"AT","refreshToken":"RT",
273 "expiresAt": {expires_ms},
274 "subscriptionType":"max","rateLimitTier":"default_claude_max_5x"
275 }}}}"#
276 );
277 f.write_all(s.as_bytes()).unwrap();
278 f.flush().unwrap();
279 f
280 }
281
282 fn cache_fixture() -> (TempDir, Cache) {
283 let td = TempDir::new().unwrap();
284 let cache = Cache::at(td.path().join("anthropic"));
285 cache.ensure_dir().unwrap();
286 (td, cache)
287 }
288
289 #[tokio::test]
290 async fn fresh_cache_skips_network() {
291 let (_td, cache) = cache_fixture();
292 cache
293 .write_payload(
294 br#"{"five_hour":{"utilization":42,"resets_at":"2026-05-23T17:30:00Z"},
295 "seven_day":{"utilization":15,"resets_at":"2026-05-30T12:00:00Z"}}"#,
296 )
297 .unwrap();
298
299 let creds = future_creds();
300 let client = reqwest::Client::new();
301 let endpoints = Endpoints {
302 usage: "http://localhost:1/should-not-be-called".into(),
303 token: "http://localhost:1/should-not-be-called".into(),
304 };
305 let outcome = fetch_snapshot(
306 &client,
307 creds.path(),
308 &cache,
309 &endpoints,
310 Duration::from_secs(60),
311 )
312 .await
313 .unwrap();
314 assert_eq!(outcome.snapshot.session.utilization_pct, 42);
315 assert!(!outcome.stale);
316 }
317
318 #[tokio::test]
319 async fn live_fetch_writes_cache_and_returns_snapshot() {
320 let mut server = mockito::Server::new_async().await;
321 let m = server
322 .mock("GET", "/api/oauth/usage")
323 .with_status(200)
324 .with_body(
325 r#"{"five_hour":{"utilization":50,"resets_at":"2026-05-23T17:30:00Z"},
326 "seven_day":{"utilization":25,"resets_at":"2026-05-30T12:00:00Z"}}"#,
327 )
328 .create_async()
329 .await;
330
331 let (_td, cache) = cache_fixture();
332 let creds = future_creds();
333 let client = reqwest::Client::new();
334 let endpoints = Endpoints {
335 usage: format!("{}/api/oauth/usage", server.url()),
336 token: format!("{}/v1/oauth/token", server.url()),
337 };
338 let outcome = fetch_snapshot(
339 &client,
340 creds.path(),
341 &cache,
342 &endpoints,
343 Duration::from_secs(0),
344 )
345 .await
346 .unwrap();
347 assert_eq!(outcome.snapshot.session.utilization_pct, 50);
348 assert!(!outcome.stale);
349 m.assert_async().await;
350 assert!(cache.maybe_payload().unwrap().is_some());
352 }
353
354 #[tokio::test]
355 async fn http_429_falls_back_to_stale_cache() {
356 let mut server = mockito::Server::new_async().await;
357 server
358 .mock("GET", "/api/oauth/usage")
359 .with_status(429)
360 .with_body(r#"{"error":{"type":"rate_limit_error","message":"slow down"}}"#)
361 .create_async()
362 .await;
363
364 let (_td, cache) = cache_fixture();
365 cache
366 .write_payload(
367 br#"{"five_hour":{"utilization":12,"resets_at":"2026-05-23T17:30:00Z"},
368 "seven_day":{"utilization":5,"resets_at":"2026-05-30T12:00:00Z"}}"#,
369 )
370 .unwrap();
371 let creds = future_creds();
373 let client = reqwest::Client::new();
374 let endpoints = Endpoints {
375 usage: format!("{}/api/oauth/usage", server.url()),
376 token: format!("{}/v1/oauth/token", server.url()),
377 };
378 let outcome = fetch_snapshot(
379 &client,
380 creds.path(),
381 &cache,
382 &endpoints,
383 Duration::from_secs(0),
384 )
385 .await
386 .unwrap();
387 assert!(outcome.stale);
388 assert_eq!(outcome.snapshot.session.utilization_pct, 12);
389 assert_eq!(outcome.last_error.as_ref().map(|(c, _)| *c), Some(429));
390 assert_eq!(
391 outcome.last_error.as_ref().map(|(_, m)| m.as_str()),
392 Some("slow down")
393 );
394 }
395
396 #[tokio::test]
397 async fn no_cache_and_no_network_returns_error() {
398 let (_td, cache) = cache_fixture();
400 let creds = future_creds();
401 let client = reqwest::Client::builder()
402 .timeout(Duration::from_millis(200))
403 .build()
404 .unwrap();
405 let endpoints = Endpoints {
406 usage: "http://127.0.0.1:1/api/oauth/usage".into(),
407 token: "http://127.0.0.1:1/v1/oauth/token".into(),
408 };
409 let err = fetch_snapshot(
410 &client,
411 creds.path(),
412 &cache,
413 &endpoints,
414 Duration::from_secs(0),
415 )
416 .await
417 .unwrap_err();
418 assert!(err.is_transient(), "expected transient error, got {err:?}");
419 }
420}