Skip to main content

libdd_common/
rate_limiter.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::atomic::{AtomicI64, AtomicU32, AtomicU64, Ordering};
5
6pub trait Limiter {
7    /// Takes the limit per interval.
8    /// Returns false if the limit is exceeded, otherwise true.
9    fn inc(&self, limit: u32) -> bool;
10    /// Returns the effective rate per interval.
11    /// Note: The rate is only guaranteed to be accurate immediately after a call to inc().
12    fn rate(&self) -> f64;
13    /// Updates the rate and returns it
14    fn update_rate(&self) -> f64;
15}
16
17/// A thread-safe limiter built on Atomics.
18/// It's base unit is in seconds, i.e. the minimum allowed rate is 1 per second.
19/// Internally the limiter works with the system time granularity, i.e. nanoseconds on unix and
20/// milliseconds on windows.
21/// The implementation is a sliding window: every time the limiter is increased, the amount of time
22/// that has passed is also refilled.
23#[repr(C)]
24pub struct LocalLimiter {
25    hit_count: AtomicI64,
26    last_update: AtomicU64,
27    last_limit: AtomicU32,
28    granularity: i64,
29}
30
31const TIME_PER_SECOND: i64 = 1_000_000_000; // nanoseconds
32
33/// When set to a non-zero value, `now()` returns this instead of the real clock.
34/// This allows tests to control time deterministically, avoiding flakiness from
35/// wall-clock timing on CI machines.
36#[cfg(test)]
37static MOCK_NOW: AtomicU64 = AtomicU64::new(0);
38
39fn now() -> u64 {
40    #[cfg(test)]
41    {
42        let mock = MOCK_NOW.load(Ordering::Relaxed);
43        if mock != 0 {
44            return mock;
45        }
46    }
47    #[cfg(windows)]
48    let now = unsafe {
49        static FREQUENCY: AtomicU64 = AtomicU64::new(0);
50
51        let mut frequency = FREQUENCY.load(Ordering::Relaxed);
52        if frequency == 0 {
53            windows_sys::Win32::System::Performance::QueryPerformanceFrequency(
54                &mut frequency as *mut u64 as *mut i64,
55            );
56            FREQUENCY.store(frequency, Ordering::Relaxed);
57        }
58
59        let mut perf_counter = 0;
60        windows_sys::Win32::System::Performance::QueryPerformanceCounter(&mut perf_counter);
61        perf_counter as u64 * frequency / TIME_PER_SECOND as u64
62    };
63    #[cfg(not(windows))]
64    let now = {
65        let mut ts: libc::timespec = libc::timespec {
66            tv_sec: 0,
67            tv_nsec: 0,
68        };
69        unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) };
70        // tv_sec is i32 on 32bit architecture
71        // https://sourceware.org/bugzilla/show_bug.cgi?id=16437
72        #[cfg(target_pointer_width = "32")]
73        {
74            (ts.tv_sec as i64 * TIME_PER_SECOND + ts.tv_nsec as i64) as u64
75        }
76        #[cfg(target_pointer_width = "64")]
77        {
78            (ts.tv_sec * TIME_PER_SECOND + ts.tv_nsec) as u64
79        }
80    };
81    now
82}
83
84impl Default for LocalLimiter {
85    fn default() -> Self {
86        LocalLimiter {
87            hit_count: Default::default(),
88            last_update: AtomicU64::from(now()),
89            last_limit: Default::default(),
90            granularity: TIME_PER_SECOND,
91        }
92    }
93}
94
95impl LocalLimiter {
96    /// Allows setting a custom time granularity. The default() implementation is 1 second.
97    pub fn with_granularity(seconds: u32) -> LocalLimiter {
98        let mut limiter = LocalLimiter::default();
99        limiter.granularity *= seconds as i64;
100        limiter
101    }
102
103    /// Resets, with a given granularity.
104    pub fn reset(&mut self, seconds: u32) {
105        self.last_update.store(now(), Ordering::Relaxed);
106        self.hit_count.store(0, Ordering::Relaxed);
107        self.last_limit.store(0, Ordering::Relaxed);
108        self.granularity = TIME_PER_SECOND * seconds as i64;
109    }
110
111    fn update(&self, limit: u32, inc: i64) -> i64 {
112        let now = now();
113        let last = self.last_update.swap(now, Ordering::SeqCst);
114        // Make sure reducing the limit doesn't stall for a long time
115        let clear_limit = limit.max(self.last_limit.load(Ordering::Relaxed));
116        let clear_counter = (now as i64 - last as i64) * (clear_limit as i64);
117        let subtract = clear_counter - inc;
118        let mut previous_hits = self.hit_count.fetch_sub(subtract, Ordering::SeqCst);
119        // Handle where the limiter goes below zero
120        if previous_hits < subtract {
121            let add = clear_counter - previous_hits.max(0);
122            self.hit_count.fetch_add(add, Ordering::Acquire);
123            previous_hits += add - clear_counter;
124        }
125        previous_hits
126    }
127}
128
129impl Limiter for LocalLimiter {
130    fn inc(&self, limit: u32) -> bool {
131        let previous_hits = self.update(limit, self.granularity);
132        if previous_hits / self.granularity >= limit as i64 {
133            self.hit_count
134                .fetch_sub(self.granularity, Ordering::Acquire);
135            false
136        } else {
137            // We don't care about race conditions here:
138            // If the last limit was high enough to increase the previous_hits, we are anyway close
139            // to a number realistic to decrease the count quickly; i.e. we won't stall the limiter
140            // indefinitely when switching from a high to a low limit.
141            self.last_limit.store(limit, Ordering::Relaxed);
142            true
143        }
144    }
145
146    fn rate(&self) -> f64 {
147        let last_limit = self.last_limit.load(Ordering::Relaxed);
148        let hit_count = self.hit_count.load(Ordering::Relaxed);
149        (hit_count as f64 / (last_limit as i64 * self.granularity) as f64).clamp(0., 1.)
150    }
151
152    fn update_rate(&self) -> f64 {
153        self.update(0, self.granularity);
154        self.rate()
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use crate::rate_limiter::{now, Limiter, LocalLimiter, MOCK_NOW, TIME_PER_SECOND};
161    use std::sync::atomic::Ordering;
162
163    fn set_mock_time(nanos: u64) {
164        MOCK_NOW.store(nanos, Ordering::Relaxed);
165    }
166
167    fn advance_mock_time(nanos: u64) {
168        MOCK_NOW.fetch_add(nanos, Ordering::Relaxed);
169    }
170
171    /// A small time tick (100ns) used to simulate minimal time passing between operations.
172    const TICK: u64 = 100;
173
174    #[test]
175    #[cfg_attr(miri, ignore)]
176    fn test_rate_limiter() {
177        // Use mock time for deterministic behavior — real wall-clock sleeps are flaky on CI.
178        set_mock_time(1_000_000_000);
179
180        let limiter = LocalLimiter::default();
181
182        // First inc uses 1 of 2 slots: rate is exactly 0.5
183        assert!(limiter.inc(2));
184        assert_eq!(0.5, limiter.rate());
185
186        // Second inc: rate approaches 1.0 but not quite (tiny time elapsed)
187        advance_mock_time(TICK);
188        assert!(limiter.inc(2));
189        assert!(limiter.rate() > 0.5 && limiter.rate() < 1.);
190
191        // Third inc fills the bucket: rate clamps to 1.0
192        advance_mock_time(TICK);
193        assert!(limiter.inc(2));
194        assert_eq!(1., limiter.rate());
195
196        // Over limit — both rejected
197        advance_mock_time(TICK);
198        assert!(!limiter.inc(2));
199        advance_mock_time(TICK);
200        assert!(!limiter.inc(2));
201
202        // 3 seconds pass — capacity fully refills, hit count goes negative then resets to zero
203        advance_mock_time(3 * TIME_PER_SECOND as u64);
204        assert!(limiter.inc(2));
205        assert_eq!(0.5, limiter.rate()); // Starting from scratch
206
207        advance_mock_time(TICK);
208        assert!(limiter.inc(2));
209        advance_mock_time(TICK);
210        assert!(limiter.inc(2));
211        advance_mock_time(TICK);
212        assert!(!limiter.inc(2));
213
214        // Test change to higher limit
215        advance_mock_time(TICK);
216        assert!(limiter.inc(3));
217        advance_mock_time(TICK);
218        assert!(!limiter.inc(3));
219
220        // Change to lower limit — no capacity available
221        assert!(!limiter.inc(1));
222
223        // 2 seconds pass — the counter resets (last successful limit was 3, so subtracting
224        // 3 per second twice clears it)
225        advance_mock_time(2 * TIME_PER_SECOND as u64);
226
227        // Now 1 succeeds again
228        assert!(limiter.inc(1));
229
230        set_mock_time(0);
231    }
232
233    /// Validates the real clock implementation (MOCK_NOW is 0, so `now()` hits the actual
234    /// platform clock).
235    // We normally shouldn't test private functions directly, but is necessary here since
236    // now() is mocked for the other tests.
237    #[test]
238    #[cfg_attr(miri, ignore)]
239    fn test_now_monotonic() {
240        let t1 = now();
241        assert!(t1 > 0);
242        let t2 = now();
243        assert!(t2 >= t1);
244    }
245}