Skip to main content

fraiseql_auth/
rate_limiting.rs

1//! Rate limiting for brute-force and abuse protection.
2//!
3//! Provides [`KeyedRateLimiter`] — a per-key sliding-window counter backed by
4//! a `Mutex<HashMap>` — and [`RateLimiters`], a pre-built set of limiters for
5//! each authentication endpoint.
6// # Threading Model
7//
8// All rate limiting operations are **atomic** with respect to concurrent access:
9// - Each call to check() holds a lock for its entire duration
10// - Check-and-update operations cannot be interleaved with other threads
11// - This prevents race conditions where multiple threads simultaneously exceed limits
12// - The lock is held while reading current time, reading record, and updating counter
13// - This ensures that the decision to allow/deny a request is consistent
14
15use std::{
16    collections::HashMap,
17    sync::{
18        Arc, Mutex,
19        atomic::{AtomicU64, Ordering},
20    },
21    time::{SystemTime, UNIX_EPOCH},
22};
23
24use crate::error::{AuthError, Result};
25
26/// Rate limit configuration for authentication endpoints (sliding-window algorithm).
27///
28/// Uses a per-key sliding-window counter for brute-force protection on
29/// authentication endpoints (login, token refresh, callback).
30///
31/// Distinct from `fraiseql_server::middleware::RateLimitConfig`, which uses
32/// a token-bucket algorithm for general request rate limiting.
33#[derive(Debug, Clone)]
34pub struct AuthRateLimitConfig {
35    /// Whether rate limiting is enabled for this endpoint
36    pub enabled:      bool,
37    /// Maximum number of requests allowed in the window
38    pub max_requests: u32,
39    /// Window duration in seconds
40    pub window_secs:  u64,
41}
42
43impl AuthRateLimitConfig {
44    /// IP-based rate limiting for public endpoints
45    /// 100 requests per 60 seconds (typical for auth/start, auth/callback)
46    pub const fn per_ip_standard() -> Self {
47        Self {
48            enabled:      true,
49            max_requests: 100,
50            window_secs:  60,
51        }
52    }
53
54    /// Stricter IP-based rate limiting for sensitive endpoints
55    /// 50 requests per 60 seconds
56    pub const fn per_ip_strict() -> Self {
57        Self {
58            enabled:      true,
59            max_requests: 50,
60            window_secs:  60,
61        }
62    }
63
64    /// User-based rate limiting for authenticated endpoints
65    /// 10 requests per 60 seconds
66    pub const fn per_user_standard() -> Self {
67        Self {
68            enabled:      true,
69            max_requests: 10,
70            window_secs:  60,
71        }
72    }
73
74    /// Failed login attempt limiting
75    /// 5 failed attempts per 3600 seconds (1 hour)
76    pub const fn failed_login_attempts() -> Self {
77        Self {
78            enabled:      true,
79            max_requests: 5,
80            window_secs:  3600,
81        }
82    }
83}
84
85/// Request record for tracking
86#[derive(Debug, Clone)]
87struct RequestRecord {
88    /// Number of requests in current window
89    count:        u32,
90    /// Unix timestamp of window start
91    window_start: u64,
92}
93
94/// How often (in number of `check()` calls) expired entries are purged from the map.
95///
96/// Stale entries accumulate when keys stop sending requests.  Every
97/// `PURGE_INTERVAL` calls the limiter performs a full sweep and removes entries
98/// whose window has elapsed, bounding the HashMap's memory footprint.
99const PURGE_INTERVAL: u64 = 1_000;
100
101/// Default maximum number of unique keys the limiter will track simultaneously.
102///
103/// When the cap is reached, new keys are denied immediately and a warning is logged.
104/// This prevents an attacker from exhausting memory by sending requests from millions
105/// of unique IP addresses. The cap is conservative: 100k entries × ~100 bytes ≈ 10 MB.
106const DEFAULT_MAX_ENTRIES: usize = 100_000;
107
108/// Per-key sliding-window rate limiter backed by a `Mutex<HashMap>`.
109///
110/// Each unique key (IP address, user ID, etc.) gets its own independent counter.
111/// The check-and-update sequence is atomic: no TOCTOU race can allow more requests
112/// than `max_requests` in any single window, even under high concurrency.
113///
114/// The map is capped at `DEFAULT_MAX_ENTRIES` keys. When a new key arrives at
115/// capacity the entry with the oldest `window_start` is evicted to make room,
116/// bounding memory growth while still tracking new sources.
117///
118/// # Deployment note
119///
120/// This rate limiter is **per-process**. In a multi-replica deployment, each
121/// replica enforces the limit independently — the effective limit across *N*
122/// replicas is *N × limit*. For true distributed enforcement, configure a
123/// Redis-backed rate limiter via the `redis-rate-limiting` Cargo feature (see
124/// the fraiseql-observers queue feature for the integration pattern). Call
125/// [`warn_if_single_node_rate_limiting`] during server startup to emit a
126/// reminder when no distributed backend is detected.
127///
128/// # Constructors
129///
130/// - [`KeyedRateLimiter::new`] — use the system wall clock (production).
131/// - [`KeyedRateLimiter::with_clock`] — inject a custom clock (testing).
132/// - [`KeyedRateLimiter::with_clock_and_max_entries`] — custom clock + cap (testing).
133pub struct KeyedRateLimiter {
134    records:     Arc<Mutex<HashMap<String, RequestRecord>>>,
135    config:      AuthRateLimitConfig,
136    max_entries: usize,
137    /// Monotonically increasing call counter for triggering periodic sweeps.
138    check_count: AtomicU64,
139    /// Time source — defaults to `SystemTime::now()` via [`system_clock`].
140    /// Overridable via [`KeyedRateLimiter::with_clock`] for testing.
141    clock:       Box<dyn Fn() -> u64 + Send + Sync>,
142}
143
144/// Default clock that reads wall-clock time.
145///
146/// On system time error, returns `0` (fail-closed): a timestamp of `0` is
147/// before any real `window_start`, so existing windows will not expire and
148/// rate limiting continues to be enforced with existing counters. New windows
149/// started while the clock is broken will have `window_start = 0`; when the
150/// clock recovers, those windows will immediately expire (since any real
151/// timestamp ≥ 0 + `window_secs`) and reset naturally.
152fn system_clock() -> u64 {
153    match SystemTime::now().duration_since(UNIX_EPOCH) {
154        Ok(duration) => duration.as_secs(),
155        Err(e) => {
156            tracing::warn!(
157                error = %e,
158                "System time error in rate limiter — brute-force protection \
159                 continues using frozen timestamps. System clock may have moved \
160                 backward or time source is unavailable."
161            );
162            // Return 0 (not u64::MAX): existing windows will not expire,
163            // so rate limiting remains enforced during the clock failure.
164            0
165        },
166    }
167}
168
169impl KeyedRateLimiter {
170    /// Create a new keyed rate limiter using wall-clock time.
171    pub fn new(config: AuthRateLimitConfig) -> Self {
172        Self {
173            records: Arc::new(Mutex::new(HashMap::new())),
174            config,
175            max_entries: DEFAULT_MAX_ENTRIES,
176            check_count: AtomicU64::new(0),
177            clock: Box::new(system_clock),
178        }
179    }
180
181    /// Create a rate limiter with a custom entry cap.
182    ///
183    /// Use this when the deployment context calls for a tighter or looser bound
184    /// than `DEFAULT_MAX_ENTRIES`.  Setting `max_entries = 0` disables the cap
185    /// (unbounded — not recommended in production).
186    pub fn with_max_entries(config: AuthRateLimitConfig, max_entries: usize) -> Self {
187        Self {
188            records: Arc::new(Mutex::new(HashMap::new())),
189            config,
190            max_entries,
191            check_count: AtomicU64::new(0),
192            clock: Box::new(system_clock),
193        }
194    }
195
196    /// Create a rate limiter with an injectable clock (for testing).
197    ///
198    /// The `clock` function is called on every `check()` to obtain the current Unix timestamp.
199    /// Pass `|| u64::MAX` to simulate a broken system clock and verify fail-open behavior.
200    pub fn with_clock<F>(config: AuthRateLimitConfig, clock: F) -> Self
201    where
202        F: Fn() -> u64 + Send + Sync + 'static,
203    {
204        Self {
205            records: Arc::new(Mutex::new(HashMap::new())),
206            config,
207            max_entries: DEFAULT_MAX_ENTRIES,
208            check_count: AtomicU64::new(0),
209            clock: Box::new(clock),
210        }
211    }
212
213    /// Create a rate limiter with both a custom clock and a custom entry cap (for testing).
214    ///
215    /// Combines the benefits of [`KeyedRateLimiter::with_clock`] and
216    /// [`KeyedRateLimiter::with_max_entries`] for deterministic eviction tests.
217    pub fn with_clock_and_max_entries<F>(
218        config: AuthRateLimitConfig,
219        max_entries: usize,
220        clock: F,
221    ) -> Self
222    where
223        F: Fn() -> u64 + Send + Sync + 'static,
224    {
225        Self {
226            records: Arc::new(Mutex::new(HashMap::new())),
227            config,
228            max_entries,
229            check_count: AtomicU64::new(0),
230            clock: Box::new(clock),
231        }
232    }
233
234    /// Check if a request should be allowed for the given key
235    ///
236    /// # Atomicity
237    ///
238    /// This operation is **atomic** - the entire check-and-update sequence happens atomically:
239    /// 1. Acquires exclusive lock on rate limit records
240    /// 2. Gets current timestamp
241    /// 3. Loads or creates request record for this key
242    /// 4. Decides: allow, reset window, or deny
243    /// 5. Updates counter/window only if request is allowed
244    /// 6. Releases lock
245    ///
246    /// No concurrent thread can observe a partial state. This prevents classic
247    /// time-of-check-time-of-use (TOCTOU) race conditions where multiple threads
248    /// simultaneously exceed the rate limit.
249    ///
250    /// # Returns
251    ///
252    /// `Ok(())` if the request is allowed and the counter has been incremented.
253    ///
254    /// # Errors
255    ///
256    /// Returns [`AuthError::RateLimited`] if the key has exceeded the configured
257    /// rate limit within the sliding window.
258    ///
259    /// # Panics
260    ///
261    /// Panics if the Mutex is poisoned (another thread panicked while holding the lock).
262    /// This is acceptable because a poisoned lock indicates a thread panic, suggesting
263    /// the system is already in an inconsistent state and should be restarted.
264    pub fn check(&self, key: &str) -> Result<()> {
265        // If rate limiting is disabled, always allow the request
266        // Note: This check is outside the lock for efficiency, but there's a benign race:
267        // if another thread changes config.enabled between this check and acquiring the lock,
268        // we still proceed to update the counter. This is safe because we only update counters
269        // and don't depend on the enabled flag for correctness (counter updates are idempotent).
270        if !self.config.enabled {
271            return Ok(());
272        }
273
274        // CRITICAL: Acquire lock - this ensures all operations below are atomic.
275        // On poison, recover the inner data — the HashMap is still valid even if the
276        // thread that held the lock panicked mid-update (worst case: a stale entry).
277        let mut records = self.records.lock().unwrap_or_else(|poisoned| {
278            tracing::warn!("rate limiter mutex was poisoned, recovering");
279            poisoned.into_inner()
280        });
281        let now = (self.clock)();
282
283        // Periodic expiry sweep to bound HashMap growth.
284        // Runs every PURGE_INTERVAL calls; overflow wraps silently which is fine.
285        let count = self.check_count.fetch_add(1, Ordering::Relaxed);
286        if count.is_multiple_of(PURGE_INTERVAL) {
287            records.retain(|_, r| now < r.window_start.saturating_add(self.config.window_secs));
288        }
289
290        // Enforce max-entries cap to prevent unbounded memory growth under distributed attacks.
291        // A cap of 0 disables the limit (opt-in unbounded mode).
292        // When at capacity, evict the entry with the oldest window_start (LRU by activity)
293        // so new sources can always be tracked without permanently blocking new IPs.
294        if self.max_entries > 0 && !records.contains_key(key) && records.len() >= self.max_entries {
295            if let Some(oldest_key) =
296                records.iter().min_by_key(|(_, r)| r.window_start).map(|(k, _)| k.clone())
297            {
298                records.remove(&oldest_key);
299                tracing::debug!(
300                    max_entries = self.max_entries,
301                    "Rate limiter at capacity — evicted oldest entry to make room for new key"
302                );
303            }
304        }
305
306        // Get or create record for this key (first request from this key)
307        let record = records.entry(key.to_string()).or_insert_with(|| RequestRecord {
308            count:        0,
309            window_start: now,
310        });
311
312        // Thread-safe decision: all branches update state atomically while holding the lock
313        if now >= record.window_start.saturating_add(self.config.window_secs) {
314            // CASE 1: Window has expired - start a new window
315            // This request is the first in the new window, so it's allowed
316            record.count = 1;
317            record.window_start = now;
318            Ok(())
319        } else if record.count < self.config.max_requests {
320            // CASE 2: Window is active and we haven't exceeded the limit
321            // This request is allowed - increment the counter atomically
322            record.count += 1;
323            Ok(())
324        } else {
325            // CASE 3: Window is active and we've reached the limit
326            // This request is NOT allowed - counter is not incremented
327            // Subsequent requests will also fail until the window expires
328            Err(AuthError::RateLimited {
329                retry_after_secs: self.config.window_secs,
330            })
331        }
332    }
333
334    /// Get the number of active rate limiters (for monitoring).
335    pub fn active_limiters(&self) -> usize {
336        let records = self.records.lock().unwrap_or_else(|poisoned| {
337            tracing::warn!("rate limiter mutex was poisoned, recovering");
338            poisoned.into_inner()
339        });
340        records.len()
341    }
342
343    /// Clear all rate limiters (for testing or reset).
344    pub fn clear(&self) {
345        let mut records = self.records.lock().unwrap_or_else(|poisoned| {
346            tracing::warn!("rate limiter mutex was poisoned, recovering");
347            poisoned.into_inner()
348        });
349        records.clear();
350    }
351
352    /// Create a copy for independent testing
353    pub fn clone_config(&self) -> AuthRateLimitConfig {
354        self.config.clone()
355    }
356}
357
358/// Emit a startup warning when no distributed rate-limiting backend is configured.
359///
360/// Call once during server startup. If the `FRAISEQL_RATE_LIMIT_WARN_SINGLE_NODE`
361/// environment variable is set to `true` or `1` (case-insensitive) and the
362/// `FRAISEQL_RATE_LIMIT_BACKEND` variable is unset, a `warn!` is emitted reminding
363/// operators that each replica enforces limits independently — the effective limit
364/// across *N* replicas is *N × limit*.
365///
366/// This is a documentation-only reminder; it does not change runtime behaviour.
367pub fn warn_if_single_node_rate_limiting() {
368    let should_warn = std::env::var("FRAISEQL_RATE_LIMIT_WARN_SINGLE_NODE")
369        .map(|v| v.eq_ignore_ascii_case("true") || v == "1")
370        .unwrap_or(false);
371    let has_backend = std::env::var("FRAISEQL_RATE_LIMIT_BACKEND").is_ok();
372    if should_warn && !has_backend {
373        tracing::warn!(
374            "Rate limiter is per-process; multi-replica deployments are not protected against \
375             distributed brute-force. Configure a Redis-backed rate limiter via the \
376             `redis-rate-limiting` feature for distributed enforcement."
377        );
378    }
379}
380
381/// Global rate limiters for different endpoints
382pub struct RateLimiters {
383    /// auth/start: per-IP, 100 req/min
384    pub auth_start:    KeyedRateLimiter,
385    /// auth/callback: per-IP, 50 req/min
386    pub auth_callback: KeyedRateLimiter,
387    /// auth/refresh: per-user, 10 req/min
388    pub auth_refresh:  KeyedRateLimiter,
389    /// auth/logout: per-user, 20 req/min
390    pub auth_logout:   KeyedRateLimiter,
391    /// Failed login tracking: per-user, 5 attempts/hour
392    pub failed_logins: KeyedRateLimiter,
393}
394
395impl RateLimiters {
396    /// Create default rate limiters for all endpoints
397    pub fn new() -> Self {
398        Self {
399            auth_start:    KeyedRateLimiter::new(AuthRateLimitConfig::per_ip_standard()),
400            auth_callback: KeyedRateLimiter::new(AuthRateLimitConfig::per_ip_strict()),
401            auth_refresh:  KeyedRateLimiter::new(AuthRateLimitConfig::per_user_standard()),
402            auth_logout:   KeyedRateLimiter::new(AuthRateLimitConfig::per_user_standard()),
403            failed_logins: KeyedRateLimiter::new(AuthRateLimitConfig::failed_login_attempts()),
404        }
405    }
406
407    /// Create with custom configurations
408    pub fn with_configs(
409        start_cfg: AuthRateLimitConfig,
410        callback_cfg: AuthRateLimitConfig,
411        refresh_cfg: AuthRateLimitConfig,
412        logout_cfg: AuthRateLimitConfig,
413        failed_cfg: AuthRateLimitConfig,
414    ) -> Self {
415        Self {
416            auth_start:    KeyedRateLimiter::new(start_cfg),
417            auth_callback: KeyedRateLimiter::new(callback_cfg),
418            auth_refresh:  KeyedRateLimiter::new(refresh_cfg),
419            auth_logout:   KeyedRateLimiter::new(logout_cfg),
420            failed_logins: KeyedRateLimiter::new(failed_cfg),
421        }
422    }
423}
424
425impl Default for RateLimiters {
426    fn default() -> Self {
427        Self::new()
428    }
429}
430
431#[allow(clippy::unwrap_used)] // Reason: test code, panics are acceptable
432#[cfg(test)]
433mod tests {
434    #[allow(clippy::wildcard_imports)]
435    // Reason: test module — wildcard keeps test boilerplate minimal
436    use super::*;
437
438    #[test]
439    fn test_rate_limiter_allows_within_limit() {
440        let limiter = KeyedRateLimiter::new(AuthRateLimitConfig {
441            enabled:      true,
442            max_requests: 3,
443            window_secs:  60,
444        });
445
446        // Should allow up to max_requests
447        for i in 0..3 {
448            let result = limiter.check("key");
449            assert!(result.is_ok(), "Request {} should be allowed", i);
450        }
451    }
452
453    #[test]
454    fn test_rate_limiter_rejects_over_limit() {
455        let limiter = KeyedRateLimiter::new(AuthRateLimitConfig {
456            enabled:      true,
457            max_requests: 2,
458            window_secs:  60,
459        });
460
461        limiter.check("key").ok();
462        limiter.check("key").ok();
463
464        // Third should fail
465        let result = limiter.check("key");
466        assert!(
467            matches!(result, Err(AuthError::RateLimited { .. })),
468            "expected RateLimited error, got: {result:?}"
469        );
470    }
471
472    #[test]
473    fn test_rate_limiter_per_key() {
474        let limiter = KeyedRateLimiter::new(AuthRateLimitConfig {
475            enabled:      true,
476            max_requests: 2,
477            window_secs:  60,
478        });
479
480        // Key 1: use allowance
481        limiter.check("key1").ok();
482        limiter.check("key1").ok();
483
484        // Key 2: should have fresh allowance
485        let result = limiter.check("key2");
486        assert!(result.is_ok(), "Different key should have independent limit");
487    }
488
489    #[test]
490    fn test_rate_limiter_error_contains_retry_after() {
491        let limiter = KeyedRateLimiter::new(AuthRateLimitConfig {
492            enabled:      true,
493            max_requests: 1,
494            window_secs:  60,
495        });
496
497        limiter.check("key").ok();
498        let result = limiter.check("key");
499
500        match result {
501            Err(AuthError::RateLimited { retry_after_secs }) => {
502                assert_eq!(retry_after_secs, 60);
503            },
504            _ => panic!("Expected RateLimited error"),
505        }
506    }
507
508    #[test]
509    fn test_rate_limiter_active_limiters_count() {
510        let limiter = KeyedRateLimiter::new(AuthRateLimitConfig {
511            enabled:      true,
512            max_requests: 100,
513            window_secs:  60,
514        });
515
516        assert_eq!(limiter.active_limiters(), 0);
517
518        limiter.check("key1").ok();
519        assert_eq!(limiter.active_limiters(), 1);
520
521        limiter.check("key2").ok();
522        assert_eq!(limiter.active_limiters(), 2);
523    }
524
525    #[test]
526    fn test_rate_limiters_default() {
527        let limiters = RateLimiters::new();
528
529        // auth/start should allow requests
530        let result = limiters.auth_start.check("ip_1");
531        assert!(result.is_ok(), "auth/start should allow first request: {result:?}");
532
533        // auth/refresh should track per-user
534        let result = limiters.auth_refresh.check("user_1");
535        assert!(result.is_ok(), "auth/refresh should allow first request: {result:?}");
536    }
537
538    #[test]
539    fn test_rate_limit_config_presets() {
540        let standard_ip = AuthRateLimitConfig::per_ip_standard();
541        assert_eq!(standard_ip.max_requests, 100);
542        assert_eq!(standard_ip.window_secs, 60);
543
544        let strict_ip = AuthRateLimitConfig::per_ip_strict();
545        assert_eq!(strict_ip.max_requests, 50);
546
547        let user_limit = AuthRateLimitConfig::per_user_standard();
548        assert_eq!(user_limit.max_requests, 10);
549
550        let failed = AuthRateLimitConfig::failed_login_attempts();
551        assert_eq!(failed.max_requests, 5);
552        assert_eq!(failed.window_secs, 3600);
553    }
554
555    #[test]
556    fn test_ip_based_rate_limiting() {
557        let limiter = KeyedRateLimiter::new(AuthRateLimitConfig::per_ip_standard());
558
559        let ip = "203.0.113.1";
560
561        // Should allow up to 100 requests
562        for _ in 0..100 {
563            let result = limiter.check(ip);
564            assert!(result.is_ok(), "request within limit should be allowed: {result:?}");
565        }
566
567        // 101st should fail
568        let result = limiter.check(ip);
569        assert!(
570            matches!(result, Err(AuthError::RateLimited { .. })),
571            "expected RateLimited after exceeding IP limit, got: {result:?}"
572        );
573    }
574
575    #[test]
576    fn test_rejected_login_tracking() {
577        let limiter = KeyedRateLimiter::new(AuthRateLimitConfig::failed_login_attempts());
578
579        let user = "alice@example.com";
580
581        // Should allow 5 failed attempts
582        for _ in 0..5 {
583            let result = limiter.check(user);
584            assert!(
585                result.is_ok(),
586                "failed login attempt within limit should be allowed: {result:?}"
587            );
588        }
589
590        // 6th should fail
591        let result = limiter.check(user);
592        assert!(
593            matches!(result, Err(AuthError::RateLimited { .. })),
594            "expected RateLimited after exceeding failed login limit, got: {result:?}"
595        );
596    }
597
598    #[test]
599    fn test_multiple_users_independent() {
600        let limiter = KeyedRateLimiter::new(AuthRateLimitConfig::failed_login_attempts());
601
602        // User 1 uses attempts
603        for _ in 0..5 {
604            limiter.check("user1").ok();
605        }
606
607        // User 1 blocked
608        let result = limiter.check("user1");
609        assert!(
610            matches!(result, Err(AuthError::RateLimited { .. })),
611            "expected RateLimited for user1, got: {result:?}"
612        );
613
614        // User 2 should have fresh attempts
615        let result = limiter.check("user2");
616        assert!(result.is_ok(), "user2 should have independent fresh limit: {result:?}");
617    }
618
619    #[test]
620    fn test_clear_limiters() {
621        let limiter = KeyedRateLimiter::new(AuthRateLimitConfig {
622            enabled:      true,
623            max_requests: 1,
624            window_secs:  60,
625        });
626
627        limiter.check("key").ok();
628        let result = limiter.check("key");
629        assert!(
630            matches!(result, Err(AuthError::RateLimited { .. })),
631            "expected RateLimited before clear, got: {result:?}"
632        );
633
634        limiter.clear();
635
636        // After clear, should allow again
637        let result = limiter.check("key");
638        assert!(result.is_ok(), "should allow requests after clear: {result:?}");
639    }
640
641    #[test]
642    fn test_thread_safe_rate_limiting() {
643        use std::sync::Arc as StdArc;
644
645        let limiter = StdArc::new(KeyedRateLimiter::new(AuthRateLimitConfig {
646            enabled:      true,
647            max_requests: 100,
648            window_secs:  60,
649        }));
650
651        let mut handles = vec![];
652
653        for _ in 0..10 {
654            let limiter_clone = StdArc::clone(&limiter);
655            let handle = std::thread::spawn(move || {
656                for _ in 0..10 {
657                    let _ = limiter_clone.check("concurrent");
658                }
659            });
660            handles.push(handle);
661        }
662
663        for handle in handles {
664            handle.join().ok();
665        }
666
667        // After 100 concurrent requests, next should fail
668        let result = limiter.check("concurrent");
669        assert!(
670            matches!(result, Err(AuthError::RateLimited { .. })),
671            "expected RateLimited after concurrent exhaustion, got: {result:?}"
672        );
673    }
674
675    #[test]
676    fn test_rate_limiting_many_keys() {
677        let limiter = KeyedRateLimiter::new(AuthRateLimitConfig {
678            enabled:      true,
679            max_requests: 10,
680            window_secs:  60,
681        });
682
683        // Simulate 1000 different IPs, each with requests
684        for i in 0..1000 {
685            let key = format!("192.168.{}.{}", i / 256, i % 256);
686            let result = limiter.check(&key);
687            assert!(result.is_ok(), "first request for {key} should be allowed: {result:?}");
688        }
689
690        assert_eq!(limiter.active_limiters(), 1000);
691    }
692
693    #[test]
694    fn test_endpoint_combinations() {
695        let limiters = RateLimiters::new();
696
697        let ip = "203.0.113.1";
698        let user = "bob@example.com";
699
700        // Complete flow
701        let result = limiters.auth_start.check(ip);
702        assert!(result.is_ok(), "auth_start should allow: {result:?}");
703
704        let result = limiters.auth_callback.check(ip);
705        assert!(result.is_ok(), "auth_callback should allow: {result:?}");
706
707        let result = limiters.auth_refresh.check(user);
708        assert!(result.is_ok(), "auth_refresh should allow: {result:?}");
709
710        let result = limiters.auth_logout.check(user);
711        assert!(result.is_ok(), "auth_logout should allow: {result:?}");
712
713        let result = limiters.failed_logins.check(user);
714        assert!(result.is_ok(), "failed_logins should allow: {result:?}");
715    }
716
717    #[test]
718    fn test_attack_prevention_scenario() {
719        let limiter = KeyedRateLimiter::new(AuthRateLimitConfig {
720            enabled:      true,
721            max_requests: 10,
722            window_secs:  60,
723        });
724
725        let target = "admin@example.com";
726
727        // Attacker tries 10 failed attempts
728        for _ in 0..10 {
729            let _ = limiter.check(target);
730        }
731
732        // 11th blocked
733        let result = limiter.check(target);
734        assert!(
735            matches!(result, Err(AuthError::RateLimited { .. })),
736            "expected RateLimited after attack scenario, got: {result:?}"
737        );
738    }
739
740    #[test]
741    fn test_rate_limiter_disabled() {
742        let limiter = KeyedRateLimiter::new(AuthRateLimitConfig {
743            enabled:      false,
744            max_requests: 1,
745            window_secs:  60,
746        });
747
748        // Even with max_requests = 1, should allow many requests when disabled
749        for i in 0..100 {
750            let result = limiter.check("key");
751            assert!(result.is_ok(), "Request {} should be allowed when rate limiting disabled", i);
752        }
753    }
754
755    // CONCURRENCY AND ATOMICITY TESTS
756    // These tests verify that the rate limiter is thread-safe and atomic
757
758    #[test]
759    fn test_concurrent_requests_from_same_key_respects_limit() {
760        // RACE CONDITION CHECK: Multiple threads simultaneously checking the same key
761        // This verifies that atomic operations prevent exceeding the limit
762        use std::{sync::Arc, thread};
763
764        let limiter = Arc::new(KeyedRateLimiter::new(AuthRateLimitConfig {
765            enabled:      true,
766            max_requests: 50,
767            window_secs:  60,
768        }));
769
770        let key = "shared_key";
771        let allowed_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
772        let rejected_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
773
774        let mut handles = vec![];
775
776        // Spawn 100 concurrent threads, all checking the same key
777        for _ in 0..100 {
778            let limiter = Arc::clone(&limiter);
779            let allowed = Arc::clone(&allowed_count);
780            let rejected = Arc::clone(&rejected_count);
781
782            let handle = thread::spawn(move || {
783                match limiter.check(key) {
784                    Ok(()) => allowed.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
785                    Err(_) => rejected.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
786                };
787            });
788            handles.push(handle);
789        }
790
791        // Wait for all threads to complete
792        for handle in handles {
793            handle.join().unwrap();
794        }
795
796        let allowed = allowed_count.load(std::sync::atomic::Ordering::SeqCst);
797        let rejected = rejected_count.load(std::sync::atomic::Ordering::SeqCst);
798
799        // CRITICAL: Due to atomicity, at most 50 requests should be allowed
800        assert_eq!(allowed, 50, "Atomic operations should limit to max_requests");
801        assert_eq!(rejected, 50, "Remaining requests should be rejected");
802        assert_eq!(allowed + rejected, 100, "All requests should be accounted for");
803    }
804
805    #[test]
806    fn test_concurrent_requests_different_keys_independent() {
807        // RACE CONDITION CHECK: Multiple threads checking different keys
808        // This verifies that per-key isolation works under concurrent access
809        use std::{sync::Arc, thread};
810
811        let limiter = Arc::new(KeyedRateLimiter::new(AuthRateLimitConfig {
812            enabled:      true,
813            max_requests: 10,
814            window_secs:  60,
815        }));
816
817        let mut handles = vec![];
818
819        // Spawn 10 threads, each using a different key and making 15 requests
820        for thread_id in 0..10 {
821            let limiter = Arc::clone(&limiter);
822            let handle = thread::spawn(move || {
823                let key = format!("key_{}", thread_id);
824                let mut allowed = 0;
825                let mut rejected = 0;
826
827                for _ in 0..15 {
828                    match limiter.check(&key) {
829                        Ok(()) => allowed += 1,
830                        Err(_) => rejected += 1,
831                    }
832                }
833
834                (allowed, rejected)
835            });
836            handles.push(handle);
837        }
838
839        // Collect results from all threads
840        let mut total_allowed = 0;
841        let mut total_rejected = 0;
842
843        for handle in handles {
844            let (allowed, rejected) = handle.join().unwrap();
845            total_allowed += allowed;
846            total_rejected += rejected;
847        }
848
849        // CRITICAL: Each key gets independent limit of 10 requests
850        assert_eq!(total_allowed, 100, "Each of 10 keys should allow 10 requests");
851        assert_eq!(total_rejected, 50, "Each of 10 keys should reject 5 requests");
852    }
853
854    #[test]
855    fn test_atomic_check_and_update_not_interleaved() {
856        // This test verifies that the check-and-update sequence is atomic
857        // by ensuring the counter never gets into an inconsistent state
858        let limiter = KeyedRateLimiter::new(AuthRateLimitConfig {
859            enabled:      true,
860            max_requests: 3,
861            window_secs:  60,
862        });
863
864        let key = "test_key";
865
866        // Make 3 allowed requests
867        let r = limiter.check(key);
868        assert!(r.is_ok(), "request 1 should be allowed: {r:?}");
869        let r = limiter.check(key);
870        assert!(r.is_ok(), "request 2 should be allowed: {r:?}");
871        let r = limiter.check(key);
872        assert!(r.is_ok(), "request 3 should be allowed: {r:?}");
873
874        // Verify counter is at 3 (not less, not more)
875        assert_eq!(limiter.active_limiters(), 1);
876
877        // 4th request should be rejected
878        let r = limiter.check(key);
879        assert!(
880            matches!(r, Err(AuthError::RateLimited { .. })),
881            "request 4 should be rate-limited: {r:?}"
882        );
883
884        // 5th request should also be rejected (counter didn't change)
885        let r = limiter.check(key);
886        assert!(
887            matches!(r, Err(AuthError::RateLimited { .. })),
888            "request 5 should be rate-limited: {r:?}"
889        );
890
891        // Counter should still be at 3 (not decremented on rejection)
892        // This verifies that rejected requests didn't partially update state
893    }
894
895    #[test]
896    fn test_concurrent_window_reset_safety() {
897        // Verify that window reset (when window expires) is atomic
898        // even under concurrent access
899        let limiter = KeyedRateLimiter::new(AuthRateLimitConfig {
900            enabled:      true,
901            max_requests: 2,
902            window_secs:  3600, // 1 hour - won't expire in test
903        });
904
905        let key = "reset_key";
906
907        // Fill the window
908        limiter.check(key).ok();
909        limiter.check(key).ok();
910
911        // Further requests should fail
912        let r = limiter.check(key);
913        assert!(matches!(r, Err(AuthError::RateLimited { .. })), "should be rate-limited: {r:?}");
914        let r = limiter.check(key);
915        assert!(
916            matches!(r, Err(AuthError::RateLimited { .. })),
917            "should still be rate-limited: {r:?}"
918        );
919
920        // Verify state is consistent by clearing and re-checking
921        limiter.clear();
922        assert_eq!(limiter.active_limiters(), 0);
923
924        // After clear, new requests should be allowed
925        let r = limiter.check(key);
926        assert!(r.is_ok(), "should allow after clear: {r:?}");
927    }
928
929    // ── LRU eviction tests (13-3) ─────────────────────────────────────────────
930
931    #[test]
932    fn test_rate_limiter_evicts_lru_entry_when_at_capacity() {
933        let config = AuthRateLimitConfig {
934            enabled:      true,
935            max_requests: 10,
936            window_secs:  3600,
937        };
938        let limiter = KeyedRateLimiter::with_max_entries(config, 3);
939
940        // Fill to capacity.
941        limiter.check("key_a").unwrap();
942        limiter.check("key_b").unwrap();
943        limiter.check("key_c").unwrap();
944        assert_eq!(limiter.active_limiters(), 3);
945
946        // Adding a 4th key must succeed — the oldest entry is evicted to make room.
947        let result = limiter.check("key_d");
948        assert!(result.is_ok(), "new key must be accepted when limiter evicts LRU entry");
949        assert_eq!(
950            limiter.active_limiters(),
951            3,
952            "entry count must stay at capacity after eviction"
953        );
954    }
955
956    #[test]
957    fn test_rate_limiter_capacity_configurable() {
958        let config = AuthRateLimitConfig {
959            enabled:      true,
960            max_requests: 10,
961            window_secs:  3600,
962        };
963        let limiter = KeyedRateLimiter::with_max_entries(config, 5);
964
965        for i in 0..5 {
966            limiter.check(&format!("key_{i}")).unwrap();
967        }
968        assert_eq!(limiter.active_limiters(), 5, "limiter must track exactly max_entries keys");
969
970        // 6th key triggers eviction; count must stay at 5.
971        limiter.check("key_overflow").unwrap();
972        assert_eq!(limiter.active_limiters(), 5, "capacity must not exceed configured maximum");
973    }
974
975    #[test]
976    fn test_rate_limiter_eviction_does_not_affect_active_ips() {
977        use std::sync::{
978            Arc,
979            atomic::{AtomicU64, Ordering},
980        };
981
982        // Use an injectable clock so window_start values are deterministic.
983        let now = Arc::new(AtomicU64::new(1_000));
984        let clock_ref = Arc::clone(&now);
985        let config = AuthRateLimitConfig {
986            enabled:      true,
987            max_requests: 1,
988            window_secs:  3600,
989        };
990        let limiter = KeyedRateLimiter::with_clock_and_max_entries(config, 2, move || {
991            clock_ref.load(Ordering::Relaxed)
992        });
993
994        // key_a at t=1000 — uses its 1 allowed request.
995        now.store(1_000, Ordering::Relaxed);
996        limiter.check("key_a").unwrap();
997
998        // key_b at t=2000 — uses its 1 allowed request (more recent than key_a).
999        now.store(2_000, Ordering::Relaxed);
1000        limiter.check("key_b").unwrap();
1001
1002        // At capacity (2). key_c at t=3000 — triggers eviction of key_a (oldest at t=1000).
1003        now.store(3_000, Ordering::Relaxed);
1004        limiter.check("key_c").unwrap();
1005
1006        // key_b (window_start=2000) was NOT evicted; its rate limit is still active.
1007        let result = limiter.check("key_b");
1008        assert!(
1009            matches!(result, Err(AuthError::RateLimited { .. })),
1010            "key_b must remain rate-limited after eviction of the older key_a entry, got: {result:?}"
1011        );
1012    }
1013
1014    // ── Distributed RL warning test (13-4) ───────────────────────────────────
1015
1016    #[test]
1017    fn test_startup_warn_emitted_when_no_distributed_backend() {
1018        // Verify the function is callable without panicking.
1019        // The tracing output is verified in observability integration tests.
1020        warn_if_single_node_rate_limiting();
1021    }
1022
1023    #[test]
1024    fn test_no_toctou_race_condition() {
1025        // Time-of-Check-Time-of-Use (TOCTOU) race condition test
1026        // Verifies that checking the limit and updating the counter happen atomically
1027        let limiter = KeyedRateLimiter::new(AuthRateLimitConfig {
1028            enabled:      true,
1029            max_requests: 1, // Very strict: only 1 request allowed
1030            window_secs:  60,
1031        });
1032
1033        let key = "single_key";
1034
1035        // First request is allowed
1036        let r = limiter.check(key);
1037        assert!(r.is_ok(), "first request should be allowed: {r:?}");
1038
1039        // Due to atomic check-and-update, the second request must fail
1040        // There's no window where both can check and both succeed
1041        let result = limiter.check(key);
1042        assert!(
1043            result.is_err(),
1044            "Second request must fail - check-and-update is atomic so no TOCTOU race"
1045        );
1046    }
1047}