use core::time::Duration;
use std::collections::VecDeque;
use std::sync::{Mutex, MutexGuard, PoisonError};
use clock_lib::{Clock, Monotonic, SystemClock};
use crate::decision::Decision;
#[cfg(feature = "runtime")]
use crate::error::ThrottleError;
use crate::limiter::Limiter;
#[derive(Clone, Copy)]
struct Grant {
at_ms: u64,
count: u32,
}
struct Log {
grants: VecDeque<Grant>,
used: u32,
}
pub struct SlidingWindowLog<C = SystemClock>
where
C: Clock,
{
limit: u32,
window: Duration,
log: Mutex<Log>,
clock: C,
epoch: Monotonic,
}
impl SlidingWindowLog<SystemClock> {
#[must_use]
pub fn new(limit: u32, window: Duration) -> Self {
Self::with_clock_inner(limit, window, SystemClock::new())
}
#[must_use]
pub fn per_second(rate: u32) -> Self {
Self::new(rate, Duration::from_secs(1))
}
}
impl<C> SlidingWindowLog<C>
where
C: Clock + Clone,
{
fn with_clock_inner(limit: u32, window: Duration, clock: C) -> Self {
let epoch = clock.now();
Self {
limit,
window,
log: Mutex::new(Log {
grants: VecDeque::new(),
used: 0,
}),
clock,
epoch,
}
}
#[must_use]
pub fn with_clock<C2>(self, clock: C2) -> SlidingWindowLog<C2>
where
C2: Clock + Clone,
{
SlidingWindowLog::with_clock_inner(self.limit, self.window, clock)
}
#[inline]
fn lock(&self) -> MutexGuard<'_, Log> {
self.log.lock().unwrap_or_else(PoisonError::into_inner)
}
#[inline]
fn now_ms(&self) -> u64 {
let elapsed = self.clock.now().saturating_duration_since(self.epoch);
u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX)
}
#[inline]
fn window_ms(&self) -> u64 {
u64::try_from(self.window.as_millis()).unwrap_or(u64::MAX)
}
fn prune(log: &mut Log, now_ms: u64, window_ms: u64) {
while let Some(front) = log.grants.front() {
if front.at_ms.saturating_add(window_ms) <= now_ms {
log.used = log.used.saturating_sub(front.count);
let _ = log.grants.pop_front();
} else {
break;
}
}
}
fn wait_for(log: &Log, now_ms: u64, window_ms: u64, needed: u32) -> Duration {
let mut freed = 0u32;
for grant in &log.grants {
freed = freed.saturating_add(grant.count);
if freed >= needed {
let ready_at = grant.at_ms.saturating_add(window_ms);
return Duration::from_millis(ready_at.saturating_sub(now_ms));
}
}
Duration::from_millis(window_ms)
}
fn decide(&self, cost: u32) -> Decision {
if cost > self.limit {
return Decision::Impossible;
}
if cost == 0 {
return Decision::Acquired;
}
let now_ms = self.now_ms();
let window_ms = self.window_ms();
let mut log = self.lock();
Self::prune(&mut log, now_ms, window_ms);
if log.used + cost <= self.limit {
log.used += cost;
log.grants.push_back(Grant {
at_ms: now_ms,
count: cost,
});
Decision::Acquired
} else {
let needed = log.used + cost - self.limit;
Decision::Retry {
after: Self::wait_for(&log, now_ms, window_ms, needed),
}
}
}
#[inline]
#[must_use]
pub fn try_acquire(&self) -> bool {
self.decide(1).is_acquired()
}
#[inline]
#[must_use]
pub fn try_acquire_with_cost(&self, cost: u32) -> bool {
self.decide(cost).is_acquired()
}
#[must_use]
pub fn peek(&self, cost: u32) -> Decision {
if cost > self.limit {
return Decision::Impossible;
}
if cost == 0 {
return Decision::Acquired;
}
let now_ms = self.now_ms();
let window_ms = self.window_ms();
let mut log = self.lock();
Self::prune(&mut log, now_ms, window_ms);
if log.used + cost <= self.limit {
Decision::Acquired
} else {
let needed = log.used + cost - self.limit;
Decision::Retry {
after: Self::wait_for(&log, now_ms, window_ms, needed),
}
}
}
#[must_use]
pub fn available(&self) -> u32 {
let now_ms = self.now_ms();
let window_ms = self.window_ms();
let mut log = self.lock();
Self::prune(&mut log, now_ms, window_ms);
self.limit.saturating_sub(log.used)
}
#[inline]
#[must_use]
pub fn capacity(&self) -> u32 {
self.limit
}
}
#[cfg(feature = "runtime")]
#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
impl<C> SlidingWindowLog<C>
where
C: Clock + Clone,
{
pub async fn acquire(&self) -> Result<(), ThrottleError> {
self.acquire_with_cost(1).await
}
pub async fn acquire_with_cost(&self, cost: u32) -> Result<(), ThrottleError> {
loop {
match self.decide(cost) {
Decision::Acquired => return Ok(()),
Decision::Impossible => {
return Err(ThrottleError::CostExceedsCapacity {
cost,
capacity: self.limit,
});
}
Decision::Retry { after } => crate::rt::sleep(after).await,
}
}
}
}
impl<C> Limiter for SlidingWindowLog<C>
where
C: Clock + Clone + Send + Sync,
{
#[inline]
fn peek(&self, cost: u32) -> Decision {
SlidingWindowLog::peek(self, cost)
}
#[inline]
fn acquire_cost(&self, cost: u32) -> Decision {
self.decide(cost)
}
#[inline]
fn available(&self) -> u32 {
SlidingWindowLog::available(self)
}
#[inline]
fn capacity(&self) -> u32 {
self.limit
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used, clippy::expect_used)]
use super::SlidingWindowLog;
use crate::limiter::Limiter;
use clock_lib::ManualClock;
use core::time::Duration;
use std::sync::Arc;
fn assert_send_sync<T: Send + Sync>() {}
#[test]
fn test_is_send_sync() {
assert_send_sync::<SlidingWindowLog>();
}
#[test]
fn test_admits_up_to_limit_then_refuses() {
let limiter = SlidingWindowLog::new(3, Duration::from_secs(1));
assert!(limiter.try_acquire());
assert!(limiter.try_acquire());
assert!(limiter.try_acquire());
assert!(!limiter.try_acquire());
assert_eq!(limiter.available(), 0);
}
#[test]
fn test_window_slides_exactly() {
let clock = Arc::new(ManualClock::new());
let limiter = SlidingWindowLog::new(2, Duration::from_secs(1)).with_clock(clock.clone());
assert!(limiter.try_acquire()); clock.advance(Duration::from_millis(600));
assert!(limiter.try_acquire()); assert!(!limiter.try_acquire());
clock.advance(Duration::from_millis(401));
assert!(limiter.try_acquire());
assert!(!limiter.try_acquire());
}
#[test]
fn test_no_boundary_burst() {
let clock = Arc::new(ManualClock::new());
let limiter = SlidingWindowLog::new(3, Duration::from_secs(1)).with_clock(clock.clone());
clock.advance(Duration::from_millis(900));
for _ in 0..3 {
assert!(limiter.try_acquire()); }
clock.advance(Duration::from_millis(200)); assert!(!limiter.try_acquire()); }
#[test]
fn test_cost_aware_and_impossible() {
let limiter = SlidingWindowLog::new(5, Duration::from_secs(1));
assert!(limiter.try_acquire_with_cost(4));
assert!(!limiter.try_acquire_with_cost(4)); assert!(limiter.try_acquire_with_cost(1));
assert_eq!(
SlidingWindowLog::new(5, Duration::from_secs(1)).peek(9),
crate::Decision::Impossible
);
}
#[test]
fn test_peek_does_not_record() {
let limiter = SlidingWindowLog::new(2, Duration::from_secs(1));
assert!(limiter.peek(2).is_acquired());
assert_eq!(limiter.available(), 2); }
#[test]
fn test_retry_after_points_to_oldest_expiry() {
let clock = Arc::new(ManualClock::new());
let limiter = SlidingWindowLog::new(1, Duration::from_secs(1)).with_clock(clock.clone());
assert!(limiter.try_acquire()); let after = limiter
.peek(1)
.retry_after()
.expect("should suggest a wait");
assert_eq!(after, Duration::from_secs(1));
}
#[test]
fn test_works_as_a_limiter_trait_object() {
let limiter = SlidingWindowLog::new(2, Duration::from_secs(1));
let dyn_limiter: &dyn Limiter = &limiter;
assert_eq!(dyn_limiter.capacity(), 2);
assert!(dyn_limiter.acquire_cost(1).is_acquired());
assert_eq!(dyn_limiter.available(), 1);
}
}