libdd_common/
rate_limiter.rs1use std::sync::atomic::{AtomicI64, AtomicU32, AtomicU64, Ordering};
5
6pub trait Limiter {
7 fn inc(&self, limit: u32) -> bool;
10 fn rate(&self) -> f64;
13 fn update_rate(&self) -> f64;
15}
16
17#[repr(C)]
24pub struct LocalLimiter {
25 hit_count: AtomicI64,
26 last_update: AtomicU64,
27 last_limit: AtomicU32,
28 granularity: i64,
29}
30
31const TIME_PER_SECOND: i64 = 1_000_000_000; fn now() -> u64 {
34 #[cfg(windows)]
35 let now = unsafe {
36 static FREQUENCY: AtomicU64 = AtomicU64::new(0);
37
38 let mut frequency = FREQUENCY.load(Ordering::Relaxed);
39 if frequency == 0 {
40 windows_sys::Win32::System::Performance::QueryPerformanceFrequency(
41 &mut frequency as *mut u64 as *mut i64,
42 );
43 FREQUENCY.store(frequency, Ordering::Relaxed);
44 }
45
46 let mut perf_counter = 0;
47 windows_sys::Win32::System::Performance::QueryPerformanceCounter(&mut perf_counter);
48 perf_counter as u64 * frequency / TIME_PER_SECOND as u64
49 };
50 #[cfg(not(windows))]
51 let now = {
52 let mut ts: libc::timespec = libc::timespec {
53 tv_sec: 0,
54 tv_nsec: 0,
55 };
56 unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts) };
57 #[cfg(target_pointer_width = "32")]
60 {
61 (ts.tv_sec as i64 * TIME_PER_SECOND + ts.tv_nsec as i64) as u64
62 }
63 #[cfg(target_pointer_width = "64")]
64 {
65 (ts.tv_sec * TIME_PER_SECOND + ts.tv_nsec) as u64
66 }
67 };
68 now
69}
70
71impl Default for LocalLimiter {
72 fn default() -> Self {
73 LocalLimiter {
74 hit_count: Default::default(),
75 last_update: AtomicU64::from(now()),
76 last_limit: Default::default(),
77 granularity: TIME_PER_SECOND,
78 }
79 }
80}
81
82impl LocalLimiter {
83 pub fn with_granularity(seconds: u32) -> LocalLimiter {
85 let mut limiter = LocalLimiter::default();
86 limiter.granularity *= seconds as i64;
87 limiter
88 }
89
90 pub fn reset(&mut self, seconds: u32) {
92 self.last_update.store(now(), Ordering::Relaxed);
93 self.hit_count.store(0, Ordering::Relaxed);
94 self.last_limit.store(0, Ordering::Relaxed);
95 self.granularity = TIME_PER_SECOND * seconds as i64;
96 }
97
98 fn update(&self, limit: u32, inc: i64) -> i64 {
99 let now = now();
100 let last = self.last_update.swap(now, Ordering::SeqCst);
101 let clear_limit = limit.max(self.last_limit.load(Ordering::Relaxed));
103 let clear_counter = (now as i64 - last as i64) * (clear_limit as i64);
104 let subtract = clear_counter - inc;
105 let mut previous_hits = self.hit_count.fetch_sub(subtract, Ordering::SeqCst);
106 if previous_hits < subtract {
108 let add = clear_counter - previous_hits.max(0);
109 self.hit_count.fetch_add(add, Ordering::Acquire);
110 previous_hits += add - clear_counter;
111 }
112 previous_hits
113 }
114}
115
116impl Limiter for LocalLimiter {
117 fn inc(&self, limit: u32) -> bool {
118 let previous_hits = self.update(limit, self.granularity);
119 if previous_hits / self.granularity >= limit as i64 {
120 self.hit_count
121 .fetch_sub(self.granularity, Ordering::Acquire);
122 false
123 } else {
124 self.last_limit.store(limit, Ordering::Relaxed);
129 true
130 }
131 }
132
133 fn rate(&self) -> f64 {
134 let last_limit = self.last_limit.load(Ordering::Relaxed);
135 let hit_count = self.hit_count.load(Ordering::Relaxed);
136 (hit_count as f64 / (last_limit as i64 * self.granularity) as f64).clamp(0., 1.)
137 }
138
139 fn update_rate(&self) -> f64 {
140 self.update(0, self.granularity);
141 self.rate()
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use crate::rate_limiter::{Limiter, LocalLimiter, TIME_PER_SECOND};
148 use std::sync::atomic::Ordering;
149 use std::thread::sleep;
150 use std::time::Duration;
151
152 #[test]
153 #[cfg_attr(miri, ignore)]
154 fn test_rate_limiter() {
155 let limiter = LocalLimiter::default();
156 assert!(limiter.inc(2));
158 assert!(limiter.rate() > 0.49999 && limiter.rate() <= 0.5);
160 sleep(Duration::from_micros(100));
162 assert!(limiter.inc(2));
163 assert!(limiter.rate() > 0.5 && limiter.rate() < 1.);
165 sleep(Duration::from_micros(100));
166 assert!(limiter.inc(2));
167 assert_eq!(1., limiter.rate());
169 sleep(Duration::from_micros(100));
170 assert!(!limiter.inc(2));
171 sleep(Duration::from_micros(100));
172 assert!(!limiter.inc(2));
173 sleep(Duration::from_micros(100));
174
175 limiter
177 .last_update
178 .fetch_sub(3 * TIME_PER_SECOND as u64, Ordering::Relaxed);
179 assert!(limiter.inc(2));
180 assert!(limiter.rate() > 0.49999 && limiter.rate() <= 0.5); sleep(Duration::from_micros(100));
183 assert!(limiter.inc(2));
184 sleep(Duration::from_micros(100));
185 assert!(limiter.inc(2));
186 sleep(Duration::from_micros(100));
187 assert!(!limiter.inc(2));
188 sleep(Duration::from_micros(100));
189
190 assert!(limiter.inc(3));
192 sleep(Duration::from_micros(100));
193 assert!(!limiter.inc(3));
194
195 assert!(!limiter.inc(1));
197
198 limiter
201 .last_update
202 .fetch_sub(2 * TIME_PER_SECOND as u64, Ordering::Relaxed);
203
204 assert!(limiter.inc(1));
206 }
207}