datasynth_core/rate_limit/
limiter.rs1use std::collections::VecDeque;
7use std::time::{Duration, Instant};
8
9use serde::{Deserialize, Serialize};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct RateLimitConfig {
14 pub entities_per_second: f64,
16 pub burst_size: u32,
18 pub backpressure: RateLimitBackpressure,
20 pub enabled: bool,
22}
23
24impl Default for RateLimitConfig {
25 fn default() -> Self {
26 Self {
27 entities_per_second: 1000.0,
28 burst_size: 100,
29 backpressure: RateLimitBackpressure::Block,
30 enabled: true,
31 }
32 }
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
37#[serde(rename_all = "snake_case")]
38pub enum RateLimitBackpressure {
39 #[default]
41 Block,
42 Drop,
44 Buffer {
46 max_buffered: usize,
48 },
49}
50
51#[derive(Debug, Clone, PartialEq)]
53pub enum RateLimitAction {
54 Proceed,
56 Dropped,
58 Buffered {
60 position: usize,
62 },
63 Waited {
65 wait_time_ms: u64,
67 },
68}
69
70#[derive(Debug, Clone, Default)]
72pub struct RateLimiterStats {
73 pub total_acquisitions: u64,
75 pub immediate_proceeds: u64,
77 pub waits: u64,
79 pub drops: u64,
81 pub buffers: u64,
83 pub total_wait_time_ms: u64,
85 pub current_tokens: f64,
87 pub buffer_size: usize,
89}
90
91pub struct RateLimiter {
98 config: RateLimitConfig,
99 tokens: f64,
101 last_refill: Instant,
103 buffer: VecDeque<Instant>,
105 stats: RateLimiterStats,
107}
108
109impl RateLimiter {
110 pub fn new(config: RateLimitConfig) -> Self {
112 Self {
113 tokens: config.burst_size as f64,
114 last_refill: Instant::now(),
115 buffer: VecDeque::new(),
116 stats: RateLimiterStats {
117 current_tokens: config.burst_size as f64,
118 ..Default::default()
119 },
120 config,
121 }
122 }
123
124 pub fn with_rate(entities_per_second: f64) -> Self {
126 Self::new(RateLimitConfig {
127 entities_per_second,
128 ..Default::default()
129 })
130 }
131
132 pub fn disabled() -> Self {
134 Self::new(RateLimitConfig {
135 enabled: false,
136 ..Default::default()
137 })
138 }
139
140 pub fn acquire(&mut self) -> RateLimitAction {
144 if !self.config.enabled {
145 self.stats.total_acquisitions += 1;
146 self.stats.immediate_proceeds += 1;
147 return RateLimitAction::Proceed;
148 }
149
150 self.stats.total_acquisitions += 1;
151 self.refill_tokens();
152
153 if self.tokens >= 1.0 {
154 self.tokens -= 1.0;
155 self.stats.current_tokens = self.tokens;
156 self.stats.immediate_proceeds += 1;
157 return RateLimitAction::Proceed;
158 }
159
160 match self.config.backpressure {
162 RateLimitBackpressure::Block => {
163 let wait_time = self.wait_for_token();
164 self.stats.waits += 1;
165 self.stats.total_wait_time_ms += wait_time;
166 RateLimitAction::Waited {
167 wait_time_ms: wait_time,
168 }
169 }
170 RateLimitBackpressure::Drop => {
171 self.stats.drops += 1;
172 RateLimitAction::Dropped
173 }
174 RateLimitBackpressure::Buffer { max_buffered } => {
175 if self.buffer.len() < max_buffered {
176 self.buffer.push_back(Instant::now());
177 self.stats.buffers += 1;
178 self.stats.buffer_size = self.buffer.len();
179 RateLimitAction::Buffered {
180 position: self.buffer.len(),
181 }
182 } else {
183 let wait_time = self.wait_for_token();
185 self.stats.waits += 1;
186 self.stats.total_wait_time_ms += wait_time;
187 RateLimitAction::Waited {
188 wait_time_ms: wait_time,
189 }
190 }
191 }
192 }
193 }
194
195 pub fn try_acquire(&mut self) -> Option<RateLimitAction> {
199 if !self.config.enabled {
200 self.stats.total_acquisitions += 1;
201 self.stats.immediate_proceeds += 1;
202 return Some(RateLimitAction::Proceed);
203 }
204
205 self.refill_tokens();
206
207 if self.tokens >= 1.0 {
208 self.tokens -= 1.0;
209 self.stats.current_tokens = self.tokens;
210 self.stats.total_acquisitions += 1;
211 self.stats.immediate_proceeds += 1;
212 Some(RateLimitAction::Proceed)
213 } else {
214 None
215 }
216 }
217
218 pub fn acquire_timeout(&mut self, timeout: Duration) -> Option<RateLimitAction> {
222 if !self.config.enabled {
223 self.stats.total_acquisitions += 1;
224 self.stats.immediate_proceeds += 1;
225 return Some(RateLimitAction::Proceed);
226 }
227
228 self.stats.total_acquisitions += 1;
229 self.refill_tokens();
230
231 if self.tokens >= 1.0 {
232 self.tokens -= 1.0;
233 self.stats.current_tokens = self.tokens;
234 self.stats.immediate_proceeds += 1;
235 return Some(RateLimitAction::Proceed);
236 }
237
238 let tokens_needed = 1.0 - self.tokens;
240 let time_needed = Duration::from_secs_f64(tokens_needed / self.config.entities_per_second);
241
242 if time_needed > timeout {
243 match self.config.backpressure {
245 RateLimitBackpressure::Drop => {
246 self.stats.drops += 1;
247 Some(RateLimitAction::Dropped)
248 }
249 _ => None,
250 }
251 } else {
252 std::thread::sleep(time_needed);
253 self.refill_tokens();
254 self.tokens -= 1.0;
255 self.stats.current_tokens = self.tokens;
256 self.stats.waits += 1;
257 self.stats.total_wait_time_ms += time_needed.as_millis() as u64;
258 Some(RateLimitAction::Waited {
259 wait_time_ms: time_needed.as_millis() as u64,
260 })
261 }
262 }
263
264 pub fn stats(&self) -> RateLimiterStats {
266 let mut stats = self.stats.clone();
267 stats.current_tokens = self.tokens;
268 stats.buffer_size = self.buffer.len();
269 stats
270 }
271
272 pub fn reset(&mut self) {
274 self.tokens = self.config.burst_size as f64;
275 self.last_refill = Instant::now();
276 self.buffer.clear();
277 self.stats = RateLimiterStats {
278 current_tokens: self.tokens,
279 ..Default::default()
280 };
281 }
282
283 pub fn available_tokens(&self) -> f64 {
285 self.tokens
286 }
287
288 pub fn config(&self) -> &RateLimitConfig {
290 &self.config
291 }
292
293 pub fn set_rate(&mut self, entities_per_second: f64) {
295 self.config.entities_per_second = entities_per_second;
296 }
297
298 pub fn set_enabled(&mut self, enabled: bool) {
300 self.config.enabled = enabled;
301 }
302
303 fn refill_tokens(&mut self) {
305 let now = Instant::now();
306 let elapsed = now.duration_since(self.last_refill);
307 let new_tokens = elapsed.as_secs_f64() * self.config.entities_per_second;
308
309 self.tokens = (self.tokens + new_tokens).min(self.config.burst_size as f64);
310 self.last_refill = now;
311 }
312
313 fn wait_for_token(&mut self) -> u64 {
315 let tokens_needed = 1.0 - self.tokens;
316 let wait_secs = tokens_needed / self.config.entities_per_second;
317 let wait_duration = Duration::from_secs_f64(wait_secs);
318
319 std::thread::sleep(wait_duration);
320
321 self.refill_tokens();
322 self.tokens -= 1.0;
323 self.stats.current_tokens = self.tokens;
324
325 wait_duration.as_millis() as u64
326 }
327
328 pub fn process_buffer(&mut self) -> Vec<Duration> {
330 self.refill_tokens();
331
332 let mut wait_times = Vec::new();
333
334 while !self.buffer.is_empty() && self.tokens >= 1.0 {
335 if let Some(enqueue_time) = self.buffer.pop_front() {
336 let wait_time = enqueue_time.elapsed();
337 wait_times.push(wait_time);
338 self.tokens -= 1.0;
339 }
340 }
341
342 self.stats.buffer_size = self.buffer.len();
343 self.stats.current_tokens = self.tokens;
344
345 wait_times
346 }
347}
348
349pub struct RateLimitedIterator<I> {
351 inner: I,
352 limiter: RateLimiter,
353}
354
355impl<I> RateLimitedIterator<I> {
356 pub fn new(inner: I, limiter: RateLimiter) -> Self {
358 Self { inner, limiter }
359 }
360
361 pub fn with_rate(inner: I, entities_per_second: f64) -> Self {
363 Self::new(inner, RateLimiter::with_rate(entities_per_second))
364 }
365
366 pub fn stats(&self) -> RateLimiterStats {
368 self.limiter.stats()
369 }
370}
371
372impl<I: Iterator> Iterator for RateLimitedIterator<I> {
373 type Item = I::Item;
374
375 fn next(&mut self) -> Option<Self::Item> {
376 self.limiter.acquire();
377 self.inner.next()
378 }
379}
380
381pub trait RateLimitExt: Iterator + Sized {
383 fn rate_limit(self, entities_per_second: f64) -> RateLimitedIterator<Self> {
385 RateLimitedIterator::with_rate(self, entities_per_second)
386 }
387
388 fn rate_limit_with(self, config: RateLimitConfig) -> RateLimitedIterator<Self> {
390 RateLimitedIterator::new(self, RateLimiter::new(config))
391 }
392}
393
394impl<I: Iterator> RateLimitExt for I {}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399 use std::time::Duration;
400
401 #[test]
402 fn test_rate_limiter_immediate_proceed() {
403 let config = RateLimitConfig {
404 entities_per_second: 1000.0,
405 burst_size: 10,
406 ..Default::default()
407 };
408 let mut limiter = RateLimiter::new(config);
409
410 for _ in 0..10 {
412 let action = limiter.acquire();
413 assert_eq!(action, RateLimitAction::Proceed);
414 }
415
416 let stats = limiter.stats();
417 assert_eq!(stats.total_acquisitions, 10);
418 assert_eq!(stats.immediate_proceeds, 10);
419 }
420
421 #[test]
422 fn test_rate_limiter_blocking() {
423 let config = RateLimitConfig {
424 entities_per_second: 1000.0,
425 burst_size: 1,
426 backpressure: RateLimitBackpressure::Block,
427 ..Default::default()
428 };
429 let mut limiter = RateLimiter::new(config);
430
431 let action1 = limiter.acquire();
433 assert_eq!(action1, RateLimitAction::Proceed);
434
435 let action2 = limiter.acquire();
437 assert!(matches!(action2, RateLimitAction::Waited { .. }));
438 }
439
440 #[test]
441 fn test_rate_limiter_drop() {
442 let config = RateLimitConfig {
443 entities_per_second: 10.0,
444 burst_size: 1,
445 backpressure: RateLimitBackpressure::Drop,
446 ..Default::default()
447 };
448 let mut limiter = RateLimiter::new(config);
449
450 let action1 = limiter.acquire();
452 assert_eq!(action1, RateLimitAction::Proceed);
453
454 let action2 = limiter.acquire();
456 assert_eq!(action2, RateLimitAction::Dropped);
457
458 let stats = limiter.stats();
459 assert_eq!(stats.drops, 1);
460 }
461
462 #[test]
463 fn test_rate_limiter_buffer() {
464 let config = RateLimitConfig {
465 entities_per_second: 10.0,
466 burst_size: 1,
467 backpressure: RateLimitBackpressure::Buffer { max_buffered: 5 },
468 ..Default::default()
469 };
470 let mut limiter = RateLimiter::new(config);
471
472 let action1 = limiter.acquire();
474 assert_eq!(action1, RateLimitAction::Proceed);
475
476 let action2 = limiter.acquire();
478 assert!(matches!(action2, RateLimitAction::Buffered { position: 1 }));
479
480 let stats = limiter.stats();
481 assert_eq!(stats.buffers, 1);
482 assert_eq!(stats.buffer_size, 1);
483 }
484
485 #[test]
486 fn test_rate_limiter_try_acquire() {
487 let config = RateLimitConfig {
488 entities_per_second: 10.0,
489 burst_size: 1,
490 ..Default::default()
491 };
492 let mut limiter = RateLimiter::new(config);
493
494 assert!(limiter.try_acquire().is_some());
496
497 assert!(limiter.try_acquire().is_none());
499 }
500
501 #[test]
502 fn test_rate_limiter_disabled() {
503 let mut limiter = RateLimiter::disabled();
504
505 for _ in 0..100 {
507 let action = limiter.acquire();
508 assert_eq!(action, RateLimitAction::Proceed);
509 }
510 }
511
512 #[test]
513 fn test_rate_limiter_reset() {
514 let config = RateLimitConfig {
515 entities_per_second: 10.0,
516 burst_size: 5,
517 ..Default::default()
518 };
519 let mut limiter = RateLimiter::new(config);
520
521 for _ in 0..5 {
523 limiter.acquire();
524 }
525
526 assert!(limiter.available_tokens() < 1.0);
527
528 limiter.reset();
529
530 assert_eq!(limiter.available_tokens(), 5.0);
531 }
532
533 #[test]
534 fn test_rate_limited_iterator() {
535 let items = vec![1, 2, 3, 4, 5];
536 let rate_limited: Vec<_> = items
537 .into_iter()
538 .rate_limit_with(RateLimitConfig {
539 entities_per_second: 10000.0,
540 burst_size: 100,
541 ..Default::default()
542 })
543 .collect();
544
545 assert_eq!(rate_limited, vec![1, 2, 3, 4, 5]);
546 }
547
548 #[test]
549 fn test_rate_limiter_refill() {
550 let config = RateLimitConfig {
551 entities_per_second: 100.0, burst_size: 10,
553 ..Default::default()
554 };
555 let mut limiter = RateLimiter::new(config);
556
557 for _ in 0..10 {
559 limiter.try_acquire();
560 }
561 assert!(limiter.available_tokens() < 1.0);
562
563 std::thread::sleep(Duration::from_millis(25));
565
566 assert!(limiter.try_acquire().is_some());
568 }
569
570 #[test]
571 fn test_rate_limit_config_default() {
572 let config = RateLimitConfig::default();
573 assert!(config.enabled);
574 assert_eq!(config.entities_per_second, 1000.0);
575 assert_eq!(config.burst_size, 100);
576 }
577}