1use std::collections::VecDeque;
4
5#[derive(Debug, Clone)]
33pub struct TimeWindow {
34 window_size_ms: u64,
36 events: VecDeque<i64>,
38 max_events: usize,
40}
41
42impl TimeWindow {
43 pub fn new(window_size_ms: u64, max_events: usize) -> Self {
49 Self {
50 window_size_ms,
51 events: VecDeque::with_capacity(max_events.min(1000)),
52 max_events,
53 }
54 }
55
56 pub fn add_event(&mut self, timestamp_ms: i64) {
58 self.cleanup_old_events(timestamp_ms);
59
60 if self.events.len() >= self.max_events {
61 self.events.pop_front();
62 }
63
64 self.events.push_back(timestamp_ms);
65 }
66
67 pub fn count_events(&mut self, now_ms: i64) -> usize {
69 self.cleanup_old_events(now_ms);
70 self.events.len()
71 }
72
73 pub fn would_exceed_limit(&mut self, now_ms: i64, limit: usize) -> bool {
75 self.cleanup_old_events(now_ms);
76 self.events.len() >= limit
77 }
78
79 pub fn events_per_second(&mut self, now_ms: i64) -> f64 {
81 let count = self.count_events(now_ms);
82 let window_secs = self.window_size_ms as f64 / 1000.0;
83 count as f64 / window_secs
84 }
85
86 pub fn clear(&mut self) {
88 self.events.clear();
89 }
90
91 fn cleanup_old_events(&mut self, now_ms: i64) {
93 let cutoff = now_ms - self.window_size_ms as i64;
94 while let Some(&event_time) = self.events.front() {
95 if event_time < cutoff {
96 self.events.pop_front();
97 } else {
98 break;
99 }
100 }
101 }
102
103 pub fn oldest_event_age_ms(&self, now_ms: i64) -> Option<u64> {
105 self.events.front().map(|&ts| (now_ms - ts) as u64)
106 }
107
108 pub fn is_full(&self) -> bool {
110 self.events.len() >= self.max_events
111 }
112}
113
114#[derive(Debug, Clone)]
116pub struct TimeBucket {
117 pub start_ms: i64,
119 pub end_ms: i64,
121 pub sum: f64,
123 pub count: u64,
125 pub min: f64,
127 pub max: f64,
129}
130
131impl TimeBucket {
132 pub fn new(start_ms: i64, end_ms: i64) -> Self {
134 Self {
135 start_ms,
136 end_ms,
137 sum: 0.0,
138 count: 0,
139 min: f64::MAX,
140 max: f64::MIN,
141 }
142 }
143
144 pub fn add_value(&mut self, value: f64) {
146 self.sum += value;
147 self.count += 1;
148 self.min = self.min.min(value);
149 self.max = self.max.max(value);
150 }
151
152 pub fn average(&self) -> Option<f64> {
154 if self.count > 0 {
155 Some(self.sum / self.count as f64)
156 } else {
157 None
158 }
159 }
160
161 pub fn contains(&self, timestamp_ms: i64) -> bool {
163 timestamp_ms >= self.start_ms && timestamp_ms < self.end_ms
164 }
165
166 pub fn duration_ms(&self) -> u64 {
168 (self.end_ms - self.start_ms) as u64
169 }
170}
171
172#[derive(Debug, Clone)]
174pub struct BucketedTimeSeries {
175 bucket_size_ms: u64,
177 max_buckets: usize,
179 buckets: VecDeque<TimeBucket>,
181}
182
183impl BucketedTimeSeries {
184 pub fn new(bucket_size_ms: u64, max_buckets: usize) -> Self {
190 Self {
191 bucket_size_ms,
192 max_buckets,
193 buckets: VecDeque::with_capacity(max_buckets.min(1000)),
194 }
195 }
196
197 pub fn add_value(&mut self, timestamp_ms: i64, value: f64) {
199 let bucket_start = (timestamp_ms / self.bucket_size_ms as i64) * self.bucket_size_ms as i64;
200 let bucket_end = bucket_start + self.bucket_size_ms as i64;
201
202 if let Some(bucket) = self.buckets.iter_mut().find(|b| b.start_ms == bucket_start) {
204 bucket.add_value(value);
205 } else {
206 let mut new_bucket = TimeBucket::new(bucket_start, bucket_end);
208 new_bucket.add_value(value);
209
210 let insert_pos = self
212 .buckets
213 .iter()
214 .position(|b| b.start_ms > bucket_start)
215 .unwrap_or(self.buckets.len());
216 self.buckets.insert(insert_pos, new_bucket);
217
218 while self.buckets.len() > self.max_buckets {
220 self.buckets.pop_front();
221 }
222 }
223 }
224
225 pub fn buckets(&self) -> &VecDeque<TimeBucket> {
227 &self.buckets
228 }
229
230 pub fn buckets_in_range(&self, start_ms: i64, end_ms: i64) -> Vec<&TimeBucket> {
232 self.buckets
233 .iter()
234 .filter(|b| b.end_ms > start_ms && b.start_ms < end_ms)
235 .collect()
236 }
237
238 pub fn aggregate_stats(&self) -> Option<(f64, f64, f64, u64)> {
240 if self.buckets.is_empty() {
241 return None;
242 }
243
244 let total_sum: f64 = self.buckets.iter().map(|b| b.sum).sum();
245 let total_count: u64 = self.buckets.iter().map(|b| b.count).sum();
246 let min = self.buckets.iter().map(|b| b.min).fold(f64::MAX, f64::min);
247 let max = self.buckets.iter().map(|b| b.max).fold(f64::MIN, f64::max);
248
249 Some((total_sum / total_count as f64, min, max, total_count))
250 }
251
252 pub fn clear(&mut self) {
254 self.buckets.clear();
255 }
256}
257
258#[derive(Debug, Clone)]
286pub struct SlidingWindowRateLimiter {
287 window: TimeWindow,
289 max_requests: usize,
291}
292
293impl SlidingWindowRateLimiter {
294 pub fn new(window_ms: u64, max_requests: usize) -> Self {
300 Self {
301 window: TimeWindow::new(window_ms, max_requests * 2), max_requests,
303 }
304 }
305
306 pub fn is_allowed(&mut self, now_ms: i64) -> bool {
308 !self.window.would_exceed_limit(now_ms, self.max_requests)
309 }
310
311 pub fn try_acquire(&mut self, now_ms: i64) -> bool {
314 if self.is_allowed(now_ms) {
315 self.window.add_event(now_ms);
316 true
317 } else {
318 false
319 }
320 }
321
322 pub fn current_count(&mut self, now_ms: i64) -> usize {
324 self.window.count_events(now_ms)
325 }
326
327 pub fn remaining_capacity(&mut self, now_ms: i64) -> usize {
329 let current = self.current_count(now_ms);
330 self.max_requests.saturating_sub(current)
331 }
332
333 pub fn reset(&mut self) {
335 self.window.clear();
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342
343 #[test]
344 fn test_time_window_basic() {
345 let mut window = TimeWindow::new(1000, 100); let now = 1000;
347
348 window.add_event(now);
349 window.add_event(now + 100);
350 window.add_event(now + 200);
351
352 assert_eq!(window.count_events(now + 500), 3);
354
355 assert_eq!(window.count_events(now + 1000), 3); assert_eq!(window.count_events(now + 1100), 2); assert_eq!(window.count_events(now + 1200), 1); assert_eq!(window.count_events(now + 1300), 0); }
361
362 #[test]
363 fn test_time_window_would_exceed_limit() {
364 let mut window = TimeWindow::new(1000, 10);
365 let now = 1000;
366
367 for i in 0..5 {
368 window.add_event(now + i * 100);
369 }
370
371 assert!(!window.would_exceed_limit(now + 500, 10));
372 assert!(window.would_exceed_limit(now + 500, 3));
373 }
374
375 #[test]
376 fn test_time_window_events_per_second() {
377 let mut window = TimeWindow::new(1000, 100);
378 let now = 1000;
379
380 for i in 0..10 {
381 window.add_event(now + i * 50);
382 }
383
384 let rate = window.events_per_second(now + 500);
385 assert!((9.0..=11.0).contains(&rate)); }
387
388 #[test]
389 fn test_time_window_max_events() {
390 let mut window = TimeWindow::new(10000, 5); let now = 1000;
392
393 for i in 0..10 {
394 window.add_event(now + i);
395 }
396
397 assert_eq!(window.count_events(now + 20), 5);
399 }
400
401 #[test]
402 fn test_time_bucket_basic() {
403 let mut bucket = TimeBucket::new(0, 1000);
404
405 bucket.add_value(10.0);
406 bucket.add_value(20.0);
407 bucket.add_value(30.0);
408
409 assert_eq!(bucket.count, 3);
410 assert_eq!(bucket.sum, 60.0);
411 assert_eq!(bucket.average(), Some(20.0));
412 assert_eq!(bucket.min, 10.0);
413 assert_eq!(bucket.max, 30.0);
414 }
415
416 #[test]
417 fn test_time_bucket_contains() {
418 let bucket = TimeBucket::new(1000, 2000);
419
420 assert!(!bucket.contains(999));
421 assert!(bucket.contains(1000));
422 assert!(bucket.contains(1500));
423 assert!(!bucket.contains(2000));
424 }
425
426 #[test]
427 fn test_bucketed_time_series() {
428 let mut series = BucketedTimeSeries::new(1000, 10); series.add_value(1000, 10.0);
431 series.add_value(1500, 20.0);
432 series.add_value(2000, 30.0);
433 series.add_value(2500, 40.0);
434
435 assert_eq!(series.buckets().len(), 2); if let Some((avg, min, max, count)) = series.aggregate_stats() {
438 assert_eq!(count, 4);
439 assert_eq!(min, 10.0);
440 assert_eq!(max, 40.0);
441 assert_eq!(avg, 25.0);
442 } else {
443 panic!("Expected aggregate stats");
444 }
445 }
446
447 #[test]
448 fn test_bucketed_time_series_max_buckets() {
449 let mut series = BucketedTimeSeries::new(1000, 3); for i in 0..5 {
452 series.add_value((i * 1000) as i64, i as f64);
453 }
454
455 assert_eq!(series.buckets().len(), 3);
457 }
458
459 #[test]
460 fn test_sliding_window_rate_limiter() {
461 let mut limiter = SlidingWindowRateLimiter::new(1000, 5); let now = 1000;
463
464 for i in 0..5 {
466 assert!(limiter.try_acquire(now + i * 10));
467 }
468
469 assert!(!limiter.try_acquire(now + 50));
471
472 assert!(limiter.try_acquire(now + 1100));
474 }
475
476 #[test]
477 fn test_rate_limiter_remaining_capacity() {
478 let mut limiter = SlidingWindowRateLimiter::new(1000, 10);
479 let now = 1000;
480
481 assert_eq!(limiter.remaining_capacity(now), 10);
482
483 limiter.try_acquire(now);
484 limiter.try_acquire(now + 10);
485 limiter.try_acquire(now + 20);
486
487 assert_eq!(limiter.remaining_capacity(now + 30), 7);
488 }
489
490 #[test]
491 fn test_rate_limiter_reset() {
492 let mut limiter = SlidingWindowRateLimiter::new(1000, 5);
493 let now = 1000;
494
495 for i in 0..5 {
496 limiter.try_acquire(now + i);
497 }
498
499 assert_eq!(limiter.current_count(now + 10), 5);
500
501 limiter.reset();
502 assert_eq!(limiter.current_count(now + 10), 0);
503 }
504}