volga_rate_limiter/rate_limiter/
fixed_window.rs1use super::{
4 RateLimiter, SystemTimeSource, TimeSource,
5 store::{FixedWindowParams, FixedWindowStore},
6};
7use dashmap::DashMap;
8use std::sync::{
9 Arc,
10 atomic::{AtomicU32, AtomicU64, Ordering::Relaxed},
11};
12use std::time::Duration;
13
14#[derive(Debug)]
22struct Entry {
23 count: AtomicU32,
25
26 window_start: AtomicU64,
28}
29
30#[derive(Debug, Clone)]
35pub struct InMemoryFixedWindowStore {
36 storage: Arc<DashMap<u64, Entry>>,
37}
38
39impl InMemoryFixedWindowStore {
40 pub fn new() -> Self {
42 Self {
43 storage: Arc::new(DashMap::new()),
44 }
45 }
46}
47
48impl Default for InMemoryFixedWindowStore {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54impl FixedWindowStore for InMemoryFixedWindowStore {
55 #[inline]
56 fn check_and_count(&self, params: FixedWindowParams) -> bool {
57 let FixedWindowParams {
59 key,
60 window,
61 max_requests,
62 now,
63 grace_secs,
64 } = params;
65
66 if let Some(entry) = self.storage.get(&key) {
68 let prev_window = entry.window_start.load(Relaxed);
69 if now.saturating_sub(prev_window) > grace_secs {
70 drop(entry);
71 self.storage.remove(&key);
72 }
73 }
74
75 let entry = self.storage.entry(key).or_insert_with(|| Entry {
76 window_start: AtomicU64::new(window),
77 count: AtomicU32::new(0),
78 });
79
80 let prev_window = entry.window_start.load(Relaxed);
81
82 if prev_window != window {
84 entry.window_start.store(window, Relaxed);
85 entry.count.store(0, Relaxed);
86 }
87
88 let prev = entry.count.fetch_add(1, Relaxed);
89 prev < max_requests
90 }
91}
92
93#[derive(Debug)]
133pub struct FixedWindowRateLimiter<
134 T: TimeSource = SystemTimeSource,
135 S: FixedWindowStore = InMemoryFixedWindowStore,
136> {
137 store: S,
138 max_requests: u32,
139 window_size_secs: u64,
140 eviction_grace_secs: u64,
141 time_source: T,
142}
143
144impl<T: TimeSource, S: FixedWindowStore> RateLimiter for FixedWindowRateLimiter<T, S> {
145 #[inline]
150 fn check(&self, key: u64) -> bool {
151 let now = self.time_source.now_secs();
152 let window = self.current_window(now);
153 self.store.check_and_count(FixedWindowParams {
154 key,
155 window,
156 max_requests: self.max_requests,
157 now,
158 grace_secs: self.eviction_grace_secs,
159 })
160 }
161}
162
163impl FixedWindowRateLimiter {
164 #[inline]
176 pub fn new(max_requests: u32, window_size: Duration) -> Self {
177 Self::with_time_source(max_requests, window_size, SystemTimeSource)
178 }
179}
180
181impl<T: TimeSource> FixedWindowRateLimiter<T> {
182 #[inline]
190 pub fn with_time_source(max_requests: u32, window_size: Duration, time_source: T) -> Self {
191 Self::with_time_source_and_store(
192 max_requests,
193 window_size,
194 time_source,
195 InMemoryFixedWindowStore::new(),
196 )
197 }
198}
199
200impl<S: FixedWindowStore> FixedWindowRateLimiter<SystemTimeSource, S> {
201 #[inline]
207 pub fn with_store(max_requests: u32, window_size: Duration, store: S) -> Self {
208 Self::with_time_source_and_store(max_requests, window_size, SystemTimeSource, store)
209 }
210}
211
212impl<T: TimeSource, S: FixedWindowStore> FixedWindowRateLimiter<T, S> {
213 #[inline]
219 pub fn with_time_source_and_store(
220 max_requests: u32,
221 window_size: Duration,
222 time_source: T,
223 store: S,
224 ) -> Self {
225 let window_size_secs = window_size.as_secs();
226 assert!(
227 window_size_secs > 0,
228 "window_size must be at least 1 second"
229 );
230 Self {
231 store,
232 max_requests,
233 window_size_secs,
234 eviction_grace_secs: window_size_secs.saturating_mul(2),
235 time_source,
236 }
237 }
238
239 #[inline]
246 pub fn set_eviction(&mut self, eviction: Duration) {
247 self.eviction_grace_secs = eviction.as_secs();
248 }
249
250 #[inline(always)]
252 pub fn max_requests(&self) -> u32 {
253 self.max_requests
254 }
255
256 #[inline(always)]
258 pub fn window_size_secs(&self) -> u64 {
259 self.window_size_secs
260 }
261
262 #[inline(always)]
266 pub fn eviction_grace_secs(&self) -> u64 {
267 self.eviction_grace_secs
268 }
269
270 #[inline]
271 fn current_window(&self, now: u64) -> u64 {
272 (now / self.window_size_secs) * self.window_size_secs
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use super::super::test_utils::MockTimeSource;
279 use super::*;
280
281 #[test]
282 fn fixed_window_allows_within_limit() {
283 let limiter = FixedWindowRateLimiter::new(3, Duration::from_secs(10));
284
285 let key = 42;
286
287 assert!(limiter.check(key));
288 assert!(limiter.check(key));
289 assert!(limiter.check(key));
290 assert!(!limiter.check(key)); }
292
293 #[test]
294 fn fixed_window_resets_after_window() {
295 let time = MockTimeSource::new(1000);
296 let limiter =
297 FixedWindowRateLimiter::with_time_source(2, Duration::from_secs(1), time.clone());
298
299 let key = 1;
300
301 assert!(limiter.check(key));
302 assert!(limiter.check(key));
303 assert!(!limiter.check(key));
304
305 time.advance(1);
306
307 assert!(limiter.check(key)); }
309
310 #[test]
311 fn fixed_window_isolated_per_key() {
312 let limiter = FixedWindowRateLimiter::new(1, Duration::from_secs(10));
313
314 assert!(limiter.check(1));
315 assert!(!limiter.check(1));
316
317 assert!(limiter.check(2)); }
319
320 #[test]
321 fn fixed_window_with_custom_store_allows_within_limit() {
322 use crate::rate_limiter::store::{FixedWindowParams, FixedWindowStore};
323 use std::sync::Arc;
324 use std::sync::atomic::{AtomicU32, Ordering::Relaxed};
325
326 struct CountingStore {
327 inner: InMemoryFixedWindowStore,
328 calls: Arc<AtomicU32>,
329 }
330 impl FixedWindowStore for CountingStore {
331 fn check_and_count(&self, params: FixedWindowParams) -> bool {
332 self.calls.fetch_add(1, Relaxed);
333 self.inner.check_and_count(params)
334 }
335 }
336
337 let calls = Arc::new(AtomicU32::new(0));
338 let store = CountingStore {
339 inner: InMemoryFixedWindowStore::new(),
340 calls: calls.clone(),
341 };
342 let limiter = FixedWindowRateLimiter::with_store(3, Duration::from_secs(10), store);
343
344 assert!(limiter.check(1));
345 assert!(limiter.check(1));
346 assert!(limiter.check(1));
347 assert!(!limiter.check(1));
348 assert_eq!(calls.load(Relaxed), 4);
349 }
350
351 #[test]
352 #[should_panic(expected = "window_size must be at least 1 second")]
353 fn fixed_window_panics_on_zero_window_size() {
354 let _ = FixedWindowRateLimiter::new(10, Duration::ZERO);
355 }
356
357 #[test]
358 fn fixed_window_is_thread_safe() {
359 use std::sync::Arc;
360 use std::thread;
361
362 let limiter = Arc::new(FixedWindowRateLimiter::new(1000, Duration::from_secs(10)));
363
364 let key = 123;
365
366 let mut handles = vec![];
367
368 for _ in 0..8 {
369 let limiter = limiter.clone();
370 handles.push(thread::spawn(move || {
371 let mut allowed = 0;
372 for _ in 0..200 {
373 if limiter.check(key) {
374 allowed += 1;
375 }
376 }
377 allowed
378 }));
379 }
380
381 let total: u32 = handles.into_iter().map(|h| h.join().unwrap()).sum();
382
383 assert!(total <= 1000 + 8);
385 }
386}