chie_shared/utils/
time_window.rs

1//! Time window utilities for rate limiting, metrics collection, and sliding window analytics.
2
3use std::collections::VecDeque;
4
5/// Time window for tracking events within a specific time period.
6///
7/// # Examples
8///
9/// ```
10/// use chie_shared::TimeWindow;
11///
12/// let mut window = TimeWindow::new(1000, 100); // 1 second window, max 100 events
13///
14/// // Track events at different timestamps
15/// window.add_event(1000);
16/// window.add_event(1100);
17/// window.add_event(1200);
18///
19/// // Count events within the window
20/// assert_eq!(window.count_events(1500), 3);
21///
22/// // Events older than window size are cleaned up
23/// assert_eq!(window.count_events(2500), 0);
24///
25/// // Check rate limiting
26/// let mut limiter = TimeWindow::new(1000, 5);
27/// for i in 0..5 {
28///     limiter.add_event(1000 + i * 10);
29/// }
30/// assert!(limiter.would_exceed_limit(1050, 5));
31/// ```
32#[derive(Debug, Clone)]
33pub struct TimeWindow {
34    /// Window size in milliseconds.
35    window_size_ms: u64,
36    /// Events with their timestamps.
37    events: VecDeque<i64>,
38    /// Maximum events to track (prevents unbounded memory growth).
39    max_events: usize,
40}
41
42impl TimeWindow {
43    /// Create a new time window.
44    ///
45    /// # Arguments
46    /// * `window_size_ms` - Window size in milliseconds
47    /// * `max_events` - Maximum number of events to track
48    pub fn new(window_size_ms: u64, max_events: usize) -> Self {
49        Self {
50            window_size_ms,
51            events: VecDeque::with_capacity(max_events.min(1000)),
52            max_events,
53        }
54    }
55
56    /// Add an event at the current time.
57    pub fn add_event(&mut self, timestamp_ms: i64) {
58        self.cleanup_old_events(timestamp_ms);
59
60        if self.events.len() >= self.max_events {
61            self.events.pop_front();
62        }
63
64        self.events.push_back(timestamp_ms);
65    }
66
67    /// Get the count of events within the window from the given timestamp.
68    pub fn count_events(&mut self, now_ms: i64) -> usize {
69        self.cleanup_old_events(now_ms);
70        self.events.len()
71    }
72
73    /// Check if adding a new event would exceed the given limit.
74    pub fn would_exceed_limit(&mut self, now_ms: i64, limit: usize) -> bool {
75        self.cleanup_old_events(now_ms);
76        self.events.len() >= limit
77    }
78
79    /// Get events per second rate.
80    pub fn events_per_second(&mut self, now_ms: i64) -> f64 {
81        let count = self.count_events(now_ms);
82        let window_secs = self.window_size_ms as f64 / 1000.0;
83        count as f64 / window_secs
84    }
85
86    /// Clear all events.
87    pub fn clear(&mut self) {
88        self.events.clear();
89    }
90
91    /// Remove events older than the window.
92    fn cleanup_old_events(&mut self, now_ms: i64) {
93        let cutoff = now_ms - self.window_size_ms as i64;
94        while let Some(&event_time) = self.events.front() {
95            if event_time < cutoff {
96                self.events.pop_front();
97            } else {
98                break;
99            }
100        }
101    }
102
103    /// Get the age of the oldest event in milliseconds.
104    pub fn oldest_event_age_ms(&self, now_ms: i64) -> Option<u64> {
105        self.events.front().map(|&ts| (now_ms - ts) as u64)
106    }
107
108    /// Check if the window is full.
109    pub fn is_full(&self) -> bool {
110        self.events.len() >= self.max_events
111    }
112}
113
114/// Fixed-size time bucket for aggregating metrics over time periods.
115#[derive(Debug, Clone)]
116pub struct TimeBucket {
117    /// Bucket start time (inclusive).
118    pub start_ms: i64,
119    /// Bucket end time (exclusive).
120    pub end_ms: i64,
121    /// Sum of values in this bucket.
122    pub sum: f64,
123    /// Count of values in this bucket.
124    pub count: u64,
125    /// Minimum value in this bucket.
126    pub min: f64,
127    /// Maximum value in this bucket.
128    pub max: f64,
129}
130
131impl TimeBucket {
132    /// Create a new time bucket.
133    pub fn new(start_ms: i64, end_ms: i64) -> Self {
134        Self {
135            start_ms,
136            end_ms,
137            sum: 0.0,
138            count: 0,
139            min: f64::MAX,
140            max: f64::MIN,
141        }
142    }
143
144    /// Add a value to the bucket.
145    pub fn add_value(&mut self, value: f64) {
146        self.sum += value;
147        self.count += 1;
148        self.min = self.min.min(value);
149        self.max = self.max.max(value);
150    }
151
152    /// Get the average value.
153    pub fn average(&self) -> Option<f64> {
154        if self.count > 0 {
155            Some(self.sum / self.count as f64)
156        } else {
157            None
158        }
159    }
160
161    /// Check if a timestamp falls within this bucket.
162    pub fn contains(&self, timestamp_ms: i64) -> bool {
163        timestamp_ms >= self.start_ms && timestamp_ms < self.end_ms
164    }
165
166    /// Get the bucket duration in milliseconds.
167    pub fn duration_ms(&self) -> u64 {
168        (self.end_ms - self.start_ms) as u64
169    }
170}
171
172/// Bucketed time series for aggregating metrics into fixed time intervals.
173#[derive(Debug, Clone)]
174pub struct BucketedTimeSeries {
175    /// Duration of each bucket in milliseconds.
176    bucket_size_ms: u64,
177    /// Maximum number of buckets to retain.
178    max_buckets: usize,
179    /// The buckets, ordered from oldest to newest.
180    buckets: VecDeque<TimeBucket>,
181}
182
183impl BucketedTimeSeries {
184    /// Create a new bucketed time series.
185    ///
186    /// # Arguments
187    /// * `bucket_size_ms` - Size of each bucket in milliseconds
188    /// * `max_buckets` - Maximum number of buckets to retain
189    pub fn new(bucket_size_ms: u64, max_buckets: usize) -> Self {
190        Self {
191            bucket_size_ms,
192            max_buckets,
193            buckets: VecDeque::with_capacity(max_buckets.min(1000)),
194        }
195    }
196
197    /// Add a value at the given timestamp.
198    pub fn add_value(&mut self, timestamp_ms: i64, value: f64) {
199        let bucket_start = (timestamp_ms / self.bucket_size_ms as i64) * self.bucket_size_ms as i64;
200        let bucket_end = bucket_start + self.bucket_size_ms as i64;
201
202        // Find or create the appropriate bucket
203        if let Some(bucket) = self.buckets.iter_mut().find(|b| b.start_ms == bucket_start) {
204            bucket.add_value(value);
205        } else {
206            // Create new bucket
207            let mut new_bucket = TimeBucket::new(bucket_start, bucket_end);
208            new_bucket.add_value(value);
209
210            // Insert in chronological order
211            let insert_pos = self
212                .buckets
213                .iter()
214                .position(|b| b.start_ms > bucket_start)
215                .unwrap_or(self.buckets.len());
216            self.buckets.insert(insert_pos, new_bucket);
217
218            // Remove oldest buckets if we exceed max_buckets
219            while self.buckets.len() > self.max_buckets {
220                self.buckets.pop_front();
221            }
222        }
223    }
224
225    /// Get all buckets.
226    pub fn buckets(&self) -> &VecDeque<TimeBucket> {
227        &self.buckets
228    }
229
230    /// Get buckets within a time range.
231    pub fn buckets_in_range(&self, start_ms: i64, end_ms: i64) -> Vec<&TimeBucket> {
232        self.buckets
233            .iter()
234            .filter(|b| b.end_ms > start_ms && b.start_ms < end_ms)
235            .collect()
236    }
237
238    /// Calculate aggregate statistics across all buckets.
239    pub fn aggregate_stats(&self) -> Option<(f64, f64, f64, u64)> {
240        if self.buckets.is_empty() {
241            return None;
242        }
243
244        let total_sum: f64 = self.buckets.iter().map(|b| b.sum).sum();
245        let total_count: u64 = self.buckets.iter().map(|b| b.count).sum();
246        let min = self.buckets.iter().map(|b| b.min).fold(f64::MAX, f64::min);
247        let max = self.buckets.iter().map(|b| b.max).fold(f64::MIN, f64::max);
248
249        Some((total_sum / total_count as f64, min, max, total_count))
250    }
251
252    /// Clear all buckets.
253    pub fn clear(&mut self) {
254        self.buckets.clear();
255    }
256}
257
258/// Rate limiter using sliding window algorithm.
259///
260/// # Examples
261///
262/// ```
263/// use chie_shared::SlidingWindowRateLimiter;
264///
265/// // Allow 10 requests per second
266/// let mut limiter = SlidingWindowRateLimiter::new(1000, 10);
267///
268/// // First 10 requests should be allowed
269/// for i in 0..10 {
270///     assert!(limiter.try_acquire(1000 + i));
271/// }
272///
273/// // 11th request should be denied
274/// assert!(!limiter.try_acquire(1010));
275///
276/// // After window passes, new requests allowed
277/// assert!(limiter.try_acquire(2100));
278///
279/// // Check remaining capacity
280/// let mut limiter2 = SlidingWindowRateLimiter::new(1000, 5);
281/// limiter2.try_acquire(1000);
282/// limiter2.try_acquire(1001);
283/// assert_eq!(limiter2.remaining_capacity(1002), 3);
284/// ```
285#[derive(Debug, Clone)]
286pub struct SlidingWindowRateLimiter {
287    /// Time window for rate limiting.
288    window: TimeWindow,
289    /// Maximum requests allowed in the window.
290    max_requests: usize,
291}
292
293impl SlidingWindowRateLimiter {
294    /// Create a new rate limiter.
295    ///
296    /// # Arguments
297    /// * `window_ms` - Window size in milliseconds
298    /// * `max_requests` - Maximum requests allowed in the window
299    pub fn new(window_ms: u64, max_requests: usize) -> Self {
300        Self {
301            window: TimeWindow::new(window_ms, max_requests * 2), // Extra capacity for safety
302            max_requests,
303        }
304    }
305
306    /// Check if a request is allowed at the given time.
307    pub fn is_allowed(&mut self, now_ms: i64) -> bool {
308        !self.window.would_exceed_limit(now_ms, self.max_requests)
309    }
310
311    /// Try to consume a token (record a request).
312    /// Returns true if the request is allowed, false if rate limit is exceeded.
313    pub fn try_acquire(&mut self, now_ms: i64) -> bool {
314        if self.is_allowed(now_ms) {
315            self.window.add_event(now_ms);
316            true
317        } else {
318            false
319        }
320    }
321
322    /// Get the current request count in the window.
323    pub fn current_count(&mut self, now_ms: i64) -> usize {
324        self.window.count_events(now_ms)
325    }
326
327    /// Get remaining capacity.
328    pub fn remaining_capacity(&mut self, now_ms: i64) -> usize {
329        let current = self.current_count(now_ms);
330        self.max_requests.saturating_sub(current)
331    }
332
333    /// Reset the rate limiter.
334    pub fn reset(&mut self) {
335        self.window.clear();
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    #[test]
344    fn test_time_window_basic() {
345        let mut window = TimeWindow::new(1000, 100); // 1 second window
346        let now = 1000;
347
348        window.add_event(now);
349        window.add_event(now + 100);
350        window.add_event(now + 200);
351
352        // All events within 1000ms window
353        assert_eq!(window.count_events(now + 500), 3);
354
355        // Sliding window keeps events from the last 1000ms
356        assert_eq!(window.count_events(now + 1000), 3); // All still within window
357        assert_eq!(window.count_events(now + 1100), 2); // Event at 1000 expired
358        assert_eq!(window.count_events(now + 1200), 1); // Events at 1000, 1100 expired
359        assert_eq!(window.count_events(now + 1300), 0); // All events expired (all > 1000ms old)
360    }
361
362    #[test]
363    fn test_time_window_would_exceed_limit() {
364        let mut window = TimeWindow::new(1000, 10);
365        let now = 1000;
366
367        for i in 0..5 {
368            window.add_event(now + i * 100);
369        }
370
371        assert!(!window.would_exceed_limit(now + 500, 10));
372        assert!(window.would_exceed_limit(now + 500, 3));
373    }
374
375    #[test]
376    fn test_time_window_events_per_second() {
377        let mut window = TimeWindow::new(1000, 100);
378        let now = 1000;
379
380        for i in 0..10 {
381            window.add_event(now + i * 50);
382        }
383
384        let rate = window.events_per_second(now + 500);
385        assert!((9.0..=11.0).contains(&rate)); // ~10 events per second
386    }
387
388    #[test]
389    fn test_time_window_max_events() {
390        let mut window = TimeWindow::new(10000, 5); // Small max_events
391        let now = 1000;
392
393        for i in 0..10 {
394            window.add_event(now + i);
395        }
396
397        // Should only keep 5 most recent events
398        assert_eq!(window.count_events(now + 20), 5);
399    }
400
401    #[test]
402    fn test_time_bucket_basic() {
403        let mut bucket = TimeBucket::new(0, 1000);
404
405        bucket.add_value(10.0);
406        bucket.add_value(20.0);
407        bucket.add_value(30.0);
408
409        assert_eq!(bucket.count, 3);
410        assert_eq!(bucket.sum, 60.0);
411        assert_eq!(bucket.average(), Some(20.0));
412        assert_eq!(bucket.min, 10.0);
413        assert_eq!(bucket.max, 30.0);
414    }
415
416    #[test]
417    fn test_time_bucket_contains() {
418        let bucket = TimeBucket::new(1000, 2000);
419
420        assert!(!bucket.contains(999));
421        assert!(bucket.contains(1000));
422        assert!(bucket.contains(1500));
423        assert!(!bucket.contains(2000));
424    }
425
426    #[test]
427    fn test_bucketed_time_series() {
428        let mut series = BucketedTimeSeries::new(1000, 10); // 1 second buckets
429
430        series.add_value(1000, 10.0);
431        series.add_value(1500, 20.0);
432        series.add_value(2000, 30.0);
433        series.add_value(2500, 40.0);
434
435        assert_eq!(series.buckets().len(), 2); // Two buckets: [1000-2000), [2000-3000)
436
437        if let Some((avg, min, max, count)) = series.aggregate_stats() {
438            assert_eq!(count, 4);
439            assert_eq!(min, 10.0);
440            assert_eq!(max, 40.0);
441            assert_eq!(avg, 25.0);
442        } else {
443            panic!("Expected aggregate stats");
444        }
445    }
446
447    #[test]
448    fn test_bucketed_time_series_max_buckets() {
449        let mut series = BucketedTimeSeries::new(1000, 3); // Max 3 buckets
450
451        for i in 0..5 {
452            series.add_value((i * 1000) as i64, i as f64);
453        }
454
455        // Should only keep 3 most recent buckets
456        assert_eq!(series.buckets().len(), 3);
457    }
458
459    #[test]
460    fn test_sliding_window_rate_limiter() {
461        let mut limiter = SlidingWindowRateLimiter::new(1000, 5); // 5 requests per second
462        let now = 1000;
463
464        // First 5 requests should succeed
465        for i in 0..5 {
466            assert!(limiter.try_acquire(now + i * 10));
467        }
468
469        // 6th request should fail
470        assert!(!limiter.try_acquire(now + 50));
471
472        // After window expires, should succeed again
473        assert!(limiter.try_acquire(now + 1100));
474    }
475
476    #[test]
477    fn test_rate_limiter_remaining_capacity() {
478        let mut limiter = SlidingWindowRateLimiter::new(1000, 10);
479        let now = 1000;
480
481        assert_eq!(limiter.remaining_capacity(now), 10);
482
483        limiter.try_acquire(now);
484        limiter.try_acquire(now + 10);
485        limiter.try_acquire(now + 20);
486
487        assert_eq!(limiter.remaining_capacity(now + 30), 7);
488    }
489
490    #[test]
491    fn test_rate_limiter_reset() {
492        let mut limiter = SlidingWindowRateLimiter::new(1000, 5);
493        let now = 1000;
494
495        for i in 0..5 {
496            limiter.try_acquire(now + i);
497        }
498
499        assert_eq!(limiter.current_count(now + 10), 5);
500
501        limiter.reset();
502        assert_eq!(limiter.current_count(now + 10), 0);
503    }
504}