1use crate::estimator::Estimator;
19use std::hash::Hash;
20use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
21use std::time::{Duration, Instant};
22
23#[non_exhaustive]
33#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
34pub struct RateComponents {
35 pub prev_samples: isize,
36 pub curr_samples: isize,
37 pub interval: Duration,
38 pub current_interval_fraction: f64,
39}
40
41pub struct Rate {
47 red_slot: Estimator,
49 blue_slot: Estimator,
50 red_or_blue: AtomicBool, start: Instant,
52 reset_interval_ms: u64, last_reset_time: AtomicU64, interval: Duration,
56}
57
58const HASHES: usize = 4;
60const SLOTS: usize = 1024; impl Rate {
63 pub fn new(interval: std::time::Duration) -> Self {
65 Rate::new_with_estimator_config(interval, HASHES, SLOTS)
66 }
67
68 #[inline]
70 pub fn new_with_estimator_config(
71 interval: std::time::Duration,
72 hashes: usize,
73 slots: usize,
74 ) -> Self {
75 Rate {
76 red_slot: Estimator::new(hashes, slots),
77 blue_slot: Estimator::new(hashes, slots),
78 red_or_blue: AtomicBool::new(true),
79 start: Instant::now(),
80 reset_interval_ms: interval.as_millis() as u64, last_reset_time: AtomicU64::new(0),
82 interval,
83 }
84 }
85
86 fn current(&self, red_or_blue: bool) -> &Estimator {
87 if red_or_blue {
88 &self.red_slot
89 } else {
90 &self.blue_slot
91 }
92 }
93
94 fn previous(&self, red_or_blue: bool) -> &Estimator {
95 if red_or_blue {
96 &self.blue_slot
97 } else {
98 &self.red_slot
99 }
100 }
101
102 fn red_or_blue(&self) -> bool {
103 self.red_or_blue.load(Ordering::SeqCst)
104 }
105
106 pub fn rate<T: Hash>(&self, key: &T) -> f64 {
108 let past_ms = self.maybe_reset();
109 if past_ms >= self.reset_interval_ms * 2 {
110 return 0f64;
112 }
113
114 self.previous(self.red_or_blue()).get(key) as f64 / self.reset_interval_ms as f64 * 1000.0
115 }
116
117 pub fn observe<T: Hash>(&self, key: &T, events: isize) -> isize {
119 self.maybe_reset();
120 self.current(self.red_or_blue()).incr(key, events)
121 }
122
123 fn maybe_reset(&self) -> u64 {
125 let now = Instant::now().duration_since(self.start).as_millis() as u64;
127 let last_reset = self.last_reset_time.load(Ordering::SeqCst);
128 let past_ms = now - last_reset;
129
130 if past_ms < self.reset_interval_ms {
131 return past_ms;
133 }
134 let red_or_blue = self.red_or_blue();
135 match self.last_reset_time.compare_exchange(
136 last_reset,
137 now,
138 Ordering::SeqCst,
139 Ordering::Acquire,
140 ) {
141 Ok(_) => {
142 self.previous(red_or_blue).reset();
144 self.red_or_blue.store(!red_or_blue, Ordering::SeqCst);
146 if now - last_reset >= self.reset_interval_ms * 2 {
149 self.current(red_or_blue).reset();
151 }
152 }
153 Err(new) => {
154 assert!(new >= now - 1000); }
157 }
158
159 past_ms
160 }
161
162 pub fn rate_with<F, T, K>(&self, key: &K, mut rate_calc_fn: F) -> T
173 where
174 F: FnMut(RateComponents) -> T,
175 K: Hash,
176 {
177 let past_ms = self.maybe_reset();
178
179 let (prev_samples, curr_samples) = if past_ms >= self.reset_interval_ms * 2 {
180 (0, 0)
182 } else if past_ms >= self.reset_interval_ms {
183 (self.previous(self.red_or_blue()).get(key), 0)
184 } else {
185 let (prev_est, curr_est) = if self.red_or_blue() {
186 (&self.blue_slot, &self.red_slot)
187 } else {
188 (&self.red_slot, &self.blue_slot)
189 };
190
191 (prev_est.get(key), curr_est.get(key))
192 };
193
194 rate_calc_fn(RateComponents {
195 interval: self.interval,
196 prev_samples,
197 curr_samples,
198 current_interval_fraction: (past_ms % self.reset_interval_ms) as f64
199 / self.reset_interval_ms as f64,
200 })
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use float_cmp::assert_approx_eq;
207
208 use super::*;
209 use std::thread::sleep;
210 use std::time::Duration;
211
212 #[test]
213 fn test_observe_rate() {
214 let r = Rate::new(Duration::from_secs(1));
215 let key = 1;
216
217 let observed = r.observe(&key, 3);
219 assert_eq!(observed, 3);
220 let observed = r.observe(&key, 2);
221 assert_eq!(observed, 5);
222 assert_eq!(r.rate(&key), 0f64); sleep(Duration::from_secs(1));
226 let observed = r.observe(&key, 4);
227 assert_eq!(observed, 4);
228 assert_eq!(r.rate(&key), 5f64); sleep(Duration::from_secs(1));
232 assert_eq!(r.rate(&key), 4f64);
233
234 sleep(Duration::from_secs(1));
236 assert_eq!(r.rate(&key), 0f64); }
238
239 fn assert_eq_ish(left: f64, right: f64) {
244 assert_approx_eq!(f64, left, right, epsilon = 0.15)
245 }
246
247 #[test]
248 fn test_observe_rate_custom_90_10() {
249 let r = Rate::new(Duration::from_secs(1));
250 let key = 1;
251
252 let rate_90_10_fn = |rate_info: RateComponents| {
253 let prev = rate_info.prev_samples as f64;
254 let curr = rate_info.curr_samples as f64;
255 (prev * 0.1 + curr * 0.9) / rate_info.interval.as_secs_f64()
256 };
257
258 let observed = r.observe(&key, 3);
260 assert_eq!(observed, 3);
261 let observed = r.observe(&key, 2);
262 assert_eq!(observed, 5);
263 assert_eq!(r.rate_with(&key, rate_90_10_fn), 5. * 0.9);
264
265 sleep(Duration::from_secs(1));
267 let observed = r.observe(&key, 4);
268 assert_eq!(observed, 4);
269 assert_eq!(r.rate_with(&key, rate_90_10_fn), 5. * 0.1 + 4. * 0.9);
270
271 sleep(Duration::from_secs(1));
273 assert_eq!(r.rate_with(&key, rate_90_10_fn), 4. * 0.1);
274
275 sleep(Duration::from_secs(1));
277 assert_eq!(r.rate_with(&key, rate_90_10_fn), 0f64);
278 }
279
280 #[test]
283 fn test_observe_rate_custom_proportional() {
284 let r = Rate::new(Duration::from_secs(1));
285 let key = 1;
286
287 let rate_prop_fn = |rate_info: RateComponents| {
288 let prev = rate_info.prev_samples as f64;
289 let curr = rate_info.curr_samples as f64;
290 let interval_secs = rate_info.interval.as_secs_f64();
291 let interval_fraction = rate_info.current_interval_fraction;
292
293 let weighted_count = prev * (1. - interval_fraction) + curr * interval_fraction;
294 weighted_count / interval_secs
295 };
296
297 let observed = r.observe(&key, 3);
299 assert_eq!(observed, 3);
300 let observed = r.observe(&key, 2);
301 assert_eq!(observed, 5);
302 assert_eq_ish(r.rate_with(&key, rate_prop_fn), 0.);
303
304 sleep(Duration::from_secs_f64(0.5));
306 assert_eq_ish(r.rate_with(&key, rate_prop_fn), 5. * 0.5);
307
308 sleep(Duration::from_secs_f64(0.5));
310 let observed = r.observe(&key, 4);
311 assert_eq!(observed, 4);
312 assert_eq_ish(r.rate_with(&key, rate_prop_fn), 5.);
313
314 sleep(Duration::from_secs_f64(0.75));
316 assert_eq_ish(r.rate_with(&key, rate_prop_fn), 5. * 0.25 + 4. * 0.75);
317
318 sleep(Duration::from_secs_f64(0.25));
320 assert_eq_ish(r.rate_with(&key, rate_prop_fn), 4.);
321
322 sleep(Duration::from_secs(1));
324 assert_eq!(r.rate_with(&key, rate_prop_fn), 0f64);
325 }
326}