Skip to main content

ai_usagebar/anthropic/
fetch.rs

1//! Stitches together: read creds → maybe-refresh → GET usage → cache result.
2//!
3//! Mirrors claudebar:402-491 — the lock + refresh + fetch state machine.
4
5use std::path::Path;
6use std::time::Duration;
7
8use chrono::Utc;
9
10use crate::cache::{Cache, acquire_lock};
11use crate::error::{AppError, Result};
12use crate::usage::AnthropicSnapshot;
13
14use super::creds::{self, OauthCreds};
15use super::oauth;
16use super::types::UsageResponse;
17
18pub const USAGE_URL: &str = "https://api.anthropic.com/api/oauth/usage";
19pub const USAGE_BETA_HEADER: &str = "oauth-2025-04-20";
20const HTTP_TIMEOUT: Duration = Duration::from_secs(10);
21const REFRESH_TIMEOUT: Duration = Duration::from_secs(25);
22const LOCK_TIMEOUT: Duration = Duration::from_secs(45);
23
24/// Endpoints (parameterized for tests).
25#[derive(Debug, Clone)]
26pub struct Endpoints {
27    pub usage: String,
28    pub token: String,
29}
30
31impl Default for Endpoints {
32    fn default() -> Self {
33        Self {
34            usage: USAGE_URL.into(),
35            token: oauth::TOKEN_URL.into(),
36        }
37    }
38}
39
40/// What we ultimately hand back to the renderer.
41#[derive(Debug, Clone)]
42pub struct FetchOutcome {
43    pub snapshot: AnthropicSnapshot,
44    /// True if this snapshot came from the on-disk cache because the live
45    /// fetch failed — the widget shows a `⏸` indicator in this case.
46    pub stale: bool,
47    /// Last fetch error, if any — drives the `.last_error` tooltip line.
48    pub last_error: Option<(u16, String)>,
49    /// When the on-disk cache was written. Drives the "Updated HH:MM" line.
50    pub cache_age: Option<Duration>,
51}
52
53/// High-level entry point. Reads creds, refreshes if needed, fetches usage,
54/// writes back the cache, and returns the snapshot — falling back to cache on
55/// failure. All under a flock so multi-monitor Waybar instances coexist.
56pub async fn fetch_snapshot(
57    client: &reqwest::Client,
58    creds_path: &Path,
59    cache: &Cache,
60    endpoints: &Endpoints,
61    cache_ttl: Duration,
62) -> Result<FetchOutcome> {
63    cache.ensure_dir()?;
64    let _lock = acquire_lock(&cache.lock_path(), LOCK_TIMEOUT)?;
65
66    // Fast path: cache is fresh, no work needed. We still need creds for the
67    // plan label though, so read them either way.
68    let mut creds = creds::read_from(creds_path)?;
69    let plan_label = creds.claude_ai_oauth.plan_label();
70
71    if let Some(bytes) = cache.fresh_payload(cache_ttl)? {
72        return Ok(reuse_cache(bytes, plan_label, cache, false));
73    }
74
75    // Maybe refresh.
76    let mut auth_ok = true;
77    let mut refresh_transient = false;
78    let now = Utc::now().timestamp();
79    if oauth::needs_refresh(creds.claude_ai_oauth.expires_at_secs(), now) {
80        match tokio::time::timeout(
81            REFRESH_TIMEOUT,
82            oauth::refresh(
83                client,
84                &endpoints.token,
85                &creds.claude_ai_oauth.refresh_token,
86            ),
87        )
88        .await
89        {
90            Ok(Ok(rr)) => {
91                creds.claude_ai_oauth.access_token = rr.access_token;
92                if let Some(new_rt) = rr.refresh_token {
93                    creds.claude_ai_oauth.refresh_token = new_rt;
94                }
95                creds.claude_ai_oauth.expires_at_ms =
96                    Utc::now().timestamp_millis() + (rr.expires_in as i64) * 1000;
97                // Best-effort persist; the refresh worked, so callers should
98                // still see fresh data even if writing the cred file failed.
99                let _ = creds::write_back(creds_path, &creds.claude_ai_oauth);
100            }
101            Ok(Err(AppError::Http { status, body })) => {
102                auth_ok = false;
103                cache.write_last_error(status, &body);
104            }
105            Ok(Err(e)) if e.is_transient() => {
106                auth_ok = false;
107                refresh_transient = true;
108            }
109            Ok(Err(e)) => {
110                auth_ok = false;
111                cache.write_last_error(0, &e.to_string());
112            }
113            Err(_elapsed) => {
114                auth_ok = false;
115                refresh_transient = true;
116            }
117        }
118    }
119
120    if !auth_ok {
121        return handle_auth_failure(cache, plan_label, refresh_transient);
122    }
123
124    // Fetch usage.
125    match tokio::time::timeout(
126        HTTP_TIMEOUT,
127        fetch_usage(client, &endpoints.usage, &creds.claude_ai_oauth),
128    )
129    .await
130    {
131        Ok(Ok(bytes)) => {
132            cache.write_payload(&bytes)?;
133            let snap = parse_payload(&bytes, plan_label.clone())?;
134            Ok(FetchOutcome {
135                snapshot: snap,
136                stale: false,
137                last_error: None,
138                cache_age: Some(Duration::ZERO),
139            })
140        }
141        Ok(Err(AppError::Http { status, body })) => {
142            cache.mark_stale();
143            cache.write_last_error(status, &body);
144            fallback_to_cache(cache, plan_label, Some((status, body)))
145        }
146        Ok(Err(e)) if e.is_transient() => {
147            // Reuse cache silently; no last_error write.
148            fallback_to_cache_silent(cache, plan_label)
149        }
150        Ok(Err(e)) => {
151            cache.mark_stale();
152            cache.write_last_error(0, &e.to_string());
153            fallback_to_cache(cache, plan_label, Some((0, e.to_string())))
154        }
155        Err(_elapsed) => fallback_to_cache_silent(cache, plan_label),
156    }
157}
158
159fn reuse_cache(bytes: Vec<u8>, plan_label: String, cache: &Cache, stale: bool) -> FetchOutcome {
160    let snap =
161        parse_payload(&bytes, plan_label).unwrap_or_else(|_| empty_snapshot("Unknown".into()));
162    FetchOutcome {
163        snapshot: snap,
164        stale,
165        last_error: cache.read_last_error(),
166        cache_age: cache.payload_age(),
167    }
168}
169
170fn fallback_to_cache(
171    cache: &Cache,
172    plan_label: String,
173    last_error: Option<(u16, String)>,
174) -> Result<FetchOutcome> {
175    let Some(bytes) = cache.maybe_payload()? else {
176        return Err(AppError::Other("no usable cache".into()));
177    };
178    let snap = parse_payload(&bytes, plan_label)?;
179    Ok(FetchOutcome {
180        snapshot: snap,
181        stale: true,
182        last_error,
183        cache_age: cache.payload_age(),
184    })
185}
186
187fn fallback_to_cache_silent(cache: &Cache, plan_label: String) -> Result<FetchOutcome> {
188    let Some(bytes) = cache.maybe_payload()? else {
189        return Err(AppError::Transport(
190            "no cache and network unreachable".into(),
191        ));
192    };
193    let snap = parse_payload(&bytes, plan_label)?;
194    Ok(FetchOutcome {
195        snapshot: snap,
196        stale: true,
197        last_error: cache.read_last_error(),
198        cache_age: cache.payload_age(),
199    })
200}
201
202fn handle_auth_failure(cache: &Cache, plan_label: String, transient: bool) -> Result<FetchOutcome> {
203    let Some(bytes) = cache.maybe_payload()? else {
204        return if transient {
205            Err(AppError::Transport(
206                "no cache and refresh failed transiently".into(),
207            ))
208        } else {
209            Err(AppError::Credentials(
210                "token refresh failed; run `claude` to re-auth".into(),
211            ))
212        };
213    };
214    let snap = parse_payload(&bytes, plan_label)?;
215    Ok(FetchOutcome {
216        snapshot: snap,
217        stale: true,
218        last_error: cache.read_last_error(),
219        cache_age: cache.payload_age(),
220    })
221}
222
223fn parse_payload(bytes: &[u8], plan_label: String) -> Result<AnthropicSnapshot> {
224    let resp: UsageResponse = serde_json::from_slice(bytes)?;
225    Ok(resp.into_snapshot(plan_label))
226}
227
228fn empty_snapshot(plan_label: String) -> AnthropicSnapshot {
229    UsageResponse::default().into_snapshot(plan_label)
230}
231
232async fn fetch_usage(client: &reqwest::Client, url: &str, creds: &OauthCreds) -> Result<Vec<u8>> {
233    let resp = client
234        .get(url)
235        .header("Authorization", format!("Bearer {}", creds.access_token))
236        .header("anthropic-beta", USAGE_BETA_HEADER)
237        .send()
238        .await?;
239
240    let status = resp.status();
241    let bytes = resp.bytes().await?;
242
243    if status.is_success() {
244        // Validate it's a usage shape — keep claudebar's "must have five_hour"
245        // sanity check (claudebar:385).
246        let _: UsageResponse = serde_json::from_slice(&bytes)
247            .map_err(|e| AppError::Schema(format!("usage response unparseable: {e}")))?;
248        Ok(bytes.to_vec())
249    } else {
250        let body = String::from_utf8_lossy(&bytes).into_owned();
251        let msg =
252            oauth::parse_error_body(&body).unwrap_or_else(|| body.chars().take(200).collect());
253        Err(AppError::Http {
254            status: status.as_u16(),
255            body: msg,
256        })
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263    use std::io::Write;
264    use tempfile::{NamedTempFile, TempDir};
265
266    fn future_creds() -> NamedTempFile {
267        let mut f = NamedTempFile::new().unwrap();
268        // Expires 1 hour from now → no refresh needed in tests.
269        let expires_ms = (Utc::now().timestamp_millis()) + 3_600_000;
270        let s = format!(
271            r#"{{"claudeAiOauth":{{
272                "accessToken":"AT","refreshToken":"RT",
273                "expiresAt": {expires_ms},
274                "subscriptionType":"max","rateLimitTier":"default_claude_max_5x"
275            }}}}"#
276        );
277        f.write_all(s.as_bytes()).unwrap();
278        f.flush().unwrap();
279        f
280    }
281
282    fn cache_fixture() -> (TempDir, Cache) {
283        let td = TempDir::new().unwrap();
284        let cache = Cache::at(td.path().join("anthropic"));
285        cache.ensure_dir().unwrap();
286        (td, cache)
287    }
288
289    #[tokio::test]
290    async fn fresh_cache_skips_network() {
291        let (_td, cache) = cache_fixture();
292        cache
293            .write_payload(
294                br#"{"five_hour":{"utilization":42,"resets_at":"2026-05-23T17:30:00Z"},
295                     "seven_day":{"utilization":15,"resets_at":"2026-05-30T12:00:00Z"}}"#,
296            )
297            .unwrap();
298
299        let creds = future_creds();
300        let client = reqwest::Client::new();
301        let endpoints = Endpoints {
302            usage: "http://localhost:1/should-not-be-called".into(),
303            token: "http://localhost:1/should-not-be-called".into(),
304        };
305        let outcome = fetch_snapshot(
306            &client,
307            creds.path(),
308            &cache,
309            &endpoints,
310            Duration::from_secs(60),
311        )
312        .await
313        .unwrap();
314        assert_eq!(outcome.snapshot.session.utilization_pct, 42);
315        assert!(!outcome.stale);
316    }
317
318    #[tokio::test]
319    async fn live_fetch_writes_cache_and_returns_snapshot() {
320        let mut server = mockito::Server::new_async().await;
321        let m = server
322            .mock("GET", "/api/oauth/usage")
323            .with_status(200)
324            .with_body(
325                r#"{"five_hour":{"utilization":50,"resets_at":"2026-05-23T17:30:00Z"},
326                    "seven_day":{"utilization":25,"resets_at":"2026-05-30T12:00:00Z"}}"#,
327            )
328            .create_async()
329            .await;
330
331        let (_td, cache) = cache_fixture();
332        let creds = future_creds();
333        let client = reqwest::Client::new();
334        let endpoints = Endpoints {
335            usage: format!("{}/api/oauth/usage", server.url()),
336            token: format!("{}/v1/oauth/token", server.url()),
337        };
338        let outcome = fetch_snapshot(
339            &client,
340            creds.path(),
341            &cache,
342            &endpoints,
343            Duration::from_secs(0),
344        )
345        .await
346        .unwrap();
347        assert_eq!(outcome.snapshot.session.utilization_pct, 50);
348        assert!(!outcome.stale);
349        m.assert_async().await;
350        // Cache should now exist.
351        assert!(cache.maybe_payload().unwrap().is_some());
352    }
353
354    #[tokio::test]
355    async fn http_429_falls_back_to_stale_cache() {
356        let mut server = mockito::Server::new_async().await;
357        server
358            .mock("GET", "/api/oauth/usage")
359            .with_status(429)
360            .with_body(r#"{"error":{"type":"rate_limit_error","message":"slow down"}}"#)
361            .create_async()
362            .await;
363
364        let (_td, cache) = cache_fixture();
365        cache
366            .write_payload(
367                br#"{"five_hour":{"utilization":12,"resets_at":"2026-05-23T17:30:00Z"},
368                     "seven_day":{"utilization":5,"resets_at":"2026-05-30T12:00:00Z"}}"#,
369            )
370            .unwrap();
371        // Force the cache to be considered stale by setting TTL = 0.
372        let creds = future_creds();
373        let client = reqwest::Client::new();
374        let endpoints = Endpoints {
375            usage: format!("{}/api/oauth/usage", server.url()),
376            token: format!("{}/v1/oauth/token", server.url()),
377        };
378        let outcome = fetch_snapshot(
379            &client,
380            creds.path(),
381            &cache,
382            &endpoints,
383            Duration::from_secs(0),
384        )
385        .await
386        .unwrap();
387        assert!(outcome.stale);
388        assert_eq!(outcome.snapshot.session.utilization_pct, 12);
389        assert_eq!(outcome.last_error.as_ref().map(|(c, _)| *c), Some(429));
390        assert_eq!(
391            outcome.last_error.as_ref().map(|(_, m)| m.as_str()),
392            Some("slow down")
393        );
394    }
395
396    #[tokio::test]
397    async fn no_cache_and_no_network_returns_error() {
398        // Point at a closed port so we get a transport error.
399        let (_td, cache) = cache_fixture();
400        let creds = future_creds();
401        let client = reqwest::Client::builder()
402            .timeout(Duration::from_millis(200))
403            .build()
404            .unwrap();
405        let endpoints = Endpoints {
406            usage: "http://127.0.0.1:1/api/oauth/usage".into(),
407            token: "http://127.0.0.1:1/v1/oauth/token".into(),
408        };
409        let err = fetch_snapshot(
410            &client,
411            creds.path(),
412            &cache,
413            &endpoints,
414            Duration::from_secs(0),
415        )
416        .await
417        .unwrap_err();
418        assert!(err.is_transient(), "expected transient error, got {err:?}");
419    }
420}