Skip to main content

heliosdb_proxy/rate_limit/
sliding_window.rs

1//! Sliding Window Rate Limiter
2//!
3//! Implements a sliding window algorithm for rate limiting over
4//! rolling time periods (e.g., queries per minute).
5
6use std::collections::VecDeque;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9
10use parking_lot::Mutex;
11
12/// Sliding window rate limiter
13///
14/// Tracks events over a rolling time window, allowing precise
15/// rate limiting over time periods like "100 queries per minute".
16#[derive(Debug)]
17pub struct SlidingWindow {
18    /// Window duration
19    window_size: Duration,
20
21    /// Maximum events allowed in window
22    max_events: u32,
23
24    /// Event timestamps (relative to epoch)
25    events: Mutex<VecDeque<u64>>,
26
27    /// Epoch for time calculations
28    epoch: Instant,
29
30    /// Total events processed (for metrics)
31    total_events: AtomicU64,
32
33    /// Events rejected (for metrics)
34    rejected_events: AtomicU64,
35}
36
37impl SlidingWindow {
38    /// Create a new sliding window
39    ///
40    /// # Arguments
41    /// * `window_size` - Duration of the sliding window
42    /// * `max_events` - Maximum events allowed within the window
43    pub fn new(window_size: Duration, max_events: u32) -> Self {
44        Self {
45            window_size,
46            max_events,
47            events: Mutex::new(VecDeque::with_capacity(max_events as usize)),
48            epoch: Instant::now(),
49            total_events: AtomicU64::new(0),
50            rejected_events: AtomicU64::new(0),
51        }
52    }
53
54    /// Create a sliding window for events per second
55    pub fn per_second(max_events: u32) -> Self {
56        Self::new(Duration::from_secs(1), max_events)
57    }
58
59    /// Create a sliding window for events per minute
60    pub fn per_minute(max_events: u32) -> Self {
61        Self::new(Duration::from_secs(60), max_events)
62    }
63
64    /// Create a sliding window for events per hour
65    pub fn per_hour(max_events: u32) -> Self {
66        Self::new(Duration::from_secs(3600), max_events)
67    }
68
69    /// Try to record an event
70    ///
71    /// Returns Ok(()) if event was recorded, Err with wait time if limit exceeded.
72    pub fn try_record(&self) -> Result<(), SlidingWindowExceeded> {
73        self.try_record_n(1)
74    }
75
76    /// Try to record multiple events
77    pub fn try_record_n(&self, count: u32) -> Result<(), SlidingWindowExceeded> {
78        let now = self.epoch.elapsed().as_nanos() as u64;
79        let window_nanos = self.window_size.as_nanos() as u64;
80        let cutoff = now.saturating_sub(window_nanos);
81
82        let mut events = self.events.lock();
83
84        // Remove expired events
85        while let Some(&front) = events.front() {
86            if front < cutoff {
87                events.pop_front();
88            } else {
89                break;
90            }
91        }
92
93        // Check if we have room
94        let current_count = events.len() as u32;
95        if current_count + count > self.max_events {
96            self.rejected_events
97                .fetch_add(count as u64, Ordering::Relaxed);
98
99            let wait_time = if let Some(&oldest) = events.front() {
100                let expires_at = oldest + window_nanos;
101                if expires_at > now {
102                    Duration::from_nanos(expires_at - now)
103                } else {
104                    Duration::ZERO
105                }
106            } else {
107                Duration::ZERO
108            };
109
110            return Err(SlidingWindowExceeded {
111                retry_after: wait_time,
112                current_count,
113                max_count: self.max_events,
114                window_size: self.window_size,
115            });
116        }
117
118        // Record events
119        for _ in 0..count {
120            events.push_back(now);
121        }
122
123        self.total_events.fetch_add(count as u64, Ordering::Relaxed);
124        Ok(())
125    }
126
127    /// Record an event, blocking until allowed (with timeout)
128    pub fn record_blocking(&self, timeout: Duration) -> Result<(), SlidingWindowExceeded> {
129        let deadline = Instant::now() + timeout;
130
131        loop {
132            match self.try_record() {
133                Ok(()) => return Ok(()),
134                Err(exceeded) => {
135                    let now = Instant::now();
136                    if now >= deadline {
137                        return Err(exceeded);
138                    }
139
140                    let wait = exceeded.retry_after.min(deadline - now);
141                    std::thread::sleep(wait);
142                }
143            }
144        }
145    }
146
147    /// Get current event count in window
148    pub fn current_count(&self) -> u32 {
149        let now = self.epoch.elapsed().as_nanos() as u64;
150        let cutoff = now.saturating_sub(self.window_size.as_nanos() as u64);
151
152        let events = self.events.lock();
153        events.iter().filter(|&&t| t >= cutoff).count() as u32
154    }
155
156    /// Get remaining capacity
157    pub fn remaining_capacity(&self) -> u32 {
158        self.max_events.saturating_sub(self.current_count())
159    }
160
161    /// Get window size
162    pub fn window_size(&self) -> Duration {
163        self.window_size
164    }
165
166    /// Get max events
167    pub fn max_events(&self) -> u32 {
168        self.max_events
169    }
170
171    /// Get utilization ratio (0.0 - 1.0)
172    pub fn utilization(&self) -> f64 {
173        self.current_count() as f64 / self.max_events as f64
174    }
175
176    /// Get total events processed
177    pub fn total_events(&self) -> u64 {
178        self.total_events.load(Ordering::Relaxed)
179    }
180
181    /// Get total events rejected
182    pub fn rejected_events(&self) -> u64 {
183        self.rejected_events.load(Ordering::Relaxed)
184    }
185
186    /// Get rejection rate (0.0 - 1.0)
187    pub fn rejection_rate(&self) -> f64 {
188        let total = self.total_events();
189        let rejected = self.rejected_events();
190        let attempted = total + rejected;
191
192        if attempted == 0 {
193            0.0
194        } else {
195            rejected as f64 / attempted as f64
196        }
197    }
198
199    /// Reset the sliding window
200    pub fn reset(&self) {
201        self.events.lock().clear();
202        self.total_events.store(0, Ordering::Relaxed);
203        self.rejected_events.store(0, Ordering::Relaxed);
204    }
205
206    /// Get event rate (events per second)
207    pub fn current_rate(&self) -> f64 {
208        let count = self.current_count();
209        count as f64 / self.window_size.as_secs_f64()
210    }
211
212    /// Estimate time until an event can be recorded
213    pub fn time_until_available(&self) -> Duration {
214        if self.remaining_capacity() > 0 {
215            return Duration::ZERO;
216        }
217
218        let now = self.epoch.elapsed().as_nanos() as u64;
219        let window_nanos = self.window_size.as_nanos() as u64;
220
221        let events = self.events.lock();
222        if let Some(&oldest) = events.front() {
223            let expires_at = oldest + window_nanos;
224            if expires_at > now {
225                return Duration::from_nanos(expires_at - now);
226            }
227        }
228
229        Duration::ZERO
230    }
231
232    /// Update max events (for dynamic limits)
233    pub fn set_max_events(&mut self, max_events: u32) {
234        self.max_events = max_events;
235    }
236
237    /// Update window size (for dynamic limits)
238    pub fn set_window_size(&mut self, window_size: Duration) {
239        self.window_size = window_size;
240    }
241}
242
243impl Clone for SlidingWindow {
244    fn clone(&self) -> Self {
245        Self {
246            window_size: self.window_size,
247            max_events: self.max_events,
248            events: Mutex::new(self.events.lock().clone()),
249            epoch: self.epoch,
250            total_events: AtomicU64::new(self.total_events.load(Ordering::Relaxed)),
251            rejected_events: AtomicU64::new(self.rejected_events.load(Ordering::Relaxed)),
252        }
253    }
254}
255
256/// Error returned when sliding window limit is exceeded
257#[derive(Debug, Clone)]
258pub struct SlidingWindowExceeded {
259    /// Time until an event slot opens up
260    pub retry_after: Duration,
261
262    /// Current event count in window
263    pub current_count: u32,
264
265    /// Maximum events allowed
266    pub max_count: u32,
267
268    /// Window size
269    pub window_size: Duration,
270}
271
272impl std::fmt::Display for SlidingWindowExceeded {
273    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274        write!(
275            f,
276            "Sliding window exceeded: {}/{} events in {:?}, retry after {}ms",
277            self.current_count,
278            self.max_count,
279            self.window_size,
280            self.retry_after.as_millis()
281        )
282    }
283}
284
285impl std::error::Error for SlidingWindowExceeded {}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290
291    #[test]
292    fn test_window_creation() {
293        let window = SlidingWindow::new(Duration::from_secs(60), 100);
294        assert_eq!(window.window_size(), Duration::from_secs(60));
295        assert_eq!(window.max_events(), 100);
296        assert_eq!(window.current_count(), 0);
297    }
298
299    #[test]
300    fn test_per_second() {
301        let window = SlidingWindow::per_second(10);
302        assert_eq!(window.window_size(), Duration::from_secs(1));
303        assert_eq!(window.max_events(), 10);
304    }
305
306    #[test]
307    fn test_per_minute() {
308        let window = SlidingWindow::per_minute(100);
309        assert_eq!(window.window_size(), Duration::from_secs(60));
310        assert_eq!(window.max_events(), 100);
311    }
312
313    #[test]
314    fn test_record_success() {
315        let window = SlidingWindow::new(Duration::from_secs(60), 10);
316
317        for i in 0..10 {
318            assert!(window.try_record().is_ok(), "Failed on event {}", i);
319        }
320
321        assert_eq!(window.current_count(), 10);
322    }
323
324    #[test]
325    fn test_record_exceeded() {
326        let window = SlidingWindow::new(Duration::from_secs(60), 5);
327
328        for _ in 0..5 {
329            assert!(window.try_record().is_ok());
330        }
331
332        let result = window.try_record();
333        assert!(result.is_err());
334
335        let err = result.unwrap_err();
336        assert_eq!(err.current_count, 5);
337        assert_eq!(err.max_count, 5);
338    }
339
340    #[test]
341    fn test_record_n() {
342        let window = SlidingWindow::new(Duration::from_secs(60), 10);
343
344        assert!(window.try_record_n(5).is_ok());
345        assert_eq!(window.current_count(), 5);
346
347        assert!(window.try_record_n(5).is_ok());
348        assert_eq!(window.current_count(), 10);
349
350        // Should fail - would exceed
351        assert!(window.try_record_n(1).is_err());
352    }
353
354    #[test]
355    fn test_event_expiration() {
356        let window = SlidingWindow::new(Duration::from_millis(50), 5);
357
358        // Fill window
359        for _ in 0..5 {
360            assert!(window.try_record().is_ok());
361        }
362        assert_eq!(window.current_count(), 5);
363
364        // Should be full
365        assert!(window.try_record().is_err());
366
367        // Wait for events to expire
368        std::thread::sleep(Duration::from_millis(60));
369
370        // Should be able to record again
371        assert!(window.try_record().is_ok());
372        // Count should be 1 (only the new event, old ones expired)
373        assert!(window.current_count() <= 2); // Allow some timing variance
374    }
375
376    #[test]
377    fn test_remaining_capacity() {
378        let window = SlidingWindow::new(Duration::from_secs(60), 10);
379
380        assert_eq!(window.remaining_capacity(), 10);
381
382        assert!(window.try_record_n(3).is_ok());
383        assert_eq!(window.remaining_capacity(), 7);
384
385        assert!(window.try_record_n(7).is_ok());
386        assert_eq!(window.remaining_capacity(), 0);
387    }
388
389    #[test]
390    fn test_utilization() {
391        let window = SlidingWindow::new(Duration::from_secs(60), 10);
392
393        assert!((window.utilization() - 0.0).abs() < 0.01);
394
395        assert!(window.try_record_n(5).is_ok());
396        assert!((window.utilization() - 0.5).abs() < 0.01);
397
398        assert!(window.try_record_n(5).is_ok());
399        assert!((window.utilization() - 1.0).abs() < 0.01);
400    }
401
402    #[test]
403    fn test_total_and_rejected() {
404        let window = SlidingWindow::new(Duration::from_secs(60), 3);
405
406        assert!(window.try_record().is_ok());
407        assert!(window.try_record().is_ok());
408        assert!(window.try_record().is_ok());
409        assert!(window.try_record().is_err());
410        assert!(window.try_record().is_err());
411
412        assert_eq!(window.total_events(), 3);
413        assert_eq!(window.rejected_events(), 2);
414    }
415
416    #[test]
417    fn test_rejection_rate() {
418        let window = SlidingWindow::new(Duration::from_secs(60), 2);
419
420        assert!(window.try_record().is_ok()); // 1 success
421        assert!(window.try_record().is_ok()); // 2 success
422        assert!(window.try_record().is_err()); // 1 failure
423        assert!(window.try_record().is_err()); // 2 failures
424
425        // 2 rejected out of 4 attempts = 50%
426        assert!((window.rejection_rate() - 0.5).abs() < 0.01);
427    }
428
429    #[test]
430    fn test_reset() {
431        let window = SlidingWindow::new(Duration::from_secs(60), 10);
432
433        assert!(window.try_record_n(5).is_ok());
434        assert_eq!(window.current_count(), 5);
435
436        window.reset();
437
438        assert_eq!(window.current_count(), 0);
439        assert_eq!(window.total_events(), 0);
440        assert_eq!(window.rejected_events(), 0);
441    }
442
443    #[test]
444    fn test_current_rate() {
445        let window = SlidingWindow::new(Duration::from_secs(10), 100);
446
447        assert!(window.try_record_n(50).is_ok());
448
449        // 50 events in a 10 second window = 5 events/sec
450        let rate = window.current_rate();
451        assert!((rate - 5.0).abs() < 0.1);
452    }
453
454    #[test]
455    fn test_time_until_available() {
456        let window = SlidingWindow::new(Duration::from_millis(100), 1);
457
458        // Empty window should be immediately available
459        assert_eq!(window.time_until_available(), Duration::ZERO);
460
461        // Fill window
462        assert!(window.try_record().is_ok());
463
464        // Should need to wait for expiration
465        let wait = window.time_until_available();
466        assert!(wait.as_millis() > 0);
467        assert!(wait.as_millis() <= 100);
468    }
469
470    #[test]
471    fn test_clone() {
472        let window1 = SlidingWindow::new(Duration::from_secs(60), 10);
473        assert!(window1.try_record_n(5).is_ok());
474
475        let window2 = window1.clone();
476        assert_eq!(window2.current_count(), 5);
477        assert_eq!(window2.max_events(), 10);
478    }
479
480    #[test]
481    fn test_concurrent_access() {
482        use std::sync::Arc;
483        use std::thread;
484
485        let window = Arc::new(SlidingWindow::new(Duration::from_secs(60), 100));
486        let mut handles = vec![];
487
488        // Spawn 10 threads, each trying to record 20 events
489        for _ in 0..10 {
490            let window = Arc::clone(&window);
491            handles.push(thread::spawn(move || {
492                for _ in 0..20 {
493                    let _ = window.try_record();
494                }
495            }));
496        }
497
498        for handle in handles {
499            handle.join().unwrap();
500        }
501
502        // Should have exactly 100 events (limited by max)
503        assert_eq!(window.current_count(), 100);
504        // Should have 100 rejected (200 attempts - 100 success)
505        assert_eq!(window.rejected_events(), 100);
506    }
507
508    #[test]
509    fn test_record_blocking() {
510        let window = SlidingWindow::new(Duration::from_millis(20), 1);
511
512        // Record first event
513        assert!(window.try_record().is_ok());
514
515        // Should succeed after waiting
516        let result = window.record_blocking(Duration::from_millis(50));
517        assert!(result.is_ok());
518    }
519
520    #[test]
521    fn test_record_blocking_timeout() {
522        let window = SlidingWindow::new(Duration::from_secs(60), 1);
523
524        // Fill window
525        assert!(window.try_record().is_ok());
526
527        // Should timeout
528        let result = window.record_blocking(Duration::from_millis(10));
529        assert!(result.is_err());
530    }
531}