use std::sync::Arc;
use std::time::Duration;
use chrono::{DateTime, Utc};
use super::cache::{CacheError, CacheStore, CachedUsage, Lock, LockStore};
use super::credentials::Credentials;
use super::errors::{CredentialError, JsonlError, UsageError};
use super::fetcher::{self, UsageTransport};
use super::jsonl::{self, JsonlAggregate};
use super::usage::{FiveHourWindow, JsonlUsage, SevenDayWindow, UsageApiResponse, UsageData};
pub const DEFAULT_CACHE_DURATION: Duration = Duration::from_secs(180);
pub const DEFAULT_ERROR_TTL: Duration = Duration::from_secs(30);
pub const DEFAULT_RATE_LIMIT_BACKOFF: Duration = Duration::from_secs(300);
pub const DEFAULT_API_BASE_URL: &str = "https://api.anthropic.com";
#[derive(Debug, Clone)]
pub struct UsageCascadeConfig {
pub api_base_url: String,
pub timeout: Duration,
pub cache_duration: Duration,
}
impl Default for UsageCascadeConfig {
fn default() -> Self {
Self {
api_base_url: DEFAULT_API_BASE_URL.into(),
timeout: fetcher::DEFAULT_TIMEOUT,
cache_duration: DEFAULT_CACHE_DURATION,
}
}
}
pub fn resolve_usage(
cache: Option<&CacheStore>,
lock: Option<&LockStore>,
transport: &dyn UsageTransport,
credentials: &dyn Fn() -> Arc<Result<Credentials, CredentialError>>,
jsonl: &dyn Fn() -> Result<JsonlAggregate, JsonlError>,
now: &dyn Fn() -> DateTime<Utc>,
config: &UsageCascadeConfig,
) -> Result<UsageData, UsageError> {
let cache_entry = read_cache(cache);
let lock_entry = read_lock(lock);
let now_ts = now();
if let Some(entry) = &cache_entry {
if is_fresh(entry, now_ts, config.cache_duration) {
if let Some(data) = entry.data.clone() {
return Ok(cached_to_usage_data(data));
}
}
}
let lock_active = lock_entry
.as_ref()
.is_some_and(|l| l.blocked_until > now_ts.timestamp());
if lock_active {
let lock_error = lock_entry.as_ref().and_then(|l| l.error.as_deref());
let lock_from_401 = lock_error == Some("Unauthorized");
if let Some(entry) = &cache_entry {
if !lock_from_401 {
if let Some(data) = entry.data.clone() {
return Ok(cached_to_usage_data(data));
}
}
if let Some(cached) = &entry.error {
return jsonl_or(jsonl, now_ts, usage_error_from_code(&cached.code));
}
}
let lock_err = lock_error
.map(usage_error_from_code)
.unwrap_or(UsageError::RateLimited { retry_after: None });
return jsonl_or(jsonl, now_ts, lock_err);
}
let creds_arc = credentials();
let creds = match &*creds_arc {
Ok(c) => c.clone(),
Err(CredentialError::NoCredentials) => {
return jsonl_or(jsonl, now_ts, UsageError::NoCredentials)
}
Err(other) => {
return jsonl_or(jsonl, now_ts, UsageError::Credentials(other.clone()));
}
};
match fetcher::fetch_usage(transport, &config.api_base_url, &creds, config.timeout) {
Ok(response) => {
write_cache(cache, CachedUsage::with_data(response.clone()));
write_lock(
lock,
Lock {
blocked_until: add_secs(now_ts.timestamp(), config.cache_duration),
error: None,
},
);
Ok(UsageData::Endpoint(response.into_endpoint_usage()))
}
Err(UsageError::Unauthorized) => {
write_failure_lock(lock, now_ts, &UsageError::Unauthorized);
jsonl_or(jsonl, now_ts, UsageError::Unauthorized)
}
Err(err) => {
write_failure_lock(lock, now_ts, &err);
if let Some(entry) = &cache_entry {
if let Some(data) = entry.data.clone() {
return Ok(cached_to_usage_data(data));
}
}
jsonl_or(jsonl, now_ts, err)
}
}
}
fn jsonl_or(
jsonl: &dyn Fn() -> Result<JsonlAggregate, JsonlError>,
now: DateTime<Utc>,
fallback: UsageError,
) -> Result<UsageData, UsageError> {
match build_jsonl_usage(jsonl(), now) {
Some(data) => Ok(UsageData::Jsonl(data)),
None => Err(fallback),
}
}
fn build_jsonl_usage(
result: Result<JsonlAggregate, JsonlError>,
now: DateTime<Utc>,
) -> Option<JsonlUsage> {
let agg = match result {
Ok(agg) => agg,
Err(JsonlError::NoEntries | JsonlError::DirectoryMissing) => return None,
Err(other) => {
crate::lsm_warn!(
"cascade: JSONL fallback unavailable ({other}); surfacing endpoint error"
);
return None;
}
};
let now_floor = jsonl::floor_to_hour(now);
let five_hour = agg.five_hour.as_ref().map(|block| {
let start = block.start.min(now_floor);
FiveHourWindow::new(block.token_counts, start)
});
let seven_day = SevenDayWindow::new(agg.seven_day.token_counts);
Some(JsonlUsage::new(five_hour, seven_day))
}
fn read_cache(cache: Option<&CacheStore>) -> Option<CachedUsage> {
cache.and_then(|c| match c.read() {
Ok(hit) => hit,
Err(e) => {
log_cache_read_failure("cache", &e);
None
}
})
}
fn read_lock(lock: Option<&LockStore>) -> Option<Lock> {
lock.and_then(|l| match l.read() {
Ok(hit) => hit,
Err(e) => {
log_cache_read_failure("lock", &e);
None
}
})
}
fn log_cache_read_failure(kind: &str, err: &super::cache::CacheError) {
use std::io::ErrorKind;
let io_kind = match err {
super::cache::CacheError::Io { cause, .. }
| super::cache::CacheError::Persist { cause, .. } => cause.kind(),
};
match io_kind {
ErrorKind::NotFound | ErrorKind::UnexpectedEof => {
crate::lsm_debug!("cascade: {kind} read failed: {err}; treating as miss");
}
_ => {
crate::lsm_warn!("cascade: {kind} read failed: {err}");
}
}
}
fn write_cache(cache: Option<&CacheStore>, entry: CachedUsage) {
if let Some(c) = cache {
if let Err(e) = c.write(&entry) {
log_persist_error("cache", &e);
}
}
}
fn write_lock(lock: Option<&LockStore>, entry: Lock) {
if let Some(l) = lock {
if let Err(e) = l.write(&entry) {
log_persist_error("lock", &e);
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PersistLogClass {
Error,
Debug,
}
fn classify_persist_error(kind: &str, err: &CacheError) -> (PersistLogClass, String) {
if is_transient_persist_race(err) {
(
PersistLogClass::Debug,
format!("cascade: {kind} write race-loser (Windows MoveFileEx): {err}"),
)
} else {
(
PersistLogClass::Error,
format!("cascade: {kind} write failed: {err}"),
)
}
}
fn route_persist_error<D, E>(class: PersistLogClass, msg: &str, on_debug: D, on_error: E)
where
D: FnOnce(&str),
E: FnOnce(&str),
{
match class {
PersistLogClass::Debug => on_debug(msg),
PersistLogClass::Error => on_error(msg),
}
}
fn log_persist_error(kind: &str, err: &CacheError) {
let (class, msg) = classify_persist_error(kind, err);
route_persist_error(
class,
&msg,
|s| crate::lsm_debug!("{s}"),
|s| crate::lsm_error!("{s}"),
);
}
#[cfg(windows)]
fn is_transient_persist_race(err: &CacheError) -> bool {
matches!(
err,
CacheError::Persist { cause, .. }
if cause.kind() == std::io::ErrorKind::PermissionDenied
)
}
#[cfg(not(windows))]
fn is_transient_persist_race(_err: &CacheError) -> bool {
false
}
fn write_failure_lock(lock: Option<&LockStore>, now_ts: DateTime<Utc>, err: &UsageError) {
let backoff = backoff_for_error(err);
write_lock(
lock,
Lock {
blocked_until: add_secs(now_ts.timestamp(), backoff),
error: Some(err.code().to_string()),
},
);
}
fn backoff_for_error(err: &UsageError) -> Duration {
match err {
UsageError::RateLimited {
retry_after: Some(d),
} => *d,
UsageError::RateLimited { retry_after: None } => DEFAULT_RATE_LIMIT_BACKOFF,
_ => DEFAULT_ERROR_TTL,
}
}
fn add_secs(base_ts: i64, secs: Duration) -> i64 {
let offset = i64::try_from(secs.as_secs()).unwrap_or(i64::MAX);
base_ts.saturating_add(offset)
}
fn usage_error_from_code(code: &str) -> UsageError {
match code {
"NoCredentials" => UsageError::NoCredentials,
"Timeout" => UsageError::Timeout,
"RateLimited" => UsageError::RateLimited { retry_after: None },
"Unauthorized" => UsageError::Unauthorized,
"ParseError" => UsageError::ParseError,
_ => UsageError::NetworkError,
}
}
fn is_fresh(entry: &CachedUsage, now: DateTime<Utc>, ttl: Duration) -> bool {
match now.signed_duration_since(entry.cached_at).to_std() {
Ok(elapsed) => elapsed < ttl,
Err(_) => false,
}
}
fn cached_to_usage_data(data: super::cache::CachedData) -> UsageData {
let response: UsageApiResponse = data.into();
UsageData::Endpoint(response.into_endpoint_usage())
}
#[cfg(test)]
mod tests {
use super::*;
use std::cell::{Cell, RefCell};
use std::io;
use chrono::Duration as ChronoDuration;
use tempfile::TempDir;
use crate::data_context::cache::{CacheStore, CachedUsage, Lock, LockStore};
use crate::data_context::credentials::Credentials;
use crate::data_context::errors::CredentialError;
use crate::data_context::fetcher::{HttpResponse, UsageTransport};
use crate::data_context::jsonl::{
FiveHourBlock, JsonlAggregate, SevenDayWindow as JsonlSevenDayWindow, TokenCounts,
};
struct FakeTransport {
response: RefCell<io::Result<HttpResponse>>,
calls: Cell<u32>,
}
impl FakeTransport {
fn ok(status: u16, body: &str, retry_after: Option<&str>) -> Self {
Self {
response: RefCell::new(Ok(HttpResponse {
status,
body: body.as_bytes().to_vec(),
retry_after: retry_after.map(String::from),
})),
calls: Cell::new(0),
}
}
fn err(kind: io::ErrorKind) -> Self {
Self {
response: RefCell::new(Err(io::Error::new(kind, "fake"))),
calls: Cell::new(0),
}
}
}
impl UsageTransport for FakeTransport {
fn get(&self, _url: &str, _token: &str, _timeout: Duration) -> io::Result<HttpResponse> {
self.calls.set(self.calls.get() + 1);
match &*self.response.borrow() {
Ok(r) => Ok(HttpResponse {
status: r.status,
body: r.body.clone(),
retry_after: r.retry_after.clone(),
}),
Err(e) => Err(io::Error::new(e.kind(), e.to_string())),
}
}
}
const SAMPLE_BODY: &str = r#"{
"five_hour": { "utilization": 42.0, "resets_at": "2026-04-19T05:00:00Z" },
"seven_day": { "utilization": 33.0, "resets_at": "2026-04-23T19:00:00Z" }
}"#;
fn sample_response() -> UsageApiResponse {
serde_json::from_str(SAMPLE_BODY).unwrap()
}
fn config() -> UsageCascadeConfig {
UsageCascadeConfig::default()
}
fn now_fn() -> impl Fn() -> DateTime<Utc> {
let ts = Utc::now();
move || ts
}
fn ok_creds() -> Arc<Result<Credentials, CredentialError>> {
Arc::new(Ok(Credentials::for_testing("test-token")))
}
fn no_creds() -> Arc<Result<Credentials, CredentialError>> {
Arc::new(Err(CredentialError::NoCredentials))
}
fn jsonl_empty() -> Result<JsonlAggregate, JsonlError> {
Err(JsonlError::NoEntries)
}
fn jsonl_ok() -> Result<JsonlAggregate, JsonlError> {
Ok(JsonlAggregate {
five_hour: None,
seven_day: JsonlSevenDayWindow {
window_start: Utc::now() - ChronoDuration::days(7),
token_counts: TokenCounts::from_parts(1_000_000, 200_000, 0, 0),
},
source_paths: Vec::new(),
})
}
fn jsonl_ok_with_active_block() -> Result<JsonlAggregate, JsonlError> {
let now = Utc::now();
let start = now - ChronoDuration::hours(1);
Ok(JsonlAggregate {
five_hour: Some(FiveHourBlock {
start,
actual_last_activity: now,
token_counts: TokenCounts::from_parts(400_000, 20_000, 0, 0),
models: vec!["claude-opus-4-7".into()],
usage_limit_reset: None,
}),
seven_day: JsonlSevenDayWindow {
window_start: now - ChronoDuration::days(7),
token_counts: TokenCounts::from_parts(1_000_000, 200_000, 0, 0),
},
source_paths: Vec::new(),
})
}
fn stale_cache_entry(age: ChronoDuration) -> CachedUsage {
let mut entry = CachedUsage::with_data(sample_response());
entry.cached_at = Utc::now() - age;
entry
}
fn assert_jsonl_matches_ok_fixture(data: &UsageData) {
let UsageData::Jsonl(j) = data else {
panic!("expected UsageData::Jsonl, got {data:?}");
};
assert!(
j.five_hour.is_none(),
"jsonl_ok fixture has no active 5h block",
);
assert_eq!(
j.seven_day.tokens.total(),
1_200_000,
"7d total must match jsonl_ok fixture (1M input + 200k output)",
);
}
#[test]
fn fresh_disk_cache_short_circuits_without_reading_credentials() {
let tmp = TempDir::new().unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
cache
.write(&CachedUsage::with_data(sample_response()))
.unwrap();
let cred_calls = Cell::new(0u32);
let jsonl_calls = Cell::new(0u32);
let credentials = || {
cred_calls.set(cred_calls.get() + 1);
ok_creds()
};
let jsonl = || {
jsonl_calls.set(jsonl_calls.get() + 1);
jsonl_empty()
};
let transport = FakeTransport::ok(200, "", None);
let data = resolve_usage(
Some(&cache),
None,
&transport,
&credentials,
&jsonl,
&now_fn(),
&config(),
)
.expect("ok");
let UsageData::Endpoint(endpoint) = &data else {
panic!("expected endpoint variant, got {data:?}");
};
assert_eq!(endpoint.five_hour.unwrap().utilization.value(), 42.0);
assert_eq!(cred_calls.get(), 0, "credentials must not be called");
assert_eq!(jsonl_calls.get(), 0, "jsonl must not be called");
assert_eq!(transport.calls.get(), 0, "no HTTP on cache hit");
}
#[test]
fn stale_cache_without_lock_triggers_fetch_and_overwrites() {
let tmp = TempDir::new().unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
cache
.write(&stale_cache_entry(ChronoDuration::minutes(10)))
.unwrap();
let lock = LockStore::new(tmp.path().to_path_buf());
let transport = FakeTransport::ok(200, SAMPLE_BODY, None);
let data = resolve_usage(
Some(&cache),
Some(&lock),
&transport,
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.expect("ok");
assert!(matches!(data, UsageData::Endpoint(_)));
assert_eq!(transport.calls.get(), 1);
let refreshed = cache.read().unwrap().unwrap();
let age = Utc::now().signed_duration_since(refreshed.cached_at);
assert!(age.num_seconds() < 5, "cache must be re-stamped on success");
}
#[test]
fn stale_cache_with_active_lock_serves_stale_without_credentials() {
let tmp = TempDir::new().unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
cache
.write(&stale_cache_entry(ChronoDuration::minutes(10)))
.unwrap();
let lock = LockStore::new(tmp.path().to_path_buf());
lock.write(&Lock {
blocked_until: Utc::now().timestamp() + 60,
error: Some("rate-limited".into()),
})
.unwrap();
let cred_calls = Cell::new(0u32);
let credentials = || {
cred_calls.set(cred_calls.get() + 1);
ok_creds()
};
let transport = FakeTransport::ok(200, "", None);
let data = resolve_usage(
Some(&cache),
Some(&lock),
&transport,
&credentials,
&jsonl_empty,
&now_fn(),
&config(),
)
.expect("ok");
assert!(matches!(data, UsageData::Endpoint(_)));
assert_eq!(
cred_calls.get(),
0,
"active lock must short-circuit before credentials read",
);
assert_eq!(transport.calls.get(), 0, "no HTTP when lock + stale cache");
}
#[test]
fn no_credentials_surfaces_nocredentials_not_timeout() {
let transport = FakeTransport::err(io::ErrorKind::TimedOut);
let err = resolve_usage(
None,
None,
&transport,
&no_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.unwrap_err();
assert!(matches!(err, UsageError::NoCredentials));
assert_eq!(transport.calls.get(), 0, "no HTTP when credentials missing",);
}
#[test]
fn no_credentials_falls_through_to_jsonl_when_available() {
let data = resolve_usage(
None,
None,
&FakeTransport::ok(200, "", None),
&no_creds,
&jsonl_ok,
&now_fn(),
&config(),
)
.expect("ok");
assert_jsonl_matches_ok_fixture(&data);
}
#[test]
fn no_credentials_with_empty_jsonl_still_surfaces_nocredentials() {
let err = resolve_usage(
None,
None,
&FakeTransport::ok(200, "", None),
&no_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.unwrap_err();
assert!(matches!(err, UsageError::NoCredentials));
}
#[test]
fn endpoint_200_writes_cache_and_lock() {
let tmp = TempDir::new().unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
let lock = LockStore::new(tmp.path().to_path_buf());
let transport = FakeTransport::ok(200, SAMPLE_BODY, None);
let data = resolve_usage(
Some(&cache),
Some(&lock),
&transport,
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.expect("ok");
assert!(matches!(data, UsageData::Endpoint(_)));
assert!(cache.read().unwrap().is_some(), "cache must be populated");
let persisted_lock = lock.read().unwrap().unwrap();
let expected_blocked_until =
Utc::now().timestamp() + config().cache_duration.as_secs() as i64;
assert!(
(persisted_lock.blocked_until - expected_blocked_until).abs() < 5,
"lock blocked_until = {}, expected near {}",
persisted_lock.blocked_until,
expected_blocked_until,
);
}
#[test]
fn endpoint_401_falls_through_to_jsonl_when_available() {
let transport = FakeTransport::ok(401, "", None);
let data = resolve_usage(
None,
None,
&transport,
&ok_creds,
&jsonl_ok,
&now_fn(),
&config(),
)
.expect("ok");
assert_jsonl_matches_ok_fixture(&data);
assert_eq!(transport.calls.get(), 1);
}
#[test]
fn jsonl_fallback_clamps_future_dated_block_start_to_now() {
let now = Utc::now();
let skewed_start = now + ChronoDuration::hours(2);
let skewed: Result<JsonlAggregate, JsonlError> = Ok(JsonlAggregate {
five_hour: Some(FiveHourBlock {
start: skewed_start,
actual_last_activity: now + ChronoDuration::minutes(30),
token_counts: TokenCounts::from_parts(100, 0, 0, 0),
models: vec!["claude-opus-4-7".into()],
usage_limit_reset: None,
}),
seven_day: JsonlSevenDayWindow {
window_start: now - ChronoDuration::days(7),
token_counts: TokenCounts::from_parts(100, 0, 0, 0),
},
source_paths: Vec::new(),
});
let skewed_closure = || match &skewed {
Ok(agg) => Ok(agg.clone()),
Err(_) => Err(JsonlError::NoEntries),
};
let now_clock = move || now;
let data = resolve_usage(
None,
None,
&FakeTransport::err(io::ErrorKind::TimedOut),
&ok_creds,
&skewed_closure,
&now_clock,
&config(),
)
.expect("ok");
let UsageData::Jsonl(j) = &data else {
panic!("expected jsonl variant, got {data:?}");
};
let window = j
.five_hour
.as_ref()
.expect("active block should populate five_hour window");
assert!(
window.ends_at() <= now + ChronoDuration::hours(5),
"ends_at={:?} must be clamped at/before now + 5h ({:?})",
window.ends_at(),
now + ChronoDuration::hours(5),
);
}
#[test]
fn jsonl_fallback_surfaces_five_hour_window_with_ends_at() {
let data = resolve_usage(
None,
None,
&FakeTransport::err(io::ErrorKind::TimedOut),
&ok_creds,
&jsonl_ok_with_active_block,
&now_fn(),
&config(),
)
.expect("ok");
let UsageData::Jsonl(j) = &data else {
panic!("expected jsonl variant, got {data:?}");
};
let window = j
.five_hour
.as_ref()
.expect("active block should populate five_hour window");
let expected_ends_at = Utc::now() + ChronoDuration::hours(4);
let drift = (window.ends_at() - expected_ends_at).num_seconds().abs();
assert!(
drift < 5,
"ends_at={:?} drifted {drift}s from expected",
window.ends_at(),
);
assert_eq!(window.tokens.total(), 420_000);
}
#[test]
fn endpoint_401_with_empty_jsonl_surfaces_unauthorized() {
let err = resolve_usage(
None,
None,
&FakeTransport::ok(401, "", None),
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.unwrap_err();
assert!(matches!(err, UsageError::Unauthorized));
}
#[test]
fn endpoint_401_does_not_serve_stale_cache() {
let tmp = TempDir::new().unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
cache
.write(&stale_cache_entry(ChronoDuration::minutes(10)))
.unwrap();
let err = resolve_usage(
Some(&cache),
None,
&FakeTransport::ok(401, "", None),
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.unwrap_err();
assert!(matches!(err, UsageError::Unauthorized));
}
#[test]
fn invocation_after_401_does_not_serve_stale_cache_via_lock_active() {
let tmp = TempDir::new().unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
cache
.write(&stale_cache_entry(ChronoDuration::minutes(10)))
.unwrap();
let lock = LockStore::new(tmp.path().to_path_buf());
let transport_a = FakeTransport::ok(401, "", None);
let err_a = resolve_usage(
Some(&cache),
Some(&lock),
&transport_a,
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.unwrap_err();
assert!(matches!(err_a, UsageError::Unauthorized));
let transport_b = FakeTransport::ok(200, SAMPLE_BODY, None);
let err_b = resolve_usage(
Some(&cache),
Some(&lock),
&transport_b,
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.unwrap_err();
assert!(matches!(err_b, UsageError::Unauthorized));
assert_eq!(
transport_b.calls.get(),
0,
"active lock must still gate the endpoint on invocation B",
);
}
#[test]
fn invocation_after_401_falls_through_to_jsonl_when_available() {
let tmp = TempDir::new().unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
cache
.write(&stale_cache_entry(ChronoDuration::minutes(10)))
.unwrap();
let lock = LockStore::new(tmp.path().to_path_buf());
let data_a = resolve_usage(
Some(&cache),
Some(&lock),
&FakeTransport::ok(401, "", None),
&ok_creds,
&jsonl_ok,
&now_fn(),
&config(),
)
.expect("A falls through to JSONL with jsonl_ok");
assert_jsonl_matches_ok_fixture(&data_a);
let transport_b = FakeTransport::ok(200, SAMPLE_BODY, None);
let data_b = resolve_usage(
Some(&cache),
Some(&lock),
&transport_b,
&ok_creds,
&jsonl_ok,
&now_fn(),
&config(),
)
.expect("B returns JSONL on lock-active path");
assert_jsonl_matches_ok_fixture(&data_b);
assert_eq!(transport_b.calls.get(), 0);
}
#[test]
fn active_unauthorized_lock_rejects_stale_cached_data() {
let tmp = TempDir::new().unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
cache
.write(&stale_cache_entry(ChronoDuration::minutes(10)))
.unwrap();
let lock = LockStore::new(tmp.path().to_path_buf());
lock.write(&Lock {
blocked_until: Utc::now().timestamp() + 30,
error: Some("Unauthorized".into()),
})
.unwrap();
let transport = FakeTransport::ok(200, SAMPLE_BODY, None);
let err = resolve_usage(
Some(&cache),
Some(&lock),
&transport,
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.unwrap_err();
assert!(matches!(err, UsageError::Unauthorized));
assert_eq!(transport.calls.get(), 0);
}
#[test]
fn endpoint_429_writes_lock_with_retry_after_backoff() {
let tmp = TempDir::new().unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
let lock = LockStore::new(tmp.path().to_path_buf());
let _ = resolve_usage(
Some(&cache),
Some(&lock),
&FakeTransport::ok(429, "", Some("120")),
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
);
let persisted = lock.read().unwrap().expect("lock must be written");
let expected = Utc::now().timestamp() + 120;
assert!(
(persisted.blocked_until - expected).abs() < 5,
"blocked_until={}, expected near {}",
persisted.blocked_until,
expected,
);
assert_eq!(persisted.error.as_deref(), Some("RateLimited"));
}
#[test]
fn endpoint_timeout_writes_lock_with_error_ttl() {
let tmp = TempDir::new().unwrap();
let lock = LockStore::new(tmp.path().to_path_buf());
let _ = resolve_usage(
None,
Some(&lock),
&FakeTransport::err(io::ErrorKind::TimedOut),
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
);
let persisted = lock.read().unwrap().expect("lock must be written");
let expected = Utc::now().timestamp() + DEFAULT_ERROR_TTL.as_secs() as i64;
assert!(
(persisted.blocked_until - expected).abs() < 5,
"blocked_until={}, expected near {}",
persisted.blocked_until,
expected,
);
assert_eq!(persisted.error.as_deref(), Some("Timeout"));
}
#[test]
fn lock_written_on_429_blocks_next_process_from_hitting_endpoint() {
let tmp = TempDir::new().unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
let lock = LockStore::new(tmp.path().to_path_buf());
let transport_a = FakeTransport::ok(429, "", Some("120"));
let _ = resolve_usage(
Some(&cache),
Some(&lock),
&transport_a,
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
);
let transport_b = FakeTransport::ok(200, SAMPLE_BODY, None);
let result_b = resolve_usage(
Some(&cache),
Some(&lock),
&transport_b,
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
);
assert!(matches!(result_b, Err(UsageError::RateLimited { .. })));
assert_eq!(
transport_b.calls.get(),
0,
"process B must not hit endpoint"
);
}
#[test]
fn endpoint_401_writes_lock_so_peers_skip_the_stale_token() {
let tmp = TempDir::new().unwrap();
let lock = LockStore::new(tmp.path().to_path_buf());
let _ = resolve_usage(
None,
Some(&lock),
&FakeTransport::ok(401, "", None),
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
);
let persisted = lock.read().unwrap().expect("lock must be written");
assert_eq!(persisted.error.as_deref(), Some("Unauthorized"));
}
#[test]
fn endpoint_429_with_stale_cache_serves_stale() {
let tmp = TempDir::new().unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
cache
.write(&stale_cache_entry(ChronoDuration::minutes(10)))
.unwrap();
let data = resolve_usage(
Some(&cache),
None,
&FakeTransport::ok(429, "", Some("120")),
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.expect("ok");
let UsageData::Endpoint(endpoint) = &data else {
panic!("expected endpoint variant, got {data:?}");
};
assert_eq!(endpoint.five_hour.unwrap().utilization.value(), 42.0);
}
#[test]
fn endpoint_429_with_empty_jsonl_surfaces_ratelimited() {
let err = resolve_usage(
None,
None,
&FakeTransport::ok(429, "", None),
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.unwrap_err();
assert!(matches!(err, UsageError::RateLimited { .. }));
}
#[test]
fn endpoint_429_falls_through_to_jsonl_when_available() {
let transport = FakeTransport::ok(429, "", None);
let data = resolve_usage(
None,
None,
&transport,
&ok_creds,
&jsonl_ok,
&now_fn(),
&config(),
)
.expect("ok");
assert_jsonl_matches_ok_fixture(&data);
assert_eq!(transport.calls.get(), 1);
}
#[test]
fn endpoint_timeout_with_stale_cache_serves_stale() {
let tmp = TempDir::new().unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
cache
.write(&stale_cache_entry(ChronoDuration::minutes(10)))
.unwrap();
let data = resolve_usage(
Some(&cache),
None,
&FakeTransport::err(io::ErrorKind::TimedOut),
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.expect("ok");
assert!(matches!(data, UsageData::Endpoint(_)));
}
#[test]
fn endpoint_timeout_without_stale_falls_through_to_jsonl() {
let transport = FakeTransport::err(io::ErrorKind::TimedOut);
let data = resolve_usage(
None,
None,
&transport,
&ok_creds,
&jsonl_ok,
&now_fn(),
&config(),
)
.expect("ok");
assert_jsonl_matches_ok_fixture(&data);
assert_eq!(
transport.calls.get(),
1,
"endpoint must be attempted before JSONL fallback",
);
}
#[test]
fn endpoint_timeout_without_stale_or_jsonl_surfaces_original_error() {
let err = resolve_usage(
None,
None,
&FakeTransport::err(io::ErrorKind::TimedOut),
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.unwrap_err();
assert!(matches!(err, UsageError::Timeout));
}
#[test]
fn endpoint_network_error_falls_through_same_as_timeout() {
let err = resolve_usage(
None,
None,
&FakeTransport::err(io::ErrorKind::ConnectionRefused),
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.unwrap_err();
assert!(matches!(err, UsageError::NetworkError));
}
#[test]
fn endpoint_malformed_response_falls_through_to_jsonl() {
let err = resolve_usage(
None,
None,
&FakeTransport::ok(200, "{ not valid", None),
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.unwrap_err();
assert!(matches!(err, UsageError::ParseError));
}
#[test]
fn cascade_tolerates_missing_cache_and_lock_stores() {
let data = resolve_usage(
None,
None,
&FakeTransport::ok(200, SAMPLE_BODY, None),
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.expect("ok");
assert!(matches!(data, UsageData::Endpoint(_)));
}
#[test]
fn expired_lock_does_not_gate_fetch() {
let tmp = TempDir::new().unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
cache
.write(&stale_cache_entry(ChronoDuration::minutes(10)))
.unwrap();
let lock = LockStore::new(tmp.path().to_path_buf());
lock.write(&Lock {
blocked_until: Utc::now().timestamp() - 60,
error: None,
})
.unwrap();
let transport = FakeTransport::ok(200, SAMPLE_BODY, None);
let _ = resolve_usage(
Some(&cache),
Some(&lock),
&transport,
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.expect("ok");
assert_eq!(
transport.calls.get(),
1,
"expired lock must not block fetch"
);
}
#[test]
fn active_lock_with_no_cached_data_does_not_hit_endpoint() {
let tmp = TempDir::new().unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
let lock = LockStore::new(tmp.path().to_path_buf());
lock.write(&Lock {
blocked_until: Utc::now().timestamp() + 60,
error: Some("RateLimited".into()),
})
.unwrap();
let cred_calls = Cell::new(0u32);
let credentials = || {
cred_calls.set(cred_calls.get() + 1);
ok_creds()
};
let transport = FakeTransport::ok(200, SAMPLE_BODY, None);
let err = resolve_usage(
Some(&cache),
Some(&lock),
&transport,
&credentials,
&jsonl_empty,
&now_fn(),
&config(),
)
.unwrap_err();
assert!(matches!(err, UsageError::RateLimited { .. }));
assert_eq!(cred_calls.get(), 0, "must not resolve credentials");
assert_eq!(transport.calls.get(), 0, "must not hit endpoint");
}
#[test]
fn active_lock_falls_through_to_jsonl_when_available() {
let tmp = TempDir::new().unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
let lock = LockStore::new(tmp.path().to_path_buf());
lock.write(&Lock {
blocked_until: Utc::now().timestamp() + 60,
error: Some("RateLimited".into()),
})
.unwrap();
let transport = FakeTransport::ok(200, SAMPLE_BODY, None);
let data = resolve_usage(
Some(&cache),
Some(&lock),
&transport,
&ok_creds,
&jsonl_ok,
&now_fn(),
&config(),
)
.expect("ok");
assert_jsonl_matches_ok_fixture(&data);
assert_eq!(
transport.calls.get(),
0,
"active lock must still gate the endpoint even with JSONL data"
);
}
#[test]
fn active_lock_serves_cached_error_without_hitting_endpoint() {
let tmp = TempDir::new().unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
cache
.write(&CachedUsage::with_error("Unauthorized"))
.unwrap();
let lock = LockStore::new(tmp.path().to_path_buf());
lock.write(&Lock {
blocked_until: Utc::now().timestamp() + 60,
error: Some("RateLimited".into()),
})
.unwrap();
let transport = FakeTransport::ok(200, "", None);
let err = resolve_usage(
Some(&cache),
Some(&lock),
&transport,
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.unwrap_err();
assert!(matches!(err, UsageError::Unauthorized));
assert_eq!(transport.calls.get(), 0);
}
#[test]
fn active_lock_with_cached_error_falls_through_to_jsonl_when_available() {
let tmp = TempDir::new().unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
cache
.write(&CachedUsage::with_error("Unauthorized"))
.unwrap();
let lock = LockStore::new(tmp.path().to_path_buf());
lock.write(&Lock {
blocked_until: Utc::now().timestamp() + 60,
error: Some("RateLimited".into()),
})
.unwrap();
let transport = FakeTransport::ok(200, "", None);
let data = resolve_usage(
Some(&cache),
Some(&lock),
&transport,
&ok_creds,
&jsonl_ok,
&now_fn(),
&config(),
)
.expect("ok");
assert_jsonl_matches_ok_fixture(&data);
assert_eq!(transport.calls.get(), 0);
}
#[test]
fn credential_failure_other_than_missing_preserves_variant_tag() {
let creds_err: Arc<Result<Credentials, CredentialError>> =
Arc::new(Err(CredentialError::MissingField {
path: std::path::PathBuf::from("/x"),
}));
let credentials = || creds_err.clone();
let err = resolve_usage(
None,
None,
&FakeTransport::err(io::ErrorKind::TimedOut),
&credentials,
&jsonl_empty,
&now_fn(),
&config(),
)
.unwrap_err();
assert!(
matches!(
err,
UsageError::Credentials(CredentialError::MissingField { .. })
),
"expected Credentials(MissingField), got {err:?}",
);
assert_eq!(err.code(), "MissingField", "variant tag must round-trip");
}
#[test]
fn subprocess_failed_cred_preserves_subprocess_tag() {
let creds_err: Arc<Result<Credentials, CredentialError>> = Arc::new(Err(
CredentialError::SubprocessFailed(io::Error::new(io::ErrorKind::PermissionDenied, "x")),
));
let credentials = || creds_err.clone();
let err = resolve_usage(
None,
None,
&FakeTransport::err(io::ErrorKind::TimedOut),
&credentials,
&jsonl_empty,
&now_fn(),
&config(),
)
.unwrap_err();
assert_eq!(err.code(), "SubprocessFailed");
}
#[test]
fn credential_variant_falls_through_to_jsonl_when_available() {
let creds_err: Arc<Result<Credentials, CredentialError>> = Arc::new(Err(
CredentialError::SubprocessFailed(io::Error::new(io::ErrorKind::PermissionDenied, "x")),
));
let credentials = || creds_err.clone();
let data = resolve_usage(
None,
None,
&FakeTransport::err(io::ErrorKind::TimedOut),
&credentials,
&jsonl_ok,
&now_fn(),
&config(),
)
.expect("ok");
assert_jsonl_matches_ok_fixture(&data);
}
#[test]
fn cache_write_failure_does_not_block_returned_data() {
let tmp = TempDir::new().unwrap();
let blocking_file = tmp.path().join("blocked");
std::fs::write(&blocking_file, "x").unwrap();
let cache = CacheStore::new(blocking_file.join("nested"));
let data = resolve_usage(
Some(&cache),
None,
&FakeTransport::ok(200, SAMPLE_BODY, None),
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.expect("ok");
assert!(matches!(data, UsageData::Endpoint(_)));
}
#[test]
fn fresh_cache_is_source_endpoint_not_jsonl() {
let tmp = TempDir::new().unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
cache
.write(&CachedUsage::with_data(sample_response()))
.unwrap();
let data = resolve_usage(
Some(&cache),
None,
&FakeTransport::ok(200, "", None),
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.expect("ok");
assert!(matches!(data, UsageData::Endpoint(_)));
}
#[test]
fn clock_skew_future_cached_at_treats_entry_as_stale() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("usage.json");
let mut entry = CachedUsage::with_data(sample_response());
entry.cached_at = Utc::now() + ChronoDuration::hours(1);
std::fs::write(&path, serde_json::to_string(&entry).unwrap()).unwrap();
let cache = CacheStore::new(tmp.path().to_path_buf());
let transport = FakeTransport::ok(200, SAMPLE_BODY, None);
let _ = resolve_usage(
Some(&cache),
None,
&transport,
&ok_creds,
&jsonl_empty,
&now_fn(),
&config(),
)
.expect("ok");
assert_eq!(transport.calls.get(), 1);
}
fn make_io_error(kind: io::ErrorKind) -> CacheError {
CacheError::Io {
path: std::path::PathBuf::from("/test/path"),
cause: io::Error::new(kind, "test"),
}
}
fn make_persist_error(kind: io::ErrorKind) -> CacheError {
CacheError::Persist {
path: std::path::PathBuf::from("/test/path"),
cause: io::Error::new(kind, "test"),
}
}
#[test]
fn classify_persist_error_routes_io_failure_to_error() {
let (class, msg) = classify_persist_error("cache", &make_io_error(io::ErrorKind::NotFound));
assert_eq!(class, PersistLogClass::Error);
assert!(
msg.contains("cascade: cache write failed:"),
"expected loud-signal prefix, got {msg:?}"
);
}
#[test]
fn classify_persist_error_routes_lock_kind_into_message() {
let (class, msg) =
classify_persist_error("lock", &make_persist_error(io::ErrorKind::OutOfMemory));
assert_eq!(class, PersistLogClass::Error);
assert!(
msg.contains("cascade: lock write failed:"),
"kind label must thread through, got {msg:?}"
);
}
#[cfg(unix)]
#[test]
fn classify_persist_error_routes_permission_denied_to_error_on_unix() {
let (class, msg) = classify_persist_error(
"cache",
&make_persist_error(io::ErrorKind::PermissionDenied),
);
assert_eq!(class, PersistLogClass::Error);
assert!(msg.contains("cascade: cache write failed:"));
}
#[cfg(windows)]
#[test]
fn classify_persist_error_routes_persist_permission_denied_to_debug_on_windows() {
let (class, msg) = classify_persist_error(
"cache",
&make_persist_error(io::ErrorKind::PermissionDenied),
);
assert_eq!(class, PersistLogClass::Debug);
assert!(
msg.contains("race-loser") && msg.contains("Windows MoveFileEx"),
"expected race-loser framing, got {msg:?}"
);
}
#[cfg(windows)]
#[test]
fn classify_persist_error_routes_io_permission_denied_to_error_on_windows() {
let (class, _msg) =
classify_persist_error("cache", &make_io_error(io::ErrorKind::PermissionDenied));
assert_eq!(class, PersistLogClass::Error);
}
#[test]
fn route_persist_error_dispatches_debug_class_to_debug_closure_only() {
let mut debug_calls = 0;
let mut error_calls = 0;
route_persist_error(
PersistLogClass::Debug,
"msg",
|_| debug_calls += 1,
|_| error_calls += 1,
);
assert_eq!((debug_calls, error_calls), (1, 0));
}
#[test]
fn route_persist_error_dispatches_error_class_to_error_closure_only() {
let mut debug_calls = 0;
let mut error_calls = 0;
route_persist_error(
PersistLogClass::Error,
"msg",
|_| debug_calls += 1,
|_| error_calls += 1,
);
assert_eq!((debug_calls, error_calls), (0, 1));
}
#[test]
fn route_persist_error_passes_msg_through_unchanged() {
let mut received: Option<String> = None;
route_persist_error(
PersistLogClass::Error,
"cascade: cache write failed: disk full",
|_| {},
|s| received = Some(s.to_string()),
);
assert_eq!(
received.as_deref(),
Some("cascade: cache write failed: disk full")
);
}
}