Skip to main content

synapse_pingora/detection/
credential_stuffing.rs

1//! Credential stuffing detection engine.
2//!
3//! Detects credential stuffing attacks with:
4//! - Per-entity auth failure tracking with sliding windows
5//! - Distributed attack correlation via fingerprint
6//! - Account takeover detection (success after failures)
7//! - Low-and-slow pattern detection
8//!
9//! Thread-safe with DashMap for concurrent access.
10//! Performance target: <5μs per record_attempt/record_result.
11
12use crate::detection::{
13    AuthAttempt, AuthMetrics, AuthResult, DistributedAttack, EntityEndpointKey,
14    GlobalVelocityTracker, StuffingConfig, StuffingEvent, StuffingSeverity, StuffingVerdict,
15    TakeoverAlert, UsernameTargetedAttack,
16};
17use crossbeam_channel::{bounded, Receiver, Sender};
18use dashmap::DashMap;
19use parking_lot::RwLock;
20use regex::Regex;
21use serde::{Deserialize, Serialize};
22use std::collections::VecDeque;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::Arc;
25use std::thread::{self, JoinHandle};
26use std::time::{SystemTime, UNIX_EPOCH};
27use tracing::warn;
28
29/// Get current time in milliseconds since Unix epoch.
30#[inline]
31fn now_ms() -> u64 {
32    SystemTime::now()
33        .duration_since(UNIX_EPOCH)
34        .unwrap_or_default()
35        .as_millis() as u64
36}
37
38/// Credential stuffing detector with concurrent access.
39///
40/// SECURITY: Implements multi-dimensional detection to catch distributed attacks:
41/// - Per-entity (IP) failure tracking
42/// - Fingerprint-based distributed attack correlation
43/// - Username-targeted attack detection (multiple IPs targeting same username)
44/// - Global velocity monitoring for coordinated attacks
45pub struct CredentialStuffingDetector {
46    /// Per-entity auth metrics (entity+endpoint -> metrics)
47    entity_auth: DashMap<EntityEndpointKey, AuthMetrics>,
48    /// Distributed attack tracking (fingerprint+endpoint -> attack)
49    distributed: DashMap<String, DistributedAttack>,
50    /// Username-targeted attack tracking (username+endpoint -> attack)
51    username_targeted: DashMap<String, UsernameTargetedAttack>,
52    /// Global velocity tracker for overall failure rate
53    global_velocity: RwLock<GlobalVelocityTracker>,
54    /// Recent takeover alerts (bounded queue)
55    takeovers: RwLock<VecDeque<TakeoverAlert>>,
56    /// Recent events for alerting (bounded queue)
57    events: RwLock<VecDeque<StuffingEvent>>,
58    /// Compiled auth path regexes
59    auth_patterns: Vec<Regex>,
60    /// Configuration
61    config: StuffingConfig,
62    /// Shutdown signal
63    shutdown: Arc<AtomicBool>,
64    /// Shutdown channel sender
65    shutdown_tx: Sender<()>,
66    /// Cleanup thread handle
67    cleanup_handle: Option<JoinHandle<()>>,
68}
69
70impl CredentialStuffingDetector {
71    /// Create a new detector with the given configuration.
72    ///
73    /// Configuration is validated and sanitized before use.
74    pub fn new(config: StuffingConfig) -> Self {
75        // Validate and sanitize config
76        let config = config.validated();
77
78        // Compile auth patterns with validation logging
79        // SECURITY: Invalid patterns are logged and skipped rather than causing panics
80        let auth_patterns: Vec<Regex> = config
81            .auth_path_patterns
82            .iter()
83            .filter_map(|p| match Regex::new(p) {
84                Ok(re) => Some(re),
85                Err(e) => {
86                    warn!(
87                        pattern = %p,
88                        error = %e,
89                        "Invalid auth_path_pattern in StuffingConfig - pattern will be skipped"
90                    );
91                    None
92                }
93            })
94            .collect();
95
96        let (shutdown_tx, shutdown_rx) = bounded(1);
97        let shutdown = Arc::new(AtomicBool::new(false));
98
99        let entity_auth = DashMap::with_capacity(config.max_entities.min(10_000));
100        let distributed = DashMap::with_capacity(config.max_distributed_attacks.min(1_000));
101        let username_targeted = DashMap::with_capacity(config.max_distributed_attacks.min(1_000));
102
103        // Clone for cleanup thread
104        let entity_auth_clone = entity_auth.clone();
105        let distributed_clone = distributed.clone();
106        let username_targeted_clone = username_targeted.clone();
107        let shutdown_flag = shutdown.clone();
108        let cleanup_interval = config.cleanup_interval_ms;
109        let failure_window = config.failure_window_ms;
110        let distributed_window = config.distributed_window_ms;
111        let username_window = config.username_targeted_window_ms;
112
113        let handle = thread::spawn(move || {
114            Self::cleanup_loop(
115                entity_auth_clone,
116                distributed_clone,
117                username_targeted_clone,
118                shutdown_rx,
119                shutdown_flag,
120                cleanup_interval,
121                failure_window,
122                distributed_window,
123                username_window,
124            );
125        });
126
127        // Initialize global velocity tracker
128        let global_velocity = GlobalVelocityTracker::new(
129            config.global_velocity_max_track,
130            config.global_velocity_window_ms,
131        );
132
133        Self {
134            entity_auth,
135            distributed,
136            username_targeted,
137            global_velocity: RwLock::new(global_velocity),
138            takeovers: RwLock::new(VecDeque::with_capacity(config.max_takeover_alerts)),
139            events: RwLock::new(VecDeque::with_capacity(1000)),
140            auth_patterns,
141            config,
142            shutdown,
143            shutdown_tx,
144            cleanup_handle: Some(handle),
145        }
146    }
147
148    /// Create with default configuration.
149    pub fn with_defaults() -> Self {
150        Self::new(StuffingConfig::default())
151    }
152
153    /// Background cleanup loop.
154    fn cleanup_loop(
155        entity_auth: DashMap<EntityEndpointKey, AuthMetrics>,
156        distributed: DashMap<String, DistributedAttack>,
157        username_targeted: DashMap<String, UsernameTargetedAttack>,
158        shutdown_rx: Receiver<()>,
159        shutdown: Arc<AtomicBool>,
160        cleanup_interval_ms: u64,
161        failure_window_ms: u64,
162        distributed_window_ms: u64,
163        username_window_ms: u64,
164    ) {
165        let cleanup_interval = std::time::Duration::from_millis(cleanup_interval_ms);
166
167        loop {
168            match shutdown_rx.recv_timeout(cleanup_interval) {
169                Ok(()) | Err(crossbeam_channel::RecvTimeoutError::Disconnected) => break,
170                Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
171                    if shutdown.load(Ordering::Relaxed) {
172                        break;
173                    }
174
175                    let now = now_ms();
176
177                    // Clean expired entity auth records
178                    let entity_threshold = now.saturating_sub(failure_window_ms * 2);
179                    entity_auth.retain(|_, metrics| metrics.last_attempt >= entity_threshold);
180
181                    // Clean expired distributed attacks
182                    let distributed_threshold = now.saturating_sub(distributed_window_ms);
183                    distributed.retain(|_, attack| attack.last_activity >= distributed_threshold);
184
185                    // Clean expired username-targeted attacks
186                    let username_threshold = now.saturating_sub(username_window_ms);
187                    username_targeted
188                        .retain(|_, attack| attack.last_activity >= username_threshold);
189                }
190            }
191        }
192    }
193
194    /// Check if an endpoint is an auth endpoint.
195    pub fn is_auth_endpoint(&self, path: &str) -> bool {
196        self.auth_patterns.iter().any(|re| re.is_match(path))
197    }
198
199    /// Record an auth attempt (request phase).
200    ///
201    /// Call this when a request hits an auth endpoint.
202    /// Returns a verdict that may adjust risk or block.
203    pub fn record_attempt(&self, attempt: &AuthAttempt) -> StuffingVerdict {
204        let now = attempt.timestamp;
205        let key = EntityEndpointKey::new(&attempt.entity_id, &attempt.endpoint);
206
207        // Get or create metrics
208        let mut metrics = self.entity_auth.entry(key.clone()).or_insert_with(|| {
209            AuthMetrics::new(attempt.entity_id.clone(), attempt.endpoint.clone(), now)
210        });
211
212        // Check if window has expired and reset
213        if now.saturating_sub(metrics.window_start) > self.config.failure_window_ms {
214            metrics.reset_window(now);
215        }
216
217        metrics.last_attempt = now;
218
219        // Check thresholds based on current failures (before this attempt's result)
220        let failures = metrics.failures;
221        drop(metrics); // Release lock before returning
222
223        // Track distributed attack if fingerprint provided
224        if let Some(ref fingerprint) = attempt.fingerprint {
225            self.track_distributed_attempt(fingerprint, &attempt.endpoint, &attempt.entity_id, now);
226        }
227
228        // Track username-targeted attack if username provided
229        if let Some(ref username) = attempt.username {
230            self.track_username_targeted_attempt(
231                username,
232                &attempt.endpoint,
233                &attempt.entity_id,
234                now,
235            );
236        }
237
238        // Evaluate verdict based on failure history
239        if failures >= self.config.failure_threshold_block {
240            StuffingVerdict::block(format!(
241                "Credential stuffing: {} failures in window",
242                failures
243            ))
244        } else if failures >= self.config.failure_threshold_high {
245            let event = StuffingEvent::SuspiciousFailureRate {
246                entity_id: attempt.entity_id.clone(),
247                endpoint: attempt.endpoint.clone(),
248                failures,
249                window_ms: self.config.failure_window_ms,
250                severity: StuffingSeverity::High,
251            };
252            self.emit_event(event);
253
254            StuffingVerdict::suspicious(
255                format!("High failure rate: {} failures", failures),
256                StuffingSeverity::High,
257            )
258        } else if failures >= self.config.failure_threshold_suspicious {
259            let event = StuffingEvent::SuspiciousFailureRate {
260                entity_id: attempt.entity_id.clone(),
261                endpoint: attempt.endpoint.clone(),
262                failures,
263                window_ms: self.config.failure_window_ms,
264                severity: StuffingSeverity::Medium,
265            };
266            self.emit_event(event);
267
268            StuffingVerdict::suspicious(
269                format!("Suspicious failure rate: {} failures", failures),
270                StuffingSeverity::Medium,
271            )
272        } else {
273            // Check for distributed attack (fingerprint-based)
274            if let Some(ref fingerprint) = attempt.fingerprint {
275                if let Some(verdict) = self.check_distributed_attack(fingerprint, &attempt.endpoint)
276                {
277                    return verdict;
278                }
279            }
280
281            // Check for username-targeted attack
282            if let Some(ref username) = attempt.username {
283                if let Some(verdict) =
284                    self.check_username_targeted_attack(username, &attempt.endpoint)
285                {
286                    return verdict;
287                }
288            }
289
290            // Check for global velocity spike
291            if let Some(verdict) = self.check_global_velocity(now) {
292                return verdict;
293            }
294
295            StuffingVerdict::Allow
296        }
297    }
298
299    /// Record an auth result (response phase).
300    ///
301    /// Call this when auth response is known (success/failure).
302    /// Checks for account takeover pattern.
303    pub fn record_result(&self, result: &AuthResult) -> Option<TakeoverAlert> {
304        let now = result.timestamp;
305        let key = EntityEndpointKey::new(&result.entity_id, &result.endpoint);
306
307        // Get or create metrics
308        let mut entry = self.entity_auth.entry(key.clone()).or_insert_with(|| {
309            AuthMetrics::new(result.entity_id.clone(), result.endpoint.clone(), now)
310        });
311
312        let metrics = entry.value_mut();
313
314        if result.success {
315            // Check for takeover pattern BEFORE updating metrics
316            let prior_failures = metrics.failures;
317            let failure_window = now.saturating_sub(metrics.window_start);
318
319            metrics.record_success(now);
320
321            // Takeover detection: success after many failures
322            if prior_failures >= self.config.takeover_min_failures
323                && failure_window <= self.config.takeover_window_ms
324            {
325                let alert = TakeoverAlert::new(
326                    result.entity_id.clone(),
327                    result.endpoint.clone(),
328                    prior_failures,
329                    failure_window,
330                    now,
331                );
332
333                // Emit event
334                let event = StuffingEvent::AccountTakeover {
335                    entity_id: result.entity_id.clone(),
336                    endpoint: result.endpoint.clone(),
337                    prior_failures,
338                    severity: StuffingSeverity::Critical,
339                };
340                self.emit_event(event);
341
342                // Store alert
343                self.store_takeover_alert(alert.clone());
344
345                // Reset window after takeover (start fresh monitoring)
346                metrics.reset_window(now);
347
348                return Some(alert);
349            }
350
351            // Reset window on success (normal behavior)
352            metrics.reset_window(now);
353        } else {
354            // Record failure
355            metrics.record_failure(now);
356
357            // Track global velocity
358            drop(entry); // Release entry lock before acquiring global lock
359            self.record_global_velocity_failure(now);
360
361            // Track username-targeted failure if username provided
362            if let Some(ref username) = result.username {
363                self.record_username_targeted_failure(username, &result.endpoint, now);
364            }
365
366            // Re-acquire entry for low-and-slow check
367            let entry = self.entity_auth.get(&key);
368            if let Some(metrics) = entry {
369                // Check for low-and-slow pattern
370                if metrics.detect_low_and_slow(
371                    self.config.low_slow_min_hours,
372                    self.config.low_slow_min_per_hour,
373                ) {
374                    let event = StuffingEvent::LowAndSlow {
375                        entity_id: result.entity_id.clone(),
376                        endpoint: result.endpoint.clone(),
377                        hours_active: self.config.low_slow_min_hours,
378                        total_failures: metrics.total_failures,
379                        severity: StuffingSeverity::Medium,
380                    };
381                    self.emit_event(event);
382                }
383            }
384        }
385
386        None
387    }
388
389    /// Track a distributed attack attempt.
390    fn track_distributed_attempt(
391        &self,
392        fingerprint: &str,
393        endpoint: &str,
394        entity_id: &str,
395        now: u64,
396    ) {
397        let key = format!("{}:{}", fingerprint, endpoint);
398
399        let mut entry = self.distributed.entry(key).or_insert_with(|| {
400            DistributedAttack::new(
401                fingerprint.to_string(),
402                endpoint.to_string(),
403                entity_id.to_string(),
404                now,
405            )
406        });
407
408        let attack = entry.value_mut();
409        attack.add_entity(entity_id.to_string(), now);
410    }
411
412    /// Check for distributed attack and return verdict if detected.
413    fn check_distributed_attack(
414        &self,
415        fingerprint: &str,
416        endpoint: &str,
417    ) -> Option<StuffingVerdict> {
418        let key = format!("{}:{}", fingerprint, endpoint);
419
420        if let Some(attack) = self.distributed.get(&key) {
421            if attack.entity_count() >= self.config.distributed_min_ips {
422                let event = StuffingEvent::DistributedAttackDetected {
423                    fingerprint: fingerprint.to_string(),
424                    endpoint: endpoint.to_string(),
425                    ip_count: attack.entity_count(),
426                    total_failures: attack.total_failures,
427                    severity: StuffingSeverity::High,
428                };
429                self.emit_event(event);
430
431                return Some(StuffingVerdict::suspicious_with_risk(
432                    format!(
433                        "Distributed attack: {} IPs with same fingerprint",
434                        attack.entity_count()
435                    ),
436                    StuffingSeverity::High,
437                    30, // +30 risk as per spec
438                ));
439            }
440        }
441
442        None
443    }
444
445    /// Record a failure in distributed attack tracking.
446    pub fn record_distributed_failure(&self, fingerprint: &str, endpoint: &str, now: u64) {
447        let key = format!("{}:{}", fingerprint, endpoint);
448
449        if let Some(mut entry) = self.distributed.get_mut(&key) {
450            entry.value_mut().record_failure(now);
451        }
452    }
453
454    /// Track a username-targeted attack attempt.
455    ///
456    /// SECURITY: Tracks multiple IPs attempting the same username to detect
457    /// coordinated credential stuffing that evades per-IP rate limiting.
458    fn track_username_targeted_attempt(
459        &self,
460        username: &str,
461        endpoint: &str,
462        entity_id: &str,
463        now: u64,
464    ) {
465        let key = format!("{}:{}", username, endpoint);
466
467        let mut entry = self.username_targeted.entry(key).or_insert_with(|| {
468            UsernameTargetedAttack::new(
469                username.to_string(),
470                endpoint.to_string(),
471                entity_id.to_string(),
472                now,
473            )
474        });
475
476        let attack = entry.value_mut();
477        attack.add_ip(entity_id.to_string(), now);
478    }
479
480    /// Record a failure for username-targeted attack tracking.
481    fn record_username_targeted_failure(&self, username: &str, endpoint: &str, now: u64) {
482        let key = format!("{}:{}", username, endpoint);
483
484        if let Some(mut entry) = self.username_targeted.get_mut(&key) {
485            entry.value_mut().record_failure(now);
486        }
487    }
488
489    /// Check for username-targeted attack and return verdict if detected.
490    ///
491    /// SECURITY: Detects when many IPs target the same username, indicating
492    /// a botnet attack on a specific account (possibly from breach list).
493    fn check_username_targeted_attack(
494        &self,
495        username: &str,
496        endpoint: &str,
497    ) -> Option<StuffingVerdict> {
498        let key = format!("{}:{}", username, endpoint);
499
500        if let Some(attack) = self.username_targeted.get(&key) {
501            let ip_count = attack.ip_count();
502            let failures = attack.total_failures;
503
504            // Check if we exceed thresholds
505            if ip_count >= self.config.username_targeted_min_ips
506                && failures >= self.config.username_targeted_min_failures
507            {
508                let event = StuffingEvent::UsernameTargetedAttack {
509                    username: username.to_string(),
510                    endpoint: endpoint.to_string(),
511                    ip_count,
512                    total_failures: failures,
513                    severity: StuffingSeverity::High,
514                };
515                self.emit_event(event);
516
517                return Some(StuffingVerdict::suspicious_with_risk(
518                    format!(
519                        "Username-targeted attack: {} IPs targeting '{}'",
520                        ip_count, username
521                    ),
522                    StuffingSeverity::High,
523                    35, // Higher risk than distributed (+35 vs +30) since targeted
524                ));
525            }
526        }
527
528        None
529    }
530
531    /// Record a failure in global velocity tracking.
532    fn record_global_velocity_failure(&self, now: u64) {
533        let mut tracker = self.global_velocity.write();
534        tracker.record_failure(now);
535    }
536
537    /// Check for global velocity spike and return verdict if detected.
538    ///
539    /// SECURITY: Detects sudden spikes in overall auth failure rate that
540    /// may indicate a coordinated attack across many IPs/usernames.
541    fn check_global_velocity(&self, now: u64) -> Option<StuffingVerdict> {
542        let tracker = self.global_velocity.read();
543        let rate = tracker.failure_rate(now);
544
545        if rate >= self.config.global_velocity_threshold_rate {
546            let count = tracker.failure_count(now);
547            drop(tracker); // Release lock before emitting event
548
549            let event = StuffingEvent::GlobalVelocitySpike {
550                failure_rate: rate,
551                failure_count: count,
552                threshold_rate: self.config.global_velocity_threshold_rate,
553                severity: StuffingSeverity::High,
554            };
555            self.emit_event(event);
556
557            return Some(StuffingVerdict::suspicious_with_risk(
558                format!(
559                    "Global velocity spike: {:.1} failures/sec (threshold: {:.1})",
560                    rate, self.config.global_velocity_threshold_rate
561                ),
562                StuffingSeverity::High,
563                20, // Lower than targeted attacks since it affects everyone
564            ));
565        }
566
567        None
568    }
569
570    /// Emit an event for alerting.
571    fn emit_event(&self, event: StuffingEvent) {
572        let mut events = self.events.write();
573        if events.len() >= 1000 {
574            events.pop_front();
575        }
576        events.push_back(event);
577    }
578
579    /// Store a takeover alert.
580    fn store_takeover_alert(&self, alert: TakeoverAlert) {
581        let mut takeovers = self.takeovers.write();
582        if takeovers.len() >= self.config.max_takeover_alerts {
583            takeovers.pop_front();
584        }
585        takeovers.push_back(alert);
586    }
587
588    // --- Query APIs ---
589
590    /// Get entity's auth metrics.
591    pub fn get_entity_metrics(&self, entity_id: &str, endpoint: &str) -> Option<AuthMetrics> {
592        let key = EntityEndpointKey::new(entity_id, endpoint);
593        self.entity_auth.get(&key).map(|e| e.clone())
594    }
595
596    /// Get all entity metrics.
597    pub fn get_all_entity_metrics(&self) -> Vec<AuthMetrics> {
598        self.entity_auth.iter().map(|e| e.value().clone()).collect()
599    }
600
601    /// Get active distributed attacks.
602    pub fn get_distributed_attacks(&self) -> Vec<DistributedAttack> {
603        self.distributed.iter().map(|e| e.value().clone()).collect()
604    }
605
606    /// Get recent takeover alerts.
607    pub fn get_takeover_alerts(&self, since: u64) -> Vec<TakeoverAlert> {
608        let takeovers = self.takeovers.read();
609        takeovers
610            .iter()
611            .filter(|a| a.success_at >= since)
612            .cloned()
613            .collect()
614    }
615
616    /// Get all takeover alerts.
617    pub fn get_all_takeover_alerts(&self) -> Vec<TakeoverAlert> {
618        let takeovers = self.takeovers.read();
619        takeovers.iter().cloned().collect()
620    }
621
622    /// Get recent events since timestamp.
623    pub fn get_events(&self, since: u64) -> Vec<StuffingEvent> {
624        let events = self.events.read();
625        // Note: Events don't currently have timestamps - would need to add for proper filtering
626        // For now, return all events (caller should drain_events() after processing)
627        let _ = since; // Acknowledge param for future use
628        events.iter().cloned().collect()
629    }
630
631    /// Get all events.
632    pub fn drain_events(&self) -> Vec<StuffingEvent> {
633        let mut events = self.events.write();
634        events.drain(..).collect()
635    }
636
637    /// Get statistics.
638    pub fn get_stats(&self) -> StuffingStats {
639        let entity_count = self.entity_auth.len();
640        let distributed_count = self.distributed.len();
641        let takeover_count = self.takeovers.read().len();
642        let event_count = self.events.read().len();
643
644        // Calculate totals
645        let mut total_failures: u64 = 0;
646        let mut total_successes: u64 = 0;
647        let mut suspicious_entities: usize = 0;
648
649        for entry in self.entity_auth.iter() {
650            let metrics = entry.value();
651            total_failures += metrics.total_failures;
652            total_successes += metrics.total_successes;
653            if metrics.failures >= self.config.failure_threshold_suspicious {
654                suspicious_entities += 1;
655            }
656        }
657
658        StuffingStats {
659            entity_count,
660            distributed_attack_count: distributed_count,
661            takeover_alert_count: takeover_count,
662            event_count,
663            total_failures,
664            total_successes,
665            suspicious_entities,
666        }
667    }
668
669    /// Clear all data.
670    pub fn clear(&self) {
671        self.entity_auth.clear();
672        self.distributed.clear();
673        self.takeovers.write().clear();
674        self.events.write().clear();
675    }
676
677    /// Get entity count.
678    pub fn len(&self) -> usize {
679        self.entity_auth.len()
680    }
681
682    /// Check if empty.
683    pub fn is_empty(&self) -> bool {
684        self.entity_auth.is_empty()
685    }
686
687    /// Stop the detector (cleanup thread).
688    pub fn stop(&self) {
689        self.shutdown.store(true, Ordering::Relaxed);
690        let _ = self.shutdown_tx.send(());
691    }
692
693    /// Export state for persistence.
694    pub fn export(&self) -> StuffingState {
695        StuffingState {
696            entity_metrics: self.get_all_entity_metrics(),
697            distributed_attacks: self.get_distributed_attacks(),
698            takeover_alerts: self.get_all_takeover_alerts(),
699        }
700    }
701
702    /// Import state from persistence.
703    pub fn import(&self, state: StuffingState) {
704        for metrics in state.entity_metrics {
705            let key = EntityEndpointKey::new(&metrics.entity_id, &metrics.endpoint);
706            self.entity_auth.insert(key, metrics);
707        }
708
709        for attack in state.distributed_attacks {
710            let key = format!("{}:{}", attack.fingerprint, attack.endpoint);
711            self.distributed.insert(key, attack);
712        }
713
714        let mut takeovers = self.takeovers.write();
715        for alert in state.takeover_alerts {
716            if takeovers.len() < self.config.max_takeover_alerts {
717                takeovers.push_back(alert);
718            }
719        }
720    }
721}
722
723impl Drop for CredentialStuffingDetector {
724    fn drop(&mut self) {
725        self.stop();
726        if let Some(handle) = self.cleanup_handle.take() {
727            let _ = handle.join();
728        }
729    }
730}
731
732/// Detector statistics.
733#[derive(Debug, Clone, Default)]
734pub struct StuffingStats {
735    /// Number of tracked entities
736    pub entity_count: usize,
737    /// Number of active distributed attacks
738    pub distributed_attack_count: usize,
739    /// Number of takeover alerts
740    pub takeover_alert_count: usize,
741    /// Number of events in queue
742    pub event_count: usize,
743    /// Total failures recorded
744    pub total_failures: u64,
745    /// Total successes recorded
746    pub total_successes: u64,
747    /// Entities above suspicious threshold
748    pub suspicious_entities: usize,
749}
750
751/// Exportable state for persistence/HA sync.
752#[derive(Debug, Clone, Default, Serialize, Deserialize)]
753pub struct StuffingState {
754    pub entity_metrics: Vec<AuthMetrics>,
755    pub distributed_attacks: Vec<DistributedAttack>,
756    pub takeover_alerts: Vec<TakeoverAlert>,
757}
758
759#[cfg(test)]
760mod tests {
761    use super::*;
762
763    fn test_config() -> StuffingConfig {
764        StuffingConfig {
765            failure_window_ms: 60_000, // 1 minute for tests
766            failure_threshold_suspicious: 3,
767            failure_threshold_high: 5,
768            failure_threshold_block: 10,
769            distributed_min_ips: 3,
770            distributed_window_ms: 60_000,
771            takeover_window_ms: 60_000,
772            takeover_min_failures: 3,
773            low_slow_min_hours: 2,
774            low_slow_min_per_hour: 1,
775            cleanup_interval_ms: 60_000,
776            ..Default::default()
777        }
778    }
779
780    #[test]
781    fn test_is_auth_endpoint() {
782        let detector = CredentialStuffingDetector::with_defaults();
783
784        assert!(detector.is_auth_endpoint("/api/login"));
785        assert!(detector.is_auth_endpoint("/api/auth/token"));
786        assert!(detector.is_auth_endpoint("/v1/signin"));
787        assert!(detector.is_auth_endpoint("/oauth/authorize"));
788
789        assert!(!detector.is_auth_endpoint("/api/users"));
790        assert!(!detector.is_auth_endpoint("/api/products"));
791    }
792
793    #[test]
794    fn test_invalid_patterns_are_skipped_gracefully() {
795        // SECURITY: Invalid regex patterns should NOT cause panics
796        // They should be logged and skipped
797        let config = StuffingConfig {
798            auth_path_patterns: vec![
799                r"(?i)/valid-login".to_string(),
800                r"[invalid(regex".to_string(), // Invalid: unclosed bracket
801                r"(?i)/another-valid".to_string(),
802                r"*invalid*".to_string(), // Invalid: nothing to repeat
803            ],
804            ..Default::default()
805        };
806
807        // This should NOT panic even with invalid patterns
808        let detector = CredentialStuffingDetector::new(config);
809
810        // Valid patterns should still work
811        assert!(detector.is_auth_endpoint("/valid-login"));
812        assert!(detector.is_auth_endpoint("/another-valid"));
813
814        // Invalid patterns are skipped, not matched
815        assert!(!detector.is_auth_endpoint("/something-else"));
816    }
817
818    #[test]
819    fn test_record_attempt_allow() {
820        let detector = CredentialStuffingDetector::new(test_config());
821        let now = now_ms();
822
823        let attempt = AuthAttempt::new("1.2.3.4", "/login", now);
824        let verdict = detector.record_attempt(&attempt);
825
826        assert!(verdict.is_allow());
827    }
828
829    #[test]
830    fn test_record_attempt_suspicious() {
831        let detector = CredentialStuffingDetector::new(test_config());
832        let now = now_ms();
833
834        // Record 3 failures first
835        for i in 0..3 {
836            let result = AuthResult::new("1.2.3.4", "/login", false, now + i);
837            detector.record_result(&result);
838        }
839
840        // Now attempt should be suspicious
841        let attempt = AuthAttempt::new("1.2.3.4", "/login", now + 100);
842        let verdict = detector.record_attempt(&attempt);
843
844        assert!(!verdict.is_allow());
845        assert!(!verdict.is_block());
846        assert!(verdict.risk_delta() > 0);
847    }
848
849    #[test]
850    fn test_record_attempt_block() {
851        let detector = CredentialStuffingDetector::new(test_config());
852        let now = now_ms();
853
854        // Record 10 failures
855        for i in 0..10 {
856            let result = AuthResult::new("1.2.3.4", "/login", false, now + i);
857            detector.record_result(&result);
858        }
859
860        // Now attempt should be blocked
861        let attempt = AuthAttempt::new("1.2.3.4", "/login", now + 100);
862        let verdict = detector.record_attempt(&attempt);
863
864        assert!(verdict.is_block());
865    }
866
867    #[test]
868    fn test_takeover_detection() {
869        let detector = CredentialStuffingDetector::new(test_config());
870        let now = now_ms();
871
872        // Record 5 failures
873        for i in 0..5 {
874            let result = AuthResult::new("1.2.3.4", "/login", false, now + i * 1000);
875            detector.record_result(&result);
876        }
877
878        // Then success - should trigger takeover alert
879        let result = AuthResult::new("1.2.3.4", "/login", true, now + 10000);
880        let alert = detector.record_result(&result);
881
882        assert!(alert.is_some());
883        let alert = alert.unwrap();
884        assert_eq!(alert.entity_id, "1.2.3.4");
885        assert_eq!(alert.prior_failures, 5);
886        assert_eq!(alert.severity, StuffingSeverity::Critical);
887
888        // Check alert stored
889        let alerts = detector.get_takeover_alerts(now);
890        assert_eq!(alerts.len(), 1);
891    }
892
893    #[test]
894    fn test_distributed_attack_detection() {
895        let detector = CredentialStuffingDetector::new(test_config());
896        let now = now_ms();
897
898        // 3 different IPs with same fingerprint
899        let ips = ["1.1.1.1", "2.2.2.2", "3.3.3.3"];
900        for ip in &ips {
901            let attempt = AuthAttempt::new(*ip, "/login", now).with_fingerprint("same-fingerprint");
902            detector.record_attempt(&attempt);
903        }
904
905        // Fourth IP should trigger distributed attack detection
906        let attempt =
907            AuthAttempt::new("4.4.4.4", "/login", now).with_fingerprint("same-fingerprint");
908        let verdict = detector.record_attempt(&attempt);
909
910        // Should be suspicious due to distributed attack
911        assert!(!verdict.is_allow());
912        assert_eq!(verdict.risk_delta(), 30);
913    }
914
915    #[test]
916    fn test_window_reset() {
917        let mut config = test_config();
918        config.failure_window_ms = 100; // 100ms window for test
919        let detector = CredentialStuffingDetector::new(config);
920        let now = now_ms();
921
922        // Record failures
923        for i in 0..5 {
924            let result = AuthResult::new("1.2.3.4", "/login", false, now + i);
925            detector.record_result(&result);
926        }
927
928        // Wait for window to expire
929        std::thread::sleep(std::time::Duration::from_millis(150));
930        let later = now_ms();
931
932        // New attempt should reset window
933        let attempt = AuthAttempt::new("1.2.3.4", "/login", later);
934        let verdict = detector.record_attempt(&attempt);
935
936        // Should be allowed since window reset
937        assert!(verdict.is_allow());
938    }
939
940    #[test]
941    fn test_success_resets_window() {
942        let detector = CredentialStuffingDetector::new(test_config());
943        let now = now_ms();
944
945        // Record 4 failures (just below block threshold)
946        for i in 0..4 {
947            let result = AuthResult::new("1.2.3.4", "/login", false, now + i * 1000);
948            detector.record_result(&result);
949        }
950
951        // Successful login (without triggering takeover - need 3+ failures)
952        let result = AuthResult::new("1.2.3.4", "/login", true, now + 5000);
953        let alert = detector.record_result(&result);
954
955        // Takeover detected (4 > 3 min failures)
956        assert!(alert.is_some());
957
958        // New attempt should be allowed (window reset after takeover)
959        let attempt = AuthAttempt::new("1.2.3.4", "/login", now + 6000);
960        let verdict = detector.record_attempt(&attempt);
961        assert!(verdict.is_allow());
962    }
963
964    #[test]
965    fn test_stats() {
966        let detector = CredentialStuffingDetector::new(test_config());
967        let now = now_ms();
968
969        // Record some activity
970        for i in 0..5 {
971            let result = AuthResult::new("1.2.3.4", "/login", false, now + i * 1000);
972            detector.record_result(&result);
973        }
974
975        let result = AuthResult::new("5.6.7.8", "/login", true, now + 10000);
976        detector.record_result(&result);
977
978        let stats = detector.get_stats();
979        assert_eq!(stats.entity_count, 2);
980        assert_eq!(stats.total_failures, 5);
981        assert_eq!(stats.total_successes, 1);
982        assert!(stats.suspicious_entities >= 1);
983    }
984
985    #[test]
986    fn test_clear() {
987        let detector = CredentialStuffingDetector::new(test_config());
988        let now = now_ms();
989
990        // Add some data
991        let result = AuthResult::new("1.2.3.4", "/login", false, now);
992        detector.record_result(&result);
993
994        assert!(!detector.is_empty());
995
996        detector.clear();
997
998        assert!(detector.is_empty());
999        assert_eq!(detector.get_stats().entity_count, 0);
1000    }
1001
1002    #[test]
1003    fn test_export_import() {
1004        let detector1 = CredentialStuffingDetector::new(test_config());
1005        let now = now_ms();
1006
1007        // Record activity
1008        for i in 0..3 {
1009            let result = AuthResult::new("1.2.3.4", "/login", false, now + i * 1000);
1010            detector1.record_result(&result);
1011        }
1012
1013        // Export
1014        let state = detector1.export();
1015        assert!(!state.entity_metrics.is_empty());
1016
1017        // Import into new detector
1018        let detector2 = CredentialStuffingDetector::new(test_config());
1019        detector2.import(state);
1020
1021        assert_eq!(detector1.len(), detector2.len());
1022    }
1023
1024    #[test]
1025    fn test_events_emitted() {
1026        let detector = CredentialStuffingDetector::new(test_config());
1027        let now = now_ms();
1028
1029        // Record enough failures to trigger suspicious
1030        for i in 0..5 {
1031            let result = AuthResult::new("1.2.3.4", "/login", false, now + i * 1000);
1032            detector.record_result(&result);
1033        }
1034
1035        // Attempt should emit event
1036        let attempt = AuthAttempt::new("1.2.3.4", "/login", now + 10000);
1037        detector.record_attempt(&attempt);
1038
1039        let events = detector.drain_events();
1040        assert!(!events.is_empty());
1041    }
1042
1043    #[test]
1044    fn test_username_targeted_attack_detection() {
1045        let mut config = test_config();
1046        config.username_targeted_min_ips = 3;
1047        config.username_targeted_min_failures = 5;
1048        config.username_targeted_window_ms = 60_000;
1049        let detector = CredentialStuffingDetector::new(config);
1050        let now = now_ms();
1051
1052        // 5 different IPs targeting same username "admin"
1053        let ips = ["1.1.1.1", "2.2.2.2", "3.3.3.3", "4.4.4.4", "5.5.5.5"];
1054        for (i, ip) in ips.iter().enumerate() {
1055            // Record attempt to track the IP
1056            let attempt =
1057                AuthAttempt::new(*ip, "/login", now + i as u64 * 100).with_username("admin");
1058            detector.record_attempt(&attempt);
1059
1060            // Record failure to increment failure count
1061            let result = AuthResult::new(*ip, "/login", false, now + i as u64 * 100 + 50)
1062                .with_username("admin");
1063            detector.record_result(&result);
1064        }
1065
1066        // Next attempt from a 6th IP should detect username-targeted attack
1067        let attempt = AuthAttempt::new("6.6.6.6", "/login", now + 1000).with_username("admin");
1068        let verdict = detector.record_attempt(&attempt);
1069
1070        // Should be suspicious due to username-targeted attack
1071        assert!(!verdict.is_allow());
1072        assert_eq!(verdict.risk_delta(), 35); // Higher risk for targeted attacks
1073
1074        // Check event was emitted
1075        let events = detector.drain_events();
1076        let has_username_targeted = events.iter().any(|e| {
1077            matches!(e, StuffingEvent::UsernameTargetedAttack { username, .. } if username == "admin")
1078        });
1079        assert!(
1080            has_username_targeted,
1081            "Expected UsernameTargetedAttack event"
1082        );
1083    }
1084
1085    #[test]
1086    fn test_username_targeted_different_usernames_isolated() {
1087        let mut config = test_config();
1088        config.username_targeted_min_ips = 3;
1089        config.username_targeted_min_failures = 5;
1090        let detector = CredentialStuffingDetector::new(config);
1091        let now = now_ms();
1092
1093        // 2 IPs targeting "admin" - not enough
1094        for (i, ip) in ["1.1.1.1", "2.2.2.2"].iter().enumerate() {
1095            let attempt =
1096                AuthAttempt::new(*ip, "/login", now + i as u64 * 100).with_username("admin");
1097            detector.record_attempt(&attempt);
1098            let result = AuthResult::new(*ip, "/login", false, now + i as u64 * 100 + 50)
1099                .with_username("admin");
1100            detector.record_result(&result);
1101        }
1102
1103        // 2 IPs targeting "user" - not enough
1104        for (i, ip) in ["3.3.3.3", "4.4.4.4"].iter().enumerate() {
1105            let attempt =
1106                AuthAttempt::new(*ip, "/login", now + i as u64 * 100).with_username("user");
1107            detector.record_attempt(&attempt);
1108            let result = AuthResult::new(*ip, "/login", false, now + i as u64 * 100 + 50)
1109                .with_username("user");
1110            detector.record_result(&result);
1111        }
1112
1113        // Neither username should trigger detection
1114        let attempt = AuthAttempt::new("5.5.5.5", "/login", now + 1000).with_username("admin");
1115        let verdict = detector.record_attempt(&attempt);
1116        assert!(
1117            verdict.is_allow(),
1118            "Should not detect attack with only 3 IPs"
1119        );
1120    }
1121
1122    #[test]
1123    fn test_global_velocity_spike_detection() {
1124        let mut config = test_config();
1125        config.global_velocity_threshold_rate = 5.0; // 5 failures/sec
1126        config.global_velocity_window_ms = 1000; // 1 second window
1127        config.global_velocity_max_track = 100;
1128        let detector = CredentialStuffingDetector::new(config);
1129        let now = now_ms();
1130
1131        // Record 10 failures in rapid succession (within 1 second = 10/sec rate)
1132        for i in 0..10 {
1133            let result = AuthResult::new(
1134                format!("10.0.0.{}", i),
1135                "/login",
1136                false,
1137                now + i as u64 * 50, // 50ms apart
1138            );
1139            detector.record_result(&result);
1140        }
1141
1142        // New attempt should detect global velocity spike
1143        let attempt = AuthAttempt::new("11.11.11.11", "/login", now + 600);
1144        let verdict = detector.record_attempt(&attempt);
1145
1146        // Should be suspicious due to global velocity spike
1147        assert!(!verdict.is_allow());
1148        assert_eq!(verdict.risk_delta(), 20); // Global velocity risk
1149
1150        // Check event was emitted
1151        let events = detector.drain_events();
1152        let has_velocity_spike = events
1153            .iter()
1154            .any(|e| matches!(e, StuffingEvent::GlobalVelocitySpike { .. }));
1155        assert!(has_velocity_spike, "Expected GlobalVelocitySpike event");
1156    }
1157
1158    #[test]
1159    fn test_global_velocity_below_threshold() {
1160        let mut config = test_config();
1161        config.global_velocity_threshold_rate = 100.0; // Very high threshold
1162        config.global_velocity_window_ms = 1000;
1163        let detector = CredentialStuffingDetector::new(config);
1164        let now = now_ms();
1165
1166        // Record a few failures
1167        for i in 0..3 {
1168            let result = AuthResult::new(
1169                format!("10.0.0.{}", i),
1170                "/login",
1171                false,
1172                now + i as u64 * 100,
1173            );
1174            detector.record_result(&result);
1175        }
1176
1177        // Should be allowed - not enough for velocity spike
1178        let attempt = AuthAttempt::new("11.11.11.11", "/login", now + 500);
1179        let verdict = detector.record_attempt(&attempt);
1180
1181        assert!(verdict.is_allow());
1182    }
1183
1184    #[test]
1185    fn test_distributed_vs_username_targeted_priority() {
1186        // Tests that fingerprint-based distributed detection takes priority
1187        let mut config = test_config();
1188        config.distributed_min_ips = 3;
1189        config.username_targeted_min_ips = 3;
1190        config.username_targeted_min_failures = 3;
1191        let detector = CredentialStuffingDetector::new(config);
1192        let now = now_ms();
1193
1194        // Set up both conditions: same fingerprint AND same username
1195        let ips = ["1.1.1.1", "2.2.2.2", "3.3.3.3", "4.4.4.4"];
1196        for (i, ip) in ips.iter().enumerate() {
1197            let attempt = AuthAttempt::new(*ip, "/login", now + i as u64 * 100)
1198                .with_fingerprint("shared-fp")
1199                .with_username("admin");
1200            detector.record_attempt(&attempt);
1201            let result = AuthResult::new(*ip, "/login", false, now + i as u64 * 100 + 50)
1202                .with_username("admin");
1203            detector.record_result(&result);
1204        }
1205
1206        // Next attempt should detect distributed attack first (fingerprint has priority)
1207        let attempt = AuthAttempt::new("5.5.5.5", "/login", now + 1000)
1208            .with_fingerprint("shared-fp")
1209            .with_username("admin");
1210        let verdict = detector.record_attempt(&attempt);
1211
1212        // Should be detected - either distributed or username-targeted
1213        assert!(!verdict.is_allow());
1214        // Distributed gives +30, username-targeted gives +35
1215        // If distributed is checked first, we get +30
1216        assert_eq!(
1217            verdict.risk_delta(),
1218            30,
1219            "Fingerprint-based detection should take priority"
1220        );
1221    }
1222
1223    #[test]
1224    fn test_username_tracking_across_results() {
1225        let mut config = test_config();
1226        config.username_targeted_min_ips = 2;
1227        config.username_targeted_min_failures = 3;
1228        let detector = CredentialStuffingDetector::new(config);
1229        let now = now_ms();
1230
1231        // Two IPs, but need 3 failures
1232        let attempt1 = AuthAttempt::new("1.1.1.1", "/login", now).with_username("victim");
1233        detector.record_attempt(&attempt1);
1234        let result1 = AuthResult::new("1.1.1.1", "/login", false, now + 10).with_username("victim");
1235        detector.record_result(&result1);
1236
1237        let attempt2 = AuthAttempt::new("2.2.2.2", "/login", now + 100).with_username("victim");
1238        detector.record_attempt(&attempt2);
1239        let result2 =
1240            AuthResult::new("2.2.2.2", "/login", false, now + 110).with_username("victim");
1241        detector.record_result(&result2);
1242
1243        // 2 IPs, 2 failures - not enough failures yet
1244        let attempt3 = AuthAttempt::new("3.3.3.3", "/login", now + 200).with_username("victim");
1245        let verdict = detector.record_attempt(&attempt3);
1246        assert!(verdict.is_allow(), "2 failures should not trigger (need 3)");
1247
1248        // Third failure pushes over threshold
1249        let result3 =
1250            AuthResult::new("3.3.3.3", "/login", false, now + 210).with_username("victim");
1251        detector.record_result(&result3);
1252
1253        // Now should detect
1254        let attempt4 = AuthAttempt::new("4.4.4.4", "/login", now + 300).with_username("victim");
1255        let verdict = detector.record_attempt(&attempt4);
1256        assert!(!verdict.is_allow(), "3 IPs and 3 failures should trigger");
1257    }
1258}