use std::sync::Arc;
use std::time::Duration;
use jiff::Timestamp;
use super::cache::{CacheError, CacheStore, CachedUsage, Lock, LockStore};
use super::credentials::Credentials;
use super::error::{CredentialError, JsonlError, UsageError};
use super::fetcher::{self, UsageTransport};
use super::jsonl::{self, JsonlAggregate};
use super::usage::{FiveHourWindow, JsonlUsage, SevenDayWindow, UsageApiResponse, UsageData};
pub const DEFAULT_CACHE_DURATION: Duration = Duration::from_secs(180);
pub const DEFAULT_ERROR_TTL: Duration = Duration::from_secs(30);
pub const DEFAULT_RATE_LIMIT_BACKOFF: Duration = Duration::from_secs(300);
pub const DEFAULT_API_BASE_URL: &str = "https://api.anthropic.com";
#[derive(Debug, Clone)]
pub struct UsageCascadeConfig {
pub api_base_url: String,
pub timeout: Duration,
pub cache_duration: Duration,
}
impl Default for UsageCascadeConfig {
fn default() -> Self {
Self {
api_base_url: DEFAULT_API_BASE_URL.into(),
timeout: fetcher::DEFAULT_TIMEOUT,
cache_duration: DEFAULT_CACHE_DURATION,
}
}
}
pub fn resolve_usage(
cache: Option<&CacheStore>,
lock: Option<&LockStore>,
transport: &dyn UsageTransport,
credentials: &dyn Fn() -> Arc<Result<Credentials, CredentialError>>,
jsonl: &dyn Fn() -> Result<JsonlAggregate, JsonlError>,
now: &dyn Fn() -> Timestamp,
config: &UsageCascadeConfig,
) -> Result<UsageData, UsageError> {
let cache_entry = read_cache(cache);
let lock_entry = read_lock(lock);
let now_ts = now();
if let Some(entry) = &cache_entry {
if is_fresh(entry, now_ts, config.cache_duration) {
if let Some(data) = entry.data.clone() {
return Ok(cached_to_usage_data(data));
}
}
}
let lock_active = lock_entry
.as_ref()
.is_some_and(|l| l.blocked_until > now_ts.as_second());
if lock_active {
let lock_error = lock_entry.as_ref().and_then(|l| l.error.as_deref());
let lock_from_401 = lock_error == Some("Unauthorized");
if let Some(entry) = &cache_entry {
if !lock_from_401 {
if let Some(data) = entry.data.clone() {
return Ok(cached_to_usage_data(data));
}
}
if let Some(cached) = &entry.error {
return jsonl_or(jsonl, now_ts, usage_error_from_code(&cached.code));
}
}
let lock_err = lock_error
.map(usage_error_from_code)
.unwrap_or(UsageError::RateLimited { retry_after: None });
return jsonl_or(jsonl, now_ts, lock_err);
}
let creds_arc = credentials();
let creds = match &*creds_arc {
Ok(c) => c.clone(),
Err(CredentialError::NoCredentials) => {
return jsonl_or(jsonl, now_ts, UsageError::NoCredentials)
}
Err(other) => {
return jsonl_or(jsonl, now_ts, UsageError::Credentials(other.clone()));
}
};
match fetcher::fetch_usage(transport, &config.api_base_url, &creds, config.timeout) {
Ok(response) => {
write_cache(cache, CachedUsage::with_data(response.clone()));
write_lock(
lock,
Lock {
blocked_until: add_secs(now_ts.as_second(), config.cache_duration),
error: None,
},
);
Ok(UsageData::Endpoint(response.into_endpoint_usage()))
}
Err(UsageError::Unauthorized) => {
if let Some(c) = cache {
if let Err(e) = c.clear() {
crate::lsm_error!(
"cascade: cache clear after 401 failed; peers may serve revoked-token data until TTL expires: {e}"
);
}
}
write_failure_lock(lock, now_ts, &UsageError::Unauthorized);
jsonl_or(jsonl, now_ts, UsageError::Unauthorized)
}
Err(err) => {
write_failure_lock(lock, now_ts, &err);
if let Some(entry) = &cache_entry {
if let Some(data) = entry.data.clone() {
return Ok(cached_to_usage_data(data));
}
}
jsonl_or(jsonl, now_ts, err)
}
}
}
fn jsonl_or(
jsonl: &dyn Fn() -> Result<JsonlAggregate, JsonlError>,
now: Timestamp,
fallback: UsageError,
) -> Result<UsageData, UsageError> {
match build_jsonl_usage(jsonl(), now) {
Some(data) => Ok(UsageData::Jsonl(data)),
None => Err(fallback),
}
}
fn build_jsonl_usage(
result: Result<JsonlAggregate, JsonlError>,
now: Timestamp,
) -> Option<JsonlUsage> {
let agg = match result {
Ok(agg) => agg,
Err(JsonlError::NoEntries | JsonlError::DirectoryMissing) => return None,
Err(other) => {
crate::lsm_warn!(
"cascade: JSONL fallback unavailable ({other}); surfacing endpoint error"
);
return None;
}
};
let now_floor = jsonl::floor_to_grain(now, 3600);
let five_hour = agg.five_hour.as_ref().map(|block| {
let start = block.start.min(now_floor);
FiveHourWindow::new(block.token_counts, start)
});
let seven_day = SevenDayWindow::new(agg.seven_day.token_counts);
Some(JsonlUsage::new(five_hour, seven_day))
}
fn read_cache(cache: Option<&CacheStore>) -> Option<CachedUsage> {
cache.and_then(|c| match c.read() {
Ok(hit) => hit,
Err(e) => {
log_cache_read_failure("cache", &e);
None
}
})
}
fn read_lock(lock: Option<&LockStore>) -> Option<Lock> {
lock.and_then(|l| match l.read() {
Ok(hit) => hit,
Err(e) => {
log_cache_read_failure("lock", &e);
None
}
})
}
fn log_cache_read_failure(kind: &str, err: &super::cache::CacheError) {
use std::io::ErrorKind;
let io_kind = match err {
super::cache::CacheError::Io { cause, .. }
| super::cache::CacheError::Persist { cause, .. } => cause.kind(),
};
match io_kind {
ErrorKind::NotFound | ErrorKind::UnexpectedEof => {
crate::lsm_debug!("cascade: {kind} read failed: {err}; treating as miss");
}
_ => {
crate::lsm_warn!("cascade: {kind} read failed: {err}");
}
}
}
fn write_cache(cache: Option<&CacheStore>, entry: CachedUsage) {
if let Some(c) = cache {
if let Err(e) = c.write(&entry) {
log_persist_error("cache", &e);
}
}
}
fn write_lock(lock: Option<&LockStore>, entry: Lock) {
if let Some(l) = lock {
if let Err(e) = l.write(&entry) {
log_persist_error("lock", &e);
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PersistLogClass {
Error,
Debug,
}
fn classify_persist_error(kind: &str, err: &CacheError) -> (PersistLogClass, String) {
if is_transient_persist_race(err) {
(
PersistLogClass::Debug,
format!("cascade: {kind} write race-loser (Windows MoveFileEx): {err}"),
)
} else {
(
PersistLogClass::Error,
format!("cascade: {kind} write failed: {err}"),
)
}
}
fn route_persist_error<D, E>(class: PersistLogClass, msg: &str, on_debug: D, on_error: E)
where
D: FnOnce(&str),
E: FnOnce(&str),
{
match class {
PersistLogClass::Debug => on_debug(msg),
PersistLogClass::Error => on_error(msg),
}
}
fn log_persist_error(kind: &str, err: &CacheError) {
let (class, msg) = classify_persist_error(kind, err);
route_persist_error(
class,
&msg,
|s| crate::lsm_debug!("{s}"),
|s| crate::lsm_error!("{s}"),
);
}
#[cfg(windows)]
fn is_transient_persist_race(err: &CacheError) -> bool {
matches!(
err,
CacheError::Persist { cause, .. }
if cause.kind() == std::io::ErrorKind::PermissionDenied
)
}
#[cfg(not(windows))]
fn is_transient_persist_race(_err: &CacheError) -> bool {
false
}
fn write_failure_lock(lock: Option<&LockStore>, now_ts: Timestamp, err: &UsageError) {
let backoff = backoff_for_error(err);
write_lock(
lock,
Lock {
blocked_until: add_secs(now_ts.as_second(), backoff),
error: Some(err.code().to_string()),
},
);
}
fn backoff_for_error(err: &UsageError) -> Duration {
match err {
UsageError::RateLimited {
retry_after: Some(d),
} => *d,
UsageError::RateLimited { retry_after: None } => DEFAULT_RATE_LIMIT_BACKOFF,
_ => DEFAULT_ERROR_TTL,
}
}
fn add_secs(base_ts: i64, secs: Duration) -> i64 {
let offset = i64::try_from(secs.as_secs()).unwrap_or(i64::MAX);
base_ts.saturating_add(offset)
}
fn usage_error_from_code(code: &str) -> UsageError {
match code {
"NoCredentials" => UsageError::NoCredentials,
"Timeout" => UsageError::Timeout,
"RateLimited" => UsageError::RateLimited { retry_after: None },
"Unauthorized" => UsageError::Unauthorized,
"ParseError" => UsageError::ParseError,
_ => UsageError::NetworkError,
}
}
fn is_fresh(entry: &CachedUsage, now: Timestamp, ttl: Duration) -> bool {
match Duration::try_from(now.duration_since(entry.cached_at)) {
Ok(elapsed) => elapsed < ttl,
Err(_) => false,
}
}
fn cached_to_usage_data(data: super::cache::CachedData) -> UsageData {
let response: UsageApiResponse = data.into();
UsageData::Endpoint(response.into_endpoint_usage())
}
#[cfg(test)]
mod tests;