Skip to main content

libdd_common/
rate_limiter.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::atomic::{AtomicI64, AtomicU32, AtomicU64, Ordering};
5
6pub trait Limiter {
7    /// Takes the limit per interval.
8    /// Returns false if the limit is exceeded, otherwise true.
9    fn inc(&self, limit: u32) -> bool;
10    /// Returns the effective rate per interval.
11    /// Note: The rate is only guaranteed to be accurate immediately after a call to inc().
12    fn rate(&self) -> f64;
13    /// Updates the rate and returns it
14    fn update_rate(&self) -> f64;
15}
16
17/// A thread-safe limiter built on Atomics.
18/// It's base unit is in seconds, i.e. the minimum allowed rate is 1 per second.
19/// Internally the limiter works with the system time granularity, i.e. nanoseconds on unix and
20/// milliseconds on windows.
21/// The implementation is a sliding window: every time the limiter is increased, the amount of time
22/// that has passed is also refilled.
23#[repr(C)]
24pub struct LocalLimiter {
25    hit_count: AtomicI64,
26    last_update: AtomicU64,
27    last_limit: AtomicU32,
28    granularity: i64,
29}
30
31const TIME_PER_SECOND: i64 = 1_000_000_000; // nanoseconds
32
33fn now() -> u64 {
34    #[cfg(windows)]
35    let now = unsafe {
36        static FREQUENCY: AtomicU64 = AtomicU64::new(0);
37
38        let mut frequency = FREQUENCY.load(Ordering::Relaxed);
39        if frequency == 0 {
40            windows_sys::Win32::System::Performance::QueryPerformanceFrequency(
41                &mut frequency as *mut u64 as *mut i64,
42            );
43            FREQUENCY.store(frequency, Ordering::Relaxed);
44        }
45
46        let mut perf_counter = 0;
47        windows_sys::Win32::System::Performance::QueryPerformanceCounter(&mut perf_counter);
48        perf_counter as u64 * frequency / TIME_PER_SECOND as u64
49    };
50    #[cfg(not(windows))]
51    let now = {
52        let mut ts: libc::timespec = libc::timespec {
53            tv_sec: 0,
54            tv_nsec: 0,
55        };
56        unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) };
57        // tv_sec is i32 on 32bit architecture
58        // https://sourceware.org/bugzilla/show_bug.cgi?id=16437
59        #[cfg(target_pointer_width = "32")]
60        {
61            (ts.tv_sec as i64 * TIME_PER_SECOND + ts.tv_nsec as i64) as u64
62        }
63        #[cfg(target_pointer_width = "64")]
64        {
65            (ts.tv_sec * TIME_PER_SECOND + ts.tv_nsec) as u64
66        }
67    };
68    now
69}
70
71impl Default for LocalLimiter {
72    fn default() -> Self {
73        LocalLimiter {
74            hit_count: Default::default(),
75            last_update: AtomicU64::from(now()),
76            last_limit: Default::default(),
77            granularity: TIME_PER_SECOND,
78        }
79    }
80}
81
82impl LocalLimiter {
83    /// Allows setting a custom time granularity. The default() implementation is 1 second.
84    pub fn with_granularity(seconds: u32) -> LocalLimiter {
85        let mut limiter = LocalLimiter::default();
86        limiter.granularity *= seconds as i64;
87        limiter
88    }
89
90    /// Resets, with a given granularity.
91    pub fn reset(&mut self, seconds: u32) {
92        self.last_update.store(now(), Ordering::Relaxed);
93        self.hit_count.store(0, Ordering::Relaxed);
94        self.last_limit.store(0, Ordering::Relaxed);
95        self.granularity = TIME_PER_SECOND * seconds as i64;
96    }
97
98    fn update(&self, limit: u32, inc: i64) -> i64 {
99        let now = now();
100        let last = self.last_update.swap(now, Ordering::SeqCst);
101        // Make sure reducing the limit doesn't stall for a long time
102        let clear_limit = limit.max(self.last_limit.load(Ordering::Relaxed));
103        let clear_counter = (now as i64 - last as i64) * (clear_limit as i64);
104        let subtract = clear_counter - inc;
105        let mut previous_hits = self.hit_count.fetch_sub(subtract, Ordering::SeqCst);
106        // Handle where the limiter goes below zero
107        if previous_hits < subtract {
108            let add = clear_counter - previous_hits.max(0);
109            self.hit_count.fetch_add(add, Ordering::Acquire);
110            previous_hits += add - clear_counter;
111        }
112        previous_hits
113    }
114}
115
116impl Limiter for LocalLimiter {
117    fn inc(&self, limit: u32) -> bool {
118        let previous_hits = self.update(limit, self.granularity);
119        if previous_hits / self.granularity >= limit as i64 {
120            self.hit_count
121                .fetch_sub(self.granularity, Ordering::Acquire);
122            false
123        } else {
124            // We don't care about race conditions here:
125            // If the last limit was high enough to increase the previous_hits, we are anyway close
126            // to a number realistic to decrease the count quickly; i.e. we won't stall the limiter
127            // indefinitely when switching from a high to a low limit.
128            self.last_limit.store(limit, Ordering::Relaxed);
129            true
130        }
131    }
132
133    fn rate(&self) -> f64 {
134        let last_limit = self.last_limit.load(Ordering::Relaxed);
135        let hit_count = self.hit_count.load(Ordering::Relaxed);
136        (hit_count as f64 / (last_limit as i64 * self.granularity) as f64).clamp(0., 1.)
137    }
138
139    fn update_rate(&self) -> f64 {
140        self.update(0, self.granularity);
141        self.rate()
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use crate::rate_limiter::{Limiter, LocalLimiter, TIME_PER_SECOND};
148    use std::sync::atomic::Ordering;
149    use std::thread::sleep;
150    use std::time::Duration;
151
152    #[test]
153    #[cfg_attr(miri, ignore)]
154    fn test_rate_limiter() {
155        let limiter = LocalLimiter::default();
156        // Two are allowed, then one more because a small amount of time passed since the first one
157        assert!(limiter.inc(2));
158        // Work around floating point precision issues
159        assert!(limiter.rate() > 0.49999 && limiter.rate() <= 0.5);
160        // Add a minimal amount of time to ensure the test doesn't run faster than timer precision
161        sleep(Duration::from_micros(100));
162        assert!(limiter.inc(2));
163        // We're close to 1, but not quite, due to the minimal time passed
164        assert!(limiter.rate() > 0.5 && limiter.rate() < 1.);
165        sleep(Duration::from_micros(100));
166        assert!(limiter.inc(2));
167        // Rate capped at 1
168        assert_eq!(1., limiter.rate());
169        sleep(Duration::from_micros(100));
170        assert!(!limiter.inc(2));
171        sleep(Duration::from_micros(100));
172        assert!(!limiter.inc(2));
173        sleep(Duration::from_micros(100));
174
175        // reduce 4 times, we're going into negative territory. Next increment will reset to zero.
176        limiter
177            .last_update
178            .fetch_sub(3 * TIME_PER_SECOND as u64, Ordering::Relaxed);
179        assert!(limiter.inc(2));
180        // Work around floating point precision issues
181        assert!(limiter.rate() > 0.49999 && limiter.rate() <= 0.5); // We're starting from scratch
182        sleep(Duration::from_micros(100));
183        assert!(limiter.inc(2));
184        sleep(Duration::from_micros(100));
185        assert!(limiter.inc(2));
186        sleep(Duration::from_micros(100));
187        assert!(!limiter.inc(2));
188        sleep(Duration::from_micros(100));
189
190        // Test change to higher value
191        assert!(limiter.inc(3));
192        sleep(Duration::from_micros(100));
193        assert!(!limiter.inc(3));
194
195        // Then change to lower value - but we have no capacity
196        assert!(!limiter.inc(1));
197
198        // The counter is around 4 (because last limit was 3)
199        // We're keeping the highest successful limit stored, thus subtracting 3 twice will reset it
200        limiter
201            .last_update
202            .fetch_sub(2 * TIME_PER_SECOND as u64, Ordering::Relaxed);
203
204        // And now 1 succeeds again.
205        assert!(limiter.inc(1));
206    }
207}