use core::time::Duration;
use std::collections::VecDeque;
use std::sync::{Mutex, MutexGuard, PoisonError};
use crate::decision::Decision;
use crate::quota::Quota;
fn nanos(now: Duration) -> u64 {
u64::try_from(now.as_nanos()).unwrap_or(u64::MAX)
}
pub(crate) struct SlidingLog {
log: Mutex<VecDeque<u64>>,
period_ns: u64,
limit: u32,
}
impl SlidingLog {
pub(crate) fn new(quota: &Quota) -> Self {
Self {
log: Mutex::new(VecDeque::new()),
period_ns: nanos(quota.period()).max(1),
limit: quota.limit(),
}
}
pub(crate) fn acquire(&self, n: u32, now: Duration) -> Decision {
if n == 0 {
return Decision::Allow;
}
if self.limit == 0 || n > self.limit {
return Decision::Deny {
retry_after: Duration::MAX,
};
}
let now_ns = nanos(now);
let mut log = self.lock();
while log
.front()
.is_some_and(|&ts| ts.saturating_add(self.period_ns) <= now_ns)
{
let _ = log.pop_front();
}
let count = log.len();
let n = n as usize;
if count + n <= self.limit as usize {
for _ in 0..n {
log.push_back(now_ns);
}
return Decision::Allow;
}
let need_expire = count + n - self.limit as usize;
let expire_at = log[need_expire - 1].saturating_add(self.period_ns);
Decision::Deny {
retry_after: Duration::from_nanos(expire_at.saturating_sub(now_ns)),
}
}
fn lock(&self) -> MutexGuard<'_, VecDeque<u64>> {
self.log.lock().unwrap_or_else(PoisonError::into_inner)
}
}
#[cfg(all(test, not(loom)))]
mod tests {
use super::SlidingLog;
use crate::decision::Decision;
use crate::quota::Quota;
use core::time::Duration;
fn at(ms: u64) -> Duration {
Duration::from_millis(ms)
}
#[test]
fn test_admits_limit_within_window_then_denies() {
let sl = SlidingLog::new(&Quota::per_second(3));
for _ in 0..3 {
assert_eq!(sl.acquire(1, at(0)), Decision::Allow);
}
assert!(sl.acquire(1, at(500)).is_deny());
}
#[test]
fn test_admits_again_as_oldest_ages_out() {
let sl = SlidingLog::new(&Quota::per_second(3));
assert!(sl.acquire(1, at(0)).is_allow());
assert!(sl.acquire(1, at(100)).is_allow());
assert!(sl.acquire(1, at(200)).is_allow());
assert!(sl.acquire(1, at(999)).is_deny());
assert!(sl.acquire(1, at(1000)).is_allow()); }
#[test]
fn test_retry_after_points_at_oldest_expiry() {
let sl = SlidingLog::new(&Quota::per_second(2));
assert!(sl.acquire(1, at(0)).is_allow());
assert!(sl.acquire(1, at(300)).is_allow());
match sl.acquire(1, at(500)) {
Decision::Deny { retry_after } => {
assert_eq!(retry_after, Duration::from_millis(500));
}
other => panic!("expected denial, got {other:?}"),
}
}
#[test]
fn test_no_boundary_burst() {
let sl = SlidingLog::new(&Quota::per_second(5));
for _ in 0..5 {
assert!(sl.acquire(1, at(900)).is_allow());
}
assert!(sl.acquire(1, at(1000)).is_deny());
}
#[test]
fn test_request_larger_than_limit_never_admits() {
let sl = SlidingLog::new(&Quota::per_second(3));
assert_eq!(
sl.acquire(4, at(0)),
Decision::Deny {
retry_after: Duration::MAX
}
);
}
}