datadog_api/
rate_limit.rs

1//! Client-side rate limiting for Datadog API requests
2//!
3//! Implements a token bucket algorithm to prevent hitting Datadog's rate limits.
4//! The limiter tracks requests per second and delays when limits are approached.
5
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tokio::sync::Mutex;
9use tracing::debug;
10
11/// Default requests per second limit (conservative estimate for Datadog API)
12pub const DEFAULT_REQUESTS_PER_SECOND: u32 = 10;
13
14/// Rate limiter configuration
15#[derive(Debug, Clone)]
16pub struct RateLimitConfig {
17    /// Maximum requests per second
18    pub requests_per_second: u32,
19    /// Whether rate limiting is enabled
20    pub enabled: bool,
21}
22
23impl Default for RateLimitConfig {
24    fn default() -> Self {
25        Self {
26            requests_per_second: DEFAULT_REQUESTS_PER_SECOND,
27            enabled: true,
28        }
29    }
30}
31
32impl RateLimitConfig {
33    /// Create a new rate limit config with custom requests per second
34    pub fn new(requests_per_second: u32) -> Self {
35        Self {
36            requests_per_second,
37            enabled: true,
38        }
39    }
40
41    /// Disable rate limiting
42    pub fn disabled() -> Self {
43        Self {
44            requests_per_second: DEFAULT_REQUESTS_PER_SECOND,
45            enabled: false,
46        }
47    }
48}
49
50/// Token bucket rate limiter
51///
52/// Allows bursts up to the bucket size, then rate limits to the configured RPS.
53#[derive(Debug)]
54pub struct RateLimiter {
55    config: RateLimitConfig,
56    state: Arc<Mutex<RateLimiterState>>,
57}
58
59#[derive(Debug)]
60struct RateLimiterState {
61    /// Available tokens (fractional to allow smooth rate limiting)
62    tokens: f64,
63    /// Last time tokens were updated
64    last_update: Instant,
65    /// Maximum tokens (bucket size = 2x RPS for burst allowance)
66    max_tokens: f64,
67    /// Token refill rate per millisecond
68    refill_rate: f64,
69}
70
71impl RateLimiter {
72    /// Create a new rate limiter with the given configuration
73    pub fn new(config: RateLimitConfig) -> Self {
74        let max_tokens = (config.requests_per_second * 2) as f64; // Allow 2-second burst
75        let refill_rate = config.requests_per_second as f64 / 1000.0; // tokens per ms
76
77        Self {
78            config,
79            state: Arc::new(Mutex::new(RateLimiterState {
80                tokens: max_tokens, // Start with full bucket
81                last_update: Instant::now(),
82                max_tokens,
83                refill_rate,
84            })),
85        }
86    }
87
88    /// Acquire a token, waiting if necessary
89    ///
90    /// Returns immediately if a token is available, otherwise waits
91    /// until a token becomes available.
92    pub async fn acquire(&self) {
93        if !self.config.enabled {
94            return;
95        }
96
97        loop {
98            let wait_time = {
99                let mut state = self.state.lock().await;
100
101                // Refill tokens based on elapsed time
102                let now = Instant::now();
103                let elapsed_ms = now.duration_since(state.last_update).as_millis() as f64;
104                state.tokens = (state.tokens + elapsed_ms * state.refill_rate).min(state.max_tokens);
105                state.last_update = now;
106
107                if state.tokens >= 1.0 {
108                    // Token available, consume it
109                    state.tokens -= 1.0;
110                    debug!(
111                        "Rate limiter: acquired token, {} remaining",
112                        state.tokens as u32
113                    );
114                    return;
115                }
116
117                // Calculate wait time for next token
118                let tokens_needed = 1.0 - state.tokens;
119                let wait_ms = (tokens_needed / state.refill_rate).ceil() as u64;
120                Duration::from_millis(wait_ms.max(1))
121            };
122
123            debug!("Rate limiter: waiting {:?} for token", wait_time);
124            tokio::time::sleep(wait_time).await;
125        }
126    }
127
128    /// Try to acquire a token without waiting
129    ///
130    /// Returns true if a token was acquired, false if rate limited.
131    pub async fn try_acquire(&self) -> bool {
132        if !self.config.enabled {
133            return true;
134        }
135
136        let mut state = self.state.lock().await;
137
138        // Refill tokens
139        let now = Instant::now();
140        let elapsed_ms = now.duration_since(state.last_update).as_millis() as f64;
141        state.tokens = (state.tokens + elapsed_ms * state.refill_rate).min(state.max_tokens);
142        state.last_update = now;
143
144        if state.tokens >= 1.0 {
145            state.tokens -= 1.0;
146            true
147        } else {
148            false
149        }
150    }
151
152    /// Get current available tokens (for monitoring)
153    pub async fn available_tokens(&self) -> u32 {
154        let state = self.state.lock().await;
155        state.tokens as u32
156    }
157}
158
159impl Clone for RateLimiter {
160    fn clone(&self) -> Self {
161        Self {
162            config: self.config.clone(),
163            state: Arc::clone(&self.state),
164        }
165    }
166}
167
168impl Default for RateLimiter {
169    fn default() -> Self {
170        Self::new(RateLimitConfig::default())
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177
178    #[tokio::test]
179    async fn test_rate_limiter_acquire() {
180        let limiter = RateLimiter::new(RateLimitConfig::new(100));
181
182        // Should be able to acquire immediately (bucket starts full)
183        for _ in 0..10 {
184            limiter.acquire().await;
185        }
186
187        // Check we consumed tokens
188        let available = limiter.available_tokens().await;
189        assert!(available < 200); // Started with 200 (2x RPS)
190    }
191
192    #[tokio::test]
193    async fn test_rate_limiter_try_acquire() {
194        let limiter = RateLimiter::new(RateLimitConfig::new(10));
195
196        // Should succeed for burst
197        for _ in 0..20 {
198            assert!(limiter.try_acquire().await);
199        }
200
201        // Should fail when bucket is empty
202        assert!(!limiter.try_acquire().await);
203    }
204
205    #[tokio::test]
206    async fn test_rate_limiter_disabled() {
207        let limiter = RateLimiter::new(RateLimitConfig::disabled());
208
209        // Should always succeed when disabled
210        for _ in 0..100 {
211            assert!(limiter.try_acquire().await);
212        }
213    }
214
215    #[tokio::test]
216    async fn test_rate_limiter_refill() {
217        let limiter = RateLimiter::new(RateLimitConfig::new(1000)); // 1000 RPS = 1 per ms
218
219        // Drain the bucket
220        for _ in 0..2000 {
221            limiter.try_acquire().await;
222        }
223
224        // Wait for refill
225        tokio::time::sleep(Duration::from_millis(10)).await;
226
227        // Should have some tokens now
228        assert!(limiter.try_acquire().await);
229    }
230
231    #[test]
232    fn test_rate_limit_config_default() {
233        let config = RateLimitConfig::default();
234        assert_eq!(config.requests_per_second, DEFAULT_REQUESTS_PER_SECOND);
235        assert!(config.enabled);
236    }
237
238    #[test]
239    fn test_rate_limit_config_disabled() {
240        let config = RateLimitConfig::disabled();
241        assert!(!config.enabled);
242    }
243
244    #[tokio::test]
245    async fn test_rate_limiter_clone_shares_state() {
246        let limiter1 = RateLimiter::new(RateLimitConfig::new(10));
247        let limiter2 = limiter1.clone();
248
249        // Drain through limiter1
250        for _ in 0..20 {
251            limiter1.try_acquire().await;
252        }
253
254        // limiter2 should see the same empty bucket
255        assert!(!limiter2.try_acquire().await);
256    }
257}