use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::Mutex;
pub trait Clock: Send + Sync + 'static {
fn now(&self) -> Instant;
}
#[derive(Debug, Default, Clone, Copy)]
pub struct SystemClock;
impl Clock for SystemClock {
fn now(&self) -> Instant {
Instant::now()
}
}
#[derive(Debug)]
pub struct ManualClock {
now: Mutex<Instant>,
}
impl ManualClock {
#[must_use]
pub fn new() -> Arc<Self> {
Arc::new(Self {
now: Mutex::new(Instant::now()),
})
}
pub fn advance(&self, d: Duration) {
let mut t = self.now.lock();
*t += d;
}
}
impl Clock for ManualClock {
fn now(&self) -> Instant {
*self.now.lock()
}
}
#[must_use]
pub fn cost_of(path: &str) -> u32 {
let path = path.split('?').next().unwrap_or(path);
if path.starts_with("/evaluate") || path == "/pulse" || path.starts_with("/pulse?") {
2
} else if path == "/v2/status" {
3
} else {
1
}
}
pub const DEFAULT_CAPACITY: u32 = 60;
pub const DEFAULT_REFILL_PER_SECOND: f64 = 1.0;
#[derive(Debug)]
struct State {
tokens: f64,
last_refill: Instant,
}
struct Inner {
capacity: u32,
refill_per_second: f64,
clock: Arc<dyn Clock>,
state: Mutex<State>,
}
impl std::fmt::Debug for Inner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Inner")
.field("capacity", &self.capacity)
.field("refill_per_second", &self.refill_per_second)
.field("state", &self.state)
.finish_non_exhaustive()
}
}
#[derive(Clone)]
pub struct RateBudget {
inner: Arc<Inner>,
}
impl std::fmt::Debug for RateBudget {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let snap = self.snapshot();
f.debug_struct("RateBudget")
.field("capacity", &snap.capacity)
.field("refill_per_second", &snap.refill_per_second)
.field("tokens", &snap.tokens)
.finish()
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct BudgetSnapshot {
pub capacity: u32,
pub refill_per_second: f64,
pub tokens: u32,
}
impl BudgetSnapshot {
#[must_use]
pub fn headroom(&self) -> f64 {
if self.capacity == 0 {
0.0
} else {
f64::from(self.tokens) / f64::from(self.capacity)
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Exhausted {
pub retry_after: Duration,
}
impl RateBudget {
#[must_use]
pub fn default_system() -> Self {
Self::with_clock(
DEFAULT_CAPACITY,
DEFAULT_REFILL_PER_SECOND,
Arc::new(SystemClock),
)
}
#[must_use]
pub fn with_clock(capacity: u32, refill_per_second: f64, clock: Arc<dyn Clock>) -> Self {
assert!(capacity > 0, "rate-budget capacity must be > 0");
assert!(
refill_per_second.is_finite() && !refill_per_second.is_sign_negative(),
"rate-budget refill must be a finite, non-negative float (got {refill_per_second})"
);
let now = clock.now();
let state = State {
tokens: f64::from(capacity),
last_refill: now,
};
Self {
inner: Arc::new(Inner {
capacity,
refill_per_second,
clock,
state: Mutex::new(state),
}),
}
}
pub fn try_consume(&self, cost: u32) -> Result<(), Exhausted> {
let cost_f = f64::from(cost);
let mut state = self.inner.state.lock();
self.refill_locked(&mut state);
if state.tokens >= cost_f {
state.tokens -= cost_f;
return Ok(());
}
if cost_f > f64::from(self.inner.capacity) {
return Err(Exhausted {
retry_after: Duration::MAX,
});
}
let deficit = cost_f - state.tokens;
let retry = if self.inner.refill_per_second > 0.0 {
let secs = (deficit / self.inner.refill_per_second).ceil();
let candidate = Duration::try_from_secs_f64(secs).unwrap_or(Duration::MAX);
candidate.max(Duration::from_secs(1))
} else {
Duration::MAX
};
Err(Exhausted { retry_after: retry })
}
pub fn refund(&self, cost: u32) {
let mut state = self.inner.state.lock();
state.tokens = (state.tokens + f64::from(cost)).min(f64::from(self.inner.capacity));
}
#[must_use]
pub fn snapshot(&self) -> BudgetSnapshot {
let mut state = self.inner.state.lock();
self.refill_locked(&mut state);
BudgetSnapshot {
capacity: self.inner.capacity,
refill_per_second: self.inner.refill_per_second,
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
tokens: state.tokens.floor().max(0.0).min(f64::from(u32::MAX)) as u32,
}
}
pub fn reset_to_full(&self) {
let mut state = self.inner.state.lock();
state.tokens = f64::from(self.inner.capacity);
state.last_refill = self.inner.clock.now();
}
fn refill_locked(&self, state: &mut State) {
let now = self.inner.clock.now();
let elapsed = now.duration_since(state.last_refill);
if elapsed.is_zero() {
return;
}
let accrual = elapsed.as_secs_f64() * self.inner.refill_per_second;
state.tokens = (state.tokens + accrual).min(f64::from(self.inner.capacity));
state.last_refill = now;
}
}
#[cfg(test)]
mod tests {
use super::*;
fn bucket(cap: u32, refill: f64) -> (RateBudget, Arc<ManualClock>) {
let clock = ManualClock::new();
let b = RateBudget::with_clock(cap, refill, clock.clone());
(b, clock)
}
#[test]
fn costs_follow_spec_table() {
assert_eq!(cost_of("/status"), 1);
assert_eq!(cost_of("/risk"), 1);
assert_eq!(cost_of("/positions"), 1);
assert_eq!(cost_of("/brief"), 1);
assert_eq!(cost_of("/regime"), 1);
assert_eq!(cost_of("/operator/state"), 1);
assert_eq!(cost_of("/operator/events"), 1);
assert_eq!(cost_of("/approaching"), 1);
assert_eq!(cost_of("/rejections"), 1);
assert_eq!(cost_of("/hl/status"), 1);
assert_eq!(cost_of("/hl/status?symbol=BTC"), 1);
assert_eq!(cost_of("/hl/account"), 1);
assert_eq!(cost_of("/hl/reconcile"), 1);
assert_eq!(cost_of("/live/cockpit"), 1);
assert_eq!(cost_of("/live/certification"), 1);
assert_eq!(cost_of("/live/canary-policy"), 1);
assert_eq!(cost_of("/runtime/parity"), 1);
assert_eq!(cost_of("/market/quote?symbol=BTC"), 1);
assert_eq!(cost_of("/evaluate/BTC"), 2);
assert_eq!(cost_of("/evaluate/BTC?side=long"), 2);
assert_eq!(cost_of("/pulse"), 2);
assert_eq!(cost_of("/pulse?limit=50"), 2);
assert_eq!(cost_of("/v2/status"), 3);
}
#[test]
fn new_bucket_is_full() {
let (b, _clock) = bucket(10, 1.0);
assert_eq!(b.snapshot().tokens, 10);
}
#[test]
fn consume_debits_tokens() {
let (b, _clock) = bucket(10, 0.0);
assert!(b.try_consume(3).is_ok());
assert_eq!(b.snapshot().tokens, 7);
assert!(b.try_consume(7).is_ok());
assert_eq!(b.snapshot().tokens, 0);
}
#[test]
fn consume_exhaustion_returns_floored_retry_after() {
let (b, _clock) = bucket(10, 0.0);
assert!(b.try_consume(10).is_ok());
let err = b.try_consume(1).unwrap_err();
assert_eq!(err.retry_after, Duration::MAX);
}
#[test]
fn consume_exhaustion_countdown_rounds_up() {
let (b, _clock) = bucket(10, 1.0);
b.try_consume(10).unwrap();
let err = b.try_consume(3).unwrap_err();
assert_eq!(err.retry_after, Duration::from_secs(3));
}
#[test]
fn consume_exhaustion_fractional_deficit_rounds_up() {
let (b, _clock) = bucket(10, 2.0);
b.try_consume(10).unwrap();
let err = b.try_consume(3).unwrap_err();
assert_eq!(err.retry_after, Duration::from_secs(2));
}
#[test]
fn refill_accrues_over_clock_advance() {
let (b, clock) = bucket(10, 1.0);
b.try_consume(10).unwrap();
clock.advance(Duration::from_secs(5));
assert_eq!(b.snapshot().tokens, 5);
}
#[test]
fn refill_caps_at_capacity() {
let (b, clock) = bucket(10, 1.0);
b.try_consume(2).unwrap();
clock.advance(Duration::from_secs(1000));
assert_eq!(b.snapshot().tokens, 10);
}
#[test]
fn sub_second_accrual_does_not_floor_to_zero() {
let (b, clock) = bucket(10, 10.0); b.try_consume(10).unwrap();
clock.advance(Duration::from_millis(100)); clock.advance(Duration::from_millis(100)); assert_eq!(b.snapshot().tokens, 2);
}
#[test]
fn refund_restores_tokens_without_exceeding_capacity() {
let (b, _clock) = bucket(10, 0.0);
b.try_consume(5).unwrap();
b.refund(5);
assert_eq!(b.snapshot().tokens, 10);
b.refund(5);
assert_eq!(b.snapshot().tokens, 10);
}
#[test]
fn reset_to_full_refills_the_bucket() {
let (b, _clock) = bucket(10, 0.0);
b.try_consume(10).unwrap();
assert_eq!(b.snapshot().tokens, 0);
b.reset_to_full();
assert_eq!(b.snapshot().tokens, 10);
}
#[test]
fn headroom_bands_are_legible() {
let snap = BudgetSnapshot {
capacity: 60,
refill_per_second: 1.0,
tokens: 60,
};
assert!((snap.headroom() - 1.0).abs() < f64::EPSILON);
let half = BudgetSnapshot { tokens: 30, ..snap };
assert!((half.headroom() - 0.5).abs() < f64::EPSILON);
let empty = BudgetSnapshot { tokens: 0, ..snap };
assert!(empty.headroom().abs() < f64::EPSILON);
}
#[test]
fn cost_above_capacity_returns_permanent_exhaustion() {
let (b, _clock) = bucket(60, 1.0);
let err = b.try_consume(61).unwrap_err();
assert_eq!(err.retry_after, Duration::MAX);
}
#[test]
#[should_panic(expected = "capacity must be > 0")]
fn zero_capacity_panics_at_construction() {
let _ = RateBudget::with_clock(0, 1.0, Arc::new(SystemClock));
}
#[test]
#[should_panic(expected = "refill must be a finite, non-negative float")]
fn negative_refill_panics_at_construction() {
let _ = RateBudget::with_clock(10, -1.0, Arc::new(SystemClock));
}
}