datadog_api/
rate_limit.rs1use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tokio::sync::Mutex;
9use tracing::debug;
10
11pub const DEFAULT_REQUESTS_PER_SECOND: u32 = 10;
13
14#[derive(Debug, Clone)]
16pub struct RateLimitConfig {
17 pub requests_per_second: u32,
19 pub enabled: bool,
21}
22
23impl Default for RateLimitConfig {
24 fn default() -> Self {
25 Self {
26 requests_per_second: DEFAULT_REQUESTS_PER_SECOND,
27 enabled: true,
28 }
29 }
30}
31
32impl RateLimitConfig {
33 pub fn new(requests_per_second: u32) -> Self {
35 Self {
36 requests_per_second,
37 enabled: true,
38 }
39 }
40
41 pub fn disabled() -> Self {
43 Self {
44 requests_per_second: DEFAULT_REQUESTS_PER_SECOND,
45 enabled: false,
46 }
47 }
48}
49
50#[derive(Debug)]
54pub struct RateLimiter {
55 config: RateLimitConfig,
56 state: Arc<Mutex<RateLimiterState>>,
57}
58
59#[derive(Debug)]
60struct RateLimiterState {
61 tokens: f64,
63 last_update: Instant,
65 max_tokens: f64,
67 refill_rate: f64,
69}
70
71impl RateLimiter {
72 pub fn new(config: RateLimitConfig) -> Self {
74 let max_tokens = (config.requests_per_second * 2) as f64; let refill_rate = config.requests_per_second as f64 / 1000.0; Self {
78 config,
79 state: Arc::new(Mutex::new(RateLimiterState {
80 tokens: max_tokens, last_update: Instant::now(),
82 max_tokens,
83 refill_rate,
84 })),
85 }
86 }
87
88 pub async fn acquire(&self) {
93 if !self.config.enabled {
94 return;
95 }
96
97 loop {
98 let wait_time = {
99 let mut state = self.state.lock().await;
100
101 let now = Instant::now();
103 let elapsed_ms = now.duration_since(state.last_update).as_millis() as f64;
104 state.tokens = (state.tokens + elapsed_ms * state.refill_rate).min(state.max_tokens);
105 state.last_update = now;
106
107 if state.tokens >= 1.0 {
108 state.tokens -= 1.0;
110 debug!(
111 "Rate limiter: acquired token, {} remaining",
112 state.tokens as u32
113 );
114 return;
115 }
116
117 let tokens_needed = 1.0 - state.tokens;
119 let wait_ms = (tokens_needed / state.refill_rate).ceil() as u64;
120 Duration::from_millis(wait_ms.max(1))
121 };
122
123 debug!("Rate limiter: waiting {:?} for token", wait_time);
124 tokio::time::sleep(wait_time).await;
125 }
126 }
127
128 pub async fn try_acquire(&self) -> bool {
132 if !self.config.enabled {
133 return true;
134 }
135
136 let mut state = self.state.lock().await;
137
138 let now = Instant::now();
140 let elapsed_ms = now.duration_since(state.last_update).as_millis() as f64;
141 state.tokens = (state.tokens + elapsed_ms * state.refill_rate).min(state.max_tokens);
142 state.last_update = now;
143
144 if state.tokens >= 1.0 {
145 state.tokens -= 1.0;
146 true
147 } else {
148 false
149 }
150 }
151
152 pub async fn available_tokens(&self) -> u32 {
154 let state = self.state.lock().await;
155 state.tokens as u32
156 }
157}
158
159impl Clone for RateLimiter {
160 fn clone(&self) -> Self {
161 Self {
162 config: self.config.clone(),
163 state: Arc::clone(&self.state),
164 }
165 }
166}
167
168impl Default for RateLimiter {
169 fn default() -> Self {
170 Self::new(RateLimitConfig::default())
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177
178 #[tokio::test]
179 async fn test_rate_limiter_acquire() {
180 let limiter = RateLimiter::new(RateLimitConfig::new(100));
181
182 for _ in 0..10 {
184 limiter.acquire().await;
185 }
186
187 let available = limiter.available_tokens().await;
189 assert!(available < 200); }
191
192 #[tokio::test]
193 async fn test_rate_limiter_try_acquire() {
194 let limiter = RateLimiter::new(RateLimitConfig::new(10));
195
196 for _ in 0..20 {
198 assert!(limiter.try_acquire().await);
199 }
200
201 assert!(!limiter.try_acquire().await);
203 }
204
205 #[tokio::test]
206 async fn test_rate_limiter_disabled() {
207 let limiter = RateLimiter::new(RateLimitConfig::disabled());
208
209 for _ in 0..100 {
211 assert!(limiter.try_acquire().await);
212 }
213 }
214
215 #[tokio::test]
216 async fn test_rate_limiter_refill() {
217 let limiter = RateLimiter::new(RateLimitConfig::new(1000)); for _ in 0..2000 {
221 limiter.try_acquire().await;
222 }
223
224 tokio::time::sleep(Duration::from_millis(10)).await;
226
227 assert!(limiter.try_acquire().await);
229 }
230
231 #[test]
232 fn test_rate_limit_config_default() {
233 let config = RateLimitConfig::default();
234 assert_eq!(config.requests_per_second, DEFAULT_REQUESTS_PER_SECOND);
235 assert!(config.enabled);
236 }
237
238 #[test]
239 fn test_rate_limit_config_disabled() {
240 let config = RateLimitConfig::disabled();
241 assert!(!config.enabled);
242 }
243
244 #[tokio::test]
245 async fn test_rate_limiter_clone_shares_state() {
246 let limiter1 = RateLimiter::new(RateLimitConfig::new(10));
247 let limiter2 = limiter1.clone();
248
249 for _ in 0..20 {
251 limiter1.try_acquire().await;
252 }
253
254 assert!(!limiter2.try_acquire().await);
256 }
257}