pingora_limits/
rate.rs

1// Copyright 2024 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The rate module defines the [Rate] type that helps estimate the occurrence of events over a
16//! period of time.
17
18use crate::estimator::Estimator;
19use std::hash::Hash;
20use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
21use std::time::{Duration, Instant};
22
23/// Input struct to custom functions for calculating rate. Includes the counts
24/// from the current interval, previous interval, the configured duration of an
25/// interval, and the fraction into the current interval that the sample was
26/// taken.
27///
28/// Ex. If the interval to the Rate instance is `10s`, and the rate calculation
29/// is taken at 2 seconds after the start of the current interval, then the
30/// fraction of the current interval returned in this struct will be `0.2`
31/// meaning 20% of the current interval has elapsed
32#[non_exhaustive]
33#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
34pub struct RateComponents {
35    pub prev_samples: isize,
36    pub curr_samples: isize,
37    pub interval: Duration,
38    pub current_interval_fraction: f64,
39}
40
41/// A stable rate estimator that reports the rate of events in the past `interval` time.
42/// It returns the average rate between `interval` * 2 and `interval` while collecting the events
43/// happening between `interval` and now.
44///
45/// This estimator ignores events that happen less than once per `interval` time.
46pub struct Rate {
47    // 2 slots so that we use one to collect the current events and the other to report rate
48    red_slot: Estimator,
49    blue_slot: Estimator,
50    red_or_blue: AtomicBool, // true: the current slot is red, otherwise blue
51    start: Instant,
52    // Use u64 below instead of Instant because we want atomic operation
53    reset_interval_ms: u64, // the time interval to reset `current` and move it to `previous`
54    last_reset_time: AtomicU64, // the timestamp in ms since `start`
55    interval: Duration,
56}
57
58// see inflight module for the meaning for these numbers
59const HASHES: usize = 4;
60const SLOTS: usize = 1024; // This value can be lower if interval is short (key cardinality is low)
61
62impl Rate {
63    /// Create a new `Rate` with the given interval.
64    pub fn new(interval: std::time::Duration) -> Self {
65        Rate::new_with_estimator_config(interval, HASHES, SLOTS)
66    }
67
68    /// Create a new `Rate` with the given interval and Estimator config with the given amount of hashes and columns (slots).
69    #[inline]
70    pub fn new_with_estimator_config(
71        interval: std::time::Duration,
72        hashes: usize,
73        slots: usize,
74    ) -> Self {
75        Rate {
76            red_slot: Estimator::new(hashes, slots),
77            blue_slot: Estimator::new(hashes, slots),
78            red_or_blue: AtomicBool::new(true),
79            start: Instant::now(),
80            reset_interval_ms: interval.as_millis() as u64, // should be small not to overflow
81            last_reset_time: AtomicU64::new(0),
82            interval,
83        }
84    }
85
86    fn current(&self, red_or_blue: bool) -> &Estimator {
87        if red_or_blue {
88            &self.red_slot
89        } else {
90            &self.blue_slot
91        }
92    }
93
94    fn previous(&self, red_or_blue: bool) -> &Estimator {
95        if red_or_blue {
96            &self.blue_slot
97        } else {
98            &self.red_slot
99        }
100    }
101
102    fn red_or_blue(&self) -> bool {
103        self.red_or_blue.load(Ordering::SeqCst)
104    }
105
106    /// Return the per second rate estimation.
107    pub fn rate<T: Hash>(&self, key: &T) -> f64 {
108        let past_ms = self.maybe_reset();
109        if past_ms >= self.reset_interval_ms * 2 {
110            // already missed 2 intervals, no data, just report 0 as a short cut
111            return 0f64;
112        }
113
114        self.previous(self.red_or_blue()).get(key) as f64 / self.reset_interval_ms as f64 * 1000.0
115    }
116
117    /// Report new events and return number of events seen so far in the current interval.
118    pub fn observe<T: Hash>(&self, key: &T, events: isize) -> isize {
119        self.maybe_reset();
120        self.current(self.red_or_blue()).incr(key, events)
121    }
122
123    // reset if needed, return the time since last reset for other fn to use
124    fn maybe_reset(&self) -> u64 {
125        // should be short enough not to overflow
126        let now = Instant::now().duration_since(self.start).as_millis() as u64;
127        let last_reset = self.last_reset_time.load(Ordering::SeqCst);
128        let past_ms = now - last_reset;
129
130        if past_ms < self.reset_interval_ms {
131            // no need to reset
132            return past_ms;
133        }
134        let red_or_blue = self.red_or_blue();
135        match self.last_reset_time.compare_exchange(
136            last_reset,
137            now,
138            Ordering::SeqCst,
139            Ordering::Acquire,
140        ) {
141            Ok(_) => {
142                // first clear the previous slot
143                self.previous(red_or_blue).reset();
144                // then flip the flag to tell others to use the reset slot
145                self.red_or_blue.store(!red_or_blue, Ordering::SeqCst);
146                // if current time is beyond 2 intervals, the data stored in the previous slot
147                // is also stale, we should clear that too
148                if now - last_reset >= self.reset_interval_ms * 2 {
149                    // Note that this is the previous one now because we just flipped self.red_or_blue
150                    self.current(red_or_blue).reset();
151                }
152            }
153            Err(new) => {
154                // another thread beats us to it
155                assert!(new >= now - 1000); // double check that the new timestamp looks right
156            }
157        }
158
159        past_ms
160    }
161
162    /// Get the current rate as calculated with the given closure. This closure
163    /// will take an argument containing all the accessible information about
164    /// the rate from this object and allow the caller to make their own
165    /// estimation of rate based on:
166    ///
167    /// 1. The accumulated samples in the current interval (in progress)
168    /// 2. The accumulated samples in the previous interval (completed)
169    /// 3. The size of the interval
170    /// 4. Elapsed fraction of current interval for this sample (0..1)
171    ///
172    pub fn rate_with<F, T, K>(&self, key: &K, mut rate_calc_fn: F) -> T
173    where
174        F: FnMut(RateComponents) -> T,
175        K: Hash,
176    {
177        let past_ms = self.maybe_reset();
178
179        let (prev_samples, curr_samples) = if past_ms >= self.reset_interval_ms * 2 {
180            // already missed 2 intervals, no data, just report 0 as a short cut
181            (0, 0)
182        } else if past_ms >= self.reset_interval_ms {
183            (self.previous(self.red_or_blue()).get(key), 0)
184        } else {
185            let (prev_est, curr_est) = if self.red_or_blue() {
186                (&self.blue_slot, &self.red_slot)
187            } else {
188                (&self.red_slot, &self.blue_slot)
189            };
190
191            (prev_est.get(key), curr_est.get(key))
192        };
193
194        rate_calc_fn(RateComponents {
195            interval: self.interval,
196            prev_samples,
197            curr_samples,
198            current_interval_fraction: (past_ms % self.reset_interval_ms) as f64
199                / self.reset_interval_ms as f64,
200        })
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use float_cmp::assert_approx_eq;
207
208    use super::*;
209    use std::thread::sleep;
210    use std::time::Duration;
211
212    #[test]
213    fn test_observe_rate() {
214        let r = Rate::new(Duration::from_secs(1));
215        let key = 1;
216
217        // second: 0
218        let observed = r.observe(&key, 3);
219        assert_eq!(observed, 3);
220        let observed = r.observe(&key, 2);
221        assert_eq!(observed, 5);
222        assert_eq!(r.rate(&key), 0f64); // no estimation yet because the interval has not passed
223
224        // second: 1
225        sleep(Duration::from_secs(1));
226        let observed = r.observe(&key, 4);
227        assert_eq!(observed, 4);
228        assert_eq!(r.rate(&key), 5f64); // 5 rps
229
230        // second: 2
231        sleep(Duration::from_secs(1));
232        assert_eq!(r.rate(&key), 4f64);
233
234        // second: 3
235        sleep(Duration::from_secs(1));
236        assert_eq!(r.rate(&key), 0f64); // no event observed in the past 2 seconds
237    }
238
239    /// Assertion that 2 numbers are close within a generous margin. These
240    /// tests are doing a lot of literal sleeping, so the measured results
241    /// can't be accurate or consistent. This function does an assert with a
242    /// generous tolerance
243    fn assert_eq_ish(left: f64, right: f64) {
244        assert_approx_eq!(f64, left, right, epsilon = 0.15)
245    }
246
247    #[test]
248    fn test_observe_rate_custom_90_10() {
249        let r = Rate::new(Duration::from_secs(1));
250        let key = 1;
251
252        let rate_90_10_fn = |rate_info: RateComponents| {
253            let prev = rate_info.prev_samples as f64;
254            let curr = rate_info.curr_samples as f64;
255            (prev * 0.1 + curr * 0.9) / rate_info.interval.as_secs_f64()
256        };
257
258        // second: 0
259        let observed = r.observe(&key, 3);
260        assert_eq!(observed, 3);
261        let observed = r.observe(&key, 2);
262        assert_eq!(observed, 5);
263        assert_eq!(r.rate_with(&key, rate_90_10_fn), 5. * 0.9);
264
265        // second: 1
266        sleep(Duration::from_secs(1));
267        let observed = r.observe(&key, 4);
268        assert_eq!(observed, 4);
269        assert_eq!(r.rate_with(&key, rate_90_10_fn), 5. * 0.1 + 4. * 0.9);
270
271        // second: 2
272        sleep(Duration::from_secs(1));
273        assert_eq!(r.rate_with(&key, rate_90_10_fn), 4. * 0.1);
274
275        // second: 3
276        sleep(Duration::from_secs(1));
277        assert_eq!(r.rate_with(&key, rate_90_10_fn), 0f64);
278    }
279
280    // this is the function described in this post
281    // https://blog.cloudflare.com/counting-things-a-lot-of-different-things/
282    #[test]
283    fn test_observe_rate_custom_proportional() {
284        let r = Rate::new(Duration::from_secs(1));
285        let key = 1;
286
287        let rate_prop_fn = |rate_info: RateComponents| {
288            let prev = rate_info.prev_samples as f64;
289            let curr = rate_info.curr_samples as f64;
290            let interval_secs = rate_info.interval.as_secs_f64();
291            let interval_fraction = rate_info.current_interval_fraction;
292
293            let weighted_count = prev * (1. - interval_fraction) + curr * interval_fraction;
294            weighted_count / interval_secs
295        };
296
297        // second: 0
298        let observed = r.observe(&key, 3);
299        assert_eq!(observed, 3);
300        let observed = r.observe(&key, 2);
301        assert_eq!(observed, 5);
302        assert_eq_ish(r.rate_with(&key, rate_prop_fn), 0.);
303
304        // second 0.5
305        sleep(Duration::from_secs_f64(0.5));
306        assert_eq_ish(r.rate_with(&key, rate_prop_fn), 5. * 0.5);
307
308        // second: 1
309        sleep(Duration::from_secs_f64(0.5));
310        let observed = r.observe(&key, 4);
311        assert_eq!(observed, 4);
312        assert_eq_ish(r.rate_with(&key, rate_prop_fn), 5.);
313
314        // second 1.75
315        sleep(Duration::from_secs_f64(0.75));
316        assert_eq_ish(r.rate_with(&key, rate_prop_fn), 5. * 0.25 + 4. * 0.75);
317
318        // second: 2
319        sleep(Duration::from_secs_f64(0.25));
320        assert_eq_ish(r.rate_with(&key, rate_prop_fn), 4.);
321
322        // second: 3
323        sleep(Duration::from_secs(1));
324        assert_eq!(r.rate_with(&key, rate_prop_fn), 0f64);
325    }
326}