heliosdb_proxy/rate_limit/
sliding_window.rs1use std::collections::VecDeque;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9
10use parking_lot::Mutex;
11
12#[derive(Debug)]
17pub struct SlidingWindow {
18 window_size: Duration,
20
21 max_events: u32,
23
24 events: Mutex<VecDeque<u64>>,
26
27 epoch: Instant,
29
30 total_events: AtomicU64,
32
33 rejected_events: AtomicU64,
35}
36
37impl SlidingWindow {
38 pub fn new(window_size: Duration, max_events: u32) -> Self {
44 Self {
45 window_size,
46 max_events,
47 events: Mutex::new(VecDeque::with_capacity(max_events as usize)),
48 epoch: Instant::now(),
49 total_events: AtomicU64::new(0),
50 rejected_events: AtomicU64::new(0),
51 }
52 }
53
54 pub fn per_second(max_events: u32) -> Self {
56 Self::new(Duration::from_secs(1), max_events)
57 }
58
59 pub fn per_minute(max_events: u32) -> Self {
61 Self::new(Duration::from_secs(60), max_events)
62 }
63
64 pub fn per_hour(max_events: u32) -> Self {
66 Self::new(Duration::from_secs(3600), max_events)
67 }
68
69 pub fn try_record(&self) -> Result<(), SlidingWindowExceeded> {
73 self.try_record_n(1)
74 }
75
76 pub fn try_record_n(&self, count: u32) -> Result<(), SlidingWindowExceeded> {
78 let now = self.epoch.elapsed().as_nanos() as u64;
79 let window_nanos = self.window_size.as_nanos() as u64;
80 let cutoff = now.saturating_sub(window_nanos);
81
82 let mut events = self.events.lock();
83
84 while let Some(&front) = events.front() {
86 if front < cutoff {
87 events.pop_front();
88 } else {
89 break;
90 }
91 }
92
93 let current_count = events.len() as u32;
95 if current_count + count > self.max_events {
96 self.rejected_events
97 .fetch_add(count as u64, Ordering::Relaxed);
98
99 let wait_time = if let Some(&oldest) = events.front() {
100 let expires_at = oldest + window_nanos;
101 if expires_at > now {
102 Duration::from_nanos(expires_at - now)
103 } else {
104 Duration::ZERO
105 }
106 } else {
107 Duration::ZERO
108 };
109
110 return Err(SlidingWindowExceeded {
111 retry_after: wait_time,
112 current_count,
113 max_count: self.max_events,
114 window_size: self.window_size,
115 });
116 }
117
118 for _ in 0..count {
120 events.push_back(now);
121 }
122
123 self.total_events.fetch_add(count as u64, Ordering::Relaxed);
124 Ok(())
125 }
126
127 pub fn record_blocking(&self, timeout: Duration) -> Result<(), SlidingWindowExceeded> {
129 let deadline = Instant::now() + timeout;
130
131 loop {
132 match self.try_record() {
133 Ok(()) => return Ok(()),
134 Err(exceeded) => {
135 let now = Instant::now();
136 if now >= deadline {
137 return Err(exceeded);
138 }
139
140 let wait = exceeded.retry_after.min(deadline - now);
141 std::thread::sleep(wait);
142 }
143 }
144 }
145 }
146
147 pub fn current_count(&self) -> u32 {
149 let now = self.epoch.elapsed().as_nanos() as u64;
150 let cutoff = now.saturating_sub(self.window_size.as_nanos() as u64);
151
152 let events = self.events.lock();
153 events.iter().filter(|&&t| t >= cutoff).count() as u32
154 }
155
156 pub fn remaining_capacity(&self) -> u32 {
158 self.max_events.saturating_sub(self.current_count())
159 }
160
161 pub fn window_size(&self) -> Duration {
163 self.window_size
164 }
165
166 pub fn max_events(&self) -> u32 {
168 self.max_events
169 }
170
171 pub fn utilization(&self) -> f64 {
173 self.current_count() as f64 / self.max_events as f64
174 }
175
176 pub fn total_events(&self) -> u64 {
178 self.total_events.load(Ordering::Relaxed)
179 }
180
181 pub fn rejected_events(&self) -> u64 {
183 self.rejected_events.load(Ordering::Relaxed)
184 }
185
186 pub fn rejection_rate(&self) -> f64 {
188 let total = self.total_events();
189 let rejected = self.rejected_events();
190 let attempted = total + rejected;
191
192 if attempted == 0 {
193 0.0
194 } else {
195 rejected as f64 / attempted as f64
196 }
197 }
198
199 pub fn reset(&self) {
201 self.events.lock().clear();
202 self.total_events.store(0, Ordering::Relaxed);
203 self.rejected_events.store(0, Ordering::Relaxed);
204 }
205
206 pub fn current_rate(&self) -> f64 {
208 let count = self.current_count();
209 count as f64 / self.window_size.as_secs_f64()
210 }
211
212 pub fn time_until_available(&self) -> Duration {
214 if self.remaining_capacity() > 0 {
215 return Duration::ZERO;
216 }
217
218 let now = self.epoch.elapsed().as_nanos() as u64;
219 let window_nanos = self.window_size.as_nanos() as u64;
220
221 let events = self.events.lock();
222 if let Some(&oldest) = events.front() {
223 let expires_at = oldest + window_nanos;
224 if expires_at > now {
225 return Duration::from_nanos(expires_at - now);
226 }
227 }
228
229 Duration::ZERO
230 }
231
232 pub fn set_max_events(&mut self, max_events: u32) {
234 self.max_events = max_events;
235 }
236
237 pub fn set_window_size(&mut self, window_size: Duration) {
239 self.window_size = window_size;
240 }
241}
242
243impl Clone for SlidingWindow {
244 fn clone(&self) -> Self {
245 Self {
246 window_size: self.window_size,
247 max_events: self.max_events,
248 events: Mutex::new(self.events.lock().clone()),
249 epoch: self.epoch,
250 total_events: AtomicU64::new(self.total_events.load(Ordering::Relaxed)),
251 rejected_events: AtomicU64::new(self.rejected_events.load(Ordering::Relaxed)),
252 }
253 }
254}
255
256#[derive(Debug, Clone)]
258pub struct SlidingWindowExceeded {
259 pub retry_after: Duration,
261
262 pub current_count: u32,
264
265 pub max_count: u32,
267
268 pub window_size: Duration,
270}
271
272impl std::fmt::Display for SlidingWindowExceeded {
273 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274 write!(
275 f,
276 "Sliding window exceeded: {}/{} events in {:?}, retry after {}ms",
277 self.current_count,
278 self.max_count,
279 self.window_size,
280 self.retry_after.as_millis()
281 )
282 }
283}
284
285impl std::error::Error for SlidingWindowExceeded {}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290
291 #[test]
292 fn test_window_creation() {
293 let window = SlidingWindow::new(Duration::from_secs(60), 100);
294 assert_eq!(window.window_size(), Duration::from_secs(60));
295 assert_eq!(window.max_events(), 100);
296 assert_eq!(window.current_count(), 0);
297 }
298
299 #[test]
300 fn test_per_second() {
301 let window = SlidingWindow::per_second(10);
302 assert_eq!(window.window_size(), Duration::from_secs(1));
303 assert_eq!(window.max_events(), 10);
304 }
305
306 #[test]
307 fn test_per_minute() {
308 let window = SlidingWindow::per_minute(100);
309 assert_eq!(window.window_size(), Duration::from_secs(60));
310 assert_eq!(window.max_events(), 100);
311 }
312
313 #[test]
314 fn test_record_success() {
315 let window = SlidingWindow::new(Duration::from_secs(60), 10);
316
317 for i in 0..10 {
318 assert!(window.try_record().is_ok(), "Failed on event {}", i);
319 }
320
321 assert_eq!(window.current_count(), 10);
322 }
323
324 #[test]
325 fn test_record_exceeded() {
326 let window = SlidingWindow::new(Duration::from_secs(60), 5);
327
328 for _ in 0..5 {
329 assert!(window.try_record().is_ok());
330 }
331
332 let result = window.try_record();
333 assert!(result.is_err());
334
335 let err = result.unwrap_err();
336 assert_eq!(err.current_count, 5);
337 assert_eq!(err.max_count, 5);
338 }
339
340 #[test]
341 fn test_record_n() {
342 let window = SlidingWindow::new(Duration::from_secs(60), 10);
343
344 assert!(window.try_record_n(5).is_ok());
345 assert_eq!(window.current_count(), 5);
346
347 assert!(window.try_record_n(5).is_ok());
348 assert_eq!(window.current_count(), 10);
349
350 assert!(window.try_record_n(1).is_err());
352 }
353
354 #[test]
355 fn test_event_expiration() {
356 let window = SlidingWindow::new(Duration::from_millis(50), 5);
357
358 for _ in 0..5 {
360 assert!(window.try_record().is_ok());
361 }
362 assert_eq!(window.current_count(), 5);
363
364 assert!(window.try_record().is_err());
366
367 std::thread::sleep(Duration::from_millis(60));
369
370 assert!(window.try_record().is_ok());
372 assert!(window.current_count() <= 2); }
375
376 #[test]
377 fn test_remaining_capacity() {
378 let window = SlidingWindow::new(Duration::from_secs(60), 10);
379
380 assert_eq!(window.remaining_capacity(), 10);
381
382 assert!(window.try_record_n(3).is_ok());
383 assert_eq!(window.remaining_capacity(), 7);
384
385 assert!(window.try_record_n(7).is_ok());
386 assert_eq!(window.remaining_capacity(), 0);
387 }
388
389 #[test]
390 fn test_utilization() {
391 let window = SlidingWindow::new(Duration::from_secs(60), 10);
392
393 assert!((window.utilization() - 0.0).abs() < 0.01);
394
395 assert!(window.try_record_n(5).is_ok());
396 assert!((window.utilization() - 0.5).abs() < 0.01);
397
398 assert!(window.try_record_n(5).is_ok());
399 assert!((window.utilization() - 1.0).abs() < 0.01);
400 }
401
402 #[test]
403 fn test_total_and_rejected() {
404 let window = SlidingWindow::new(Duration::from_secs(60), 3);
405
406 assert!(window.try_record().is_ok());
407 assert!(window.try_record().is_ok());
408 assert!(window.try_record().is_ok());
409 assert!(window.try_record().is_err());
410 assert!(window.try_record().is_err());
411
412 assert_eq!(window.total_events(), 3);
413 assert_eq!(window.rejected_events(), 2);
414 }
415
416 #[test]
417 fn test_rejection_rate() {
418 let window = SlidingWindow::new(Duration::from_secs(60), 2);
419
420 assert!(window.try_record().is_ok()); assert!(window.try_record().is_ok()); assert!(window.try_record().is_err()); assert!(window.try_record().is_err()); assert!((window.rejection_rate() - 0.5).abs() < 0.01);
427 }
428
429 #[test]
430 fn test_reset() {
431 let window = SlidingWindow::new(Duration::from_secs(60), 10);
432
433 assert!(window.try_record_n(5).is_ok());
434 assert_eq!(window.current_count(), 5);
435
436 window.reset();
437
438 assert_eq!(window.current_count(), 0);
439 assert_eq!(window.total_events(), 0);
440 assert_eq!(window.rejected_events(), 0);
441 }
442
443 #[test]
444 fn test_current_rate() {
445 let window = SlidingWindow::new(Duration::from_secs(10), 100);
446
447 assert!(window.try_record_n(50).is_ok());
448
449 let rate = window.current_rate();
451 assert!((rate - 5.0).abs() < 0.1);
452 }
453
454 #[test]
455 fn test_time_until_available() {
456 let window = SlidingWindow::new(Duration::from_millis(100), 1);
457
458 assert_eq!(window.time_until_available(), Duration::ZERO);
460
461 assert!(window.try_record().is_ok());
463
464 let wait = window.time_until_available();
466 assert!(wait.as_millis() > 0);
467 assert!(wait.as_millis() <= 100);
468 }
469
470 #[test]
471 fn test_clone() {
472 let window1 = SlidingWindow::new(Duration::from_secs(60), 10);
473 assert!(window1.try_record_n(5).is_ok());
474
475 let window2 = window1.clone();
476 assert_eq!(window2.current_count(), 5);
477 assert_eq!(window2.max_events(), 10);
478 }
479
480 #[test]
481 fn test_concurrent_access() {
482 use std::sync::Arc;
483 use std::thread;
484
485 let window = Arc::new(SlidingWindow::new(Duration::from_secs(60), 100));
486 let mut handles = vec![];
487
488 for _ in 0..10 {
490 let window = Arc::clone(&window);
491 handles.push(thread::spawn(move || {
492 for _ in 0..20 {
493 let _ = window.try_record();
494 }
495 }));
496 }
497
498 for handle in handles {
499 handle.join().unwrap();
500 }
501
502 assert_eq!(window.current_count(), 100);
504 assert_eq!(window.rejected_events(), 100);
506 }
507
508 #[test]
509 fn test_record_blocking() {
510 let window = SlidingWindow::new(Duration::from_millis(20), 1);
511
512 assert!(window.try_record().is_ok());
514
515 let result = window.record_blocking(Duration::from_millis(50));
517 assert!(result.is_ok());
518 }
519
520 #[test]
521 fn test_record_blocking_timeout() {
522 let window = SlidingWindow::new(Duration::from_secs(60), 1);
523
524 assert!(window.try_record().is_ok());
526
527 let result = window.record_blocking(Duration::from_millis(10));
529 assert!(result.is_err());
530 }
531}