use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc::{Receiver, Sender};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crate::lockorder::{RankedMutex, rank};
use super::fetch::{
FetchError, UsageInfo, fetch_raw, iso_to_epoch_secs, load_disk_cache, now_epoch_secs, now_ms,
write_disk_cache,
};
#[cfg(test)]
#[allow(unused_imports)]
use super::fetch::UsageWindow;
const TICK_INTERVAL: Duration = Duration::from_secs(1);
pub(crate) const NORMAL_INTERVAL_MS: u64 = 35_000;
pub(crate) const LEARNED_FLOOR_MS: u64 = 10_000;
pub(crate) const LEARNED_CEILING_MS: u64 = 300_000;
pub(crate) const LEARNED_STEP_MS: u64 = 5_000;
pub(crate) const LEARNED_QUIET_RESET_MS: u64 = 5 * 60 * 1_000;
const DEFAULT_FALLBACK_THRESHOLD: f64 = 95.0;
const NEAR_THRESHOLD_MARGIN: f64 = 5.0;
const CACHE_HIT_EPSILON: f64 = 1e-9;
pub(crate) const SERVER_CACHE_TTL_ESTIMATE_MS: u64 = 25_000;
const _: () = assert!(
NORMAL_INTERVAL_MS > SERVER_CACHE_TTL_ESTIMATE_MS,
"NORMAL_INTERVAL_MS must exceed SERVER_CACHE_TTL_ESTIMATE_MS for cache-hit elimination to terminate",
);
const _: () = assert!(
NORMAL_INTERVAL_MS.saturating_sub(2 * LEARNED_STEP_MS) >= SERVER_CACHE_TTL_ESTIMATE_MS,
"two bump_downs from NORMAL must stay at or above the server cache TTL, or the learner oscillates around TTL with a non-zero steady-state cache-hit rate",
);
pub(crate) type UsageStore = Arc<RankedMutex<HashMap<String, UsageInfo>, { rank::USAGE_STORE }>>;
pub(crate) type StatusStore =
Arc<RankedMutex<HashMap<String, FetchStatus>, { rank::USAGE_STATUS }>>;
pub(crate) type TokenList = Arc<RankedMutex<Vec<TokenEntry>, { rank::TOKENS }>>;
pub(crate) type LastFetchedAt = Arc<RankedMutex<HashMap<String, u64>, { rank::LAST_FETCHED }>>;
pub(crate) type RefetchQueue = Arc<RankedMutex<HashSet<String>, { rank::REFETCH_QUEUE }>>;
pub(crate) type LearnedIntervals = Arc<RankedMutex<HashMap<String, u64>, { rank::LEARNED }>>;
pub(crate) type ConsecutiveOk = Arc<RankedMutex<HashMap<String, u32>, { rank::OK_COUNT }>>;
pub(crate) type ConsecutiveCacheHit = Arc<RankedMutex<HashMap<String, u32>, { rank::CACHE_HIT }>>;
pub(crate) type Last429At = Arc<RankedMutex<HashMap<String, u64>, { rank::LAST_429 }>>;
pub(crate) type PendingAutoStart = Arc<RankedMutex<HashSet<String>, { rank::PENDING_AUTO_START }>>;
pub(crate) type PendingWindowRotation =
Arc<RankedMutex<HashMap<String, i64>, { rank::PENDING_WINDOW_ROTATION }>>;
pub(crate) type LastRotatedWindow =
Arc<RankedMutex<HashMap<String, i64>, { rank::LAST_ROTATED_WINDOW }>>;
pub(crate) type PendingSwitch = Arc<RankedMutex<HashSet<String>, { rank::PENDING_SWITCH }>>;
pub(crate) type PendingSwitchOff = Arc<RankedMutex<bool, { rank::PENDING_SWITCH_OFF }>>;
#[derive(Clone)]
pub(crate) struct TokenEntry {
pub(crate) name: String,
pub(crate) access_token: String,
pub(crate) refresh_token: Option<String>,
pub(crate) fallback_threshold: f64,
}
pub(crate) type NextRefreshPerProfile =
Arc<RankedMutex<HashMap<String, u64>, { rank::NEXT_REFRESH }>>;
pub(crate) type ActivityStore =
Arc<RankedMutex<HashMap<String, ProfileActivity>, { rank::ACTIVITY }>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ProfileActivity {
Idle,
Fetching,
Refreshing,
Switching,
#[allow(dead_code)]
Starting,
AutoStarting,
}
#[allow(dead_code)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ActivityKind {
Fetching,
Refreshing,
Switching,
Starting,
AutoStarting,
}
impl ActivityKind {
#[allow(dead_code)]
pub(crate) fn as_activity(self) -> ProfileActivity {
match self {
ActivityKind::Fetching => ProfileActivity::Fetching,
ActivityKind::Refreshing => ProfileActivity::Refreshing,
ActivityKind::Switching => ProfileActivity::Switching,
ActivityKind::Starting => ProfileActivity::Starting,
ActivityKind::AutoStarting => ProfileActivity::AutoStarting,
}
}
}
#[derive(Debug)]
pub(crate) struct OpResult {
pub(crate) name: String,
pub(crate) kind: ActivityKind,
pub(crate) outcome: anyhow::Result<()>,
}
pub(crate) type OpResultSender = Sender<OpResult>;
pub(crate) type OpResultReceiver = Receiver<OpResult>;
#[derive(Debug)]
pub(crate) enum StartupSignal {
ReconcileDone,
ReconcileNeedsPrompt { active: String },
BootstrapDone,
}
pub(crate) type StartupSender = Sender<StartupSignal>;
pub(crate) type StartupReceiver = Receiver<StartupSignal>;
pub(crate) fn mark_activity(store: &ActivityStore, name: &str, activity: ProfileActivity) {
if let Ok(mut g) = store.lock() {
if matches!(activity, ProfileActivity::Idle) {
g.remove(name);
} else {
g.insert(name.to_string(), activity);
}
}
}
pub(crate) fn clear_activity(store: &ActivityStore, name: &str) {
if let Ok(mut g) = store.lock() {
g.remove(name);
}
}
pub(crate) fn is_idle(store: &ActivityStore, name: &str) -> bool {
match store.lock() {
Ok(g) => !g.contains_key(name),
Err(_) => false,
}
}
pub(crate) fn any_busy(store: &ActivityStore) -> bool {
match store.lock() {
Ok(g) => !g.is_empty(),
Err(_) => true,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum FetchStatus {
Fresh,
Cached,
Failed,
RateLimited,
}
pub(crate) type RotatedTokens = (String, Option<String>);
fn load_cached_with_status(name: &str, status: FetchStatus) -> (Option<UsageInfo>, FetchStatus) {
match load_disk_cache(name) {
Some(info) => (Some(info), status),
None => (None, FetchStatus::Failed),
}
}
fn fetch_with_rotation(
config: &crate::profile::ConfigHandle,
name: &str,
access_token: &str,
refresh_token: Option<&str>,
refetch: &RefetchQueue,
activity: &ActivityStore,
) -> (Option<UsageInfo>, FetchStatus, Option<RotatedTokens>) {
let saw_429 = match fetch_raw(access_token) {
Ok(info) => return (Some(info), FetchStatus::Fresh, None),
Err(FetchError::Status(429)) => true,
Err(FetchError::Status(401)) => false,
Err(_) => {
let (info, status) = load_cached_with_status(name, FetchStatus::Cached);
return (info, status, None);
}
};
let fallback_status = if saw_429 {
FetchStatus::RateLimited
} else {
FetchStatus::Cached
};
let Some(rt) = refresh_token else {
let (info, status) = load_cached_with_status(name, fallback_status);
return (info, status, None);
};
let Ok(_rotation_guard) = crate::runtime::RotationGuard::acquire(name) else {
let (info, status) = load_cached_with_status(name, fallback_status);
return (info, status, None);
};
if crate::runtime::has_live_session(name) {
let (info, status) = load_cached_with_status(name, fallback_status);
return (info, status, None);
}
mark_activity(activity, name, ProfileActivity::Refreshing);
let refresh_result = crate::oauth::refresh(rt);
mark_activity(activity, name, ProfileActivity::Fetching);
let tok = match refresh_result {
Ok(t) => t,
Err(_) => {
let (info, status) = load_cached_with_status(name, fallback_status);
return (info, status, None);
}
};
let access = tok.access_token.clone();
let refresh = tok.refresh_token.clone();
if !crate::oauth::apply_rotated_tokens_locked(config, name, tok, None) {
let (info, status) = load_cached_with_status(name, fallback_status);
return (info, status, None);
}
let rotated: Option<RotatedTokens> = Some((access.clone(), Some(refresh)));
match fetch_raw(&access) {
Ok(info) => {
let status = if saw_429 {
FetchStatus::RateLimited
} else {
FetchStatus::Fresh
};
(Some(info), status, rotated)
}
Err(FetchError::Status(429)) => {
let (info, _) = load_cached_with_status(name, FetchStatus::RateLimited);
(info, FetchStatus::RateLimited, rotated)
}
Err(_) => {
if let Ok(mut q) = refetch.lock() {
q.insert(name.to_string());
}
let (info, status) = load_cached_with_status(name, fallback_status);
(info, status, rotated)
}
}
}
fn five_hour_utilization(info: &UsageInfo) -> Option<f64> {
info.five_hour.as_ref().map(|w| w.utilization)
}
static JITTER_COUNTER: AtomicU64 = AtomicU64::new(0);
fn jitter_seed() -> u64 {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let counter = JITTER_COUNTER.fetch_add(1, Ordering::Relaxed);
let mixed = nanos ^ counter.wrapping_mul(0x9E37_79B9_7F4A_7C15);
mixed
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1_442_695_040_888_963_407)
}
pub(crate) fn bump_up(current: u64) -> u64 {
let raised = current.saturating_mul(3) / 2;
let margin = raised / 10;
if raised.saturating_add(margin) >= LEARNED_CEILING_MS {
return LEARNED_CEILING_MS;
}
let jitter = if margin == 0 {
0i64
} else {
i64::try_from(jitter_seed() % (margin * 2 + 1)).unwrap_or(0)
- i64::try_from(margin).unwrap_or(0)
};
((raised as i64 + jitter).max(LEARNED_FLOOR_MS as i64) as u64).min(LEARNED_CEILING_MS)
}
pub(crate) fn bump_down(current: u64) -> u64 {
current
.saturating_sub(LEARNED_STEP_MS)
.max(LEARNED_FLOOR_MS)
}
fn detect_cache_hit(
status: FetchStatus,
elapsed_ms: u64,
prev_util: Option<f64>,
new_util: Option<f64>,
) -> bool {
if !matches!(status, FetchStatus::Fresh) {
return false;
}
if elapsed_ms >= SERVER_CACHE_TTL_ESTIMATE_MS {
return false;
}
match (prev_util, new_util) {
(Some(a), Some(b)) => (a - b).abs() < CACHE_HIT_EPSILON,
_ => false,
}
}
fn interval_for(
entry: &TokenEntry,
last_5h: Option<f64>,
learned_intervals: &LearnedIntervals,
) -> u64 {
let near = entry.fallback_threshold > NEAR_THRESHOLD_MARGIN
&& matches!(last_5h, Some(u) if u >= entry.fallback_threshold - NEAR_THRESHOLD_MARGIN);
if near {
return LEARNED_FLOOR_MS;
}
learned_intervals
.lock()
.ok()
.and_then(|m| m.get(&entry.name).copied())
.unwrap_or(NORMAL_INTERVAL_MS)
}
#[allow(clippy::too_many_arguments)]
fn update_learner(
name: &str,
status: FetchStatus,
cache_hit: bool,
util_changed: bool,
learned: &LearnedIntervals,
ok_count: &ConsecutiveOk,
cache_hit_count: &ConsecutiveCacheHit,
last_429: &Last429At,
) {
let now = now_ms();
let (Ok(mut learned_g), Ok(mut ok_g), Ok(mut ch_g), Ok(mut l429_g)) = (
learned.lock(),
ok_count.lock(),
cache_hit_count.lock(),
last_429.lock(),
) else {
return;
};
if matches!(status, FetchStatus::Fresh)
&& let Some(&t429) = l429_g.get(name)
&& t429 != 0
&& now.saturating_sub(t429) >= LEARNED_QUIET_RESET_MS
{
let current = learned_g.get(name).copied().unwrap_or(NORMAL_INTERVAL_MS);
if current > NORMAL_INTERVAL_MS {
learned_g.insert(name.to_string(), NORMAL_INTERVAL_MS);
}
l429_g.remove(name);
}
match status {
FetchStatus::RateLimited => {
let current = learned_g.get(name).copied().unwrap_or(NORMAL_INTERVAL_MS);
learned_g.insert(name.to_string(), bump_up(current));
ok_g.insert(name.to_string(), 0);
ch_g.insert(name.to_string(), 0);
l429_g.insert(name.to_string(), now);
}
FetchStatus::Fresh if cache_hit => {
let hits = ch_g.get(name).copied().unwrap_or(0) + 1;
if hits >= 2 {
let current = learned_g.get(name).copied().unwrap_or(NORMAL_INTERVAL_MS);
let target =
(SERVER_CACHE_TTL_ESTIMATE_MS + LEARNED_STEP_MS).min(NORMAL_INTERVAL_MS);
let bumped = current.max(target);
learned_g.insert(name.to_string(), bumped);
ch_g.insert(name.to_string(), 0);
} else {
ch_g.insert(name.to_string(), hits);
}
}
FetchStatus::Fresh if util_changed => {
let count = ok_g.get(name).copied().unwrap_or(0) + 1;
if count >= 2 {
let current = learned_g.get(name).copied().unwrap_or(NORMAL_INTERVAL_MS);
learned_g.insert(name.to_string(), bump_down(current));
ok_g.insert(name.to_string(), 0);
} else {
ok_g.insert(name.to_string(), count);
}
ch_g.insert(name.to_string(), 0);
}
FetchStatus::Fresh => {}
FetchStatus::Cached | FetchStatus::Failed => {}
}
}
struct FetchOutcome {
name: String,
info: Option<UsageInfo>,
status: FetchStatus,
rotated: Option<RotatedTokens>,
}
fn run_fetch(
config: &crate::profile::ConfigHandle,
entry: TokenEntry,
refetch: &RefetchQueue,
activity: &ActivityStore,
) -> FetchOutcome {
let (info, status, rotated) = fetch_with_rotation(
config,
&entry.name,
&entry.access_token,
entry.refresh_token.as_deref(),
refetch,
activity,
);
FetchOutcome {
name: entry.name,
info,
status,
rotated,
}
}
#[allow(clippy::too_many_arguments)]
fn apply_outcome(
outcome: FetchOutcome,
store: &UsageStore,
status: &StatusStore,
last_fetched: &LastFetchedAt,
learned: &LearnedIntervals,
ok_count: &ConsecutiveOk,
cache_hit_count: &ConsecutiveCacheHit,
last_429: &Last429At,
) {
let now = now_ms();
let is_fresh = matches!(
outcome.status,
FetchStatus::Fresh | FetchStatus::RateLimited
);
if is_fresh && let Some(info) = &outcome.info {
write_disk_cache(&outcome.name, info);
}
let (prev_util, new_util): (Option<f64>, Option<f64>) = if let Ok(mut s) = store.lock() {
let prev = s.get(&outcome.name).and_then(five_hour_utilization);
let new_u = outcome.info.as_ref().and_then(five_hour_utilization);
if let Some(info) = &outcome.info {
if is_fresh || !s.contains_key(&outcome.name) {
s.insert(outcome.name.clone(), info.clone());
}
}
(prev, new_u)
} else {
(None, outcome.info.as_ref().and_then(five_hour_utilization))
};
let elapsed_ms: u64 = last_fetched
.lock()
.ok()
.and_then(|m| m.get(&outcome.name).copied())
.map(|prev| now.saturating_sub(prev))
.unwrap_or(u64::MAX);
let cache_hit = detect_cache_hit(outcome.status, elapsed_ms, prev_util, new_util);
let util_changed = matches!(
(prev_util, new_util),
(Some(a), Some(b)) if (a - b).abs() >= CACHE_HIT_EPSILON
);
if let Ok(mut st) = status.lock() {
st.insert(outcome.name.clone(), outcome.status);
}
if let Ok(mut lf) = last_fetched.lock() {
lf.insert(outcome.name.clone(), now);
}
update_learner(
&outcome.name,
outcome.status,
cache_hit,
util_changed,
learned,
ok_count,
cache_hit_count,
last_429,
);
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn fetch_all_into(
config: &crate::profile::ConfigHandle,
tokens: &[TokenEntry],
store: &UsageStore,
status: &StatusStore,
last_fetched: &LastFetchedAt,
refetch: &RefetchQueue,
activity: &ActivityStore,
learned: &LearnedIntervals,
ok_count: &ConsecutiveOk,
cache_hit_count: &ConsecutiveCacheHit,
last_429: &Last429At,
) {
if tokens.is_empty() {
return;
}
for entry in tokens {
mark_activity(activity, &entry.name, ProfileActivity::Fetching);
}
let handles: Vec<_> = tokens
.iter()
.cloned()
.map(|entry| {
let name = entry.name.clone();
let config = Arc::clone(config);
let refetch = Arc::clone(refetch);
let activity = Arc::clone(activity);
let h = std::thread::spawn(move || run_fetch(&config, entry, &refetch, &activity));
(name, h)
})
.collect();
for (name, h) in handles {
match h.join() {
Ok(outcome) => {
clear_activity(activity, &outcome.name);
apply_outcome(
outcome,
store,
status,
last_fetched,
learned,
ok_count,
cache_hit_count,
last_429,
);
}
Err(_) => {
clear_activity(activity, &name);
}
}
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn spawn_refresher(
config: crate::profile::ConfigHandle,
tokens: TokenList,
store: UsageStore,
status: StatusStore,
next_refresh_per_profile: NextRefreshPerProfile,
activity: ActivityStore,
last_fetched: LastFetchedAt,
pending_window_rotation: PendingWindowRotation,
last_rotated_window: LastRotatedWindow,
pending_switch: PendingSwitch,
pending_switch_off: PendingSwitchOff,
refetch_queue: RefetchQueue,
learned: LearnedIntervals,
ok_count: ConsecutiveOk,
cache_hit_count: ConsecutiveCacheHit,
last_429: Last429At,
) {
std::thread::spawn(move || {
loop {
std::thread::sleep(TICK_INTERVAL);
let snapshot: Vec<TokenEntry> = match tokens.lock() {
Ok(t) => t.clone(),
Err(_) => continue,
};
if snapshot.is_empty() {
continue;
}
let forced: HashSet<String> = refetch_queue
.lock()
.ok()
.map(|mut q| std::mem::take(&mut *q))
.unwrap_or_default();
let now = now_ms();
let (mut due, _soonest_next, mut per_profile_next) =
partition_due(&snapshot, now, &store, &last_fetched, &learned, &activity);
if !forced.is_empty() {
let switching: HashSet<String> = match activity.lock() {
Ok(a) => a
.iter()
.filter(|(_, v)| {
matches!(v, ProfileActivity::Switching | ProfileActivity::Refreshing)
})
.map(|(n, _)| n.clone())
.collect(),
Err(_) => snapshot.iter().map(|e| e.name.clone()).collect(),
};
let mut extras: Vec<TokenEntry> = Vec::with_capacity(forced.len());
for entry in snapshot.iter().filter(|e| {
forced.contains(&e.name)
&& !switching.contains(&e.name)
&& !due.iter().any(|d| d.name == e.name)
}) {
per_profile_next.insert(entry.name.clone(), now);
extras.push(entry.clone());
}
due.extend(extras);
}
if let Ok(mut nrpp) = next_refresh_per_profile.lock() {
nrpp.clone_from(&per_profile_next);
}
let pre_fetch_resets: HashMap<String, i64> = match store.lock() {
Ok(st) => snapshot
.iter()
.filter_map(|entry| {
let resets_at_str = st
.get(&entry.name)
.and_then(|u| u.five_hour.as_ref())
.and_then(|w| w.resets_at.as_deref())?;
Some((entry.name.clone(), iso_to_epoch_secs(resets_at_str)?))
})
.collect(),
Err(_) => HashMap::new(),
};
scan_expired_windows(
&pre_fetch_resets,
&activity,
&last_rotated_window,
&pending_window_rotation,
);
if due.is_empty() {
continue;
}
for entry in &due {
mark_activity(&activity, &entry.name, ProfileActivity::Fetching);
}
let handles: Vec<_> = due
.into_iter()
.map(|entry| {
let name = entry.name.clone();
let config = Arc::clone(&config);
let refetch_queue = Arc::clone(&refetch_queue);
let activity = Arc::clone(&activity);
let h = std::thread::spawn(move || {
run_fetch(&config, entry, &refetch_queue, &activity)
});
(name, h)
})
.collect();
for (name, h) in handles {
match h.join() {
Ok(outcome) => {
clear_activity(&activity, &outcome.name);
if let Some((new_access, new_refresh)) = &outcome.rotated
&& let Ok(mut t) = tokens.lock()
&& let Some(entry) = t.iter_mut().find(|e| e.name == outcome.name)
{
entry.access_token = new_access.clone();
entry.refresh_token = new_refresh.clone();
}
apply_outcome(
outcome,
&store,
&status,
&last_fetched,
&learned,
&ok_count,
&cache_hit_count,
&last_429,
);
}
Err(_) => {
clear_activity(&activity, &name);
}
}
}
let (_, _, per_profile_after) = partition_due(
&snapshot,
now_ms(),
&store,
&last_fetched,
&learned,
&activity,
);
if let Ok(mut nrpp) = next_refresh_per_profile.lock() {
nrpp.clone_from(&per_profile_after);
}
scan_auto_switch(
&config,
&store,
&activity,
&pending_switch,
&pending_switch_off,
);
}
});
}
fn scan_auto_switch(
config: &crate::profile::ConfigHandle,
store: &UsageStore,
activity: &ActivityStore,
pending_switch: &PendingSwitch,
pending_switch_off: &PendingSwitchOff,
) {
{
let Ok(p) = pending_switch.lock() else { return };
if !p.is_empty() {
return;
}
}
{
let Ok(off) = pending_switch_off.lock() else {
return;
};
if *off {
return;
}
}
{
let Ok(a) = activity.lock() else { return };
if a.values().any(|v| matches!(v, ProfileActivity::Switching)) {
return;
}
}
let snapshot = {
let cfg = match config.lock() {
Ok(c) => c,
Err(_) => return,
};
crate::fallback::snapshot_chain(&cfg)
};
let Some(snapshot) = snapshot else {
return;
};
match crate::fallback::next_auto_switch_target(&snapshot, store) {
Some(crate::fallback::SwitchAction::To(name)) => {
if let Ok(mut p) = pending_switch.lock() {
p.insert(name);
}
}
Some(crate::fallback::SwitchAction::Off) => {
if let Ok(mut off) = pending_switch_off.lock() {
*off = true;
}
}
None => {}
}
}
fn scan_expired_windows(
pre_fetch_resets: &HashMap<String, i64>,
activity: &ActivityStore,
last_rotated_window: &LastRotatedWindow,
pending: &PendingWindowRotation,
) {
let now = now_epoch_secs();
let candidates: Vec<(String, i64)> = pre_fetch_resets
.iter()
.filter_map(|(name, &resets_at)| {
if now < resets_at + 5 {
return None;
}
Some((name.clone(), resets_at))
})
.collect();
if candidates.is_empty() {
return;
}
let candidates: Vec<(String, i64)> = {
let Ok(act) = activity.lock() else { return };
candidates
.into_iter()
.filter(|(name, _)| {
!matches!(
act.get(name),
Some(ProfileActivity::Switching | ProfileActivity::Refreshing)
)
})
.collect()
};
if candidates.is_empty() {
return;
}
let to_enqueue: Vec<(String, i64)> = {
let Ok(lrw) = last_rotated_window.lock() else {
return;
};
candidates
.into_iter()
.filter(|(name, resets_at)| lrw.get(name).copied().unwrap_or(0) != *resets_at)
.collect()
};
if to_enqueue.is_empty() {
return;
}
let Ok(mut pend) = pending.lock() else {
return;
};
for (name, resets_at) in to_enqueue {
pend.insert(name, resets_at);
}
}
fn partition_due(
snapshot: &[TokenEntry],
now: u64,
store: &UsageStore,
last_fetched: &LastFetchedAt,
learned: &LearnedIntervals,
activity: &ActivityStore,
) -> (Vec<TokenEntry>, u64, HashMap<String, u64>) {
let Ok(lf) = last_fetched.lock() else {
return (Vec::new(), now + NORMAL_INTERVAL_MS, HashMap::new());
};
let Ok(st) = store.lock() else {
return (Vec::new(), now + NORMAL_INTERVAL_MS, HashMap::new());
};
let act = activity.lock();
let mut due = Vec::new();
let mut soonest_next = now + NORMAL_INTERVAL_MS;
let mut per_profile = HashMap::with_capacity(snapshot.len());
for entry in snapshot {
let last = lf.get(&entry.name).copied().unwrap_or(0);
let last_5h = st.get(&entry.name).and_then(five_hour_utilization);
let interval = interval_for(entry, last_5h, learned);
let next = last.saturating_add(interval);
per_profile.insert(entry.name.clone(), next);
let excluded = match act.as_ref() {
Ok(a) => matches!(
a.get(&entry.name),
Some(ProfileActivity::Switching | ProfileActivity::Refreshing)
),
Err(_) => true,
};
if excluded {
continue;
}
if now >= next {
due.push(entry.clone());
} else if next < soonest_next {
soonest_next = next;
}
}
(due, soonest_next, per_profile)
}
pub(crate) const fn default_fallback_threshold() -> f64 {
DEFAULT_FALLBACK_THRESHOLD
}
#[cfg(test)]
#[path = "../../tests/inline/learned_cadence.rs"]
mod tests;