Skip to main content

datasynth_core/rate_limit/
limiter.rs

1//! Token bucket rate limiter implementation.
2//!
3//! Provides a token bucket algorithm for rate limiting with support for
4//! burst capacity and multiple backpressure strategies.
5
6use std::collections::VecDeque;
7use std::time::{Duration, Instant};
8
9use serde::{Deserialize, Serialize};
10
11/// Configuration for rate limiting.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct RateLimitConfig {
14    /// Target entities per second.
15    pub entities_per_second: f64,
16    /// Burst size (maximum tokens in bucket).
17    pub burst_size: u32,
18    /// Backpressure strategy when rate is exceeded.
19    pub backpressure: RateLimitBackpressure,
20    /// Whether rate limiting is enabled.
21    pub enabled: bool,
22}
23
24impl Default for RateLimitConfig {
25    fn default() -> Self {
26        Self {
27            entities_per_second: 1000.0,
28            burst_size: 100,
29            backpressure: RateLimitBackpressure::Block,
30            enabled: true,
31        }
32    }
33}
34
35/// Backpressure strategy when rate limit is exceeded.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
37#[serde(rename_all = "snake_case")]
38pub enum RateLimitBackpressure {
39    /// Block until tokens are available.
40    #[default]
41    Block,
42    /// Drop excess items.
43    Drop,
44    /// Buffer items up to a limit, then block.
45    Buffer {
46        /// Maximum number of items to buffer.
47        max_buffered: usize,
48    },
49}
50
51/// Result of a rate limit acquisition.
52#[derive(Debug, Clone, PartialEq)]
53pub enum RateLimitAction {
54    /// Request can proceed immediately.
55    Proceed,
56    /// Request was dropped due to rate limiting.
57    Dropped,
58    /// Request was buffered.
59    Buffered {
60        /// Position in buffer.
61        position: usize,
62    },
63    /// Request waited before proceeding.
64    Waited {
65        /// Time waited in milliseconds.
66        wait_time_ms: u64,
67    },
68}
69
70/// Statistics for the rate limiter.
71#[derive(Debug, Clone, Default)]
72pub struct RateLimiterStats {
73    /// Total acquisitions attempted.
74    pub total_acquisitions: u64,
75    /// Acquisitions that proceeded immediately.
76    pub immediate_proceeds: u64,
77    /// Acquisitions that required waiting.
78    pub waits: u64,
79    /// Acquisitions that were dropped.
80    pub drops: u64,
81    /// Acquisitions that were buffered.
82    pub buffers: u64,
83    /// Total wait time in milliseconds.
84    pub total_wait_time_ms: u64,
85    /// Current tokens available.
86    pub current_tokens: f64,
87    /// Current buffer size.
88    pub buffer_size: usize,
89}
90
91/// Token bucket rate limiter.
92///
93/// Implements the token bucket algorithm for rate limiting:
94/// - Tokens are added at a steady rate up to a maximum (burst) capacity
95/// - Each operation consumes one token
96/// - If no tokens are available, behavior depends on backpressure strategy
97pub struct RateLimiter {
98    config: RateLimitConfig,
99    /// Current number of tokens in the bucket.
100    tokens: f64,
101    /// Time of last token refill.
102    last_refill: Instant,
103    /// Buffer for items waiting due to rate limiting.
104    buffer: VecDeque<Instant>,
105    /// Statistics.
106    stats: RateLimiterStats,
107}
108
109impl RateLimiter {
110    /// Creates a new rate limiter with the given configuration.
111    pub fn new(config: RateLimitConfig) -> Self {
112        Self {
113            tokens: config.burst_size as f64,
114            last_refill: Instant::now(),
115            buffer: VecDeque::new(),
116            stats: RateLimiterStats {
117                current_tokens: config.burst_size as f64,
118                ..Default::default()
119            },
120            config,
121        }
122    }
123
124    /// Creates a rate limiter with a simple rate (entities per second).
125    pub fn with_rate(entities_per_second: f64) -> Self {
126        Self::new(RateLimitConfig {
127            entities_per_second,
128            ..Default::default()
129        })
130    }
131
132    /// Creates a disabled rate limiter (always allows).
133    pub fn disabled() -> Self {
134        Self::new(RateLimitConfig {
135            enabled: false,
136            ..Default::default()
137        })
138    }
139
140    /// Acquires a token, blocking if necessary.
141    ///
142    /// Returns the action taken to acquire the token.
143    pub fn acquire(&mut self) -> RateLimitAction {
144        if !self.config.enabled {
145            self.stats.total_acquisitions += 1;
146            self.stats.immediate_proceeds += 1;
147            return RateLimitAction::Proceed;
148        }
149
150        self.stats.total_acquisitions += 1;
151        self.refill_tokens();
152
153        if self.tokens >= 1.0 {
154            self.tokens -= 1.0;
155            self.stats.current_tokens = self.tokens;
156            self.stats.immediate_proceeds += 1;
157            return RateLimitAction::Proceed;
158        }
159
160        // No tokens available, apply backpressure strategy
161        match self.config.backpressure {
162            RateLimitBackpressure::Block => {
163                let wait_time = self.wait_for_token();
164                self.stats.waits += 1;
165                self.stats.total_wait_time_ms += wait_time;
166                RateLimitAction::Waited {
167                    wait_time_ms: wait_time,
168                }
169            }
170            RateLimitBackpressure::Drop => {
171                self.stats.drops += 1;
172                RateLimitAction::Dropped
173            }
174            RateLimitBackpressure::Buffer { max_buffered } => {
175                if self.buffer.len() < max_buffered {
176                    self.buffer.push_back(Instant::now());
177                    self.stats.buffers += 1;
178                    self.stats.buffer_size = self.buffer.len();
179                    RateLimitAction::Buffered {
180                        position: self.buffer.len(),
181                    }
182                } else {
183                    // Buffer full, block
184                    let wait_time = self.wait_for_token();
185                    self.stats.waits += 1;
186                    self.stats.total_wait_time_ms += wait_time;
187                    RateLimitAction::Waited {
188                        wait_time_ms: wait_time,
189                    }
190                }
191            }
192        }
193    }
194
195    /// Tries to acquire a token without blocking.
196    ///
197    /// Returns `Some(action)` if a token was acquired, `None` if rate limited.
198    pub fn try_acquire(&mut self) -> Option<RateLimitAction> {
199        if !self.config.enabled {
200            self.stats.total_acquisitions += 1;
201            self.stats.immediate_proceeds += 1;
202            return Some(RateLimitAction::Proceed);
203        }
204
205        self.refill_tokens();
206
207        if self.tokens >= 1.0 {
208            self.tokens -= 1.0;
209            self.stats.current_tokens = self.tokens;
210            self.stats.total_acquisitions += 1;
211            self.stats.immediate_proceeds += 1;
212            Some(RateLimitAction::Proceed)
213        } else {
214            None
215        }
216    }
217
218    /// Acquires a token with a timeout.
219    ///
220    /// Returns the action taken, or `None` if the timeout was exceeded.
221    pub fn acquire_timeout(&mut self, timeout: Duration) -> Option<RateLimitAction> {
222        if !self.config.enabled {
223            self.stats.total_acquisitions += 1;
224            self.stats.immediate_proceeds += 1;
225            return Some(RateLimitAction::Proceed);
226        }
227
228        self.stats.total_acquisitions += 1;
229        self.refill_tokens();
230
231        if self.tokens >= 1.0 {
232            self.tokens -= 1.0;
233            self.stats.current_tokens = self.tokens;
234            self.stats.immediate_proceeds += 1;
235            return Some(RateLimitAction::Proceed);
236        }
237
238        // Calculate time needed for one token
239        let tokens_needed = 1.0 - self.tokens;
240        let time_needed = Duration::from_secs_f64(tokens_needed / self.config.entities_per_second);
241
242        if time_needed > timeout {
243            // Timeout exceeded
244            match self.config.backpressure {
245                RateLimitBackpressure::Drop => {
246                    self.stats.drops += 1;
247                    Some(RateLimitAction::Dropped)
248                }
249                _ => None,
250            }
251        } else {
252            std::thread::sleep(time_needed);
253            self.refill_tokens();
254            self.tokens -= 1.0;
255            self.stats.current_tokens = self.tokens;
256            self.stats.waits += 1;
257            self.stats.total_wait_time_ms += time_needed.as_millis() as u64;
258            Some(RateLimitAction::Waited {
259                wait_time_ms: time_needed.as_millis() as u64,
260            })
261        }
262    }
263
264    /// Returns the current statistics.
265    pub fn stats(&self) -> RateLimiterStats {
266        let mut stats = self.stats.clone();
267        stats.current_tokens = self.tokens;
268        stats.buffer_size = self.buffer.len();
269        stats
270    }
271
272    /// Resets the rate limiter to initial state.
273    pub fn reset(&mut self) {
274        self.tokens = self.config.burst_size as f64;
275        self.last_refill = Instant::now();
276        self.buffer.clear();
277        self.stats = RateLimiterStats {
278            current_tokens: self.tokens,
279            ..Default::default()
280        };
281    }
282
283    /// Returns the current number of available tokens.
284    pub fn available_tokens(&self) -> f64 {
285        self.tokens
286    }
287
288    /// Returns the configuration.
289    pub fn config(&self) -> &RateLimitConfig {
290        &self.config
291    }
292
293    /// Updates the rate limit.
294    pub fn set_rate(&mut self, entities_per_second: f64) {
295        self.config.entities_per_second = entities_per_second;
296    }
297
298    /// Enables or disables the rate limiter.
299    pub fn set_enabled(&mut self, enabled: bool) {
300        self.config.enabled = enabled;
301    }
302
303    /// Refills tokens based on elapsed time.
304    fn refill_tokens(&mut self) {
305        let now = Instant::now();
306        let elapsed = now.duration_since(self.last_refill);
307        let new_tokens = elapsed.as_secs_f64() * self.config.entities_per_second;
308
309        self.tokens = (self.tokens + new_tokens).min(self.config.burst_size as f64);
310        self.last_refill = now;
311    }
312
313    /// Waits until a token is available.
314    fn wait_for_token(&mut self) -> u64 {
315        let tokens_needed = 1.0 - self.tokens;
316        let wait_secs = tokens_needed / self.config.entities_per_second;
317        let wait_duration = Duration::from_secs_f64(wait_secs);
318
319        std::thread::sleep(wait_duration);
320
321        self.refill_tokens();
322        self.tokens -= 1.0;
323        self.stats.current_tokens = self.tokens;
324
325        wait_duration.as_millis() as u64
326    }
327
328    /// Processes the buffer, releasing items as tokens become available.
329    pub fn process_buffer(&mut self) -> Vec<Duration> {
330        self.refill_tokens();
331
332        let mut wait_times = Vec::new();
333
334        while !self.buffer.is_empty() && self.tokens >= 1.0 {
335            if let Some(enqueue_time) = self.buffer.pop_front() {
336                let wait_time = enqueue_time.elapsed();
337                wait_times.push(wait_time);
338                self.tokens -= 1.0;
339            }
340        }
341
342        self.stats.buffer_size = self.buffer.len();
343        self.stats.current_tokens = self.tokens;
344
345        wait_times
346    }
347}
348
349/// A rate-limited wrapper for any iterator.
350pub struct RateLimitedIterator<I> {
351    inner: I,
352    limiter: RateLimiter,
353}
354
355impl<I> RateLimitedIterator<I> {
356    /// Creates a new rate-limited iterator.
357    pub fn new(inner: I, limiter: RateLimiter) -> Self {
358        Self { inner, limiter }
359    }
360
361    /// Creates a rate-limited iterator with a simple rate.
362    pub fn with_rate(inner: I, entities_per_second: f64) -> Self {
363        Self::new(inner, RateLimiter::with_rate(entities_per_second))
364    }
365
366    /// Returns the limiter statistics.
367    pub fn stats(&self) -> RateLimiterStats {
368        self.limiter.stats()
369    }
370}
371
372impl<I: Iterator> Iterator for RateLimitedIterator<I> {
373    type Item = I::Item;
374
375    fn next(&mut self) -> Option<Self::Item> {
376        self.limiter.acquire();
377        self.inner.next()
378    }
379}
380
381/// Extension trait to add rate limiting to any iterator.
382pub trait RateLimitExt: Iterator + Sized {
383    /// Applies rate limiting to this iterator.
384    fn rate_limit(self, entities_per_second: f64) -> RateLimitedIterator<Self> {
385        RateLimitedIterator::with_rate(self, entities_per_second)
386    }
387
388    /// Applies rate limiting with custom config.
389    fn rate_limit_with(self, config: RateLimitConfig) -> RateLimitedIterator<Self> {
390        RateLimitedIterator::new(self, RateLimiter::new(config))
391    }
392}
393
394impl<I: Iterator> RateLimitExt for I {}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399    use std::time::Duration;
400
401    #[test]
402    fn test_rate_limiter_immediate_proceed() {
403        let config = RateLimitConfig {
404            entities_per_second: 1000.0,
405            burst_size: 10,
406            ..Default::default()
407        };
408        let mut limiter = RateLimiter::new(config);
409
410        // First 10 should proceed immediately (burst capacity)
411        for _ in 0..10 {
412            let action = limiter.acquire();
413            assert_eq!(action, RateLimitAction::Proceed);
414        }
415
416        let stats = limiter.stats();
417        assert_eq!(stats.total_acquisitions, 10);
418        assert_eq!(stats.immediate_proceeds, 10);
419    }
420
421    #[test]
422    fn test_rate_limiter_blocking() {
423        let config = RateLimitConfig {
424            entities_per_second: 1000.0,
425            burst_size: 1,
426            backpressure: RateLimitBackpressure::Block,
427            ..Default::default()
428        };
429        let mut limiter = RateLimiter::new(config);
430
431        // First should proceed
432        let action1 = limiter.acquire();
433        assert_eq!(action1, RateLimitAction::Proceed);
434
435        // Second should wait
436        let action2 = limiter.acquire();
437        assert!(matches!(action2, RateLimitAction::Waited { .. }));
438    }
439
440    #[test]
441    fn test_rate_limiter_drop() {
442        let config = RateLimitConfig {
443            entities_per_second: 10.0,
444            burst_size: 1,
445            backpressure: RateLimitBackpressure::Drop,
446            ..Default::default()
447        };
448        let mut limiter = RateLimiter::new(config);
449
450        // First should proceed
451        let action1 = limiter.acquire();
452        assert_eq!(action1, RateLimitAction::Proceed);
453
454        // Second should be dropped (no time to refill)
455        let action2 = limiter.acquire();
456        assert_eq!(action2, RateLimitAction::Dropped);
457
458        let stats = limiter.stats();
459        assert_eq!(stats.drops, 1);
460    }
461
462    #[test]
463    fn test_rate_limiter_buffer() {
464        let config = RateLimitConfig {
465            entities_per_second: 10.0,
466            burst_size: 1,
467            backpressure: RateLimitBackpressure::Buffer { max_buffered: 5 },
468            ..Default::default()
469        };
470        let mut limiter = RateLimiter::new(config);
471
472        // First should proceed
473        let action1 = limiter.acquire();
474        assert_eq!(action1, RateLimitAction::Proceed);
475
476        // Next should be buffered
477        let action2 = limiter.acquire();
478        assert!(matches!(action2, RateLimitAction::Buffered { position: 1 }));
479
480        let stats = limiter.stats();
481        assert_eq!(stats.buffers, 1);
482        assert_eq!(stats.buffer_size, 1);
483    }
484
485    #[test]
486    fn test_rate_limiter_try_acquire() {
487        let config = RateLimitConfig {
488            entities_per_second: 10.0,
489            burst_size: 1,
490            ..Default::default()
491        };
492        let mut limiter = RateLimiter::new(config);
493
494        // First should succeed
495        assert!(limiter.try_acquire().is_some());
496
497        // Second should fail (no time to refill)
498        assert!(limiter.try_acquire().is_none());
499    }
500
501    #[test]
502    fn test_rate_limiter_disabled() {
503        let mut limiter = RateLimiter::disabled();
504
505        // All should proceed immediately
506        for _ in 0..100 {
507            let action = limiter.acquire();
508            assert_eq!(action, RateLimitAction::Proceed);
509        }
510    }
511
512    #[test]
513    fn test_rate_limiter_reset() {
514        let config = RateLimitConfig {
515            entities_per_second: 10.0,
516            burst_size: 5,
517            ..Default::default()
518        };
519        let mut limiter = RateLimiter::new(config);
520
521        // Consume some tokens
522        for _ in 0..5 {
523            limiter.acquire();
524        }
525
526        assert!(limiter.available_tokens() < 1.0);
527
528        limiter.reset();
529
530        assert_eq!(limiter.available_tokens(), 5.0);
531    }
532
533    #[test]
534    fn test_rate_limited_iterator() {
535        let items = vec![1, 2, 3, 4, 5];
536        let rate_limited: Vec<_> = items
537            .into_iter()
538            .rate_limit_with(RateLimitConfig {
539                entities_per_second: 10000.0,
540                burst_size: 100,
541                ..Default::default()
542            })
543            .collect();
544
545        assert_eq!(rate_limited, vec![1, 2, 3, 4, 5]);
546    }
547
548    #[test]
549    fn test_rate_limiter_refill() {
550        let config = RateLimitConfig {
551            entities_per_second: 100.0, // 100 per second = 1 per 10ms
552            burst_size: 10,
553            ..Default::default()
554        };
555        let mut limiter = RateLimiter::new(config);
556
557        // Consume all tokens
558        for _ in 0..10 {
559            limiter.try_acquire();
560        }
561        assert!(limiter.available_tokens() < 1.0);
562
563        // Wait for refill (20ms should give ~2 tokens)
564        std::thread::sleep(Duration::from_millis(25));
565
566        // Should have some tokens now
567        assert!(limiter.try_acquire().is_some());
568    }
569
570    #[test]
571    fn test_rate_limit_config_default() {
572        let config = RateLimitConfig::default();
573        assert!(config.enabled);
574        assert_eq!(config.entities_per_second, 1000.0);
575        assert_eq!(config.burst_size, 100);
576    }
577}