Skip to main content

volga_rate_limiter/rate_limiter/
fixed_window.rs

1//! Tools and data structures for a fixed-window rate limiter.
2
3use super::{
4    RateLimiter, SystemTimeSource, TimeSource,
5    store::{FixedWindowParams, FixedWindowStore},
6};
7use dashmap::DashMap;
8use std::sync::{
9    Arc,
10    atomic::{AtomicU32, AtomicU64, Ordering::Relaxed},
11};
12use std::time::Duration;
13
14/// Internal per-key state for the in-memory fixed window store.
15///
16/// Each entry tracks:
17/// - the start timestamp of the current window (in seconds),
18/// - the number of requests observed within that window.
19///
20/// Atomic fields allow concurrent access without global locking.
21#[derive(Debug)]
22struct Entry {
23    /// Number of requests in the current window.
24    count: AtomicU32,
25
26    /// A start timestamp (seconds since UNIX_EPOCH) of the current window.
27    window_start: AtomicU64,
28}
29
30/// In-memory [`FixedWindowStore`] backed by a concurrent hash map.
31///
32/// This is the default store used by [`FixedWindowRateLimiter`].
33/// It holds per-key counters in a `DashMap` and performs lazy eviction.
34#[derive(Debug, Clone)]
35pub struct InMemoryFixedWindowStore {
36    storage: Arc<DashMap<u64, Entry>>,
37}
38
39impl InMemoryFixedWindowStore {
40    /// Creates a new empty in-memory fixed-window store.
41    pub fn new() -> Self {
42        Self {
43            storage: Arc::new(DashMap::new()),
44        }
45    }
46}
47
48impl Default for InMemoryFixedWindowStore {
49    fn default() -> Self {
50        Self::new()
51    }
52}
53
54impl FixedWindowStore for InMemoryFixedWindowStore {
55    #[inline]
56    fn check_and_count(&self, params: FixedWindowParams) -> bool {
57        // Destructuring works here (same crate). External implementors must use params.key etc.
58        let FixedWindowParams {
59            key,
60            window,
61            max_requests,
62            now,
63            grace_secs,
64        } = params;
65
66        // Lazy eviction
67        if let Some(entry) = self.storage.get(&key) {
68            let prev_window = entry.window_start.load(Relaxed);
69            if now.saturating_sub(prev_window) > grace_secs {
70                drop(entry);
71                self.storage.remove(&key);
72            }
73        }
74
75        let entry = self.storage.entry(key).or_insert_with(|| Entry {
76            window_start: AtomicU64::new(window),
77            count: AtomicU32::new(0),
78        });
79
80        let prev_window = entry.window_start.load(Relaxed);
81
82        // New window -> reset counter
83        if prev_window != window {
84            entry.window_start.store(window, Relaxed);
85            entry.count.store(0, Relaxed);
86        }
87
88        let prev = entry.count.fetch_add(1, Relaxed);
89        prev < max_requests
90    }
91}
92
93/// A fixed-window rate limiter.
94///
95/// The fixed window algorithm groups requests into discrete, non-overlapping
96/// time windows of a fixed size. For each partition key, a counter is maintained
97/// per window and reset when the window changes.
98///
99/// ## Characteristics
100///
101/// - **Fast and simple**: O(1) operations with minimal bookkeeping.
102/// - **Approximate**: allows bursts at window boundaries.
103/// - **Lock-free hot path**: uses atomic counters and concurrent storage.
104/// - **Lazy eviction**: stale entries are removed opportunistically.
105///
106/// ## Algorithm
107///
108/// For a given `key`:
109///
110/// 1. The current window is calculated as:
111///    `floor(now / window_size) * window_size`.
112/// 2. If the stored window differs from the current one, the counter is reset.
113/// 3. The request counter is incremented atomically.
114/// 4. The request is allowed if the previous counter value was below
115///    `max_requests`.
116///
117/// ## Eviction
118///
119/// Entries are evicted lazily during `check` calls if they have not been
120/// accessed for longer than `eviction_grace_secs`. No background cleanup task
121/// is used.
122///
123/// ## When to use
124///
125/// This limiter is suitable when:
126///
127/// - Performance and simplicity are more important than strict accuracy,
128/// - occasional bursts at window boundaries are acceptable,
129/// - a large number of independent keys is expected.
130///
131/// For stricter enforcement, consider a sliding window implementation.
132#[derive(Debug)]
133pub struct FixedWindowRateLimiter<
134    T: TimeSource = SystemTimeSource,
135    S: FixedWindowStore = InMemoryFixedWindowStore,
136> {
137    store: S,
138    max_requests: u32,
139    window_size_secs: u64,
140    eviction_grace_secs: u64,
141    time_source: T,
142}
143
144impl<T: TimeSource, S: FixedWindowStore> RateLimiter for FixedWindowRateLimiter<T, S> {
145    /// Checks whether the rate limit has been exceeded for the given `key`.
146    ///
147    /// Returns `true` if the request is allowed, or `false` if the rate
148    /// limit has been reached.
149    #[inline]
150    fn check(&self, key: u64) -> bool {
151        let now = self.time_source.now_secs();
152        let window = self.current_window(now);
153        self.store.check_and_count(FixedWindowParams {
154            key,
155            window,
156            max_requests: self.max_requests,
157            now,
158            grace_secs: self.eviction_grace_secs,
159        })
160    }
161}
162
163impl FixedWindowRateLimiter {
164    /// Creates a new fixed window rate limiter using the system clock
165    /// and the default in-memory store.
166    ///
167    /// # Parameters
168    ///
169    /// - `max_requests`: maximum number of requests allowed per window.
170    /// - `window_size`: duration of a single fixed window.
171    ///
172    /// # Panics
173    ///
174    /// Panics if `window_size` is less than 1 second.
175    #[inline]
176    pub fn new(max_requests: u32, window_size: Duration) -> Self {
177        Self::with_time_source(max_requests, window_size, SystemTimeSource)
178    }
179}
180
181impl<T: TimeSource> FixedWindowRateLimiter<T> {
182    /// Creates a [`FixedWindowRateLimiter`] with a custom [`TimeSource`].
183    ///
184    /// This is primarily useful for testing or deterministic simulations.
185    ///
186    /// # Panics
187    ///
188    /// Panics if `window_size` is less than 1 second.
189    #[inline]
190    pub fn with_time_source(max_requests: u32, window_size: Duration, time_source: T) -> Self {
191        Self::with_time_source_and_store(
192            max_requests,
193            window_size,
194            time_source,
195            InMemoryFixedWindowStore::new(),
196        )
197    }
198}
199
200impl<S: FixedWindowStore> FixedWindowRateLimiter<SystemTimeSource, S> {
201    /// Creates a [`FixedWindowRateLimiter`] with a custom [`FixedWindowStore`].
202    ///
203    /// # Panics
204    ///
205    /// Panics if `window_size` is less than 1 second.
206    #[inline]
207    pub fn with_store(max_requests: u32, window_size: Duration, store: S) -> Self {
208        Self::with_time_source_and_store(max_requests, window_size, SystemTimeSource, store)
209    }
210}
211
212impl<T: TimeSource, S: FixedWindowStore> FixedWindowRateLimiter<T, S> {
213    /// Creates a [`FixedWindowRateLimiter`] with a custom [`TimeSource`] and [`FixedWindowStore`].
214    ///
215    /// # Panics
216    ///
217    /// Panics if `window_size` is less than 1 second.
218    #[inline]
219    pub fn with_time_source_and_store(
220        max_requests: u32,
221        window_size: Duration,
222        time_source: T,
223        store: S,
224    ) -> Self {
225        let window_size_secs = window_size.as_secs();
226        assert!(
227            window_size_secs > 0,
228            "window_size must be at least 1 second"
229        );
230        Self {
231            store,
232            max_requests,
233            window_size_secs,
234            eviction_grace_secs: window_size_secs.saturating_mul(2),
235            time_source,
236        }
237    }
238
239    /// Sets the eviction grace period for inactive entries.
240    ///
241    /// Entries that have not been accessed for longer than this duration
242    /// may be removed during subsequent `check` calls.
243    ///
244    /// This method does not perform immediate eviction.
245    #[inline]
246    pub fn set_eviction(&mut self, eviction: Duration) {
247        self.eviction_grace_secs = eviction.as_secs();
248    }
249
250    /// Maximum number of allowed requests per window.
251    #[inline(always)]
252    pub fn max_requests(&self) -> u32 {
253        self.max_requests
254    }
255
256    /// Size of the fixed window in seconds.
257    #[inline(always)]
258    pub fn window_size_secs(&self) -> u64 {
259        self.window_size_secs
260    }
261
262    /// Time after which inactive entries are eligible for eviction.
263    ///
264    /// This value is independent of `window_size_secs` and is used solely to limit memory growth.
265    #[inline(always)]
266    pub fn eviction_grace_secs(&self) -> u64 {
267        self.eviction_grace_secs
268    }
269
270    #[inline]
271    fn current_window(&self, now: u64) -> u64 {
272        (now / self.window_size_secs) * self.window_size_secs
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use super::super::test_utils::MockTimeSource;
279    use super::*;
280
281    #[test]
282    fn fixed_window_allows_within_limit() {
283        let limiter = FixedWindowRateLimiter::new(3, Duration::from_secs(10));
284
285        let key = 42;
286
287        assert!(limiter.check(key));
288        assert!(limiter.check(key));
289        assert!(limiter.check(key));
290        assert!(!limiter.check(key)); // 4th denied
291    }
292
293    #[test]
294    fn fixed_window_resets_after_window() {
295        let time = MockTimeSource::new(1000);
296        let limiter =
297            FixedWindowRateLimiter::with_time_source(2, Duration::from_secs(1), time.clone());
298
299        let key = 1;
300
301        assert!(limiter.check(key));
302        assert!(limiter.check(key));
303        assert!(!limiter.check(key));
304
305        time.advance(1);
306
307        assert!(limiter.check(key)); // new window
308    }
309
310    #[test]
311    fn fixed_window_isolated_per_key() {
312        let limiter = FixedWindowRateLimiter::new(1, Duration::from_secs(10));
313
314        assert!(limiter.check(1));
315        assert!(!limiter.check(1));
316
317        assert!(limiter.check(2)); // independent
318    }
319
320    #[test]
321    fn fixed_window_with_custom_store_allows_within_limit() {
322        use crate::rate_limiter::store::{FixedWindowParams, FixedWindowStore};
323        use std::sync::Arc;
324        use std::sync::atomic::{AtomicU32, Ordering::Relaxed};
325
326        struct CountingStore {
327            inner: InMemoryFixedWindowStore,
328            calls: Arc<AtomicU32>,
329        }
330        impl FixedWindowStore for CountingStore {
331            fn check_and_count(&self, params: FixedWindowParams) -> bool {
332                self.calls.fetch_add(1, Relaxed);
333                self.inner.check_and_count(params)
334            }
335        }
336
337        let calls = Arc::new(AtomicU32::new(0));
338        let store = CountingStore {
339            inner: InMemoryFixedWindowStore::new(),
340            calls: calls.clone(),
341        };
342        let limiter = FixedWindowRateLimiter::with_store(3, Duration::from_secs(10), store);
343
344        assert!(limiter.check(1));
345        assert!(limiter.check(1));
346        assert!(limiter.check(1));
347        assert!(!limiter.check(1));
348        assert_eq!(calls.load(Relaxed), 4);
349    }
350
351    #[test]
352    #[should_panic(expected = "window_size must be at least 1 second")]
353    fn fixed_window_panics_on_zero_window_size() {
354        let _ = FixedWindowRateLimiter::new(10, Duration::ZERO);
355    }
356
357    #[test]
358    fn fixed_window_is_thread_safe() {
359        use std::sync::Arc;
360        use std::thread;
361
362        let limiter = Arc::new(FixedWindowRateLimiter::new(1000, Duration::from_secs(10)));
363
364        let key = 123;
365
366        let mut handles = vec![];
367
368        for _ in 0..8 {
369            let limiter = limiter.clone();
370            handles.push(thread::spawn(move || {
371                let mut allowed = 0;
372                for _ in 0..200 {
373                    if limiter.check(key) {
374                        allowed += 1;
375                    }
376                }
377                allowed
378            }));
379        }
380
381        let total: u32 = handles.into_iter().map(|h| h.join().unwrap()).sum();
382
383        // <= limit, possible small race allowance is OK
384        assert!(total <= 1000 + 8);
385    }
386}