linesmith_core/data_context/cascade/mod.rs
1//! OAuth usage fallback cascade.
2//!
3//! Glues the slice modules (`cache`, `credentials`, `fetcher`, `jsonl`,
4//! `usage`) into the full cascade from `docs/specs/data-fetching.md`
5//! §OAuth fallback cascade. The orchestrator is a pure function keyed
6//! on injected dependencies so every branch is exercised without real
7//! I/O, network, or Keychain access.
8//!
9//! The lock-active short-circuit runs before the credentials read so
10//! a process observing another's backoff window can answer from disk
11//! (or the JSONL fallback) without paying the Keychain subprocess.
12//! `NoCredentials`-vs-`Timeout` masking is preserved because
13//! credentials are still resolved before any endpoint call that could
14//! time out.
15
16use std::sync::Arc;
17use std::time::Duration;
18
19use jiff::Timestamp;
20
21use super::cache::{CacheError, CacheStore, CachedUsage, Lock, LockStore};
22use super::credentials::Credentials;
23use super::error::{CredentialError, JsonlError, UsageError};
24use super::fetcher::{self, UsageTransport};
25use super::jsonl::{self, JsonlAggregate};
26use super::usage::{FiveHourWindow, JsonlUsage, SevenDayWindow, UsageApiResponse, UsageData};
27
28/// Default cache freshness window per
29/// `docs/specs/data-fetching.md` §OAuth usage cache stack.
30pub const DEFAULT_CACHE_DURATION: Duration = Duration::from_secs(180);
31
32/// Shorter TTL applied to error responses and lock-backoff windows
33/// for non-429 failures, per `docs/specs/data-fetching.md` §OAuth
34/// usage cache stack ("Error cache uses a shorter TTL (30s default)").
35pub const DEFAULT_ERROR_TTL: Duration = Duration::from_secs(30);
36
37/// Fallback backoff when a `429` arrives without a parseable
38/// `Retry-After`. Matches `DEFAULT_RATE_LIMIT_BACKOFF` in `fetcher.rs`
39/// (300s per ADR-0011 §Cache stack).
40pub const DEFAULT_RATE_LIMIT_BACKOFF: Duration = Duration::from_secs(300);
41
42/// Default endpoint base URL per ADR-0011 §Endpoint contract.
43pub const DEFAULT_API_BASE_URL: &str = "https://api.anthropic.com";
44
45/// Tunables threaded into [`resolve_usage`]. Out-of-box defaults match
46/// `docs/specs/data-fetching.md` §OAuth usage cache stack.
47#[derive(Debug, Clone)]
48pub struct UsageCascadeConfig {
49 pub api_base_url: String,
50 pub timeout: Duration,
51 pub cache_duration: Duration,
52}
53
54impl Default for UsageCascadeConfig {
55 fn default() -> Self {
56 Self {
57 api_base_url: DEFAULT_API_BASE_URL.into(),
58 timeout: fetcher::DEFAULT_TIMEOUT,
59 cache_duration: DEFAULT_CACHE_DURATION,
60 }
61 }
62}
63
64/// Resolve OAuth usage data using the full fallback cascade.
65///
66/// `credentials` and `jsonl` are lazily evaluated: the cascade does
67/// NOT invoke either on a fresh-cache or stale-lock-serve path,
68/// preserving the "no Keychain subprocess on cache hits" guarantee.
69/// `cache` and `lock` being `None` is equivalent to pointing at paths
70/// that don't exist: reads degrade to "miss" and writes are skipped.
71/// Write failures fall into two classes. Real bugs (disk full,
72/// missing parent dir, EACCES) log via `lsm_error!` (bypasses the
73/// level gate). The documented Windows MoveFileEx race-loser case
74/// logs via `lsm_debug!` (suppressible) so multi-terminal Windows
75/// users don't get persistent stderr noise on healthy runs. Either
76/// way the cascade still returns fetched data.
77pub fn resolve_usage(
78 cache: Option<&CacheStore>,
79 lock: Option<&LockStore>,
80 transport: &dyn UsageTransport,
81 credentials: &dyn Fn() -> Arc<Result<Credentials, CredentialError>>,
82 jsonl: &dyn Fn() -> Result<JsonlAggregate, JsonlError>,
83 now: &dyn Fn() -> Timestamp,
84 config: &UsageCascadeConfig,
85) -> Result<UsageData, UsageError> {
86 let cache_entry = read_cache(cache);
87 let lock_entry = read_lock(lock);
88 let now_ts = now();
89
90 if let Some(entry) = &cache_entry {
91 if is_fresh(entry, now_ts, config.cache_duration) {
92 if let Some(data) = entry.data.clone() {
93 return Ok(cached_to_usage_data(data));
94 }
95 }
96 }
97
98 let lock_active = lock_entry
99 .as_ref()
100 .is_some_and(|l| l.blocked_until > now_ts.as_second());
101 if lock_active {
102 // Serve whatever we have without touching credentials: another
103 // process is in backoff and we must honor it.
104 let lock_error = lock_entry.as_ref().and_then(|l| l.error.as_deref());
105 let lock_from_401 = lock_error == Some("Unauthorized");
106 if let Some(entry) = &cache_entry {
107 // A lock from a 401 means the cached `data` was fetched
108 // with a now-revoked token; skip the stale-serve so
109 // invocation B (after A's 401) doesn't return the pre-401
110 // payload through this branch. Other lock errors (429,
111 // timeout) still serve stale — those are transient and
112 // the cached data was legitimately valid when fetched.
113 if !lock_from_401 {
114 if let Some(data) = entry.data.clone() {
115 return Ok(cached_to_usage_data(data));
116 }
117 }
118 if let Some(cached) = &entry.error {
119 return jsonl_or(jsonl, now_ts, usage_error_from_code(&cached.code));
120 }
121 }
122 // No cache content (or a 401-lock bypassed the data entry):
123 // try JSONL before surfacing the lock's own error hint.
124 // Crucially, we still do NOT reach the endpoint — that would
125 // defeat the cross-process spam guard on cold-cache starts.
126 let lock_err = lock_error
127 .map(usage_error_from_code)
128 .unwrap_or(UsageError::RateLimited { retry_after: None });
129 return jsonl_or(jsonl, now_ts, lock_err);
130 }
131
132 let creds_arc = credentials();
133 let creds = match &*creds_arc {
134 Ok(c) => c.clone(),
135 // INVARIANT: credential failures never write a failure-lock.
136 // They're not network transients — the same error will recur
137 // on every invocation until the user fixes their creds file /
138 // Keychain ACL, so a lock would just replay the error and
139 // delay recovery. If a future CredentialError variant becomes
140 // genuinely retry-stable, add a matching `write_failure_lock`
141 // here and update the test suite accordingly.
142 Err(CredentialError::NoCredentials) => {
143 return jsonl_or(jsonl, now_ts, UsageError::NoCredentials)
144 }
145 Err(other) => {
146 // Preserve the specific variant so `rate-limit-segments.md`
147 // §Error message table can render `[Keychain error]` /
148 // `[Credentials unreadable]` etc. The `Clone` impl on
149 // `CredentialError` is lossy for io/serde inner errors but
150 // keeps the variant tag (all segments key off) intact.
151 return jsonl_or(jsonl, now_ts, UsageError::Credentials(other.clone()));
152 }
153 };
154
155 match fetcher::fetch_usage(transport, &config.api_base_url, &creds, config.timeout) {
156 Ok(response) => {
157 write_cache(cache, CachedUsage::with_data(response.clone()));
158 write_lock(
159 lock,
160 Lock {
161 blocked_until: add_secs(now_ts.as_second(), config.cache_duration),
162 error: None,
163 },
164 );
165 Ok(UsageData::Endpoint(response.into_endpoint_usage()))
166 }
167 // 401 is the sole failure-path exception to "serve stale on
168 // error": the cached payload is tied to a no-longer-valid
169 // token, so reusing it would mislead the user. JSONL, however,
170 // is independent of token validity — fall through to it before
171 // surfacing the error so a user with a revoked token still
172 // sees their local transcript totals. We deliberately do NOT
173 // write an "Unauthorized" error into the cache here, because
174 // the cached error would then outlive the lock and mask a
175 // subsequent unrelated lock (e.g. a 429 after token refresh).
176 // Instead, clear the cache file so a peer invocation arriving
177 // after this one (and before the lock expires) reads no cache
178 // entry, can't short-circuit on the fresh-cache check, and
179 // falls through to the `lock_from_401` guard.
180 //
181 // This only changes behavior after an invocation has reached
182 // the endpoint and seen the 401. When a token is revoked
183 // while the cache is still fresh, every render returns at
184 // the top-of-cascade `is_fresh` short-circuit until the
185 // entry expires; that single-process first-invocation window
186 // is fundamental at this layer. Closing it would require
187 // out-of-band revalidation (background probe, credentials-
188 // file watch) and is tracked separately.
189 Err(UsageError::Unauthorized) => {
190 if let Some(c) = cache {
191 if let Err(e) = c.clear() {
192 // `lsm_error!` (bypasses `LINESMITH_LOG=off`)
193 // matches the severity this module already uses
194 // for cache-write failures: a clear failure
195 // leaves peers serving revoked-token data
196 // through the fresh-cache short-circuit until
197 // the entry's TTL expires.
198 crate::lsm_error!(
199 "cascade: cache clear after 401 failed; peers may serve revoked-token data until TTL expires: {e}"
200 );
201 }
202 }
203 write_failure_lock(lock, now_ts, &UsageError::Unauthorized);
204 jsonl_or(jsonl, now_ts, UsageError::Unauthorized)
205 }
206 Err(err) => {
207 // Persist the backoff so concurrent processes honor it —
208 // without this, every statusline invocation during a 429
209 // or outage re-hits the endpoint.
210 write_failure_lock(lock, now_ts, &err);
211 if let Some(entry) = &cache_entry {
212 if let Some(data) = entry.data.clone() {
213 return Ok(cached_to_usage_data(data));
214 }
215 }
216 jsonl_or(jsonl, now_ts, err)
217 }
218 }
219}
220
221/// Build a [`UsageData::Jsonl`] from the aggregator if it produced any
222/// data; otherwise surface `fallback` unchanged. Callers pass the
223/// endpoint-path error they would have returned, so a JSONL miss
224/// preserves the original failure reason the user sees. `now` is
225/// threaded through so the mapping can clamp future-dated block
226/// starts (clock skew) to a sane bound — see [`build_jsonl_usage`].
227fn jsonl_or(
228 jsonl: &dyn Fn() -> Result<JsonlAggregate, JsonlError>,
229 now: Timestamp,
230 fallback: UsageError,
231) -> Result<UsageData, UsageError> {
232 match build_jsonl_usage(jsonl(), now) {
233 Some(data) => Ok(UsageData::Jsonl(data)),
234 None => Err(fallback),
235 }
236}
237
238fn build_jsonl_usage(
239 result: Result<JsonlAggregate, JsonlError>,
240 now: Timestamp,
241) -> Option<JsonlUsage> {
242 let agg = match result {
243 Ok(agg) => agg,
244 Err(JsonlError::NoEntries | JsonlError::DirectoryMissing) => return None,
245 Err(other) => {
246 // `DataContext::resolve_usage_default` already collapses
247 // IoError / ParseError to NoEntries with a warn trace, so
248 // this arm is only reachable from direct test callers. Warn
249 // anyway so any future cascade caller that threads the real
250 // aggregator error through leaves a stderr breadcrumb.
251 crate::lsm_warn!(
252 "cascade: JSONL fallback unavailable ({other}); surfacing endpoint error"
253 );
254 return None;
255 }
256 };
257 // Clamp `block.start` to `floor_to_grain(now, 3600)` so a future-dated
258 // entry (clock skew) can't produce an `ends_at` further out than
259 // the current window's nominal close. The aggregator deliberately
260 // keeps token counts intact under mild skew so users don't lose
261 // their current session; this clamp normalizes the reset-timer
262 // surface without corrupting those totals.
263 let now_floor = jsonl::floor_to_grain(now, 3600);
264 let five_hour = agg.five_hour.as_ref().map(|block| {
265 let start = block.start.min(now_floor);
266 FiveHourWindow::new(block.token_counts, start)
267 });
268 let seven_day = SevenDayWindow::new(agg.seven_day.token_counts);
269 // Reaching here implies the aggregator returned `Ok(...)`; any
270 // aggregator failure (including the mod.rs-collapsed variants)
271 // kept us out of this branch. Token counts may still be zero —
272 // a parseable record can lie outside the 7d window or outside
273 // any active 5h block — so `five_hour: None` and/or a
274 // zero-valued `seven_day` are valid post-conditions here.
275 Some(JsonlUsage::new(five_hour, seven_day))
276}
277
278fn read_cache(cache: Option<&CacheStore>) -> Option<CachedUsage> {
279 cache.and_then(|c| match c.read() {
280 Ok(hit) => hit,
281 Err(e) => {
282 log_cache_read_failure("cache", &e);
283 None
284 }
285 })
286}
287
288fn read_lock(lock: Option<&LockStore>) -> Option<Lock> {
289 lock.and_then(|l| match l.read() {
290 Ok(hit) => hit,
291 Err(e) => {
292 log_cache_read_failure("lock", &e);
293 None
294 }
295 })
296}
297
298/// A cache/lock read error always collapses to "miss" so the cascade
299/// keeps serving the user, but not every error is equivalent. Ephemeral
300/// kinds (`NotFound`, truncated-read) are normal cold-start / partial-
301/// write symptoms and stay at debug. Persistent kinds (permission,
302/// ENOSPC, corrupt payload) are config defects that won't self-heal and
303/// silently force every invocation back onto the endpoint — escalate
304/// those so a user chasing "why does my statusline hammer the API"
305/// finds the cause without `LINESMITH_LOG=debug`.
306fn log_cache_read_failure(kind: &str, err: &super::cache::CacheError) {
307 use std::io::ErrorKind;
308 let io_kind = match err {
309 super::cache::CacheError::Io { cause, .. }
310 | super::cache::CacheError::Persist { cause, .. } => cause.kind(),
311 };
312 match io_kind {
313 ErrorKind::NotFound | ErrorKind::UnexpectedEof => {
314 crate::lsm_debug!("cascade: {kind} read failed: {err}; treating as miss");
315 }
316 _ => {
317 crate::lsm_warn!("cascade: {kind} read failed: {err}");
318 }
319 }
320}
321
322fn write_cache(cache: Option<&CacheStore>, entry: CachedUsage) {
323 if let Some(c) = cache {
324 if let Err(e) = c.write(&entry) {
325 log_persist_error("cache", &e);
326 }
327 }
328}
329
330fn write_lock(lock: Option<&LockStore>, entry: Lock) {
331 if let Some(l) = lock {
332 if let Err(e) = l.write(&entry) {
333 log_persist_error("lock", &e);
334 }
335 }
336}
337
338/// Routing tag for [`classify_persist_error`]. `Error` bypasses the
339/// log level gate (real bugs surface even with `LINESMITH_LOG=off`);
340/// `Debug` is gated (Windows MoveFileEx race losers are expected and
341/// shouldn't pollute stderr on healthy multi-terminal runs).
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343enum PersistLogClass {
344 Error,
345 Debug,
346}
347
348/// Pure classification of a persistence failure. Returns the routing
349/// class plus the formatted message. Split from [`log_persist_error`]
350/// so tests can lock in the contract (route + message format) without
351/// touching global log state or capturing process stderr.
352fn classify_persist_error(kind: &str, err: &CacheError) -> (PersistLogClass, String) {
353 if is_transient_persist_race(err) {
354 (
355 PersistLogClass::Debug,
356 format!("cascade: {kind} write race-loser (Windows MoveFileEx): {err}"),
357 )
358 } else {
359 (
360 PersistLogClass::Error,
361 format!("cascade: {kind} write failed: {err}"),
362 )
363 }
364}
365
366/// Dispatch a classified persist error to the right severity sink.
367/// Generic over the emit closures so production callers pass the
368/// `lsm_debug!`/`lsm_error!` macros while tests pass capturing
369/// closures — the match arms themselves are shared, so a future
370/// arm-swap regression fails loud in the routing tests below.
371fn route_persist_error<D, E>(class: PersistLogClass, msg: &str, on_debug: D, on_error: E)
372where
373 D: FnOnce(&str),
374 E: FnOnce(&str),
375{
376 match class {
377 PersistLogClass::Debug => on_debug(msg),
378 PersistLogClass::Error => on_error(msg),
379 }
380}
381
382/// Real bugs (disk full, missing parent dir, EACCES) route through
383/// `lsm_error!`, which bypasses the level gate so a user with
384/// `LINESMITH_LOG=off` still sees the "statusline hammers the API"
385/// class of defect. The documented Windows MoveFileEx race-loser case
386/// (concurrent processes both calling `atomic_write_json`, the loser
387/// gets `PermissionDenied`) is expected per the cache.rs contract;
388/// route it through `lsm_debug!` so multi-terminal Windows users
389/// don't get persistent stderr noise on otherwise-healthy runs.
390fn log_persist_error(kind: &str, err: &CacheError) {
391 let (class, msg) = classify_persist_error(kind, err);
392 route_persist_error(
393 class,
394 &msg,
395 |s| crate::lsm_debug!("{s}"),
396 |s| crate::lsm_error!("{s}"),
397 );
398}
399
400#[cfg(windows)]
401fn is_transient_persist_race(err: &CacheError) -> bool {
402 matches!(
403 err,
404 CacheError::Persist { cause, .. }
405 if cause.kind() == std::io::ErrorKind::PermissionDenied
406 )
407}
408
409#[cfg(not(windows))]
410fn is_transient_persist_race(_err: &CacheError) -> bool {
411 // Unix `rename(2)` doesn't expose this race; PermissionDenied on
412 // Unix is always a real perm bug and stays loud.
413 false
414}
415
416fn write_failure_lock(lock: Option<&LockStore>, now_ts: Timestamp, err: &UsageError) {
417 let backoff = backoff_for_error(err);
418 write_lock(
419 lock,
420 Lock {
421 blocked_until: add_secs(now_ts.as_second(), backoff),
422 error: Some(err.code().to_string()),
423 },
424 );
425}
426
427fn backoff_for_error(err: &UsageError) -> Duration {
428 match err {
429 UsageError::RateLimited {
430 retry_after: Some(d),
431 } => *d,
432 UsageError::RateLimited { retry_after: None } => DEFAULT_RATE_LIMIT_BACKOFF,
433 _ => DEFAULT_ERROR_TTL,
434 }
435}
436
437fn add_secs(base_ts: i64, secs: Duration) -> i64 {
438 // `LockStore::read` caps the read side of this (MAX_LOCK_DURATION
439 // ceiling in cache.rs), so saturating to i64::MAX here is safe —
440 // any pathological config gets sanitized on the next read.
441 let offset = i64::try_from(secs.as_secs()).unwrap_or(i64::MAX);
442 base_ts.saturating_add(offset)
443}
444
445/// Reconstruct a `UsageError` from a cached `.code()` tag. Used when
446/// an active lock or error-cached entry tells us "another process
447/// just saw X" and we want to honor that semantic downstream without
448/// having the full error payload. Unknown codes fall back to
449/// `NetworkError` — the most generic transient failure.
450///
451/// INVARIANT: credential-layer codes (`SubprocessFailed`, `MissingField`,
452/// `EmptyToken`, `IoError`) and JSONL-layer codes (`NoEntries`,
453/// `DirectoryMissing`) are intentionally NOT matched here and collapse
454/// to `NetworkError`. They're unreachable today because the credential
455/// arm at `resolve_usage` returns before any `write_failure_lock` call
456/// (see the matching "credential failures never write a failure-lock"
457/// invariant in `resolve_usage`), and JSONL errors never enter the
458/// cache's error-code path. If a future change persists one of those
459/// codes to the cache or lock, extend this match — the lsm-50fs bead
460/// tracks the structural fix.
461fn usage_error_from_code(code: &str) -> UsageError {
462 match code {
463 "NoCredentials" => UsageError::NoCredentials,
464 "Timeout" => UsageError::Timeout,
465 "RateLimited" => UsageError::RateLimited { retry_after: None },
466 "Unauthorized" => UsageError::Unauthorized,
467 "ParseError" => UsageError::ParseError,
468 _ => UsageError::NetworkError,
469 }
470}
471
472fn is_fresh(entry: &CachedUsage, now: Timestamp, ttl: Duration) -> bool {
473 // `cached_at > now` (clock skew) is filtered out by
474 // `CacheStore::read`, so a normal `age < ttl` check is enough.
475 match Duration::try_from(now.duration_since(entry.cached_at)) {
476 Ok(elapsed) => elapsed < ttl,
477 Err(_) => false,
478 }
479}
480
481fn cached_to_usage_data(data: super::cache::CachedData) -> UsageData {
482 let response: UsageApiResponse = data.into();
483 UsageData::Endpoint(response.into_endpoint_usage())
484}
485
486#[cfg(test)]
487mod tests;