Skip to main content

linesmith_core/data_context/cascade/
mod.rs

1//! OAuth usage fallback cascade.
2//!
3//! Glues the slice modules (`cache`, `credentials`, `fetcher`, `jsonl`,
4//! `usage`) into the full cascade from `docs/specs/data-fetching.md`
5//! §OAuth fallback cascade. The orchestrator is a pure function keyed
6//! on injected dependencies so every branch is exercised without real
7//! I/O, network, or Keychain access.
8//!
9//! The lock-active short-circuit runs before the credentials read so
10//! a process observing another's backoff window can answer from disk
11//! (or the JSONL fallback) without paying the Keychain subprocess.
12//! `NoCredentials`-vs-`Timeout` masking is preserved because
13//! credentials are still resolved before any endpoint call that could
14//! time out.
15
16use std::sync::Arc;
17use std::time::Duration;
18
19use jiff::Timestamp;
20
21use super::cache::{CacheError, CacheStore, CachedUsage, Lock, LockStore};
22use super::credentials::Credentials;
23use super::error::{CredentialError, JsonlError, UsageError};
24use super::fetcher::{self, UsageTransport};
25use super::jsonl::{self, JsonlAggregate};
26use super::usage::{FiveHourWindow, JsonlUsage, SevenDayWindow, UsageApiResponse, UsageData};
27
28/// Default cache freshness window per
29/// `docs/specs/data-fetching.md` §OAuth usage cache stack.
30pub const DEFAULT_CACHE_DURATION: Duration = Duration::from_secs(180);
31
32/// Shorter TTL applied to error responses and lock-backoff windows
33/// for non-429 failures, per `docs/specs/data-fetching.md` §OAuth
34/// usage cache stack ("Error cache uses a shorter TTL (30s default)").
35pub const DEFAULT_ERROR_TTL: Duration = Duration::from_secs(30);
36
37/// Fallback backoff when a `429` arrives without a parseable
38/// `Retry-After`. Matches `DEFAULT_RATE_LIMIT_BACKOFF` in `fetcher.rs`
39/// (300s per ADR-0011 §Cache stack).
40pub const DEFAULT_RATE_LIMIT_BACKOFF: Duration = Duration::from_secs(300);
41
42/// Default endpoint base URL per ADR-0011 §Endpoint contract.
43pub const DEFAULT_API_BASE_URL: &str = "https://api.anthropic.com";
44
45/// Tunables threaded into [`resolve_usage`]. Out-of-box defaults match
46/// `docs/specs/data-fetching.md` §OAuth usage cache stack.
47#[derive(Debug, Clone)]
48pub struct UsageCascadeConfig {
49    pub api_base_url: String,
50    pub timeout: Duration,
51    pub cache_duration: Duration,
52}
53
54impl Default for UsageCascadeConfig {
55    fn default() -> Self {
56        Self {
57            api_base_url: DEFAULT_API_BASE_URL.into(),
58            timeout: fetcher::DEFAULT_TIMEOUT,
59            cache_duration: DEFAULT_CACHE_DURATION,
60        }
61    }
62}
63
64/// Resolve OAuth usage data using the full fallback cascade.
65///
66/// `credentials` and `jsonl` are lazily evaluated: the cascade does
67/// NOT invoke either on a fresh-cache or stale-lock-serve path,
68/// preserving the "no Keychain subprocess on cache hits" guarantee.
69/// `cache` and `lock` being `None` is equivalent to pointing at paths
70/// that don't exist: reads degrade to "miss" and writes are skipped.
71/// Write failures fall into two classes. Real bugs (disk full,
72/// missing parent dir, EACCES) log via `lsm_error!` (bypasses the
73/// level gate). The documented Windows MoveFileEx race-loser case
74/// logs via `lsm_debug!` (suppressible) so multi-terminal Windows
75/// users don't get persistent stderr noise on healthy runs. Either
76/// way the cascade still returns fetched data.
77pub fn resolve_usage(
78    cache: Option<&CacheStore>,
79    lock: Option<&LockStore>,
80    transport: &dyn UsageTransport,
81    credentials: &dyn Fn() -> Arc<Result<Credentials, CredentialError>>,
82    jsonl: &dyn Fn() -> Result<JsonlAggregate, JsonlError>,
83    now: &dyn Fn() -> Timestamp,
84    config: &UsageCascadeConfig,
85) -> Result<UsageData, UsageError> {
86    let cache_entry = read_cache(cache);
87    let lock_entry = read_lock(lock);
88    let now_ts = now();
89
90    if let Some(entry) = &cache_entry {
91        if is_fresh(entry, now_ts, config.cache_duration) {
92            if let Some(data) = entry.data.clone() {
93                return Ok(cached_to_usage_data(data));
94            }
95        }
96    }
97
98    let lock_active = lock_entry
99        .as_ref()
100        .is_some_and(|l| l.blocked_until > now_ts.as_second());
101    if lock_active {
102        // Serve whatever we have without touching credentials: another
103        // process is in backoff and we must honor it.
104        let lock_error = lock_entry.as_ref().and_then(|l| l.error.as_deref());
105        let lock_from_401 = lock_error == Some("Unauthorized");
106        if let Some(entry) = &cache_entry {
107            // A lock from a 401 means the cached `data` was fetched
108            // with a now-revoked token; skip the stale-serve so
109            // invocation B (after A's 401) doesn't return the pre-401
110            // payload through this branch. Other lock errors (429,
111            // timeout) still serve stale — those are transient and
112            // the cached data was legitimately valid when fetched.
113            if !lock_from_401 {
114                if let Some(data) = entry.data.clone() {
115                    return Ok(cached_to_usage_data(data));
116                }
117            }
118            if let Some(cached) = &entry.error {
119                return jsonl_or(jsonl, now_ts, usage_error_from_code(&cached.code));
120            }
121        }
122        // No cache content (or a 401-lock bypassed the data entry):
123        // try JSONL before surfacing the lock's own error hint.
124        // Crucially, we still do NOT reach the endpoint — that would
125        // defeat the cross-process spam guard on cold-cache starts.
126        let lock_err = lock_error
127            .map(usage_error_from_code)
128            .unwrap_or(UsageError::RateLimited { retry_after: None });
129        return jsonl_or(jsonl, now_ts, lock_err);
130    }
131
132    let creds_arc = credentials();
133    let creds = match &*creds_arc {
134        Ok(c) => c.clone(),
135        // INVARIANT: credential failures never write a failure-lock.
136        // They're not network transients — the same error will recur
137        // on every invocation until the user fixes their creds file /
138        // Keychain ACL, so a lock would just replay the error and
139        // delay recovery. If a future CredentialError variant becomes
140        // genuinely retry-stable, add a matching `write_failure_lock`
141        // here and update the test suite accordingly.
142        Err(CredentialError::NoCredentials) => {
143            return jsonl_or(jsonl, now_ts, UsageError::NoCredentials)
144        }
145        Err(other) => {
146            // Preserve the specific variant so `rate-limit-segments.md`
147            // §Error message table can render `[Keychain error]` /
148            // `[Credentials unreadable]` etc. The `Clone` impl on
149            // `CredentialError` is lossy for io/serde inner errors but
150            // keeps the variant tag (all segments key off) intact.
151            return jsonl_or(jsonl, now_ts, UsageError::Credentials(other.clone()));
152        }
153    };
154
155    match fetcher::fetch_usage(transport, &config.api_base_url, &creds, config.timeout) {
156        Ok(response) => {
157            write_cache(cache, CachedUsage::with_data(response.clone()));
158            write_lock(
159                lock,
160                Lock {
161                    blocked_until: add_secs(now_ts.as_second(), config.cache_duration),
162                    error: None,
163                },
164            );
165            Ok(UsageData::Endpoint(response.into_endpoint_usage()))
166        }
167        // 401 is the sole failure-path exception to "serve stale on
168        // error": the cached payload is tied to a no-longer-valid
169        // token, so reusing it would mislead the user. JSONL, however,
170        // is independent of token validity — fall through to it before
171        // surfacing the error so a user with a revoked token still
172        // sees their local transcript totals. We deliberately do NOT
173        // write an "Unauthorized" error into the cache here, because
174        // the cached error would then outlive the lock and mask a
175        // subsequent unrelated lock (e.g. a 429 after token refresh).
176        // Instead, clear the cache file so a peer invocation arriving
177        // after this one (and before the lock expires) reads no cache
178        // entry, can't short-circuit on the fresh-cache check, and
179        // falls through to the `lock_from_401` guard.
180        //
181        // This only changes behavior after an invocation has reached
182        // the endpoint and seen the 401. When a token is revoked
183        // while the cache is still fresh, every render returns at
184        // the top-of-cascade `is_fresh` short-circuit until the
185        // entry expires; that single-process first-invocation window
186        // is fundamental at this layer. Closing it would require
187        // out-of-band revalidation (background probe, credentials-
188        // file watch) and is tracked separately.
189        Err(UsageError::Unauthorized) => {
190            if let Some(c) = cache {
191                if let Err(e) = c.clear() {
192                    // `lsm_error!` (bypasses `LINESMITH_LOG=off`)
193                    // matches the severity this module already uses
194                    // for cache-write failures: a clear failure
195                    // leaves peers serving revoked-token data
196                    // through the fresh-cache short-circuit until
197                    // the entry's TTL expires.
198                    crate::lsm_error!(
199                        "cascade: cache clear after 401 failed; peers may serve revoked-token data until TTL expires: {e}"
200                    );
201                }
202            }
203            write_failure_lock(lock, now_ts, &UsageError::Unauthorized);
204            jsonl_or(jsonl, now_ts, UsageError::Unauthorized)
205        }
206        Err(err) => {
207            // Persist the backoff so concurrent processes honor it —
208            // without this, every statusline invocation during a 429
209            // or outage re-hits the endpoint.
210            write_failure_lock(lock, now_ts, &err);
211            if let Some(entry) = &cache_entry {
212                if let Some(data) = entry.data.clone() {
213                    return Ok(cached_to_usage_data(data));
214                }
215            }
216            jsonl_or(jsonl, now_ts, err)
217        }
218    }
219}
220
221/// Build a [`UsageData::Jsonl`] from the aggregator if it produced any
222/// data; otherwise surface `fallback` unchanged. Callers pass the
223/// endpoint-path error they would have returned, so a JSONL miss
224/// preserves the original failure reason the user sees. `now` is
225/// threaded through so the mapping can clamp future-dated block
226/// starts (clock skew) to a sane bound — see [`build_jsonl_usage`].
227fn jsonl_or(
228    jsonl: &dyn Fn() -> Result<JsonlAggregate, JsonlError>,
229    now: Timestamp,
230    fallback: UsageError,
231) -> Result<UsageData, UsageError> {
232    match build_jsonl_usage(jsonl(), now) {
233        Some(data) => Ok(UsageData::Jsonl(data)),
234        None => Err(fallback),
235    }
236}
237
238fn build_jsonl_usage(
239    result: Result<JsonlAggregate, JsonlError>,
240    now: Timestamp,
241) -> Option<JsonlUsage> {
242    let agg = match result {
243        Ok(agg) => agg,
244        Err(JsonlError::NoEntries | JsonlError::DirectoryMissing) => return None,
245        Err(other) => {
246            // `DataContext::resolve_usage_default` already collapses
247            // IoError / ParseError to NoEntries with a warn trace, so
248            // this arm is only reachable from direct test callers. Warn
249            // anyway so any future cascade caller that threads the real
250            // aggregator error through leaves a stderr breadcrumb.
251            crate::lsm_warn!(
252                "cascade: JSONL fallback unavailable ({other}); surfacing endpoint error"
253            );
254            return None;
255        }
256    };
257    // Clamp `block.start` to `floor_to_grain(now, 3600)` so a future-dated
258    // entry (clock skew) can't produce an `ends_at` further out than
259    // the current window's nominal close. The aggregator deliberately
260    // keeps token counts intact under mild skew so users don't lose
261    // their current session; this clamp normalizes the reset-timer
262    // surface without corrupting those totals.
263    let now_floor = jsonl::floor_to_grain(now, 3600);
264    let five_hour = agg.five_hour.as_ref().map(|block| {
265        let start = block.start.min(now_floor);
266        FiveHourWindow::new(block.token_counts, start)
267    });
268    let seven_day = SevenDayWindow::new(agg.seven_day.token_counts);
269    // Reaching here implies the aggregator returned `Ok(...)`; any
270    // aggregator failure (including the mod.rs-collapsed variants)
271    // kept us out of this branch. Token counts may still be zero —
272    // a parseable record can lie outside the 7d window or outside
273    // any active 5h block — so `five_hour: None` and/or a
274    // zero-valued `seven_day` are valid post-conditions here.
275    Some(JsonlUsage::new(five_hour, seven_day))
276}
277
278fn read_cache(cache: Option<&CacheStore>) -> Option<CachedUsage> {
279    cache.and_then(|c| match c.read() {
280        Ok(hit) => hit,
281        Err(e) => {
282            log_cache_read_failure("cache", &e);
283            None
284        }
285    })
286}
287
288fn read_lock(lock: Option<&LockStore>) -> Option<Lock> {
289    lock.and_then(|l| match l.read() {
290        Ok(hit) => hit,
291        Err(e) => {
292            log_cache_read_failure("lock", &e);
293            None
294        }
295    })
296}
297
298/// A cache/lock read error always collapses to "miss" so the cascade
299/// keeps serving the user, but not every error is equivalent. Ephemeral
300/// kinds (`NotFound`, truncated-read) are normal cold-start / partial-
301/// write symptoms and stay at debug. Persistent kinds (permission,
302/// ENOSPC, corrupt payload) are config defects that won't self-heal and
303/// silently force every invocation back onto the endpoint — escalate
304/// those so a user chasing "why does my statusline hammer the API"
305/// finds the cause without `LINESMITH_LOG=debug`.
306fn log_cache_read_failure(kind: &str, err: &super::cache::CacheError) {
307    use std::io::ErrorKind;
308    let io_kind = match err {
309        super::cache::CacheError::Io { cause, .. }
310        | super::cache::CacheError::Persist { cause, .. } => cause.kind(),
311    };
312    match io_kind {
313        ErrorKind::NotFound | ErrorKind::UnexpectedEof => {
314            crate::lsm_debug!("cascade: {kind} read failed: {err}; treating as miss");
315        }
316        _ => {
317            crate::lsm_warn!("cascade: {kind} read failed: {err}");
318        }
319    }
320}
321
322fn write_cache(cache: Option<&CacheStore>, entry: CachedUsage) {
323    if let Some(c) = cache {
324        if let Err(e) = c.write(&entry) {
325            log_persist_error("cache", &e);
326        }
327    }
328}
329
330fn write_lock(lock: Option<&LockStore>, entry: Lock) {
331    if let Some(l) = lock {
332        if let Err(e) = l.write(&entry) {
333            log_persist_error("lock", &e);
334        }
335    }
336}
337
338/// Routing tag for [`classify_persist_error`]. `Error` bypasses the
339/// log level gate (real bugs surface even with `LINESMITH_LOG=off`);
340/// `Debug` is gated (Windows MoveFileEx race losers are expected and
341/// shouldn't pollute stderr on healthy multi-terminal runs).
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343enum PersistLogClass {
344    Error,
345    Debug,
346}
347
348/// Pure classification of a persistence failure. Returns the routing
349/// class plus the formatted message. Split from [`log_persist_error`]
350/// so tests can lock in the contract (route + message format) without
351/// touching global log state or capturing process stderr.
352fn classify_persist_error(kind: &str, err: &CacheError) -> (PersistLogClass, String) {
353    if is_transient_persist_race(err) {
354        (
355            PersistLogClass::Debug,
356            format!("cascade: {kind} write race-loser (Windows MoveFileEx): {err}"),
357        )
358    } else {
359        (
360            PersistLogClass::Error,
361            format!("cascade: {kind} write failed: {err}"),
362        )
363    }
364}
365
366/// Dispatch a classified persist error to the right severity sink.
367/// Generic over the emit closures so production callers pass the
368/// `lsm_debug!`/`lsm_error!` macros while tests pass capturing
369/// closures — the match arms themselves are shared, so a future
370/// arm-swap regression fails loud in the routing tests below.
371fn route_persist_error<D, E>(class: PersistLogClass, msg: &str, on_debug: D, on_error: E)
372where
373    D: FnOnce(&str),
374    E: FnOnce(&str),
375{
376    match class {
377        PersistLogClass::Debug => on_debug(msg),
378        PersistLogClass::Error => on_error(msg),
379    }
380}
381
382/// Real bugs (disk full, missing parent dir, EACCES) route through
383/// `lsm_error!`, which bypasses the level gate so a user with
384/// `LINESMITH_LOG=off` still sees the "statusline hammers the API"
385/// class of defect. The documented Windows MoveFileEx race-loser case
386/// (concurrent processes both calling `atomic_write_json`, the loser
387/// gets `PermissionDenied`) is expected per the cache.rs contract;
388/// route it through `lsm_debug!` so multi-terminal Windows users
389/// don't get persistent stderr noise on otherwise-healthy runs.
390fn log_persist_error(kind: &str, err: &CacheError) {
391    let (class, msg) = classify_persist_error(kind, err);
392    route_persist_error(
393        class,
394        &msg,
395        |s| crate::lsm_debug!("{s}"),
396        |s| crate::lsm_error!("{s}"),
397    );
398}
399
400#[cfg(windows)]
401fn is_transient_persist_race(err: &CacheError) -> bool {
402    matches!(
403        err,
404        CacheError::Persist { cause, .. }
405            if cause.kind() == std::io::ErrorKind::PermissionDenied
406    )
407}
408
409#[cfg(not(windows))]
410fn is_transient_persist_race(_err: &CacheError) -> bool {
411    // Unix `rename(2)` doesn't expose this race; PermissionDenied on
412    // Unix is always a real perm bug and stays loud.
413    false
414}
415
416fn write_failure_lock(lock: Option<&LockStore>, now_ts: Timestamp, err: &UsageError) {
417    let backoff = backoff_for_error(err);
418    write_lock(
419        lock,
420        Lock {
421            blocked_until: add_secs(now_ts.as_second(), backoff),
422            error: Some(err.code().to_string()),
423        },
424    );
425}
426
427fn backoff_for_error(err: &UsageError) -> Duration {
428    match err {
429        UsageError::RateLimited {
430            retry_after: Some(d),
431        } => *d,
432        UsageError::RateLimited { retry_after: None } => DEFAULT_RATE_LIMIT_BACKOFF,
433        _ => DEFAULT_ERROR_TTL,
434    }
435}
436
437fn add_secs(base_ts: i64, secs: Duration) -> i64 {
438    // `LockStore::read` caps the read side of this (MAX_LOCK_DURATION
439    // ceiling in cache.rs), so saturating to i64::MAX here is safe —
440    // any pathological config gets sanitized on the next read.
441    let offset = i64::try_from(secs.as_secs()).unwrap_or(i64::MAX);
442    base_ts.saturating_add(offset)
443}
444
445/// Reconstruct a `UsageError` from a cached `.code()` tag. Used when
446/// an active lock or error-cached entry tells us "another process
447/// just saw X" and we want to honor that semantic downstream without
448/// having the full error payload. Unknown codes fall back to
449/// `NetworkError` — the most generic transient failure.
450///
451/// INVARIANT: credential-layer codes (`SubprocessFailed`, `MissingField`,
452/// `EmptyToken`, `IoError`) and JSONL-layer codes (`NoEntries`,
453/// `DirectoryMissing`) are intentionally NOT matched here and collapse
454/// to `NetworkError`. They're unreachable today because the credential
455/// arm at `resolve_usage` returns before any `write_failure_lock` call
456/// (see the matching "credential failures never write a failure-lock"
457/// invariant in `resolve_usage`), and JSONL errors never enter the
458/// cache's error-code path. If a future change persists one of those
459/// codes to the cache or lock, extend this match — the lsm-50fs bead
460/// tracks the structural fix.
461fn usage_error_from_code(code: &str) -> UsageError {
462    match code {
463        "NoCredentials" => UsageError::NoCredentials,
464        "Timeout" => UsageError::Timeout,
465        "RateLimited" => UsageError::RateLimited { retry_after: None },
466        "Unauthorized" => UsageError::Unauthorized,
467        "ParseError" => UsageError::ParseError,
468        _ => UsageError::NetworkError,
469    }
470}
471
472fn is_fresh(entry: &CachedUsage, now: Timestamp, ttl: Duration) -> bool {
473    // `cached_at > now` (clock skew) is filtered out by
474    // `CacheStore::read`, so a normal `age < ttl` check is enough.
475    match Duration::try_from(now.duration_since(entry.cached_at)) {
476        Ok(elapsed) => elapsed < ttl,
477        Err(_) => false,
478    }
479}
480
481fn cached_to_usage_data(data: super::cache::CachedData) -> UsageData {
482    let response: UsageApiResponse = data.into();
483    UsageData::Endpoint(response.into_endpoint_usage())
484}
485
486#[cfg(test)]
487mod tests;