Skip to main content

synapse_pingora/profiler/
rate_tracker.rs

1//! Sliding window request rate tracking.
2//!
3//! Uses a circular buffer to track request timestamps for rate calculation
4//! within a configurable time window.
5//!
6//! ## Performance
7//! - Record: O(1)
8//! - Current rate: O(n) where n = buffer size (64)
9//! - Memory: ~520 bytes
10
11use serde::{Deserialize, Serialize};
12
13// ============================================================================
14// RateTracker - Sliding window request rate
15// ============================================================================
16
17/// Circular buffer for tracking requests per minute.
18///
19/// Uses 60-second sliding window for rate burst detection.
20/// Memory: ~520 bytes (64 timestamps)
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct RateTracker {
23    /// Circular buffer of request timestamps (ms)
24    /// Skip serde: rebuilt at runtime, arrays > 32 lack default serde support
25    #[serde(skip, default = "default_timestamps")]
26    timestamps: [u64; 64],
27    /// Write index (wraps at 64)
28    write_idx: u8,
29    /// Number of valid entries
30    valid_count: u8,
31}
32
33fn default_timestamps() -> [u64; 64] {
34    [0u64; 64]
35}
36
37impl Default for RateTracker {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl RateTracker {
44    /// Create a new rate tracker.
45    pub fn new() -> Self {
46        Self {
47            timestamps: [0; 64],
48            write_idx: 0,
49            valid_count: 0,
50        }
51    }
52
53    /// Record a request and return current rate (requests per minute).
54    #[inline]
55    pub fn record(&mut self, now_ms: u64) -> f64 {
56        self.timestamps[self.write_idx as usize] = now_ms;
57        self.write_idx = (self.write_idx + 1) % 64;
58        if self.valid_count < 64 {
59            self.valid_count += 1;
60        }
61
62        self.current_rate(now_ms)
63    }
64
65    /// Get current rate without recording (requests per minute).
66    #[inline]
67    pub fn current_rate(&self, now_ms: u64) -> f64 {
68        if self.valid_count == 0 {
69            return 0.0;
70        }
71
72        let window_ms: u64 = 60_000; // 60 seconds
73        let cutoff = now_ms.saturating_sub(window_ms);
74
75        let count = self
76            .timestamps
77            .iter()
78            .take(self.valid_count as usize)
79            .filter(|&&ts| ts > cutoff)
80            .count();
81
82        count as f64 // requests per minute (60s window)
83    }
84
85    /// Get rate for a custom window (in milliseconds).
86    #[inline]
87    pub fn rate_in_window(&self, now_ms: u64, window_ms: u64) -> f64 {
88        if self.valid_count == 0 || window_ms == 0 {
89            return 0.0;
90        }
91
92        let cutoff = now_ms.saturating_sub(window_ms);
93
94        let count = self
95            .timestamps
96            .iter()
97            .take(self.valid_count as usize)
98            .filter(|&&ts| ts > cutoff)
99            .count();
100
101        // Normalize to requests per minute
102        (count as f64 * 60_000.0) / window_ms as f64
103    }
104
105    /// Get the number of requests recorded (up to buffer size).
106    #[inline]
107    pub fn request_count(&self) -> u8 {
108        self.valid_count
109    }
110
111    /// Get the most recent timestamp.
112    #[inline]
113    pub fn last_request_time(&self) -> Option<u64> {
114        if self.valid_count == 0 {
115            return None;
116        }
117        // Write index points to next slot, so previous is most recent
118        let last_idx = if self.write_idx == 0 {
119            (self.valid_count - 1) as usize
120        } else {
121            (self.write_idx - 1) as usize
122        };
123        Some(self.timestamps[last_idx])
124    }
125
126    /// Clear all recorded timestamps.
127    pub fn clear(&mut self) {
128        self.timestamps = [0; 64];
129        self.write_idx = 0;
130        self.valid_count = 0;
131    }
132
133    /// Check if there's a burst (rate exceeds multiplier * baseline).
134    #[inline]
135    pub fn is_burst(&self, now_ms: u64, baseline_rate: f64, multiplier: f64) -> bool {
136        if baseline_rate <= 0.0 {
137            return false;
138        }
139        let current = self.current_rate(now_ms);
140        current > baseline_rate * multiplier
141    }
142}
143
144// ============================================================================
145// Tests
146// ============================================================================
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151
152    #[test]
153    fn test_rate_tracker_new() {
154        let rt = RateTracker::new();
155        assert_eq!(rt.request_count(), 0);
156        assert_eq!(rt.current_rate(1000000), 0.0);
157    }
158
159    #[test]
160    fn test_rate_tracker_record() {
161        let mut rt = RateTracker::new();
162        let base_time = 1000000u64;
163
164        // Record 10 requests in 1 second
165        for i in 0..10 {
166            rt.record(base_time + i * 100);
167        }
168
169        // Rate should be 10 req/min (all within 60s window)
170        let rate = rt.current_rate(base_time + 1000);
171        assert!((rate - 10.0).abs() < 0.1);
172    }
173
174    #[test]
175    fn test_rate_tracker_sliding_window() {
176        let mut rt = RateTracker::new();
177        let base_time = 1000000u64;
178
179        // Record 5 requests at base time
180        for i in 0..5 {
181            rt.record(base_time + i * 100);
182        }
183
184        // Check rate at base time + 500ms
185        let rate = rt.current_rate(base_time + 500);
186        assert_eq!(rate, 5.0);
187
188        // Check rate 61 seconds later - all old requests should be outside window
189        let rate_later = rt.current_rate(base_time + 61_000);
190        assert_eq!(rate_later, 0.0);
191    }
192
193    #[test]
194    fn test_rate_tracker_buffer_wraparound() {
195        let mut rt = RateTracker::new();
196        let base_time = 1000000u64;
197
198        // Record 100 requests (exceeds buffer size of 64)
199        for i in 0..100 {
200            rt.record(base_time + i * 100);
201        }
202
203        // Should have max 64 entries
204        assert_eq!(rt.request_count(), 64);
205
206        // Rate should count requests in window
207        let rate = rt.current_rate(base_time + 10000);
208        assert!(rate > 0.0);
209    }
210
211    #[test]
212    fn test_rate_tracker_last_request_time() {
213        let mut rt = RateTracker::new();
214
215        assert!(rt.last_request_time().is_none());
216
217        rt.record(1000);
218        assert_eq!(rt.last_request_time(), Some(1000));
219
220        rt.record(2000);
221        assert_eq!(rt.last_request_time(), Some(2000));
222
223        rt.record(3000);
224        assert_eq!(rt.last_request_time(), Some(3000));
225    }
226
227    #[test]
228    fn test_rate_tracker_clear() {
229        let mut rt = RateTracker::new();
230
231        for i in 0..10 {
232            rt.record(1000 + i * 100);
233        }
234        assert_eq!(rt.request_count(), 10);
235
236        rt.clear();
237        assert_eq!(rt.request_count(), 0);
238        assert!(rt.last_request_time().is_none());
239    }
240
241    #[test]
242    fn test_rate_tracker_is_burst() {
243        let mut rt = RateTracker::new();
244        let base_time = 1000000u64;
245
246        // Record 20 requests
247        for i in 0..20 {
248            rt.record(base_time + i * 100);
249        }
250
251        // Baseline of 5 req/min, current is 20 req/min
252        // 20 > 5 * 3 = 15, so should be burst
253        assert!(rt.is_burst(base_time + 2000, 5.0, 3.0));
254
255        // 20 > 5 * 5 = 25? No
256        assert!(!rt.is_burst(base_time + 2000, 5.0, 5.0));
257    }
258
259    #[test]
260    fn test_rate_tracker_is_burst_zero_baseline() {
261        let mut rt = RateTracker::new();
262        rt.record(1000);
263
264        // Zero baseline should not trigger burst
265        assert!(!rt.is_burst(1000, 0.0, 2.0));
266    }
267
268    #[test]
269    fn test_rate_in_window() {
270        let mut rt = RateTracker::new();
271        let base_time = 1000000u64;
272
273        // Record 10 requests over 10 seconds (at 0s, 1s, 2s, ..., 9s)
274        for i in 0..10 {
275            rt.record(base_time + i * 1000);
276        }
277
278        // Rate in 10-second window at now=base+10000
279        // cutoff = 10000 - 10000 = 0, filter is ts > 0
280        // Requests at 1s, 2s, ..., 9s = 9 requests (ts=0 excluded)
281        // Rate = 9 * 60000 / 10000 = 54 req/min
282        let rate = rt.rate_in_window(base_time + 10000, 10_000);
283        assert!((rate - 54.0).abs() < 1.0);
284
285        // Rate in 5-second window (requests at 5s, 6s, 7s, 8s, 9s = 5 requests)
286        // cutoff = 10000 - 5000 = 5000, filter is ts > 5000
287        // Requests at 6s, 7s, 8s, 9s = 4 requests
288        let rate_5s = rt.rate_in_window(base_time + 10000, 5_000);
289        // 4 requests * 60000 / 5000 = 48 req/min
290        assert!(rate_5s > 0.0);
291    }
292
293    #[test]
294    fn test_rate_in_window_zero() {
295        let rt = RateTracker::new();
296        assert_eq!(rt.rate_in_window(1000, 0), 0.0);
297        assert_eq!(rt.rate_in_window(1000, 10000), 0.0);
298    }
299
300    #[test]
301    fn test_rate_tracker_timestamps_outside_window() {
302        let mut rt = RateTracker::new();
303
304        // Record requests at time 0
305        for i in 0..5 {
306            rt.record(i * 100);
307        }
308
309        // Check rate way in the future - all timestamps should be outside window
310        let rate = rt.current_rate(120_000); // 2 minutes later
311        assert_eq!(rate, 0.0);
312    }
313
314    #[test]
315    fn test_rate_tracker_exact_boundary() {
316        let mut rt = RateTracker::new();
317
318        // Record at exactly 60 seconds ago
319        rt.record(0);
320
321        // At exactly 60000ms, the request at 0 should be outside the window
322        // (cutoff = 60000 - 60000 = 0, filter is ts > cutoff, so ts > 0)
323        let rate = rt.current_rate(60_000);
324        assert_eq!(rate, 0.0);
325
326        // At 59999ms with saturating_sub:
327        // cutoff = 59999 - 60000 = 0 (saturates to 0)
328        // filter is ts > 0, so request at ts=0 is excluded
329        let rate_earlier = rt.current_rate(59_999);
330        assert_eq!(rate_earlier, 0.0);
331
332        // Test with a request actually inside the window
333        rt.record(1); // request at ts=1
334                      // At 60000ms, cutoff = 0, ts=1 > 0 is true
335        let rate_with_in_window = rt.current_rate(60_000);
336        assert_eq!(rate_with_in_window, 1.0);
337    }
338}