libdd_common/
rate_limiter.rs1use std::sync::atomic::{AtomicI64, AtomicU32, AtomicU64, Ordering};
5
6pub trait Limiter {
7 fn inc(&self, limit: u32) -> bool;
10 fn rate(&self) -> f64;
13 fn update_rate(&self) -> f64;
15}
16
17#[repr(C)]
24pub struct LocalLimiter {
25 hit_count: AtomicI64,
26 last_update: AtomicU64,
27 last_limit: AtomicU32,
28 granularity: i64,
29}
30
31const TIME_PER_SECOND: i64 = 1_000_000_000; #[cfg(test)]
37static MOCK_NOW: AtomicU64 = AtomicU64::new(0);
38
39fn now() -> u64 {
40 #[cfg(test)]
41 {
42 let mock = MOCK_NOW.load(Ordering::Relaxed);
43 if mock != 0 {
44 return mock;
45 }
46 }
47 #[cfg(windows)]
48 let now = unsafe {
49 static FREQUENCY: AtomicU64 = AtomicU64::new(0);
50
51 let mut frequency = FREQUENCY.load(Ordering::Relaxed);
52 if frequency == 0 {
53 windows_sys::Win32::System::Performance::QueryPerformanceFrequency(
54 &mut frequency as *mut u64 as *mut i64,
55 );
56 FREQUENCY.store(frequency, Ordering::Relaxed);
57 }
58
59 let mut perf_counter = 0;
60 windows_sys::Win32::System::Performance::QueryPerformanceCounter(&mut perf_counter);
61 perf_counter as u64 * frequency / TIME_PER_SECOND as u64
62 };
63 #[cfg(not(windows))]
64 let now = {
65 let mut ts: libc::timespec = libc::timespec {
66 tv_sec: 0,
67 tv_nsec: 0,
68 };
69 unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) };
70 #[cfg(target_pointer_width = "32")]
73 {
74 (ts.tv_sec as i64 * TIME_PER_SECOND + ts.tv_nsec as i64) as u64
75 }
76 #[cfg(target_pointer_width = "64")]
77 {
78 (ts.tv_sec * TIME_PER_SECOND + ts.tv_nsec) as u64
79 }
80 };
81 now
82}
83
84impl Default for LocalLimiter {
85 fn default() -> Self {
86 LocalLimiter {
87 hit_count: Default::default(),
88 last_update: AtomicU64::from(now()),
89 last_limit: Default::default(),
90 granularity: TIME_PER_SECOND,
91 }
92 }
93}
94
95impl LocalLimiter {
96 pub fn with_granularity(seconds: u32) -> LocalLimiter {
98 let mut limiter = LocalLimiter::default();
99 limiter.granularity *= seconds as i64;
100 limiter
101 }
102
103 pub fn reset(&mut self, seconds: u32) {
105 self.last_update.store(now(), Ordering::Relaxed);
106 self.hit_count.store(0, Ordering::Relaxed);
107 self.last_limit.store(0, Ordering::Relaxed);
108 self.granularity = TIME_PER_SECOND * seconds as i64;
109 }
110
111 fn update(&self, limit: u32, inc: i64) -> i64 {
112 let now = now();
113 let last = self.last_update.swap(now, Ordering::SeqCst);
114 let clear_limit = limit.max(self.last_limit.load(Ordering::Relaxed));
116 let clear_counter = (now as i64 - last as i64) * (clear_limit as i64);
117 let subtract = clear_counter - inc;
118 let mut previous_hits = self.hit_count.fetch_sub(subtract, Ordering::SeqCst);
119 if previous_hits < subtract {
121 let add = clear_counter - previous_hits.max(0);
122 self.hit_count.fetch_add(add, Ordering::Acquire);
123 previous_hits += add - clear_counter;
124 }
125 previous_hits
126 }
127}
128
129impl Limiter for LocalLimiter {
130 fn inc(&self, limit: u32) -> bool {
131 let previous_hits = self.update(limit, self.granularity);
132 if previous_hits / self.granularity >= limit as i64 {
133 self.hit_count
134 .fetch_sub(self.granularity, Ordering::Acquire);
135 false
136 } else {
137 self.last_limit.store(limit, Ordering::Relaxed);
142 true
143 }
144 }
145
146 fn rate(&self) -> f64 {
147 let last_limit = self.last_limit.load(Ordering::Relaxed);
148 let hit_count = self.hit_count.load(Ordering::Relaxed);
149 (hit_count as f64 / (last_limit as i64 * self.granularity) as f64).clamp(0., 1.)
150 }
151
152 fn update_rate(&self) -> f64 {
153 self.update(0, self.granularity);
154 self.rate()
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use crate::rate_limiter::{now, Limiter, LocalLimiter, MOCK_NOW, TIME_PER_SECOND};
161 use std::sync::atomic::Ordering;
162
163 fn set_mock_time(nanos: u64) {
164 MOCK_NOW.store(nanos, Ordering::Relaxed);
165 }
166
167 fn advance_mock_time(nanos: u64) {
168 MOCK_NOW.fetch_add(nanos, Ordering::Relaxed);
169 }
170
171 const TICK: u64 = 100;
173
174 #[test]
175 #[cfg_attr(miri, ignore)]
176 fn test_rate_limiter() {
177 set_mock_time(1_000_000_000);
179
180 let limiter = LocalLimiter::default();
181
182 assert!(limiter.inc(2));
184 assert_eq!(0.5, limiter.rate());
185
186 advance_mock_time(TICK);
188 assert!(limiter.inc(2));
189 assert!(limiter.rate() > 0.5 && limiter.rate() < 1.);
190
191 advance_mock_time(TICK);
193 assert!(limiter.inc(2));
194 assert_eq!(1., limiter.rate());
195
196 advance_mock_time(TICK);
198 assert!(!limiter.inc(2));
199 advance_mock_time(TICK);
200 assert!(!limiter.inc(2));
201
202 advance_mock_time(3 * TIME_PER_SECOND as u64);
204 assert!(limiter.inc(2));
205 assert_eq!(0.5, limiter.rate()); advance_mock_time(TICK);
208 assert!(limiter.inc(2));
209 advance_mock_time(TICK);
210 assert!(limiter.inc(2));
211 advance_mock_time(TICK);
212 assert!(!limiter.inc(2));
213
214 advance_mock_time(TICK);
216 assert!(limiter.inc(3));
217 advance_mock_time(TICK);
218 assert!(!limiter.inc(3));
219
220 assert!(!limiter.inc(1));
222
223 advance_mock_time(2 * TIME_PER_SECOND as u64);
226
227 assert!(limiter.inc(1));
229
230 set_mock_time(0);
231 }
232
233 #[test]
238 #[cfg_attr(miri, ignore)]
239 fn test_now_monotonic() {
240 let t1 = now();
241 assert!(t1 > 0);
242 let t2 = now();
243 assert!(t2 >= t1);
244 }
245}