Skip to main content

pingora_limits/
rate.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The rate module defines the [Rate] type that helps estimate the occurrence of events over a
16//! period of time.
17
18use crate::estimator::Estimator;
19use std::hash::Hash;
20use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
21use std::time::{Duration, Instant};
22
23/// Input struct to custom functions for calculating rate. Includes the counts
24/// from the current interval, previous interval, the configured duration of an
25/// interval, and the fraction into the current interval that the sample was
26/// taken.
27///
28/// Ex. If the interval to the Rate instance is `10s`, and the rate calculation
29/// is taken at 2 seconds after the start of the current interval, then the
30/// fraction of the current interval returned in this struct will be `0.2`
31/// meaning 20% of the current interval has elapsed
32#[non_exhaustive]
33#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
34pub struct RateComponents {
35    pub prev_samples: isize,
36    pub curr_samples: isize,
37    pub interval: Duration,
38    pub current_interval_fraction: f64,
39}
40
41/// A rate calculation function which uses a good estimate of the rate of events over the past
42/// `interval` time.
43///
44/// Specifically, it linearly interpolates between the event counts of the previous and current
45/// periods based on how far into the current period we are, as described in this post:
46/// <https://blog.cloudflare.com/counting-things-a-lot-of-different-things/>
47#[allow(dead_code)]
48pub static PROPORTIONAL_RATE_ESTIMATE_CALC_FN: fn(RateComponents) -> f64 =
49    |rate_info: RateComponents| {
50        let prev = rate_info.prev_samples as f64;
51        let curr = rate_info.curr_samples as f64;
52        let interval_secs = rate_info.interval.as_secs_f64();
53        let interval_fraction = rate_info.current_interval_fraction;
54
55        let weighted_count = prev * (1. - interval_fraction) + curr;
56        weighted_count / interval_secs
57    };
58
59/// A stable rate estimator that reports the rate of events per period of `interval` time.
60///
61/// It counts events for periods of `interval` and returns the average rate of the latest completed
62/// period while counting events for the current (partial) period.
63pub struct Rate {
64    // 2 slots so that we use one to collect the current events and the other to report rate
65    red_slot: Estimator,
66    blue_slot: Estimator,
67    red_or_blue: AtomicBool, // true: the current slot is red, otherwise blue
68    start: Instant,
69    // Use u64 below instead of Instant because we want atomic operation
70    reset_interval_ms: u64, // the time interval to reset `current` and move it to `previous`
71    last_reset_time: AtomicU64, // the timestamp in ms since `start`
72    interval: Duration,
73}
74
75// see inflight module for the meaning for these numbers
76const HASHES: usize = 4;
77const SLOTS: usize = 1024; // This value can be lower if interval is short (key cardinality is low)
78
79impl Rate {
80    /// Create a new `Rate` with the given interval.
81    pub fn new(interval: std::time::Duration) -> Self {
82        Rate::new_with_estimator_config(interval, HASHES, SLOTS)
83    }
84
85    /// Create a new `Rate` with the given interval and Estimator config with the given amount of hashes and columns (slots).
86    #[inline]
87    pub fn new_with_estimator_config(
88        interval: std::time::Duration,
89        hashes: usize,
90        slots: usize,
91    ) -> Self {
92        Rate {
93            red_slot: Estimator::new(hashes, slots),
94            blue_slot: Estimator::new(hashes, slots),
95            red_or_blue: AtomicBool::new(true),
96            start: Instant::now(),
97            reset_interval_ms: interval.as_millis() as u64, // should be small not to overflow
98            last_reset_time: AtomicU64::new(0),
99            interval,
100        }
101    }
102
103    fn current(&self, red_or_blue: bool) -> &Estimator {
104        if red_or_blue {
105            &self.red_slot
106        } else {
107            &self.blue_slot
108        }
109    }
110
111    fn previous(&self, red_or_blue: bool) -> &Estimator {
112        if red_or_blue {
113            &self.blue_slot
114        } else {
115            &self.red_slot
116        }
117    }
118
119    fn red_or_blue(&self) -> bool {
120        self.red_or_blue.load(Ordering::SeqCst)
121    }
122
123    /// Return the per second rate estimation.
124    ///
125    /// This is the average rate of the latest completed period of length `interval`.
126    pub fn rate<T: Hash>(&self, key: &T) -> f64 {
127        let past_ms = self.maybe_reset();
128        if past_ms >= self.reset_interval_ms * 2 {
129            // already missed 2 intervals, no data, just report 0 as a short cut
130            return 0f64;
131        }
132
133        self.previous(self.red_or_blue()).get(key) as f64 * 1000.0 / self.reset_interval_ms as f64
134    }
135
136    /// Report new events and return number of events seen so far in the current interval.
137    pub fn observe<T: Hash>(&self, key: &T, events: isize) -> isize {
138        self.maybe_reset();
139        self.current(self.red_or_blue()).incr(key, events)
140    }
141
142    // reset if needed, return the time since last reset for other fn to use
143    fn maybe_reset(&self) -> u64 {
144        // should be short enough not to overflow
145        let now = Instant::now().duration_since(self.start).as_millis() as u64;
146        let last_reset = self.last_reset_time.load(Ordering::SeqCst);
147        let past_ms = now - last_reset;
148
149        if past_ms < self.reset_interval_ms {
150            // no need to reset
151            return past_ms;
152        }
153        let red_or_blue = self.red_or_blue();
154        match self.last_reset_time.compare_exchange(
155            last_reset,
156            now,
157            Ordering::SeqCst,
158            Ordering::Acquire,
159        ) {
160            Ok(_) => {
161                // first clear the previous slot
162                self.previous(red_or_blue).reset();
163                // then flip the flag to tell others to use the reset slot
164                self.red_or_blue.store(!red_or_blue, Ordering::SeqCst);
165                // if current time is beyond 2 intervals, the data stored in the previous slot
166                // is also stale, we should clear that too
167                if now - last_reset >= self.reset_interval_ms * 2 {
168                    // Note that this is the previous one now because we just flipped self.red_or_blue
169                    self.current(red_or_blue).reset();
170                }
171            }
172            Err(new) => {
173                // another thread beats us to it
174                assert!(new >= now - 1000); // double check that the new timestamp looks right
175            }
176        }
177
178        past_ms
179    }
180
181    /// Get the current rate as calculated with the given closure. This closure
182    /// will take an argument containing all the accessible information about
183    /// the rate from this object and allow the caller to make their own
184    /// estimation of rate based on:
185    ///
186    /// 1. The accumulated samples in the current interval (in progress)
187    /// 2. The accumulated samples in the previous interval (completed)
188    /// 3. The size of the interval
189    /// 4. Elapsed fraction of current interval for this sample (0..1)
190    ///
191    pub fn rate_with<F, T, K>(&self, key: &K, mut rate_calc_fn: F) -> T
192    where
193        F: FnMut(RateComponents) -> T,
194        K: Hash,
195    {
196        let past_ms = self.maybe_reset();
197
198        let (prev_samples, curr_samples) = if past_ms >= self.reset_interval_ms * 2 {
199            // already missed 2 intervals, no data, just report 0 as a short cut
200            (0, 0)
201        } else if past_ms >= self.reset_interval_ms {
202            (self.previous(self.red_or_blue()).get(key), 0)
203        } else {
204            let (prev_est, curr_est) = if self.red_or_blue() {
205                (&self.blue_slot, &self.red_slot)
206            } else {
207                (&self.red_slot, &self.blue_slot)
208            };
209
210            (prev_est.get(key), curr_est.get(key))
211        };
212
213        rate_calc_fn(RateComponents {
214            interval: self.interval,
215            prev_samples,
216            curr_samples,
217            current_interval_fraction: (past_ms % self.reset_interval_ms) as f64
218                / self.reset_interval_ms as f64,
219        })
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use float_cmp::assert_approx_eq;
226
227    use super::*;
228    use std::thread::sleep;
229    use std::time::Duration;
230
231    #[test]
232    fn test_observe_rate() {
233        let r = Rate::new(Duration::from_secs(1));
234        let key = 1;
235
236        // second: 0
237        let observed = r.observe(&key, 3);
238        assert_eq!(observed, 3);
239        let observed = r.observe(&key, 2);
240        assert_eq!(observed, 5);
241        assert_eq!(r.rate(&key), 0f64); // no estimation yet because the interval has not passed
242
243        // second: 1
244        sleep(Duration::from_secs(1));
245        let observed = r.observe(&key, 4);
246        assert_eq!(observed, 4);
247        assert_eq!(r.rate(&key), 5f64); // 5 rps
248
249        // second: 2
250        sleep(Duration::from_secs(1));
251        assert_eq!(r.rate(&key), 4f64);
252
253        // second: 3
254        sleep(Duration::from_secs(1));
255        assert_eq!(r.rate(&key), 0f64); // no event observed in the past 2 seconds
256    }
257
258    /// Assertion that 2 numbers are close within a generous margin. These
259    /// tests are doing a lot of literal sleeping, so the measured results
260    /// can't be accurate or consistent. This function does an assert with a
261    /// generous tolerance
262    fn assert_eq_ish(left: f64, right: f64) {
263        assert_approx_eq!(f64, left, right, epsilon = 0.15)
264    }
265
266    #[test]
267    fn test_observe_rate_custom_90_10() {
268        let r = Rate::new(Duration::from_secs(1));
269        let key = 1;
270
271        let rate_90_10_fn = |rate_info: RateComponents| {
272            let prev = rate_info.prev_samples as f64;
273            let curr = rate_info.curr_samples as f64;
274            (prev * 0.1 + curr * 0.9) / rate_info.interval.as_secs_f64()
275        };
276
277        // second: 0
278        let observed = r.observe(&key, 3);
279        assert_eq!(observed, 3);
280        let observed = r.observe(&key, 2);
281        assert_eq!(observed, 5);
282        assert_eq!(r.rate_with(&key, rate_90_10_fn), 5. * 0.9);
283
284        // second: 1
285        sleep(Duration::from_secs(1));
286        let observed = r.observe(&key, 4);
287        assert_eq!(observed, 4);
288        assert_eq!(r.rate_with(&key, rate_90_10_fn), 5. * 0.1 + 4. * 0.9);
289
290        // second: 2
291        sleep(Duration::from_secs(1));
292        assert_eq!(r.rate_with(&key, rate_90_10_fn), 4. * 0.1);
293
294        // second: 3
295        sleep(Duration::from_secs(1));
296        assert_eq!(r.rate_with(&key, rate_90_10_fn), 0f64);
297    }
298
299    #[test]
300    fn test_observe_rate_custom_proportional() {
301        let r = Rate::new(Duration::from_secs(1));
302        let key = 1;
303
304        // second: 0
305        let observed = r.observe(&key, 3);
306        assert_eq!(observed, 3);
307        let observed = r.observe(&key, 2);
308        assert_eq!(observed, 5);
309        assert_eq_ish(r.rate_with(&key, PROPORTIONAL_RATE_ESTIMATE_CALC_FN), 5.);
310
311        // second 0.5
312        sleep(Duration::from_secs_f64(0.5));
313        assert_eq_ish(r.rate_with(&key, PROPORTIONAL_RATE_ESTIMATE_CALC_FN), 5.);
314        // rate() just looks at the previous interval, ignores current interval
315        assert_eq_ish(r.rate(&key), 0.);
316
317        // second: 1
318        sleep(Duration::from_secs_f64(0.5));
319        let observed = r.observe(&key, 4);
320        assert_eq!(observed, 4);
321        assert_eq_ish(r.rate_with(&key, PROPORTIONAL_RATE_ESTIMATE_CALC_FN), 9.);
322
323        // second 1.75
324        sleep(Duration::from_secs_f64(0.75));
325        assert_eq_ish(
326            r.rate_with(&key, PROPORTIONAL_RATE_ESTIMATE_CALC_FN),
327            5. * 0.25 + 4.,
328        );
329
330        // second: 2
331        sleep(Duration::from_secs_f64(0.25));
332        assert_eq_ish(r.rate_with(&key, PROPORTIONAL_RATE_ESTIMATE_CALC_FN), 4.);
333        assert_eq_ish(r.rate(&key), 4.);
334
335        // second: 2.5
336        sleep(Duration::from_secs_f64(0.5));
337        assert_eq_ish(
338            r.rate_with(&key, PROPORTIONAL_RATE_ESTIMATE_CALC_FN),
339            4. / 2.,
340        );
341        assert_eq_ish(r.rate(&key), 4.);
342
343        // second: 3
344        sleep(Duration::from_secs(1));
345        assert_eq!(r.rate_with(&key, PROPORTIONAL_RATE_ESTIMATE_CALC_FN), 0f64);
346    }
347}