use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use astrid_core::PrincipalId;
use dashmap::DashMap;
use parking_lot::Mutex;
#[derive(Clone, Default)]
pub struct FuelLedger {
inner: Arc<DashMap<PrincipalId, AtomicU64>>,
}
impl FuelLedger {
pub fn charge(&self, principal: &PrincipalId, fuel: u64) {
if fuel == 0 {
return;
}
if let Some(counter) = self.inner.get(principal) {
Self::saturating_add(&counter, fuel);
return;
}
Self::saturating_add(&self.inner.entry(principal.clone()).or_default(), fuel);
}
fn saturating_add(counter: &AtomicU64, fuel: u64) {
let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
Some(v.saturating_add(fuel))
});
}
#[must_use]
pub fn total(&self, principal: &PrincipalId) -> u64 {
self.inner
.get(principal)
.map_or(0, |counter| counter.load(Ordering::Relaxed))
}
}
struct FuelWindow {
window_start: Instant,
fuel_in_window: u64,
}
const WINDOW: Duration = Duration::from_secs(1);
const PRUNE_THRESHOLD: usize = 1000;
const PRUNE_INTERVAL: Duration = Duration::from_secs(60);
#[derive(Clone)]
pub struct FuelRateLimiter {
inner: Arc<DashMap<PrincipalId, Mutex<FuelWindow>>>,
last_prune: Arc<Mutex<Instant>>,
}
impl Default for FuelRateLimiter {
fn default() -> Self {
Self {
inner: Arc::new(DashMap::new()),
last_prune: Arc::new(Mutex::new(Instant::now())),
}
}
}
impl FuelRateLimiter {
fn roll(window: &mut FuelWindow, now: Instant) {
if now.saturating_duration_since(window.window_start) >= WINDOW {
window.window_start = now;
window.fuel_in_window = 0;
}
}
#[must_use]
pub fn over_budget(
&self,
principal: &PrincipalId,
max_fuel_per_sec: u64,
now: Instant,
) -> bool {
if max_fuel_per_sec == 0 {
return false;
}
let Some(cell) = self.inner.get(principal) else {
return false;
};
let mut window = cell.lock();
Self::roll(&mut window, now);
window.fuel_in_window > max_fuel_per_sec
}
pub fn record(&self, principal: &PrincipalId, fuel: u64, now: Instant) {
if fuel == 0 {
return;
}
if let Some(cell) = self.inner.get(principal) {
let mut window = cell.lock();
Self::roll(&mut window, now);
window.fuel_in_window = window.fuel_in_window.saturating_add(fuel);
} else {
let cell = self.inner.entry(principal.clone()).or_insert_with(|| {
Mutex::new(FuelWindow {
window_start: now,
fuel_in_window: 0,
})
});
let mut window = cell.lock();
Self::roll(&mut window, now);
window.fuel_in_window = window.fuel_in_window.saturating_add(fuel);
}
self.maybe_prune(now);
}
fn maybe_prune(&self, now: Instant) {
if self.inner.len() <= PRUNE_THRESHOLD {
return;
}
let Some(mut last) = self.last_prune.try_lock() else {
return;
};
if now.saturating_duration_since(*last) < PRUNE_INTERVAL {
return;
}
*last = now;
self.inner.retain(|_, cell| {
match cell.try_lock() {
Some(window) => now.saturating_duration_since(window.window_start) < WINDOW,
None => true,
}
});
}
}
#[cfg(test)]
mod tests {
use super::*;
fn pid(s: &str) -> PrincipalId {
PrincipalId::new(s).expect("valid principal id")
}
#[test]
fn charge_sums_per_principal() {
let ledger = FuelLedger::default();
let a = pid("alice");
ledger.charge(&a, 100);
ledger.charge(&a, 50);
assert_eq!(
ledger.inner.get(&a).unwrap().load(Ordering::Relaxed),
150,
"repeat charges accumulate"
);
}
#[test]
fn distinct_principals_are_independent() {
let ledger = FuelLedger::default();
let (a, b) = (pid("alice"), pid("bob"));
ledger.charge(&a, 100);
ledger.charge(&b, 7);
assert_eq!(ledger.inner.get(&a).unwrap().load(Ordering::Relaxed), 100);
assert_eq!(ledger.inner.get(&b).unwrap().load(Ordering::Relaxed), 7);
}
#[test]
fn clones_share_one_aggregate() {
let engine_a = FuelLedger::default();
let engine_b = engine_a.clone();
let p = pid("alice");
engine_a.charge(&p, 40);
engine_b.charge(&p, 2);
assert_eq!(
engine_a.inner.get(&p).unwrap().load(Ordering::Relaxed),
42,
"two engines sharing one ledger sum cross-capsule"
);
}
#[test]
fn total_sums_charges_and_defaults_to_zero() {
let ledger = FuelLedger::default();
let a = pid("alice");
ledger.charge(&a, 100);
ledger.charge(&a, 50);
assert_eq!(ledger.total(&a), 150, "total returns the cumulative sum");
assert_eq!(
ledger.total(&pid("absent")),
0,
"a never-charged principal reads as zero, not an error"
);
}
#[test]
fn zero_charge_is_a_noop() {
let ledger = FuelLedger::default();
let p = pid("alice");
ledger.charge(&p, 0);
assert!(
ledger.inner.get(&p).is_none(),
"a zero charge must not even create an entry"
);
}
#[test]
fn charge_saturates_instead_of_wrapping() {
let ledger = FuelLedger::default();
let p = pid("alice");
ledger.charge(&p, u64::MAX);
ledger.charge(&p, 10);
assert_eq!(
ledger.inner.get(&p).unwrap().load(Ordering::Relaxed),
u64::MAX,
"saturating add pins at u64::MAX"
);
}
const BUDGET: u64 = 1_000;
#[test]
fn rate_over_budget_after_recording_past_budget() {
let rl = FuelRateLimiter::default();
let p = pid("alice");
let t0 = Instant::now();
rl.record(&p, BUDGET, t0);
assert!(
!rl.over_budget(&p, BUDGET, t0),
"spending exactly the budget is allowed"
);
rl.record(&p, 1, t0);
assert!(
rl.over_budget(&p, BUDGET, t0),
"one unit past the budget must deny within the same window"
);
}
#[test]
fn rate_under_budget_is_admitted() {
let rl = FuelRateLimiter::default();
let p = pid("alice");
let t0 = Instant::now();
rl.record(&p, BUDGET / 2, t0);
assert!(
!rl.over_budget(&p, BUDGET, t0),
"a principal under budget must be admitted"
);
}
#[test]
fn rate_cold_principal_is_admitted() {
let rl = FuelRateLimiter::default();
let p = pid("ghost");
let t0 = Instant::now();
assert!(
!rl.over_budget(&p, BUDGET, t0),
"a cold principal must be admitted"
);
assert!(
rl.inner.get(&p).is_none(),
"over_budget must not create a window for a cold principal"
);
}
#[test]
fn rate_window_self_heals_across_the_second() {
let rl = FuelRateLimiter::default();
let p = pid("alice");
let t0 = Instant::now();
rl.record(&p, BUDGET * 10, t0);
assert!(
rl.over_budget(&p, BUDGET, t0),
"way over budget within the window must deny"
);
let t1 = t0 + Duration::from_millis(1_001);
assert!(
!rl.over_budget(&p, BUDGET, t1),
"after the window rolls, the principal must be admitted again (no permanent brick)"
);
}
#[test]
fn rate_zero_budget_is_always_admitted() {
let rl = FuelRateLimiter::default();
let p = pid("unbounded");
let t0 = Instant::now();
rl.record(&p, u64::MAX, t0);
assert!(
!rl.over_budget(&p, 0, t0),
"a zero (unlimited) budget must never deny"
);
}
#[test]
fn rate_distinct_principals_have_independent_windows() {
let rl = FuelRateLimiter::default();
let (a, b) = (pid("alice"), pid("bob"));
let t0 = Instant::now();
rl.record(&a, BUDGET * 5, t0);
assert!(rl.over_budget(&a, BUDGET, t0), "alice over budget");
assert!(
!rl.over_budget(&b, BUDGET, t0),
"bob's window is independent of alice's"
);
rl.record(&b, BUDGET / 4, t0);
assert!(!rl.over_budget(&b, BUDGET, t0), "bob still under budget");
}
#[test]
fn rate_zero_record_is_a_noop() {
let rl = FuelRateLimiter::default();
let p = pid("alice");
rl.record(&p, 0, Instant::now());
assert!(
rl.inner.get(&p).is_none(),
"a zero record must not create a window entry"
);
}
#[test]
fn rate_clones_share_one_map() {
let a = FuelRateLimiter::default();
let b = a.clone();
let p = pid("alice");
let t0 = Instant::now();
a.record(&p, BUDGET, t0);
b.record(&p, 1, t0);
assert!(
a.over_budget(&p, BUDGET, t0),
"two handles sharing one map sum cross-capsule into one window"
);
}
#[test]
fn rate_lazy_prune_caps_the_map() {
let rl = FuelRateLimiter::default();
let base = Instant::now();
*rl.last_prune.lock() = base;
for i in 0..(PRUNE_THRESHOLD + 50) {
rl.inner.insert(
pid(&format!("ghost-{i}")),
Mutex::new(FuelWindow {
window_start: base,
fuel_in_window: 1,
}),
);
}
assert!(
rl.inner.len() > PRUNE_THRESHOLD,
"map seeded over threshold"
);
let now = base + Duration::from_secs(120);
rl.record(&pid("live"), BUDGET, now);
assert!(
rl.inner.len() < PRUNE_THRESHOLD,
"lazy prune must drop stale windows and bound the map (len = {})",
rl.inner.len()
);
assert!(
rl.inner.contains_key(&pid("live")),
"the freshly-recorded principal survives the prune"
);
}
#[test]
#[ignore = "real-clock variant; the deterministic injected-now tests are the contract"]
fn rate_window_self_heals_real_clock() {
let rl = FuelRateLimiter::default();
let p = pid("alice");
let t0 = Instant::now();
rl.record(&p, BUDGET * 10, t0);
assert!(rl.over_budget(&p, BUDGET, Instant::now()));
std::thread::sleep(Duration::from_millis(1_100));
assert!(
!rl.over_budget(&p, BUDGET, Instant::now()),
"after a real second the window must reset"
);
}
}