synapse_pingora/profiler/
rate_tracker.rs1use serde::{Deserialize, Serialize};
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct RateTracker {
23 #[serde(skip, default = "default_timestamps")]
26 timestamps: [u64; 64],
27 write_idx: u8,
29 valid_count: u8,
31}
32
33fn default_timestamps() -> [u64; 64] {
34 [0u64; 64]
35}
36
37impl Default for RateTracker {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl RateTracker {
44 pub fn new() -> Self {
46 Self {
47 timestamps: [0; 64],
48 write_idx: 0,
49 valid_count: 0,
50 }
51 }
52
53 #[inline]
55 pub fn record(&mut self, now_ms: u64) -> f64 {
56 self.timestamps[self.write_idx as usize] = now_ms;
57 self.write_idx = (self.write_idx + 1) % 64;
58 if self.valid_count < 64 {
59 self.valid_count += 1;
60 }
61
62 self.current_rate(now_ms)
63 }
64
65 #[inline]
67 pub fn current_rate(&self, now_ms: u64) -> f64 {
68 if self.valid_count == 0 {
69 return 0.0;
70 }
71
72 let window_ms: u64 = 60_000; let cutoff = now_ms.saturating_sub(window_ms);
74
75 let count = self
76 .timestamps
77 .iter()
78 .take(self.valid_count as usize)
79 .filter(|&&ts| ts > cutoff)
80 .count();
81
82 count as f64 }
84
85 #[inline]
87 pub fn rate_in_window(&self, now_ms: u64, window_ms: u64) -> f64 {
88 if self.valid_count == 0 || window_ms == 0 {
89 return 0.0;
90 }
91
92 let cutoff = now_ms.saturating_sub(window_ms);
93
94 let count = self
95 .timestamps
96 .iter()
97 .take(self.valid_count as usize)
98 .filter(|&&ts| ts > cutoff)
99 .count();
100
101 (count as f64 * 60_000.0) / window_ms as f64
103 }
104
105 #[inline]
107 pub fn request_count(&self) -> u8 {
108 self.valid_count
109 }
110
111 #[inline]
113 pub fn last_request_time(&self) -> Option<u64> {
114 if self.valid_count == 0 {
115 return None;
116 }
117 let last_idx = if self.write_idx == 0 {
119 (self.valid_count - 1) as usize
120 } else {
121 (self.write_idx - 1) as usize
122 };
123 Some(self.timestamps[last_idx])
124 }
125
126 pub fn clear(&mut self) {
128 self.timestamps = [0; 64];
129 self.write_idx = 0;
130 self.valid_count = 0;
131 }
132
133 #[inline]
135 pub fn is_burst(&self, now_ms: u64, baseline_rate: f64, multiplier: f64) -> bool {
136 if baseline_rate <= 0.0 {
137 return false;
138 }
139 let current = self.current_rate(now_ms);
140 current > baseline_rate * multiplier
141 }
142}
143
144#[cfg(test)]
149mod tests {
150 use super::*;
151
152 #[test]
153 fn test_rate_tracker_new() {
154 let rt = RateTracker::new();
155 assert_eq!(rt.request_count(), 0);
156 assert_eq!(rt.current_rate(1000000), 0.0);
157 }
158
159 #[test]
160 fn test_rate_tracker_record() {
161 let mut rt = RateTracker::new();
162 let base_time = 1000000u64;
163
164 for i in 0..10 {
166 rt.record(base_time + i * 100);
167 }
168
169 let rate = rt.current_rate(base_time + 1000);
171 assert!((rate - 10.0).abs() < 0.1);
172 }
173
174 #[test]
175 fn test_rate_tracker_sliding_window() {
176 let mut rt = RateTracker::new();
177 let base_time = 1000000u64;
178
179 for i in 0..5 {
181 rt.record(base_time + i * 100);
182 }
183
184 let rate = rt.current_rate(base_time + 500);
186 assert_eq!(rate, 5.0);
187
188 let rate_later = rt.current_rate(base_time + 61_000);
190 assert_eq!(rate_later, 0.0);
191 }
192
193 #[test]
194 fn test_rate_tracker_buffer_wraparound() {
195 let mut rt = RateTracker::new();
196 let base_time = 1000000u64;
197
198 for i in 0..100 {
200 rt.record(base_time + i * 100);
201 }
202
203 assert_eq!(rt.request_count(), 64);
205
206 let rate = rt.current_rate(base_time + 10000);
208 assert!(rate > 0.0);
209 }
210
211 #[test]
212 fn test_rate_tracker_last_request_time() {
213 let mut rt = RateTracker::new();
214
215 assert!(rt.last_request_time().is_none());
216
217 rt.record(1000);
218 assert_eq!(rt.last_request_time(), Some(1000));
219
220 rt.record(2000);
221 assert_eq!(rt.last_request_time(), Some(2000));
222
223 rt.record(3000);
224 assert_eq!(rt.last_request_time(), Some(3000));
225 }
226
227 #[test]
228 fn test_rate_tracker_clear() {
229 let mut rt = RateTracker::new();
230
231 for i in 0..10 {
232 rt.record(1000 + i * 100);
233 }
234 assert_eq!(rt.request_count(), 10);
235
236 rt.clear();
237 assert_eq!(rt.request_count(), 0);
238 assert!(rt.last_request_time().is_none());
239 }
240
241 #[test]
242 fn test_rate_tracker_is_burst() {
243 let mut rt = RateTracker::new();
244 let base_time = 1000000u64;
245
246 for i in 0..20 {
248 rt.record(base_time + i * 100);
249 }
250
251 assert!(rt.is_burst(base_time + 2000, 5.0, 3.0));
254
255 assert!(!rt.is_burst(base_time + 2000, 5.0, 5.0));
257 }
258
259 #[test]
260 fn test_rate_tracker_is_burst_zero_baseline() {
261 let mut rt = RateTracker::new();
262 rt.record(1000);
263
264 assert!(!rt.is_burst(1000, 0.0, 2.0));
266 }
267
268 #[test]
269 fn test_rate_in_window() {
270 let mut rt = RateTracker::new();
271 let base_time = 1000000u64;
272
273 for i in 0..10 {
275 rt.record(base_time + i * 1000);
276 }
277
278 let rate = rt.rate_in_window(base_time + 10000, 10_000);
283 assert!((rate - 54.0).abs() < 1.0);
284
285 let rate_5s = rt.rate_in_window(base_time + 10000, 5_000);
289 assert!(rate_5s > 0.0);
291 }
292
293 #[test]
294 fn test_rate_in_window_zero() {
295 let rt = RateTracker::new();
296 assert_eq!(rt.rate_in_window(1000, 0), 0.0);
297 assert_eq!(rt.rate_in_window(1000, 10000), 0.0);
298 }
299
300 #[test]
301 fn test_rate_tracker_timestamps_outside_window() {
302 let mut rt = RateTracker::new();
303
304 for i in 0..5 {
306 rt.record(i * 100);
307 }
308
309 let rate = rt.current_rate(120_000); assert_eq!(rate, 0.0);
312 }
313
314 #[test]
315 fn test_rate_tracker_exact_boundary() {
316 let mut rt = RateTracker::new();
317
318 rt.record(0);
320
321 let rate = rt.current_rate(60_000);
324 assert_eq!(rate, 0.0);
325
326 let rate_earlier = rt.current_rate(59_999);
330 assert_eq!(rate_earlier, 0.0);
331
332 rt.record(1); let rate_with_in_window = rt.current_rate(60_000);
336 assert_eq!(rate_with_in_window, 1.0);
337 }
338}