ipfrs_storage/
rate_limit.rs

1//! Rate limiting for controlling request rates to backends
2//!
3//! Provides token bucket and leaky bucket algorithms for rate limiting:
4//! - Token bucket: Allows bursts up to capacity
5//! - Leaky bucket: Smooth rate limiting
6//! - Per-operation rate limiting
7//! - Configurable refill rates
8//!
9//! ## Example
10//! ```no_run
11//! use ipfrs_storage::{RateLimiter, RateLimitConfig};
12//! use std::time::Duration;
13//!
14//! #[tokio::main]
15//! async fn main() {
16//!     let config = RateLimitConfig::new(100, Duration::from_secs(1));
17//!     let limiter = RateLimiter::new(config);
18//!
19//!     // Acquire permission to proceed
20//!     limiter.acquire(1).await;
21//!     // Make your API call here
22//! }
23//! ```
24
25use parking_lot::Mutex;
26use serde::{Deserialize, Serialize};
27use std::sync::Arc;
28use std::time::{Duration, Instant};
29use tokio::time::sleep;
30
31/// Rate limiting algorithm
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
33pub enum RateLimitAlgorithm {
34    /// Token bucket - allows bursts up to capacity
35    TokenBucket,
36    /// Leaky bucket - smooth rate limiting
37    LeakyBucket,
38}
39
40/// Rate limiter configuration
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct RateLimitConfig {
43    /// Maximum tokens/requests (capacity)
44    pub capacity: u64,
45    /// Refill rate (tokens per interval)
46    pub refill_rate: u64,
47    /// Refill interval
48    pub refill_interval: Duration,
49    /// Algorithm to use
50    pub algorithm: RateLimitAlgorithm,
51    /// Whether to block or return error when limit exceeded
52    pub block_on_limit: bool,
53}
54
55impl RateLimitConfig {
56    /// Create a new rate limit configuration
57    ///
58    /// # Arguments
59    /// * `capacity` - Maximum tokens (burst size)
60    /// * `refill_interval` - How often to refill tokens
61    pub fn new(capacity: u64, refill_interval: Duration) -> Self {
62        Self {
63            capacity,
64            refill_rate: capacity,
65            refill_interval,
66            algorithm: RateLimitAlgorithm::TokenBucket,
67            block_on_limit: true,
68        }
69    }
70
71    /// Create configuration for requests per second
72    pub fn per_second(requests: u64) -> Self {
73        Self::new(requests, Duration::from_secs(1))
74    }
75
76    /// Create configuration for requests per minute
77    pub fn per_minute(requests: u64) -> Self {
78        Self::new(requests, Duration::from_secs(60))
79    }
80
81    /// Set the refill rate
82    pub fn with_refill_rate(mut self, rate: u64) -> Self {
83        self.refill_rate = rate;
84        self
85    }
86
87    /// Set the algorithm
88    pub fn with_algorithm(mut self, algorithm: RateLimitAlgorithm) -> Self {
89        self.algorithm = algorithm;
90        self
91    }
92
93    /// Set whether to block when limit is exceeded
94    pub fn with_blocking(mut self, block: bool) -> Self {
95        self.block_on_limit = block;
96        self
97    }
98}
99
100/// Internal state for rate limiter
101#[derive(Debug)]
102struct RateLimiterState {
103    /// Available tokens
104    tokens: f64,
105    /// Last refill time
106    last_refill: Instant,
107    /// Total requests made
108    total_requests: u64,
109    /// Requests allowed
110    requests_allowed: u64,
111    /// Requests denied
112    requests_denied: u64,
113}
114
115/// Rate limiter statistics
116#[derive(Debug, Clone, Default, Serialize, Deserialize)]
117pub struct RateLimitStats {
118    /// Total requests attempted
119    pub total_requests: u64,
120    /// Requests allowed through
121    pub requests_allowed: u64,
122    /// Requests denied/delayed
123    pub requests_denied: u64,
124    /// Current available tokens
125    pub available_tokens: u64,
126    /// Utilization percentage (0-100)
127    pub utilization_percent: f64,
128}
129
130/// Token bucket rate limiter
131pub struct RateLimiter {
132    config: RateLimitConfig,
133    state: Arc<Mutex<RateLimiterState>>,
134}
135
136impl RateLimiter {
137    /// Create a new rate limiter
138    pub fn new(config: RateLimitConfig) -> Self {
139        Self {
140            state: Arc::new(Mutex::new(RateLimiterState {
141                tokens: config.capacity as f64,
142                last_refill: Instant::now(),
143                total_requests: 0,
144                requests_allowed: 0,
145                requests_denied: 0,
146            })),
147            config,
148        }
149    }
150
151    /// Acquire tokens from the rate limiter
152    ///
153    /// # Arguments
154    /// * `tokens` - Number of tokens to acquire
155    ///
156    /// # Returns
157    /// True if tokens were acquired, false if rate limit exceeded
158    pub async fn acquire(&self, tokens: u64) -> bool {
159        loop {
160            // Try to acquire tokens
161            let wait_duration = {
162                let mut state = self.state.lock();
163                self.refill_tokens(&mut state);
164
165                state.total_requests += 1;
166
167                if state.tokens >= tokens as f64 {
168                    // Have enough tokens
169                    state.tokens -= tokens as f64;
170                    state.requests_allowed += 1;
171                    return true;
172                } else {
173                    state.requests_denied += 1;
174
175                    if !self.config.block_on_limit {
176                        return false;
177                    }
178
179                    // Calculate how long to wait
180                    let tokens_needed = tokens as f64 - state.tokens;
181                    let tokens_per_ms = self.config.refill_rate as f64
182                        / self.config.refill_interval.as_millis() as f64;
183                    let wait_ms = (tokens_needed / tokens_per_ms).ceil() as u64;
184                    Duration::from_millis(wait_ms.max(1))
185                }
186            };
187
188            // Wait before retrying
189            sleep(wait_duration).await;
190        }
191    }
192
193    /// Try to acquire tokens without blocking
194    pub fn try_acquire(&self, tokens: u64) -> bool {
195        let mut state = self.state.lock();
196        self.refill_tokens(&mut state);
197
198        state.total_requests += 1;
199
200        if state.tokens >= tokens as f64 {
201            state.tokens -= tokens as f64;
202            state.requests_allowed += 1;
203            true
204        } else {
205            state.requests_denied += 1;
206            false
207        }
208    }
209
210    /// Get current statistics
211    pub fn stats(&self) -> RateLimitStats {
212        let mut state = self.state.lock();
213        self.refill_tokens(&mut state);
214
215        RateLimitStats {
216            total_requests: state.total_requests,
217            requests_allowed: state.requests_allowed,
218            requests_denied: state.requests_denied,
219            available_tokens: state.tokens as u64,
220            utilization_percent: if state.total_requests > 0 {
221                (state.requests_allowed as f64 / state.total_requests as f64) * 100.0
222            } else {
223                0.0
224            },
225        }
226    }
227
228    /// Reset the rate limiter
229    pub fn reset(&self) {
230        let mut state = self.state.lock();
231        state.tokens = self.config.capacity as f64;
232        state.last_refill = Instant::now();
233        state.total_requests = 0;
234        state.requests_allowed = 0;
235        state.requests_denied = 0;
236    }
237
238    /// Refill tokens based on elapsed time
239    fn refill_tokens(&self, state: &mut RateLimiterState) {
240        let now = Instant::now();
241        let elapsed = now.duration_since(state.last_refill);
242
243        if elapsed >= self.config.refill_interval {
244            let intervals = elapsed.as_secs_f64() / self.config.refill_interval.as_secs_f64();
245            let tokens_to_add = intervals * self.config.refill_rate as f64;
246
247            state.tokens = (state.tokens + tokens_to_add).min(self.config.capacity as f64);
248            state.last_refill = now;
249        }
250    }
251}
252
253impl Clone for RateLimiter {
254    fn clone(&self) -> Self {
255        Self {
256            config: self.config.clone(),
257            state: Arc::clone(&self.state),
258        }
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265    use tokio::time::{sleep, Duration};
266
267    #[tokio::test]
268    async fn test_rate_limiter_basic() {
269        let config = RateLimitConfig::new(10, Duration::from_secs(1));
270        let limiter = RateLimiter::new(config);
271
272        // Should be able to acquire up to capacity
273        for _ in 0..10 {
274            assert!(limiter.try_acquire(1));
275        }
276
277        // Next acquisition should fail
278        assert!(!limiter.try_acquire(1));
279
280        let stats = limiter.stats();
281        assert_eq!(stats.requests_allowed, 10);
282        assert_eq!(stats.requests_denied, 1);
283    }
284
285    #[tokio::test]
286    async fn test_rate_limiter_refill() {
287        let config = RateLimitConfig::new(5, Duration::from_millis(100));
288        let limiter = RateLimiter::new(config);
289
290        // Exhaust tokens
291        for _ in 0..5 {
292            assert!(limiter.try_acquire(1));
293        }
294        assert!(!limiter.try_acquire(1));
295
296        // Wait for refill
297        sleep(Duration::from_millis(150)).await;
298
299        // Should be able to acquire again
300        assert!(limiter.try_acquire(1));
301    }
302
303    #[tokio::test]
304    async fn test_rate_limiter_blocking() {
305        let config = RateLimitConfig::new(2, Duration::from_millis(100)).with_blocking(true);
306        let limiter = RateLimiter::new(config);
307
308        // Exhaust tokens
309        limiter.acquire(2).await;
310
311        // This should block and wait for refill
312        let start = Instant::now();
313        limiter.acquire(1).await;
314        let elapsed = start.elapsed();
315
316        // Should have waited at least ~100ms for refill
317        assert!(elapsed >= Duration::from_millis(50));
318    }
319
320    #[tokio::test]
321    async fn test_rate_limiter_stats() {
322        let config = RateLimitConfig::new(10, Duration::from_secs(1));
323        let limiter = RateLimiter::new(config);
324
325        // Make some requests
326        for _ in 0..5 {
327            limiter.try_acquire(1);
328        }
329
330        let stats = limiter.stats();
331        assert_eq!(stats.total_requests, 5);
332        assert_eq!(stats.requests_allowed, 5);
333        assert_eq!(stats.requests_denied, 0);
334        assert_eq!(stats.available_tokens, 5);
335        assert_eq!(stats.utilization_percent, 100.0);
336    }
337
338    #[tokio::test]
339    async fn test_rate_limiter_reset() {
340        let config = RateLimitConfig::new(5, Duration::from_secs(1));
341        let limiter = RateLimiter::new(config);
342
343        // Exhaust tokens
344        for _ in 0..5 {
345            limiter.try_acquire(1);
346        }
347
348        // Reset
349        limiter.reset();
350
351        // Should be able to acquire again
352        assert!(limiter.try_acquire(1));
353
354        let stats = limiter.stats();
355        assert_eq!(stats.total_requests, 1);
356    }
357
358    #[tokio::test]
359    async fn test_rate_limiter_per_second() {
360        let config = RateLimitConfig::per_second(100);
361        let limiter = RateLimiter::new(config);
362
363        assert_eq!(limiter.config.capacity, 100);
364        assert_eq!(limiter.config.refill_interval, Duration::from_secs(1));
365    }
366
367    #[tokio::test]
368    async fn test_rate_limiter_per_minute() {
369        let config = RateLimitConfig::per_minute(1000);
370        let limiter = RateLimiter::new(config);
371
372        assert_eq!(limiter.config.capacity, 1000);
373        assert_eq!(limiter.config.refill_interval, Duration::from_secs(60));
374    }
375}