Skip to main content

ringkernel_core/
rate_limiting.rs

1//! Rate limiting for enterprise workloads.
2//!
3//! This module provides rate limiting capabilities for controlling request rates
4//! to GPU kernels and system resources, supporting both global and per-tenant limits.
5//!
6//! # Features
7//!
8//! - Token bucket and sliding window algorithms
9//! - Per-tenant rate limiting
10//! - Global rate limiting
11//! - Configurable burst capacity
12//! - Real-time statistics
13//!
14//! # Example
15//!
16//! ```ignore
17//! use ringkernel_core::rate_limiting::{RateLimiter, RateLimitConfig};
18//!
19//! let config = RateLimitConfig::default()
20//!     .with_requests_per_second(100)
21//!     .with_burst_size(50);
22//!
23//! let limiter = RateLimiter::new(config);
24//!
25//! if limiter.check("tenant_1").is_ok() {
26//!     // Request allowed
27//! }
28//! ```
29
30use std::collections::HashMap;
31use std::sync::atomic::{AtomicU64, Ordering};
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use parking_lot::RwLock;
36
37// ============================================================================
38// RATE LIMIT ERRORS
39// ============================================================================
40
41/// Errors that can occur during rate limiting.
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum RateLimitError {
44    /// Request rate exceeded the configured limit.
45    RateLimitExceeded {
46        /// Time until the rate limit resets.
47        retry_after: Duration,
48        /// Current request count in the window.
49        current_count: u64,
50        /// Maximum allowed requests.
51        limit: u64,
52    },
53    /// The specified tenant was not found.
54    TenantNotFound(String),
55    /// The rate limiter is disabled.
56    Disabled,
57}
58
59impl std::fmt::Display for RateLimitError {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        match self {
62            Self::RateLimitExceeded {
63                retry_after,
64                current_count,
65                limit,
66            } => {
67                write!(
68                    f,
69                    "Rate limit exceeded: {}/{} requests, retry after {:?}",
70                    current_count, limit, retry_after
71                )
72            }
73            Self::TenantNotFound(id) => write!(f, "Tenant not found: {}", id),
74            Self::Disabled => write!(f, "Rate limiter is disabled"),
75        }
76    }
77}
78
79impl std::error::Error for RateLimitError {}
80
81/// Result type for rate limiting operations.
82pub type RateLimitResult<T> = Result<T, RateLimitError>;
83
84// ============================================================================
85// RATE LIMIT ALGORITHMS
86// ============================================================================
87
88/// Rate limiting algorithm to use.
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
90pub enum RateLimitAlgorithm {
91    /// Token bucket algorithm.
92    ///
93    /// Tokens are added at a fixed rate. Each request consumes one token.
94    /// Allows bursting up to the bucket capacity.
95    #[default]
96    TokenBucket,
97    /// Sliding window algorithm.
98    ///
99    /// Counts requests in a sliding time window. Provides smoother
100    /// rate limiting than fixed windows.
101    SlidingWindow,
102    /// Fixed window algorithm.
103    ///
104    /// Counts requests in fixed time windows. Simple but can allow
105    /// bursting at window boundaries.
106    FixedWindow,
107    /// Leaky bucket algorithm.
108    ///
109    /// Requests are processed at a constant rate. Excess requests
110    /// are queued or rejected.
111    LeakyBucket,
112}
113
114// ============================================================================
115// RATE LIMIT CONFIGURATION
116// ============================================================================
117
118/// Configuration for rate limiting.
119#[derive(Debug, Clone)]
120pub struct RateLimitConfig {
121    /// Maximum requests per second.
122    pub requests_per_second: u64,
123    /// Burst capacity (for token bucket).
124    pub burst_size: u64,
125    /// Window size for sliding/fixed window algorithms.
126    pub window_size: Duration,
127    /// Algorithm to use.
128    pub algorithm: RateLimitAlgorithm,
129    /// Whether rate limiting is enabled.
130    pub enabled: bool,
131    /// Whether to track per-tenant limits.
132    pub per_tenant: bool,
133    /// Default quota for new tenants.
134    pub default_tenant_quota: u64,
135}
136
137impl Default for RateLimitConfig {
138    fn default() -> Self {
139        Self {
140            requests_per_second: 1000,
141            burst_size: 100,
142            window_size: Duration::from_secs(1),
143            algorithm: RateLimitAlgorithm::TokenBucket,
144            enabled: true,
145            per_tenant: true,
146            default_tenant_quota: 100,
147        }
148    }
149}
150
151impl RateLimitConfig {
152    /// Create a new configuration with default values.
153    pub fn new() -> Self {
154        Self::default()
155    }
156
157    /// Set the requests per second limit.
158    pub fn with_requests_per_second(mut self, rps: u64) -> Self {
159        self.requests_per_second = rps;
160        self
161    }
162
163    /// Set the burst size.
164    pub fn with_burst_size(mut self, size: u64) -> Self {
165        self.burst_size = size;
166        self
167    }
168
169    /// Set the window size.
170    pub fn with_window_size(mut self, size: Duration) -> Self {
171        self.window_size = size;
172        self
173    }
174
175    /// Set the algorithm.
176    pub fn with_algorithm(mut self, algorithm: RateLimitAlgorithm) -> Self {
177        self.algorithm = algorithm;
178        self
179    }
180
181    /// Enable or disable rate limiting.
182    pub fn with_enabled(mut self, enabled: bool) -> Self {
183        self.enabled = enabled;
184        self
185    }
186
187    /// Enable or disable per-tenant limiting.
188    pub fn with_per_tenant(mut self, per_tenant: bool) -> Self {
189        self.per_tenant = per_tenant;
190        self
191    }
192
193    /// Set the default tenant quota.
194    pub fn with_default_tenant_quota(mut self, quota: u64) -> Self {
195        self.default_tenant_quota = quota;
196        self
197    }
198
199    /// Create a strict rate limit configuration.
200    pub fn strict(rps: u64) -> Self {
201        Self {
202            requests_per_second: rps,
203            burst_size: rps / 10, // 10% burst
204            window_size: Duration::from_secs(1),
205            algorithm: RateLimitAlgorithm::SlidingWindow,
206            enabled: true,
207            per_tenant: true,
208            default_tenant_quota: rps / 10,
209        }
210    }
211
212    /// Create a permissive rate limit configuration.
213    pub fn permissive(rps: u64) -> Self {
214        Self {
215            requests_per_second: rps,
216            burst_size: rps * 2, // 200% burst
217            window_size: Duration::from_secs(1),
218            algorithm: RateLimitAlgorithm::TokenBucket,
219            enabled: true,
220            per_tenant: false,
221            default_tenant_quota: rps,
222        }
223    }
224}
225
226// ============================================================================
227// TOKEN BUCKET
228// ============================================================================
229
230/// Token bucket rate limiter state.
231#[derive(Debug)]
232struct TokenBucket {
233    /// Current number of tokens.
234    tokens: AtomicU64,
235    /// Maximum tokens (burst capacity).
236    capacity: u64,
237    /// Tokens added per second.
238    refill_rate: u64,
239    /// Last refill time.
240    last_refill: RwLock<Instant>,
241}
242
243impl TokenBucket {
244    fn new(capacity: u64, refill_rate: u64) -> Self {
245        Self {
246            tokens: AtomicU64::new(capacity),
247            capacity,
248            refill_rate,
249            last_refill: RwLock::new(Instant::now()),
250        }
251    }
252
253    fn try_acquire(&self) -> RateLimitResult<()> {
254        self.refill();
255
256        let current = self.tokens.load(Ordering::Acquire);
257        if current == 0 {
258            // Calculate retry after based on refill rate
259            let retry_after = if self.refill_rate > 0 {
260                Duration::from_secs_f64(1.0 / self.refill_rate as f64)
261            } else {
262                Duration::from_secs(1)
263            };
264
265            return Err(RateLimitError::RateLimitExceeded {
266                retry_after,
267                current_count: self.capacity - current,
268                limit: self.capacity,
269            });
270        }
271
272        // Try to decrement
273        loop {
274            let current = self.tokens.load(Ordering::Acquire);
275            if current == 0 {
276                let retry_after = if self.refill_rate > 0 {
277                    Duration::from_secs_f64(1.0 / self.refill_rate as f64)
278                } else {
279                    Duration::from_secs(1)
280                };
281                return Err(RateLimitError::RateLimitExceeded {
282                    retry_after,
283                    current_count: self.capacity,
284                    limit: self.capacity,
285                });
286            }
287
288            if self
289                .tokens
290                .compare_exchange(current, current - 1, Ordering::Release, Ordering::Acquire)
291                .is_ok()
292            {
293                return Ok(());
294            }
295        }
296    }
297
298    fn refill(&self) {
299        let now = Instant::now();
300        let mut last = self.last_refill.write();
301        let elapsed = now.duration_since(*last);
302
303        // Calculate tokens to add
304        let tokens_to_add = (elapsed.as_secs_f64() * self.refill_rate as f64) as u64;
305
306        if tokens_to_add > 0 {
307            let current = self.tokens.load(Ordering::Acquire);
308            let new_tokens = (current + tokens_to_add).min(self.capacity);
309            self.tokens.store(new_tokens, Ordering::Release);
310            *last = now;
311        }
312    }
313
314    #[allow(dead_code)]
315    fn available_tokens(&self) -> u64 {
316        self.refill();
317        self.tokens.load(Ordering::Acquire)
318    }
319}
320
321// ============================================================================
322// SLIDING WINDOW
323// ============================================================================
324
325/// Sliding window rate limiter state.
326#[derive(Debug)]
327struct SlidingWindow {
328    /// Request timestamps in the current window.
329    requests: RwLock<Vec<Instant>>,
330    /// Window size.
331    window_size: Duration,
332    /// Maximum requests per window.
333    limit: u64,
334}
335
336impl SlidingWindow {
337    fn new(window_size: Duration, limit: u64) -> Self {
338        Self {
339            requests: RwLock::new(Vec::with_capacity(limit as usize)),
340            window_size,
341            limit,
342        }
343    }
344
345    fn try_acquire(&self) -> RateLimitResult<()> {
346        let now = Instant::now();
347        let window_start = now - self.window_size;
348
349        let mut requests = self.requests.write();
350
351        // Remove old requests
352        requests.retain(|&t| t > window_start);
353
354        if requests.len() as u64 >= self.limit {
355            // Find oldest request in window to calculate retry time
356            let oldest = requests.iter().min().copied().unwrap_or(now);
357            let retry_after = oldest + self.window_size - now;
358
359            return Err(RateLimitError::RateLimitExceeded {
360                retry_after,
361                current_count: requests.len() as u64,
362                limit: self.limit,
363            });
364        }
365
366        requests.push(now);
367        Ok(())
368    }
369
370    #[allow(dead_code)]
371    fn current_count(&self) -> u64 {
372        let now = Instant::now();
373        let window_start = now - self.window_size;
374
375        let requests = self.requests.read();
376        requests.iter().filter(|&&t| t > window_start).count() as u64
377    }
378}
379
380// ============================================================================
381// FIXED WINDOW
382// ============================================================================
383
384/// Fixed window rate limiter state.
385#[derive(Debug)]
386struct FixedWindow {
387    /// Request count in current window.
388    count: AtomicU64,
389    /// Window start time.
390    window_start: RwLock<Instant>,
391    /// Window size.
392    window_size: Duration,
393    /// Maximum requests per window.
394    limit: u64,
395}
396
397impl FixedWindow {
398    fn new(window_size: Duration, limit: u64) -> Self {
399        Self {
400            count: AtomicU64::new(0),
401            window_start: RwLock::new(Instant::now()),
402            window_size,
403            limit,
404        }
405    }
406
407    fn try_acquire(&self) -> RateLimitResult<()> {
408        let now = Instant::now();
409
410        // Check if we need to reset the window
411        {
412            let start = *self.window_start.read();
413            if now.duration_since(start) >= self.window_size {
414                let mut start_write = self.window_start.write();
415                // Double-check after acquiring write lock
416                if now.duration_since(*start_write) >= self.window_size {
417                    *start_write = now;
418                    self.count.store(0, Ordering::Release);
419                }
420            }
421        }
422
423        // Try to increment count
424        loop {
425            let current = self.count.load(Ordering::Acquire);
426            if current >= self.limit {
427                let start = *self.window_start.read();
428                let retry_after = (start + self.window_size).saturating_duration_since(now);
429
430                return Err(RateLimitError::RateLimitExceeded {
431                    retry_after,
432                    current_count: current,
433                    limit: self.limit,
434                });
435            }
436
437            if self
438                .count
439                .compare_exchange(current, current + 1, Ordering::Release, Ordering::Acquire)
440                .is_ok()
441            {
442                return Ok(());
443            }
444        }
445    }
446
447    #[allow(dead_code)]
448    fn current_count(&self) -> u64 {
449        self.count.load(Ordering::Acquire)
450    }
451}
452
453// ============================================================================
454// LEAKY BUCKET
455// ============================================================================
456
457/// Leaky bucket rate limiter state.
458#[derive(Debug)]
459struct LeakyBucket {
460    /// Current water level (pending requests).
461    level: AtomicU64,
462    /// Maximum capacity.
463    capacity: u64,
464    /// Leak rate (requests per second).
465    leak_rate: u64,
466    /// Last leak time.
467    last_leak: RwLock<Instant>,
468}
469
470impl LeakyBucket {
471    fn new(capacity: u64, leak_rate: u64) -> Self {
472        Self {
473            level: AtomicU64::new(0),
474            capacity,
475            leak_rate,
476            last_leak: RwLock::new(Instant::now()),
477        }
478    }
479
480    fn try_acquire(&self) -> RateLimitResult<()> {
481        self.leak();
482
483        loop {
484            let current = self.level.load(Ordering::Acquire);
485            if current >= self.capacity {
486                let retry_after = if self.leak_rate > 0 {
487                    Duration::from_secs_f64(1.0 / self.leak_rate as f64)
488                } else {
489                    Duration::from_secs(1)
490                };
491
492                return Err(RateLimitError::RateLimitExceeded {
493                    retry_after,
494                    current_count: current,
495                    limit: self.capacity,
496                });
497            }
498
499            if self
500                .level
501                .compare_exchange(current, current + 1, Ordering::Release, Ordering::Acquire)
502                .is_ok()
503            {
504                return Ok(());
505            }
506        }
507    }
508
509    fn leak(&self) {
510        let now = Instant::now();
511        let mut last = self.last_leak.write();
512        let elapsed = now.duration_since(*last);
513
514        let leaked = (elapsed.as_secs_f64() * self.leak_rate as f64) as u64;
515
516        if leaked > 0 {
517            let current = self.level.load(Ordering::Acquire);
518            let new_level = current.saturating_sub(leaked);
519            self.level.store(new_level, Ordering::Release);
520            *last = now;
521        }
522    }
523
524    #[allow(dead_code)]
525    fn current_level(&self) -> u64 {
526        self.leak();
527        self.level.load(Ordering::Acquire)
528    }
529}
530
531// ============================================================================
532// RATE LIMITER
533// ============================================================================
534
535/// Internal limiter state.
536enum LimiterState {
537    TokenBucket(TokenBucket),
538    SlidingWindow(SlidingWindow),
539    FixedWindow(FixedWindow),
540    LeakyBucket(LeakyBucket),
541}
542
543impl LimiterState {
544    fn try_acquire(&self) -> RateLimitResult<()> {
545        match self {
546            Self::TokenBucket(b) => b.try_acquire(),
547            Self::SlidingWindow(w) => w.try_acquire(),
548            Self::FixedWindow(w) => w.try_acquire(),
549            Self::LeakyBucket(b) => b.try_acquire(),
550        }
551    }
552}
553
554/// Per-tenant rate limiter entry.
555struct TenantLimiter {
556    state: LimiterState,
557    quota: u64,
558}
559
560/// Global rate limiter.
561///
562/// Provides rate limiting for GPU kernel requests with support for
563/// multiple algorithms and per-tenant limits.
564pub struct RateLimiter {
565    config: RateLimitConfig,
566    /// Global limiter state.
567    global: LimiterState,
568    /// Per-tenant limiters.
569    tenants: RwLock<HashMap<String, TenantLimiter>>,
570    /// Statistics.
571    stats: RateLimiterStats,
572}
573
574impl RateLimiter {
575    /// Create a new rate limiter with the given configuration.
576    pub fn new(config: RateLimitConfig) -> Self {
577        let global = Self::create_global_limiter(&config);
578
579        Self {
580            config,
581            global,
582            tenants: RwLock::new(HashMap::new()),
583            stats: RateLimiterStats::default(),
584        }
585    }
586
587    fn create_global_limiter(config: &RateLimitConfig) -> LimiterState {
588        let limit = config.requests_per_second;
589        match config.algorithm {
590            RateLimitAlgorithm::TokenBucket => {
591                LimiterState::TokenBucket(TokenBucket::new(config.burst_size, limit))
592            }
593            RateLimitAlgorithm::SlidingWindow => {
594                LimiterState::SlidingWindow(SlidingWindow::new(config.window_size, limit))
595            }
596            RateLimitAlgorithm::FixedWindow => {
597                LimiterState::FixedWindow(FixedWindow::new(config.window_size, limit))
598            }
599            RateLimitAlgorithm::LeakyBucket => {
600                LimiterState::LeakyBucket(LeakyBucket::new(config.burst_size, limit))
601            }
602        }
603    }
604
605    fn create_tenant_limiter(config: &RateLimitConfig, quota: u64) -> LimiterState {
606        // For tenant limiters, the quota is both the capacity and the refill rate
607        match config.algorithm {
608            RateLimitAlgorithm::TokenBucket => {
609                // Tenant bucket: capacity = quota (allows up to quota requests)
610                // refill_rate = quota (refills at quota per second)
611                LimiterState::TokenBucket(TokenBucket::new(quota, quota))
612            }
613            RateLimitAlgorithm::SlidingWindow => {
614                LimiterState::SlidingWindow(SlidingWindow::new(config.window_size, quota))
615            }
616            RateLimitAlgorithm::FixedWindow => {
617                LimiterState::FixedWindow(FixedWindow::new(config.window_size, quota))
618            }
619            RateLimitAlgorithm::LeakyBucket => {
620                LimiterState::LeakyBucket(LeakyBucket::new(quota, quota))
621            }
622        }
623    }
624
625    /// Check if a request should be allowed (global limit).
626    pub fn check(&self) -> RateLimitResult<()> {
627        if !self.config.enabled {
628            return Err(RateLimitError::Disabled);
629        }
630
631        self.stats.total_requests.fetch_add(1, Ordering::Relaxed);
632
633        match self.global.try_acquire() {
634            Ok(()) => {
635                self.stats.allowed_requests.fetch_add(1, Ordering::Relaxed);
636                Ok(())
637            }
638            Err(e) => {
639                self.stats.rejected_requests.fetch_add(1, Ordering::Relaxed);
640                Err(e)
641            }
642        }
643    }
644
645    /// Check if a request should be allowed for a specific tenant.
646    pub fn check_tenant(&self, tenant_id: &str) -> RateLimitResult<()> {
647        if !self.config.enabled {
648            return Err(RateLimitError::Disabled);
649        }
650
651        self.stats.total_requests.fetch_add(1, Ordering::Relaxed);
652
653        // First check global limit
654        if let Err(e) = self.global.try_acquire() {
655            self.stats.rejected_requests.fetch_add(1, Ordering::Relaxed);
656            return Err(e);
657        }
658
659        // Then check tenant limit if per-tenant is enabled
660        if self.config.per_tenant {
661            let tenants = self.tenants.read();
662            if let Some(limiter) = tenants.get(tenant_id) {
663                match limiter.state.try_acquire() {
664                    Ok(()) => {
665                        self.stats.allowed_requests.fetch_add(1, Ordering::Relaxed);
666                        Ok(())
667                    }
668                    Err(e) => {
669                        self.stats.rejected_requests.fetch_add(1, Ordering::Relaxed);
670                        Err(e)
671                    }
672                }
673            } else {
674                // Tenant not registered, use default quota
675                drop(tenants);
676                self.register_tenant(tenant_id, self.config.default_tenant_quota);
677                self.check_tenant(tenant_id)
678            }
679        } else {
680            self.stats.allowed_requests.fetch_add(1, Ordering::Relaxed);
681            Ok(())
682        }
683    }
684
685    /// Register a tenant with a custom quota.
686    pub fn register_tenant(&self, tenant_id: &str, quota: u64) {
687        let limiter = TenantLimiter {
688            state: Self::create_tenant_limiter(&self.config, quota),
689            quota,
690        };
691
692        let mut tenants = self.tenants.write();
693        tenants.insert(tenant_id.to_string(), limiter);
694    }
695
696    /// Update a tenant's quota.
697    pub fn update_tenant_quota(&self, tenant_id: &str, quota: u64) -> RateLimitResult<()> {
698        let mut tenants = self.tenants.write();
699        if let Some(limiter) = tenants.get_mut(tenant_id) {
700            limiter.quota = quota;
701            limiter.state = Self::create_tenant_limiter(&self.config, quota);
702            Ok(())
703        } else {
704            Err(RateLimitError::TenantNotFound(tenant_id.to_string()))
705        }
706    }
707
708    /// Remove a tenant.
709    pub fn remove_tenant(&self, tenant_id: &str) -> bool {
710        let mut tenants = self.tenants.write();
711        tenants.remove(tenant_id).is_some()
712    }
713
714    /// Get the number of registered tenants.
715    pub fn tenant_count(&self) -> usize {
716        self.tenants.read().len()
717    }
718
719    /// Get the current statistics.
720    pub fn stats(&self) -> RateLimiterStatsSnapshot {
721        RateLimiterStatsSnapshot {
722            total_requests: self.stats.total_requests.load(Ordering::Relaxed),
723            allowed_requests: self.stats.allowed_requests.load(Ordering::Relaxed),
724            rejected_requests: self.stats.rejected_requests.load(Ordering::Relaxed),
725            tenant_count: self.tenant_count(),
726        }
727    }
728
729    /// Reset statistics.
730    pub fn reset_stats(&self) {
731        self.stats.total_requests.store(0, Ordering::Relaxed);
732        self.stats.allowed_requests.store(0, Ordering::Relaxed);
733        self.stats.rejected_requests.store(0, Ordering::Relaxed);
734    }
735
736    /// Get the configuration.
737    pub fn config(&self) -> &RateLimitConfig {
738        &self.config
739    }
740
741    /// Check if rate limiting is enabled.
742    pub fn is_enabled(&self) -> bool {
743        self.config.enabled
744    }
745}
746
747// ============================================================================
748// STATISTICS
749// ============================================================================
750
751/// Internal statistics counters.
752#[derive(Debug, Default)]
753struct RateLimiterStats {
754    total_requests: AtomicU64,
755    allowed_requests: AtomicU64,
756    rejected_requests: AtomicU64,
757}
758
759/// Snapshot of rate limiter statistics.
760#[derive(Debug, Clone, PartialEq, Eq)]
761pub struct RateLimiterStatsSnapshot {
762    /// Total number of requests checked.
763    pub total_requests: u64,
764    /// Number of requests allowed.
765    pub allowed_requests: u64,
766    /// Number of requests rejected.
767    pub rejected_requests: u64,
768    /// Number of registered tenants.
769    pub tenant_count: usize,
770}
771
772impl RateLimiterStatsSnapshot {
773    /// Calculate the rejection rate.
774    pub fn rejection_rate(&self) -> f64 {
775        if self.total_requests == 0 {
776            0.0
777        } else {
778            self.rejected_requests as f64 / self.total_requests as f64
779        }
780    }
781
782    /// Calculate the acceptance rate.
783    pub fn acceptance_rate(&self) -> f64 {
784        if self.total_requests == 0 {
785            1.0
786        } else {
787            self.allowed_requests as f64 / self.total_requests as f64
788        }
789    }
790}
791
792// ============================================================================
793// RATE LIMIT GUARD (RAII)
794// ============================================================================
795
796/// RAII guard for rate-limited operations.
797///
798/// Automatically tracks the completion of rate-limited operations.
799pub struct RateLimitGuard<'a> {
800    limiter: &'a RateLimiter,
801    tenant_id: Option<String>,
802    _started: Instant,
803}
804
805impl<'a> RateLimitGuard<'a> {
806    /// Create a new guard after successfully acquiring a rate limit token.
807    fn new(limiter: &'a RateLimiter, tenant_id: Option<String>) -> Self {
808        Self {
809            limiter,
810            tenant_id,
811            _started: Instant::now(),
812        }
813    }
814
815    /// Get the tenant ID if this is a tenant-scoped guard.
816    pub fn tenant_id(&self) -> Option<&str> {
817        self.tenant_id.as_deref()
818    }
819}
820
821impl<'a> Drop for RateLimitGuard<'a> {
822    fn drop(&mut self) {
823        // Could track completion time here for metrics
824        let _ = self.limiter;
825    }
826}
827
828/// Extension trait for acquiring guards.
829pub trait RateLimiterExt {
830    /// Try to acquire a rate limit guard.
831    fn try_acquire(&self) -> RateLimitResult<RateLimitGuard<'_>>;
832
833    /// Try to acquire a tenant-scoped rate limit guard.
834    fn try_acquire_tenant(&self, tenant_id: &str) -> RateLimitResult<RateLimitGuard<'_>>;
835}
836
837impl RateLimiterExt for RateLimiter {
838    fn try_acquire(&self) -> RateLimitResult<RateLimitGuard<'_>> {
839        self.check()?;
840        Ok(RateLimitGuard::new(self, None))
841    }
842
843    fn try_acquire_tenant(&self, tenant_id: &str) -> RateLimitResult<RateLimitGuard<'_>> {
844        self.check_tenant(tenant_id)?;
845        Ok(RateLimitGuard::new(self, Some(tenant_id.to_string())))
846    }
847}
848
849// ============================================================================
850// RATE LIMITER BUILDER
851// ============================================================================
852
853/// Builder for creating rate limiters.
854pub struct RateLimiterBuilder {
855    config: RateLimitConfig,
856    tenants: Vec<(String, u64)>,
857}
858
859impl RateLimiterBuilder {
860    /// Create a new builder with default configuration.
861    pub fn new() -> Self {
862        Self {
863            config: RateLimitConfig::default(),
864            tenants: Vec::new(),
865        }
866    }
867
868    /// Set the requests per second limit.
869    pub fn with_requests_per_second(mut self, rps: u64) -> Self {
870        self.config.requests_per_second = rps;
871        self
872    }
873
874    /// Set the burst size.
875    pub fn with_burst_size(mut self, size: u64) -> Self {
876        self.config.burst_size = size;
877        self
878    }
879
880    /// Set the algorithm.
881    pub fn with_algorithm(mut self, algorithm: RateLimitAlgorithm) -> Self {
882        self.config.algorithm = algorithm;
883        self
884    }
885
886    /// Set the window size.
887    pub fn with_window_size(mut self, size: Duration) -> Self {
888        self.config.window_size = size;
889        self
890    }
891
892    /// Enable or disable rate limiting.
893    pub fn with_enabled(mut self, enabled: bool) -> Self {
894        self.config.enabled = enabled;
895        self
896    }
897
898    /// Enable or disable per-tenant limiting.
899    pub fn with_per_tenant(mut self, per_tenant: bool) -> Self {
900        self.config.per_tenant = per_tenant;
901        self
902    }
903
904    /// Add a tenant with a specific quota.
905    pub fn with_tenant(mut self, tenant_id: impl Into<String>, quota: u64) -> Self {
906        self.tenants.push((tenant_id.into(), quota));
907        self
908    }
909
910    /// Build the rate limiter.
911    pub fn build(self) -> RateLimiter {
912        let limiter = RateLimiter::new(self.config);
913
914        for (tenant_id, quota) in self.tenants {
915            limiter.register_tenant(&tenant_id, quota);
916        }
917
918        limiter
919    }
920}
921
922impl Default for RateLimiterBuilder {
923    fn default() -> Self {
924        Self::new()
925    }
926}
927
928// ============================================================================
929// SHARED RATE LIMITER
930// ============================================================================
931
932/// Thread-safe, shareable rate limiter.
933pub type SharedRateLimiter = Arc<RateLimiter>;
934
935/// Create a shared rate limiter.
936pub fn shared_rate_limiter(config: RateLimitConfig) -> SharedRateLimiter {
937    Arc::new(RateLimiter::new(config))
938}
939
940// ============================================================================
941// TESTS
942// ============================================================================
943
944#[cfg(test)]
945mod tests {
946    use super::*;
947
948    #[test]
949    fn test_rate_limit_config_default() {
950        let config = RateLimitConfig::default();
951        assert_eq!(config.requests_per_second, 1000);
952        assert_eq!(config.burst_size, 100);
953        assert!(config.enabled);
954        assert!(config.per_tenant);
955    }
956
957    #[test]
958    fn test_rate_limit_config_builder() {
959        let config = RateLimitConfig::new()
960            .with_requests_per_second(500)
961            .with_burst_size(50)
962            .with_algorithm(RateLimitAlgorithm::SlidingWindow);
963
964        assert_eq!(config.requests_per_second, 500);
965        assert_eq!(config.burst_size, 50);
966        assert_eq!(config.algorithm, RateLimitAlgorithm::SlidingWindow);
967    }
968
969    #[test]
970    fn test_rate_limit_config_strict() {
971        let config = RateLimitConfig::strict(100);
972        assert_eq!(config.requests_per_second, 100);
973        assert_eq!(config.burst_size, 10);
974        assert_eq!(config.algorithm, RateLimitAlgorithm::SlidingWindow);
975    }
976
977    #[test]
978    fn test_rate_limiter_allows_within_limit() {
979        let config = RateLimitConfig::new()
980            .with_requests_per_second(100)
981            .with_burst_size(10);
982
983        let limiter = RateLimiter::new(config);
984
985        // Should allow up to burst_size requests
986        for _ in 0..5 {
987            assert!(limiter.check().is_ok());
988        }
989    }
990
991    #[test]
992    fn test_rate_limiter_rejects_over_limit() {
993        let config = RateLimitConfig::new()
994            .with_requests_per_second(100)
995            .with_burst_size(5)
996            .with_algorithm(RateLimitAlgorithm::TokenBucket);
997
998        let limiter = RateLimiter::new(config);
999
1000        // Exhaust the bucket
1001        for _ in 0..5 {
1002            assert!(limiter.check().is_ok());
1003        }
1004
1005        // Should reject
1006        let result = limiter.check();
1007        assert!(matches!(
1008            result,
1009            Err(RateLimitError::RateLimitExceeded { .. })
1010        ));
1011    }
1012
1013    #[test]
1014    fn test_rate_limiter_disabled() {
1015        let config = RateLimitConfig::new().with_enabled(false);
1016        let limiter = RateLimiter::new(config);
1017
1018        let result = limiter.check();
1019        assert!(matches!(result, Err(RateLimitError::Disabled)));
1020    }
1021
1022    #[test]
1023    fn test_rate_limiter_tenant() {
1024        let config = RateLimitConfig::new()
1025            .with_requests_per_second(1000)
1026            .with_burst_size(100)
1027            .with_per_tenant(true)
1028            .with_default_tenant_quota(5);
1029
1030        let limiter = RateLimiter::new(config);
1031
1032        // Auto-register tenant on first request
1033        for _ in 0..5 {
1034            assert!(limiter.check_tenant("tenant_1").is_ok());
1035        }
1036
1037        // Tenant limit should be hit
1038        let result = limiter.check_tenant("tenant_1");
1039        assert!(matches!(
1040            result,
1041            Err(RateLimitError::RateLimitExceeded { .. })
1042        ));
1043
1044        // Different tenant should still work
1045        assert!(limiter.check_tenant("tenant_2").is_ok());
1046    }
1047
1048    #[test]
1049    fn test_rate_limiter_register_tenant() {
1050        let config = RateLimitConfig::new()
1051            .with_requests_per_second(1000)
1052            .with_burst_size(100);
1053
1054        let limiter = RateLimiter::new(config);
1055        limiter.register_tenant("tenant_1", 10);
1056
1057        assert_eq!(limiter.tenant_count(), 1);
1058
1059        for _ in 0..10 {
1060            assert!(limiter.check_tenant("tenant_1").is_ok());
1061        }
1062    }
1063
1064    #[test]
1065    fn test_rate_limiter_stats() {
1066        let config = RateLimitConfig::new()
1067            .with_requests_per_second(100)
1068            .with_burst_size(5);
1069
1070        let limiter = RateLimiter::new(config);
1071
1072        for _ in 0..5 {
1073            let _ = limiter.check();
1074        }
1075        // This should be rejected
1076        let _ = limiter.check();
1077
1078        let stats = limiter.stats();
1079        assert_eq!(stats.total_requests, 6);
1080        assert_eq!(stats.allowed_requests, 5);
1081        assert_eq!(stats.rejected_requests, 1);
1082    }
1083
1084    #[test]
1085    fn test_rate_limiter_stats_snapshot() {
1086        let stats = RateLimiterStatsSnapshot {
1087            total_requests: 100,
1088            allowed_requests: 80,
1089            rejected_requests: 20,
1090            tenant_count: 5,
1091        };
1092
1093        assert!((stats.rejection_rate() - 0.2).abs() < f64::EPSILON);
1094        assert!((stats.acceptance_rate() - 0.8).abs() < f64::EPSILON);
1095    }
1096
1097    #[test]
1098    fn test_sliding_window() {
1099        let config = RateLimitConfig::new()
1100            .with_algorithm(RateLimitAlgorithm::SlidingWindow)
1101            .with_requests_per_second(5)
1102            .with_window_size(Duration::from_secs(1));
1103
1104        let limiter = RateLimiter::new(config);
1105
1106        for _ in 0..5 {
1107            assert!(limiter.check().is_ok());
1108        }
1109
1110        let result = limiter.check();
1111        assert!(matches!(
1112            result,
1113            Err(RateLimitError::RateLimitExceeded { .. })
1114        ));
1115    }
1116
1117    #[test]
1118    fn test_fixed_window() {
1119        let config = RateLimitConfig::new()
1120            .with_algorithm(RateLimitAlgorithm::FixedWindow)
1121            .with_requests_per_second(5)
1122            .with_window_size(Duration::from_secs(1));
1123
1124        let limiter = RateLimiter::new(config);
1125
1126        for _ in 0..5 {
1127            assert!(limiter.check().is_ok());
1128        }
1129
1130        let result = limiter.check();
1131        assert!(matches!(
1132            result,
1133            Err(RateLimitError::RateLimitExceeded { .. })
1134        ));
1135    }
1136
1137    #[test]
1138    fn test_leaky_bucket() {
1139        let config = RateLimitConfig::new()
1140            .with_algorithm(RateLimitAlgorithm::LeakyBucket)
1141            .with_requests_per_second(100)
1142            .with_burst_size(5);
1143
1144        let limiter = RateLimiter::new(config);
1145
1146        for _ in 0..5 {
1147            assert!(limiter.check().is_ok());
1148        }
1149
1150        let result = limiter.check();
1151        assert!(matches!(
1152            result,
1153            Err(RateLimitError::RateLimitExceeded { .. })
1154        ));
1155    }
1156
1157    #[test]
1158    fn test_rate_limiter_builder() {
1159        let limiter = RateLimiterBuilder::new()
1160            .with_requests_per_second(500)
1161            .with_burst_size(50)
1162            .with_tenant("tenant_1", 100)
1163            .with_tenant("tenant_2", 200)
1164            .build();
1165
1166        assert_eq!(limiter.tenant_count(), 2);
1167        assert_eq!(limiter.config().requests_per_second, 500);
1168    }
1169
1170    #[test]
1171    fn test_update_tenant_quota() {
1172        let limiter = RateLimiterBuilder::new().with_tenant("tenant_1", 5).build();
1173
1174        // Use up initial quota
1175        for _ in 0..5 {
1176            assert!(limiter.check_tenant("tenant_1").is_ok());
1177        }
1178
1179        // Should fail
1180        assert!(limiter.check_tenant("tenant_1").is_err());
1181
1182        // Update quota
1183        assert!(limiter.update_tenant_quota("tenant_1", 10).is_ok());
1184
1185        // Should work again
1186        for _ in 0..10 {
1187            assert!(limiter.check_tenant("tenant_1").is_ok());
1188        }
1189    }
1190
1191    #[test]
1192    fn test_remove_tenant() {
1193        let limiter = RateLimiterBuilder::new()
1194            .with_tenant("tenant_1", 100)
1195            .build();
1196
1197        assert_eq!(limiter.tenant_count(), 1);
1198        assert!(limiter.remove_tenant("tenant_1"));
1199        assert_eq!(limiter.tenant_count(), 0);
1200        assert!(!limiter.remove_tenant("tenant_1")); // Already removed
1201    }
1202
1203    #[test]
1204    fn test_rate_limit_guard() {
1205        let config = RateLimitConfig::new().with_burst_size(10);
1206        let limiter = RateLimiter::new(config);
1207
1208        {
1209            let guard = limiter.try_acquire().unwrap();
1210            assert!(guard.tenant_id().is_none());
1211        }
1212
1213        limiter.register_tenant("tenant_1", 10);
1214        {
1215            let guard = limiter.try_acquire_tenant("tenant_1").unwrap();
1216            assert_eq!(guard.tenant_id(), Some("tenant_1"));
1217        }
1218    }
1219}