1use crate::estimator::Estimator;
19use std::hash::Hash;
20use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
21use std::time::{Duration, Instant};
22
23#[non_exhaustive]
33#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
34pub struct RateComponents {
35 pub prev_samples: isize,
36 pub curr_samples: isize,
37 pub interval: Duration,
38 pub current_interval_fraction: f64,
39}
40
41#[allow(dead_code)]
48pub static PROPORTIONAL_RATE_ESTIMATE_CALC_FN: fn(RateComponents) -> f64 =
49 |rate_info: RateComponents| {
50 let prev = rate_info.prev_samples as f64;
51 let curr = rate_info.curr_samples as f64;
52 let interval_secs = rate_info.interval.as_secs_f64();
53 let interval_fraction = rate_info.current_interval_fraction;
54
55 let weighted_count = prev * (1. - interval_fraction) + curr;
56 weighted_count / interval_secs
57 };
58
59pub struct Rate {
64 red_slot: Estimator,
66 blue_slot: Estimator,
67 red_or_blue: AtomicBool, start: Instant,
69 reset_interval_ms: u64, last_reset_time: AtomicU64, interval: Duration,
73}
74
75const HASHES: usize = 4;
77const SLOTS: usize = 1024; impl Rate {
80 pub fn new(interval: std::time::Duration) -> Self {
82 Rate::new_with_estimator_config(interval, HASHES, SLOTS)
83 }
84
85 #[inline]
87 pub fn new_with_estimator_config(
88 interval: std::time::Duration,
89 hashes: usize,
90 slots: usize,
91 ) -> Self {
92 Rate {
93 red_slot: Estimator::new(hashes, slots),
94 blue_slot: Estimator::new(hashes, slots),
95 red_or_blue: AtomicBool::new(true),
96 start: Instant::now(),
97 reset_interval_ms: interval.as_millis() as u64, last_reset_time: AtomicU64::new(0),
99 interval,
100 }
101 }
102
103 fn current(&self, red_or_blue: bool) -> &Estimator {
104 if red_or_blue {
105 &self.red_slot
106 } else {
107 &self.blue_slot
108 }
109 }
110
111 fn previous(&self, red_or_blue: bool) -> &Estimator {
112 if red_or_blue {
113 &self.blue_slot
114 } else {
115 &self.red_slot
116 }
117 }
118
119 fn red_or_blue(&self) -> bool {
120 self.red_or_blue.load(Ordering::SeqCst)
121 }
122
123 pub fn rate<T: Hash>(&self, key: &T) -> f64 {
127 let past_ms = self.maybe_reset();
128 if past_ms >= self.reset_interval_ms * 2 {
129 return 0f64;
131 }
132
133 self.previous(self.red_or_blue()).get(key) as f64 * 1000.0 / self.reset_interval_ms as f64
134 }
135
136 pub fn observe<T: Hash>(&self, key: &T, events: isize) -> isize {
138 self.maybe_reset();
139 self.current(self.red_or_blue()).incr(key, events)
140 }
141
142 fn maybe_reset(&self) -> u64 {
144 let now = Instant::now().duration_since(self.start).as_millis() as u64;
146 let last_reset = self.last_reset_time.load(Ordering::SeqCst);
147 let past_ms = now - last_reset;
148
149 if past_ms < self.reset_interval_ms {
150 return past_ms;
152 }
153 let red_or_blue = self.red_or_blue();
154 match self.last_reset_time.compare_exchange(
155 last_reset,
156 now,
157 Ordering::SeqCst,
158 Ordering::Acquire,
159 ) {
160 Ok(_) => {
161 self.previous(red_or_blue).reset();
163 self.red_or_blue.store(!red_or_blue, Ordering::SeqCst);
165 if now - last_reset >= self.reset_interval_ms * 2 {
168 self.current(red_or_blue).reset();
170 }
171 }
172 Err(new) => {
173 assert!(new >= now - 1000); }
176 }
177
178 past_ms
179 }
180
181 pub fn rate_with<F, T, K>(&self, key: &K, mut rate_calc_fn: F) -> T
192 where
193 F: FnMut(RateComponents) -> T,
194 K: Hash,
195 {
196 let past_ms = self.maybe_reset();
197
198 let (prev_samples, curr_samples) = if past_ms >= self.reset_interval_ms * 2 {
199 (0, 0)
201 } else if past_ms >= self.reset_interval_ms {
202 (self.previous(self.red_or_blue()).get(key), 0)
203 } else {
204 let (prev_est, curr_est) = if self.red_or_blue() {
205 (&self.blue_slot, &self.red_slot)
206 } else {
207 (&self.red_slot, &self.blue_slot)
208 };
209
210 (prev_est.get(key), curr_est.get(key))
211 };
212
213 rate_calc_fn(RateComponents {
214 interval: self.interval,
215 prev_samples,
216 curr_samples,
217 current_interval_fraction: (past_ms % self.reset_interval_ms) as f64
218 / self.reset_interval_ms as f64,
219 })
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use float_cmp::assert_approx_eq;
226
227 use super::*;
228 use std::thread::sleep;
229 use std::time::Duration;
230
231 #[test]
232 fn test_observe_rate() {
233 let r = Rate::new(Duration::from_secs(1));
234 let key = 1;
235
236 let observed = r.observe(&key, 3);
238 assert_eq!(observed, 3);
239 let observed = r.observe(&key, 2);
240 assert_eq!(observed, 5);
241 assert_eq!(r.rate(&key), 0f64); sleep(Duration::from_secs(1));
245 let observed = r.observe(&key, 4);
246 assert_eq!(observed, 4);
247 assert_eq!(r.rate(&key), 5f64); sleep(Duration::from_secs(1));
251 assert_eq!(r.rate(&key), 4f64);
252
253 sleep(Duration::from_secs(1));
255 assert_eq!(r.rate(&key), 0f64); }
257
258 fn assert_eq_ish(left: f64, right: f64) {
263 assert_approx_eq!(f64, left, right, epsilon = 0.15)
264 }
265
266 #[test]
267 fn test_observe_rate_custom_90_10() {
268 let r = Rate::new(Duration::from_secs(1));
269 let key = 1;
270
271 let rate_90_10_fn = |rate_info: RateComponents| {
272 let prev = rate_info.prev_samples as f64;
273 let curr = rate_info.curr_samples as f64;
274 (prev * 0.1 + curr * 0.9) / rate_info.interval.as_secs_f64()
275 };
276
277 let observed = r.observe(&key, 3);
279 assert_eq!(observed, 3);
280 let observed = r.observe(&key, 2);
281 assert_eq!(observed, 5);
282 assert_eq!(r.rate_with(&key, rate_90_10_fn), 5. * 0.9);
283
284 sleep(Duration::from_secs(1));
286 let observed = r.observe(&key, 4);
287 assert_eq!(observed, 4);
288 assert_eq!(r.rate_with(&key, rate_90_10_fn), 5. * 0.1 + 4. * 0.9);
289
290 sleep(Duration::from_secs(1));
292 assert_eq!(r.rate_with(&key, rate_90_10_fn), 4. * 0.1);
293
294 sleep(Duration::from_secs(1));
296 assert_eq!(r.rate_with(&key, rate_90_10_fn), 0f64);
297 }
298
299 #[test]
300 fn test_observe_rate_custom_proportional() {
301 let r = Rate::new(Duration::from_secs(1));
302 let key = 1;
303
304 let observed = r.observe(&key, 3);
306 assert_eq!(observed, 3);
307 let observed = r.observe(&key, 2);
308 assert_eq!(observed, 5);
309 assert_eq_ish(r.rate_with(&key, PROPORTIONAL_RATE_ESTIMATE_CALC_FN), 5.);
310
311 sleep(Duration::from_secs_f64(0.5));
313 assert_eq_ish(r.rate_with(&key, PROPORTIONAL_RATE_ESTIMATE_CALC_FN), 5.);
314 assert_eq_ish(r.rate(&key), 0.);
316
317 sleep(Duration::from_secs_f64(0.5));
319 let observed = r.observe(&key, 4);
320 assert_eq!(observed, 4);
321 assert_eq_ish(r.rate_with(&key, PROPORTIONAL_RATE_ESTIMATE_CALC_FN), 9.);
322
323 sleep(Duration::from_secs_f64(0.75));
325 assert_eq_ish(
326 r.rate_with(&key, PROPORTIONAL_RATE_ESTIMATE_CALC_FN),
327 5. * 0.25 + 4.,
328 );
329
330 sleep(Duration::from_secs_f64(0.25));
332 assert_eq_ish(r.rate_with(&key, PROPORTIONAL_RATE_ESTIMATE_CALC_FN), 4.);
333 assert_eq_ish(r.rate(&key), 4.);
334
335 sleep(Duration::from_secs_f64(0.5));
337 assert_eq_ish(
338 r.rate_with(&key, PROPORTIONAL_RATE_ESTIMATE_CALC_FN),
339 4. / 2.,
340 );
341 assert_eq_ish(r.rate(&key), 4.);
342
343 sleep(Duration::from_secs(1));
345 assert_eq!(r.rate_with(&key, PROPORTIONAL_RATE_ESTIMATE_CALC_FN), 0f64);
346 }
347}