Skip to main content

throttle_net/
sliding.rs

1//! An exact sliding-window-log limiter.
2//!
3//! [`Throttle`](crate::Throttle) is a token bucket: smooth and cheap, but it
4//! permits a full burst at any instant. A [`SlidingWindowLog`] is the exact
5//! alternative — it records the timestamp of every grant and admits a request
6//! only if fewer than `limit` units were granted in the trailing `window`. No
7//! boundary burst, at the cost of remembering recent grants.
8//!
9//! It implements [`Limiter`](crate::Limiter), so it composes everywhere the
10//! token bucket does (hybrid, per-key, layered, behind a circuit breaker), and it
11//! offers the same waiting [`acquire`](SlidingWindowLog::acquire) surface.
12
13use core::time::Duration;
14use std::collections::VecDeque;
15use std::sync::{Mutex, MutexGuard, PoisonError};
16
17use clock_lib::{Clock, Monotonic, SystemClock};
18
19use crate::decision::Decision;
20#[cfg(feature = "runtime")]
21use crate::error::ThrottleError;
22use crate::limiter::Limiter;
23
24/// One grant in the log: when it happened and how many units it took. The whole
25/// `count` leaves the window together when `at_ms + window` passes.
26#[derive(Clone, Copy)]
27struct Grant {
28    at_ms: u64,
29    count: u32,
30}
31
32/// The mutable log, guarded by a mutex.
33struct Log {
34    /// Grants in arrival order (oldest at the front).
35    grants: VecDeque<Grant>,
36    /// Sum of `count` across `grants`, kept in step to avoid re-summing.
37    used: u32,
38}
39
40/// An exact sliding-window-log rate limiter: at most `limit` units in any
41/// trailing window of `window`.
42///
43/// Construct with [`new`](Self::new) (or [`per_second`](Self::per_second)), then
44/// use the non-blocking [`try_acquire`](Self::try_acquire) / [`peek`](Self::peek)
45/// or the waiting [`acquire`](Self::acquire). Time comes from an injectable
46/// [`Clock`].
47///
48/// # Examples
49///
50/// ```
51/// use std::time::Duration;
52/// use throttle_net::SlidingWindowLog;
53///
54/// // At most 5 requests in any 1-second window.
55/// let limiter = SlidingWindowLog::new(5, Duration::from_secs(1));
56/// for _ in 0..5 {
57///     assert!(limiter.try_acquire());
58/// }
59/// assert!(!limiter.try_acquire()); // the 6th in this window is refused
60/// ```
61pub struct SlidingWindowLog<C = SystemClock>
62where
63    C: Clock,
64{
65    limit: u32,
66    window: Duration,
67    log: Mutex<Log>,
68    clock: C,
69    epoch: Monotonic,
70}
71
72impl SlidingWindowLog<SystemClock> {
73    /// Creates a limiter admitting at most `limit` units per trailing `window`.
74    ///
75    /// A `limit` of `0` admits nothing; a zero `window` makes every grant expire
76    /// immediately (so it behaves as a per-instant limit of `limit`).
77    ///
78    /// # Examples
79    ///
80    /// ```
81    /// use std::time::Duration;
82    /// use throttle_net::SlidingWindowLog;
83    ///
84    /// let limiter = SlidingWindowLog::new(100, Duration::from_secs(60)); // 100/min
85    /// assert_eq!(limiter.capacity(), 100);
86    /// ```
87    #[must_use]
88    pub fn new(limit: u32, window: Duration) -> Self {
89        Self::with_clock_inner(limit, window, SystemClock::new())
90    }
91
92    /// Creates a limiter admitting at most `rate` units in any one-second window.
93    ///
94    /// # Examples
95    ///
96    /// ```
97    /// use throttle_net::SlidingWindowLog;
98    ///
99    /// let limiter = SlidingWindowLog::per_second(50);
100    /// assert!(limiter.try_acquire());
101    /// ```
102    #[must_use]
103    pub fn per_second(rate: u32) -> Self {
104        Self::new(rate, Duration::from_secs(1))
105    }
106}
107
108impl<C> SlidingWindowLog<C>
109where
110    C: Clock + Clone,
111{
112    fn with_clock_inner(limit: u32, window: Duration, clock: C) -> Self {
113        let epoch = clock.now();
114        Self {
115            limit,
116            window,
117            log: Mutex::new(Log {
118                grants: VecDeque::new(),
119                used: 0,
120            }),
121            clock,
122            epoch,
123        }
124    }
125
126    /// Replaces the time source, for deterministic tests with a
127    /// [`ManualClock`](clock_lib::ManualClock). The log is reset.
128    ///
129    /// # Examples
130    ///
131    /// ```
132    /// use std::sync::Arc;
133    /// use std::time::Duration;
134    /// use clock_lib::ManualClock;
135    /// use throttle_net::SlidingWindowLog;
136    ///
137    /// let clock = Arc::new(ManualClock::new());
138    /// let limiter = SlidingWindowLog::new(2, Duration::from_secs(1)).with_clock(clock.clone());
139    ///
140    /// assert!(limiter.try_acquire());
141    /// assert!(limiter.try_acquire());
142    /// assert!(!limiter.try_acquire());
143    /// clock.advance(Duration::from_secs(1)); // the window slides past both grants
144    /// assert!(limiter.try_acquire());
145    /// ```
146    #[must_use]
147    pub fn with_clock<C2>(self, clock: C2) -> SlidingWindowLog<C2>
148    where
149        C2: Clock + Clone,
150    {
151        SlidingWindowLog::with_clock_inner(self.limit, self.window, clock)
152    }
153
154    #[inline]
155    fn lock(&self) -> MutexGuard<'_, Log> {
156        self.log.lock().unwrap_or_else(PoisonError::into_inner)
157    }
158
159    #[inline]
160    fn now_ms(&self) -> u64 {
161        let elapsed = self.clock.now().saturating_duration_since(self.epoch);
162        u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX)
163    }
164
165    #[inline]
166    fn window_ms(&self) -> u64 {
167        u64::try_from(self.window.as_millis()).unwrap_or(u64::MAX)
168    }
169
170    /// Drops grants that have left the trailing window ending at `now_ms`.
171    ///
172    /// A grant made at `at_ms` occupies the window until `at_ms + window_ms`; once
173    /// that moment has arrived it no longer counts. Grants are ordered oldest-first,
174    /// so pruning stops at the first one still inside the window.
175    fn prune(log: &mut Log, now_ms: u64, window_ms: u64) {
176        while let Some(front) = log.grants.front() {
177            if front.at_ms.saturating_add(window_ms) <= now_ms {
178                log.used = log.used.saturating_sub(front.count);
179                let _ = log.grants.pop_front();
180            } else {
181                break;
182            }
183        }
184    }
185
186    /// Wait until enough of the oldest grants expire to free `needed` units.
187    fn wait_for(log: &Log, now_ms: u64, window_ms: u64, needed: u32) -> Duration {
188        let mut freed = 0u32;
189        for grant in &log.grants {
190            freed = freed.saturating_add(grant.count);
191            if freed >= needed {
192                let ready_at = grant.at_ms.saturating_add(window_ms);
193                return Duration::from_millis(ready_at.saturating_sub(now_ms));
194            }
195        }
196        // Should not happen (capacity is checked first), but never wait forever.
197        Duration::from_millis(window_ms)
198    }
199
200    /// The shared decision core. Records the grant on success.
201    fn decide(&self, cost: u32) -> Decision {
202        if cost > self.limit {
203            return Decision::Impossible;
204        }
205        if cost == 0 {
206            return Decision::Acquired;
207        }
208        let now_ms = self.now_ms();
209        let window_ms = self.window_ms();
210        let mut log = self.lock();
211        Self::prune(&mut log, now_ms, window_ms);
212        if log.used + cost <= self.limit {
213            log.used += cost;
214            log.grants.push_back(Grant {
215                at_ms: now_ms,
216                count: cost,
217            });
218            Decision::Acquired
219        } else {
220            let needed = log.used + cost - self.limit;
221            Decision::Retry {
222                after: Self::wait_for(&log, now_ms, window_ms, needed),
223            }
224        }
225    }
226
227    /// Attempts to take one unit without waiting.
228    #[inline]
229    #[must_use]
230    pub fn try_acquire(&self) -> bool {
231        self.decide(1).is_acquired()
232    }
233
234    /// Attempts to take `cost` units without waiting (all-or-nothing).
235    #[inline]
236    #[must_use]
237    pub fn try_acquire_with_cost(&self, cost: u32) -> bool {
238        self.decide(cost).is_acquired()
239    }
240
241    /// Reports whether `cost` units would be admitted now, without recording.
242    #[must_use]
243    pub fn peek(&self, cost: u32) -> Decision {
244        if cost > self.limit {
245            return Decision::Impossible;
246        }
247        if cost == 0 {
248            return Decision::Acquired;
249        }
250        let now_ms = self.now_ms();
251        let window_ms = self.window_ms();
252        let mut log = self.lock();
253        Self::prune(&mut log, now_ms, window_ms);
254        if log.used + cost <= self.limit {
255            Decision::Acquired
256        } else {
257            let needed = log.used + cost - self.limit;
258            Decision::Retry {
259                after: Self::wait_for(&log, now_ms, window_ms, needed),
260            }
261        }
262    }
263
264    /// Units still admissible in the current window.
265    #[must_use]
266    pub fn available(&self) -> u32 {
267        let now_ms = self.now_ms();
268        let window_ms = self.window_ms();
269        let mut log = self.lock();
270        Self::prune(&mut log, now_ms, window_ms);
271        self.limit.saturating_sub(log.used)
272    }
273
274    /// The window limit (the most units admissible at once).
275    #[inline]
276    #[must_use]
277    pub fn capacity(&self) -> u32 {
278        self.limit
279    }
280}
281
282#[cfg(feature = "runtime")]
283#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
284impl<C> SlidingWindowLog<C>
285where
286    C: Clock + Clone,
287{
288    /// Takes one unit, waiting until the window has room.
289    ///
290    /// # Errors
291    ///
292    /// Returns [`ThrottleError::CostExceedsCapacity`] when the limit is zero.
293    pub async fn acquire(&self) -> Result<(), ThrottleError> {
294        self.acquire_with_cost(1).await
295    }
296
297    /// Takes `cost` units, waiting until the window has room.
298    ///
299    /// # Errors
300    ///
301    /// Returns [`ThrottleError::CostExceedsCapacity`] when `cost` exceeds the
302    /// limit, so it can never be admitted.
303    pub async fn acquire_with_cost(&self, cost: u32) -> Result<(), ThrottleError> {
304        loop {
305            match self.decide(cost) {
306                Decision::Acquired => return Ok(()),
307                Decision::Impossible => {
308                    return Err(ThrottleError::CostExceedsCapacity {
309                        cost,
310                        capacity: self.limit,
311                    });
312                }
313                Decision::Retry { after } => crate::rt::sleep(after).await,
314            }
315        }
316    }
317}
318
319impl<C> Limiter for SlidingWindowLog<C>
320where
321    C: Clock + Clone + Send + Sync,
322{
323    #[inline]
324    fn peek(&self, cost: u32) -> Decision {
325        SlidingWindowLog::peek(self, cost)
326    }
327
328    #[inline]
329    fn acquire_cost(&self, cost: u32) -> Decision {
330        self.decide(cost)
331    }
332
333    #[inline]
334    fn available(&self) -> u32 {
335        SlidingWindowLog::available(self)
336    }
337
338    #[inline]
339    fn capacity(&self) -> u32 {
340        self.limit
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    #![allow(clippy::unwrap_used, clippy::expect_used)]
347
348    use super::SlidingWindowLog;
349    use crate::limiter::Limiter;
350    use clock_lib::ManualClock;
351    use core::time::Duration;
352    use std::sync::Arc;
353
354    fn assert_send_sync<T: Send + Sync>() {}
355
356    #[test]
357    fn test_is_send_sync() {
358        assert_send_sync::<SlidingWindowLog>();
359    }
360
361    #[test]
362    fn test_admits_up_to_limit_then_refuses() {
363        let limiter = SlidingWindowLog::new(3, Duration::from_secs(1));
364        assert!(limiter.try_acquire());
365        assert!(limiter.try_acquire());
366        assert!(limiter.try_acquire());
367        assert!(!limiter.try_acquire());
368        assert_eq!(limiter.available(), 0);
369    }
370
371    #[test]
372    fn test_window_slides_exactly() {
373        let clock = Arc::new(ManualClock::new());
374        let limiter = SlidingWindowLog::new(2, Duration::from_secs(1)).with_clock(clock.clone());
375
376        assert!(limiter.try_acquire()); // t=0
377        clock.advance(Duration::from_millis(600));
378        assert!(limiter.try_acquire()); // t=600
379        assert!(!limiter.try_acquire()); // 2 in the last 1s
380
381        // At t=1001 the first grant (t=0) has left the window, but the second
382        // (t=600) has not — so exactly one slot opens.
383        clock.advance(Duration::from_millis(401));
384        assert!(limiter.try_acquire());
385        assert!(!limiter.try_acquire());
386    }
387
388    #[test]
389    fn test_no_boundary_burst() {
390        // Unlike a fixed window, the sliding log never admits 2x the limit across
391        // a boundary: 3 at the end of one window blocks 3 at the start of the next
392        // until they age out.
393        let clock = Arc::new(ManualClock::new());
394        let limiter = SlidingWindowLog::new(3, Duration::from_secs(1)).with_clock(clock.clone());
395
396        clock.advance(Duration::from_millis(900));
397        for _ in 0..3 {
398            assert!(limiter.try_acquire()); // 3 grants at t=900
399        }
400        clock.advance(Duration::from_millis(200)); // t=1100, new "fixed" window
401        assert!(!limiter.try_acquire()); // still 3 within the trailing 1s
402    }
403
404    #[test]
405    fn test_cost_aware_and_impossible() {
406        let limiter = SlidingWindowLog::new(5, Duration::from_secs(1));
407        assert!(limiter.try_acquire_with_cost(4));
408        assert!(!limiter.try_acquire_with_cost(4)); // only 1 left
409        assert!(limiter.try_acquire_with_cost(1));
410        // A cost beyond the limit can never be admitted.
411        assert_eq!(
412            SlidingWindowLog::new(5, Duration::from_secs(1)).peek(9),
413            crate::Decision::Impossible
414        );
415    }
416
417    #[test]
418    fn test_peek_does_not_record() {
419        let limiter = SlidingWindowLog::new(2, Duration::from_secs(1));
420        assert!(limiter.peek(2).is_acquired());
421        assert_eq!(limiter.available(), 2); // peek took nothing
422    }
423
424    #[test]
425    fn test_retry_after_points_to_oldest_expiry() {
426        let clock = Arc::new(ManualClock::new());
427        let limiter = SlidingWindowLog::new(1, Duration::from_secs(1)).with_clock(clock.clone());
428        assert!(limiter.try_acquire()); // t=0
429        let after = limiter
430            .peek(1)
431            .retry_after()
432            .expect("should suggest a wait");
433        // The single grant expires at t=1s, so the wait is ~1s.
434        assert_eq!(after, Duration::from_secs(1));
435    }
436
437    #[test]
438    fn test_works_as_a_limiter_trait_object() {
439        let limiter = SlidingWindowLog::new(2, Duration::from_secs(1));
440        let dyn_limiter: &dyn Limiter = &limiter;
441        assert_eq!(dyn_limiter.capacity(), 2);
442        assert!(dyn_limiter.acquire_cost(1).is_acquired());
443        assert_eq!(dyn_limiter.available(), 1);
444    }
445}