1use crate::detection::{
13 AuthAttempt, AuthMetrics, AuthResult, DistributedAttack, EntityEndpointKey,
14 GlobalVelocityTracker, StuffingConfig, StuffingEvent, StuffingSeverity, StuffingVerdict,
15 TakeoverAlert, UsernameTargetedAttack,
16};
17use crossbeam_channel::{bounded, Receiver, Sender};
18use dashmap::DashMap;
19use parking_lot::RwLock;
20use regex::Regex;
21use serde::{Deserialize, Serialize};
22use std::collections::VecDeque;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::Arc;
25use std::thread::{self, JoinHandle};
26use std::time::{SystemTime, UNIX_EPOCH};
27use tracing::warn;
28
29#[inline]
31fn now_ms() -> u64 {
32 SystemTime::now()
33 .duration_since(UNIX_EPOCH)
34 .unwrap_or_default()
35 .as_millis() as u64
36}
37
38pub struct CredentialStuffingDetector {
46 entity_auth: DashMap<EntityEndpointKey, AuthMetrics>,
48 distributed: DashMap<String, DistributedAttack>,
50 username_targeted: DashMap<String, UsernameTargetedAttack>,
52 global_velocity: RwLock<GlobalVelocityTracker>,
54 takeovers: RwLock<VecDeque<TakeoverAlert>>,
56 events: RwLock<VecDeque<StuffingEvent>>,
58 auth_patterns: Vec<Regex>,
60 config: StuffingConfig,
62 shutdown: Arc<AtomicBool>,
64 shutdown_tx: Sender<()>,
66 cleanup_handle: Option<JoinHandle<()>>,
68}
69
70impl CredentialStuffingDetector {
71 pub fn new(config: StuffingConfig) -> Self {
75 let config = config.validated();
77
78 let auth_patterns: Vec<Regex> = config
81 .auth_path_patterns
82 .iter()
83 .filter_map(|p| match Regex::new(p) {
84 Ok(re) => Some(re),
85 Err(e) => {
86 warn!(
87 pattern = %p,
88 error = %e,
89 "Invalid auth_path_pattern in StuffingConfig - pattern will be skipped"
90 );
91 None
92 }
93 })
94 .collect();
95
96 let (shutdown_tx, shutdown_rx) = bounded(1);
97 let shutdown = Arc::new(AtomicBool::new(false));
98
99 let entity_auth = DashMap::with_capacity(config.max_entities.min(10_000));
100 let distributed = DashMap::with_capacity(config.max_distributed_attacks.min(1_000));
101 let username_targeted = DashMap::with_capacity(config.max_distributed_attacks.min(1_000));
102
103 let entity_auth_clone = entity_auth.clone();
105 let distributed_clone = distributed.clone();
106 let username_targeted_clone = username_targeted.clone();
107 let shutdown_flag = shutdown.clone();
108 let cleanup_interval = config.cleanup_interval_ms;
109 let failure_window = config.failure_window_ms;
110 let distributed_window = config.distributed_window_ms;
111 let username_window = config.username_targeted_window_ms;
112
113 let handle = thread::spawn(move || {
114 Self::cleanup_loop(
115 entity_auth_clone,
116 distributed_clone,
117 username_targeted_clone,
118 shutdown_rx,
119 shutdown_flag,
120 cleanup_interval,
121 failure_window,
122 distributed_window,
123 username_window,
124 );
125 });
126
127 let global_velocity = GlobalVelocityTracker::new(
129 config.global_velocity_max_track,
130 config.global_velocity_window_ms,
131 );
132
133 Self {
134 entity_auth,
135 distributed,
136 username_targeted,
137 global_velocity: RwLock::new(global_velocity),
138 takeovers: RwLock::new(VecDeque::with_capacity(config.max_takeover_alerts)),
139 events: RwLock::new(VecDeque::with_capacity(1000)),
140 auth_patterns,
141 config,
142 shutdown,
143 shutdown_tx,
144 cleanup_handle: Some(handle),
145 }
146 }
147
148 pub fn with_defaults() -> Self {
150 Self::new(StuffingConfig::default())
151 }
152
153 fn cleanup_loop(
155 entity_auth: DashMap<EntityEndpointKey, AuthMetrics>,
156 distributed: DashMap<String, DistributedAttack>,
157 username_targeted: DashMap<String, UsernameTargetedAttack>,
158 shutdown_rx: Receiver<()>,
159 shutdown: Arc<AtomicBool>,
160 cleanup_interval_ms: u64,
161 failure_window_ms: u64,
162 distributed_window_ms: u64,
163 username_window_ms: u64,
164 ) {
165 let cleanup_interval = std::time::Duration::from_millis(cleanup_interval_ms);
166
167 loop {
168 match shutdown_rx.recv_timeout(cleanup_interval) {
169 Ok(()) | Err(crossbeam_channel::RecvTimeoutError::Disconnected) => break,
170 Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
171 if shutdown.load(Ordering::Relaxed) {
172 break;
173 }
174
175 let now = now_ms();
176
177 let entity_threshold = now.saturating_sub(failure_window_ms * 2);
179 entity_auth.retain(|_, metrics| metrics.last_attempt >= entity_threshold);
180
181 let distributed_threshold = now.saturating_sub(distributed_window_ms);
183 distributed.retain(|_, attack| attack.last_activity >= distributed_threshold);
184
185 let username_threshold = now.saturating_sub(username_window_ms);
187 username_targeted
188 .retain(|_, attack| attack.last_activity >= username_threshold);
189 }
190 }
191 }
192 }
193
194 pub fn is_auth_endpoint(&self, path: &str) -> bool {
196 self.auth_patterns.iter().any(|re| re.is_match(path))
197 }
198
199 pub fn record_attempt(&self, attempt: &AuthAttempt) -> StuffingVerdict {
204 let now = attempt.timestamp;
205 let key = EntityEndpointKey::new(&attempt.entity_id, &attempt.endpoint);
206
207 let mut metrics = self.entity_auth.entry(key.clone()).or_insert_with(|| {
209 AuthMetrics::new(attempt.entity_id.clone(), attempt.endpoint.clone(), now)
210 });
211
212 if now.saturating_sub(metrics.window_start) > self.config.failure_window_ms {
214 metrics.reset_window(now);
215 }
216
217 metrics.last_attempt = now;
218
219 let failures = metrics.failures;
221 drop(metrics); if let Some(ref fingerprint) = attempt.fingerprint {
225 self.track_distributed_attempt(fingerprint, &attempt.endpoint, &attempt.entity_id, now);
226 }
227
228 if let Some(ref username) = attempt.username {
230 self.track_username_targeted_attempt(
231 username,
232 &attempt.endpoint,
233 &attempt.entity_id,
234 now,
235 );
236 }
237
238 if failures >= self.config.failure_threshold_block {
240 StuffingVerdict::block(format!(
241 "Credential stuffing: {} failures in window",
242 failures
243 ))
244 } else if failures >= self.config.failure_threshold_high {
245 let event = StuffingEvent::SuspiciousFailureRate {
246 entity_id: attempt.entity_id.clone(),
247 endpoint: attempt.endpoint.clone(),
248 failures,
249 window_ms: self.config.failure_window_ms,
250 severity: StuffingSeverity::High,
251 };
252 self.emit_event(event);
253
254 StuffingVerdict::suspicious(
255 format!("High failure rate: {} failures", failures),
256 StuffingSeverity::High,
257 )
258 } else if failures >= self.config.failure_threshold_suspicious {
259 let event = StuffingEvent::SuspiciousFailureRate {
260 entity_id: attempt.entity_id.clone(),
261 endpoint: attempt.endpoint.clone(),
262 failures,
263 window_ms: self.config.failure_window_ms,
264 severity: StuffingSeverity::Medium,
265 };
266 self.emit_event(event);
267
268 StuffingVerdict::suspicious(
269 format!("Suspicious failure rate: {} failures", failures),
270 StuffingSeverity::Medium,
271 )
272 } else {
273 if let Some(ref fingerprint) = attempt.fingerprint {
275 if let Some(verdict) = self.check_distributed_attack(fingerprint, &attempt.endpoint)
276 {
277 return verdict;
278 }
279 }
280
281 if let Some(ref username) = attempt.username {
283 if let Some(verdict) =
284 self.check_username_targeted_attack(username, &attempt.endpoint)
285 {
286 return verdict;
287 }
288 }
289
290 if let Some(verdict) = self.check_global_velocity(now) {
292 return verdict;
293 }
294
295 StuffingVerdict::Allow
296 }
297 }
298
299 pub fn record_result(&self, result: &AuthResult) -> Option<TakeoverAlert> {
304 let now = result.timestamp;
305 let key = EntityEndpointKey::new(&result.entity_id, &result.endpoint);
306
307 let mut entry = self.entity_auth.entry(key.clone()).or_insert_with(|| {
309 AuthMetrics::new(result.entity_id.clone(), result.endpoint.clone(), now)
310 });
311
312 let metrics = entry.value_mut();
313
314 if result.success {
315 let prior_failures = metrics.failures;
317 let failure_window = now.saturating_sub(metrics.window_start);
318
319 metrics.record_success(now);
320
321 if prior_failures >= self.config.takeover_min_failures
323 && failure_window <= self.config.takeover_window_ms
324 {
325 let alert = TakeoverAlert::new(
326 result.entity_id.clone(),
327 result.endpoint.clone(),
328 prior_failures,
329 failure_window,
330 now,
331 );
332
333 let event = StuffingEvent::AccountTakeover {
335 entity_id: result.entity_id.clone(),
336 endpoint: result.endpoint.clone(),
337 prior_failures,
338 severity: StuffingSeverity::Critical,
339 };
340 self.emit_event(event);
341
342 self.store_takeover_alert(alert.clone());
344
345 metrics.reset_window(now);
347
348 return Some(alert);
349 }
350
351 metrics.reset_window(now);
353 } else {
354 metrics.record_failure(now);
356
357 drop(entry); self.record_global_velocity_failure(now);
360
361 if let Some(ref username) = result.username {
363 self.record_username_targeted_failure(username, &result.endpoint, now);
364 }
365
366 let entry = self.entity_auth.get(&key);
368 if let Some(metrics) = entry {
369 if metrics.detect_low_and_slow(
371 self.config.low_slow_min_hours,
372 self.config.low_slow_min_per_hour,
373 ) {
374 let event = StuffingEvent::LowAndSlow {
375 entity_id: result.entity_id.clone(),
376 endpoint: result.endpoint.clone(),
377 hours_active: self.config.low_slow_min_hours,
378 total_failures: metrics.total_failures,
379 severity: StuffingSeverity::Medium,
380 };
381 self.emit_event(event);
382 }
383 }
384 }
385
386 None
387 }
388
389 fn track_distributed_attempt(
391 &self,
392 fingerprint: &str,
393 endpoint: &str,
394 entity_id: &str,
395 now: u64,
396 ) {
397 let key = format!("{}:{}", fingerprint, endpoint);
398
399 let mut entry = self.distributed.entry(key).or_insert_with(|| {
400 DistributedAttack::new(
401 fingerprint.to_string(),
402 endpoint.to_string(),
403 entity_id.to_string(),
404 now,
405 )
406 });
407
408 let attack = entry.value_mut();
409 attack.add_entity(entity_id.to_string(), now);
410 }
411
412 fn check_distributed_attack(
414 &self,
415 fingerprint: &str,
416 endpoint: &str,
417 ) -> Option<StuffingVerdict> {
418 let key = format!("{}:{}", fingerprint, endpoint);
419
420 if let Some(attack) = self.distributed.get(&key) {
421 if attack.entity_count() >= self.config.distributed_min_ips {
422 let event = StuffingEvent::DistributedAttackDetected {
423 fingerprint: fingerprint.to_string(),
424 endpoint: endpoint.to_string(),
425 ip_count: attack.entity_count(),
426 total_failures: attack.total_failures,
427 severity: StuffingSeverity::High,
428 };
429 self.emit_event(event);
430
431 return Some(StuffingVerdict::suspicious_with_risk(
432 format!(
433 "Distributed attack: {} IPs with same fingerprint",
434 attack.entity_count()
435 ),
436 StuffingSeverity::High,
437 30, ));
439 }
440 }
441
442 None
443 }
444
445 pub fn record_distributed_failure(&self, fingerprint: &str, endpoint: &str, now: u64) {
447 let key = format!("{}:{}", fingerprint, endpoint);
448
449 if let Some(mut entry) = self.distributed.get_mut(&key) {
450 entry.value_mut().record_failure(now);
451 }
452 }
453
454 fn track_username_targeted_attempt(
459 &self,
460 username: &str,
461 endpoint: &str,
462 entity_id: &str,
463 now: u64,
464 ) {
465 let key = format!("{}:{}", username, endpoint);
466
467 let mut entry = self.username_targeted.entry(key).or_insert_with(|| {
468 UsernameTargetedAttack::new(
469 username.to_string(),
470 endpoint.to_string(),
471 entity_id.to_string(),
472 now,
473 )
474 });
475
476 let attack = entry.value_mut();
477 attack.add_ip(entity_id.to_string(), now);
478 }
479
480 fn record_username_targeted_failure(&self, username: &str, endpoint: &str, now: u64) {
482 let key = format!("{}:{}", username, endpoint);
483
484 if let Some(mut entry) = self.username_targeted.get_mut(&key) {
485 entry.value_mut().record_failure(now);
486 }
487 }
488
489 fn check_username_targeted_attack(
494 &self,
495 username: &str,
496 endpoint: &str,
497 ) -> Option<StuffingVerdict> {
498 let key = format!("{}:{}", username, endpoint);
499
500 if let Some(attack) = self.username_targeted.get(&key) {
501 let ip_count = attack.ip_count();
502 let failures = attack.total_failures;
503
504 if ip_count >= self.config.username_targeted_min_ips
506 && failures >= self.config.username_targeted_min_failures
507 {
508 let event = StuffingEvent::UsernameTargetedAttack {
509 username: username.to_string(),
510 endpoint: endpoint.to_string(),
511 ip_count,
512 total_failures: failures,
513 severity: StuffingSeverity::High,
514 };
515 self.emit_event(event);
516
517 return Some(StuffingVerdict::suspicious_with_risk(
518 format!(
519 "Username-targeted attack: {} IPs targeting '{}'",
520 ip_count, username
521 ),
522 StuffingSeverity::High,
523 35, ));
525 }
526 }
527
528 None
529 }
530
531 fn record_global_velocity_failure(&self, now: u64) {
533 let mut tracker = self.global_velocity.write();
534 tracker.record_failure(now);
535 }
536
537 fn check_global_velocity(&self, now: u64) -> Option<StuffingVerdict> {
542 let tracker = self.global_velocity.read();
543 let rate = tracker.failure_rate(now);
544
545 if rate >= self.config.global_velocity_threshold_rate {
546 let count = tracker.failure_count(now);
547 drop(tracker); let event = StuffingEvent::GlobalVelocitySpike {
550 failure_rate: rate,
551 failure_count: count,
552 threshold_rate: self.config.global_velocity_threshold_rate,
553 severity: StuffingSeverity::High,
554 };
555 self.emit_event(event);
556
557 return Some(StuffingVerdict::suspicious_with_risk(
558 format!(
559 "Global velocity spike: {:.1} failures/sec (threshold: {:.1})",
560 rate, self.config.global_velocity_threshold_rate
561 ),
562 StuffingSeverity::High,
563 20, ));
565 }
566
567 None
568 }
569
570 fn emit_event(&self, event: StuffingEvent) {
572 let mut events = self.events.write();
573 if events.len() >= 1000 {
574 events.pop_front();
575 }
576 events.push_back(event);
577 }
578
579 fn store_takeover_alert(&self, alert: TakeoverAlert) {
581 let mut takeovers = self.takeovers.write();
582 if takeovers.len() >= self.config.max_takeover_alerts {
583 takeovers.pop_front();
584 }
585 takeovers.push_back(alert);
586 }
587
588 pub fn get_entity_metrics(&self, entity_id: &str, endpoint: &str) -> Option<AuthMetrics> {
592 let key = EntityEndpointKey::new(entity_id, endpoint);
593 self.entity_auth.get(&key).map(|e| e.clone())
594 }
595
596 pub fn get_all_entity_metrics(&self) -> Vec<AuthMetrics> {
598 self.entity_auth.iter().map(|e| e.value().clone()).collect()
599 }
600
601 pub fn get_distributed_attacks(&self) -> Vec<DistributedAttack> {
603 self.distributed.iter().map(|e| e.value().clone()).collect()
604 }
605
606 pub fn get_takeover_alerts(&self, since: u64) -> Vec<TakeoverAlert> {
608 let takeovers = self.takeovers.read();
609 takeovers
610 .iter()
611 .filter(|a| a.success_at >= since)
612 .cloned()
613 .collect()
614 }
615
616 pub fn get_all_takeover_alerts(&self) -> Vec<TakeoverAlert> {
618 let takeovers = self.takeovers.read();
619 takeovers.iter().cloned().collect()
620 }
621
622 pub fn get_events(&self, since: u64) -> Vec<StuffingEvent> {
624 let events = self.events.read();
625 let _ = since; events.iter().cloned().collect()
629 }
630
631 pub fn drain_events(&self) -> Vec<StuffingEvent> {
633 let mut events = self.events.write();
634 events.drain(..).collect()
635 }
636
637 pub fn get_stats(&self) -> StuffingStats {
639 let entity_count = self.entity_auth.len();
640 let distributed_count = self.distributed.len();
641 let takeover_count = self.takeovers.read().len();
642 let event_count = self.events.read().len();
643
644 let mut total_failures: u64 = 0;
646 let mut total_successes: u64 = 0;
647 let mut suspicious_entities: usize = 0;
648
649 for entry in self.entity_auth.iter() {
650 let metrics = entry.value();
651 total_failures += metrics.total_failures;
652 total_successes += metrics.total_successes;
653 if metrics.failures >= self.config.failure_threshold_suspicious {
654 suspicious_entities += 1;
655 }
656 }
657
658 StuffingStats {
659 entity_count,
660 distributed_attack_count: distributed_count,
661 takeover_alert_count: takeover_count,
662 event_count,
663 total_failures,
664 total_successes,
665 suspicious_entities,
666 }
667 }
668
669 pub fn clear(&self) {
671 self.entity_auth.clear();
672 self.distributed.clear();
673 self.takeovers.write().clear();
674 self.events.write().clear();
675 }
676
677 pub fn len(&self) -> usize {
679 self.entity_auth.len()
680 }
681
682 pub fn is_empty(&self) -> bool {
684 self.entity_auth.is_empty()
685 }
686
687 pub fn stop(&self) {
689 self.shutdown.store(true, Ordering::Relaxed);
690 let _ = self.shutdown_tx.send(());
691 }
692
693 pub fn export(&self) -> StuffingState {
695 StuffingState {
696 entity_metrics: self.get_all_entity_metrics(),
697 distributed_attacks: self.get_distributed_attacks(),
698 takeover_alerts: self.get_all_takeover_alerts(),
699 }
700 }
701
702 pub fn import(&self, state: StuffingState) {
704 for metrics in state.entity_metrics {
705 let key = EntityEndpointKey::new(&metrics.entity_id, &metrics.endpoint);
706 self.entity_auth.insert(key, metrics);
707 }
708
709 for attack in state.distributed_attacks {
710 let key = format!("{}:{}", attack.fingerprint, attack.endpoint);
711 self.distributed.insert(key, attack);
712 }
713
714 let mut takeovers = self.takeovers.write();
715 for alert in state.takeover_alerts {
716 if takeovers.len() < self.config.max_takeover_alerts {
717 takeovers.push_back(alert);
718 }
719 }
720 }
721}
722
723impl Drop for CredentialStuffingDetector {
724 fn drop(&mut self) {
725 self.stop();
726 if let Some(handle) = self.cleanup_handle.take() {
727 let _ = handle.join();
728 }
729 }
730}
731
732#[derive(Debug, Clone, Default)]
734pub struct StuffingStats {
735 pub entity_count: usize,
737 pub distributed_attack_count: usize,
739 pub takeover_alert_count: usize,
741 pub event_count: usize,
743 pub total_failures: u64,
745 pub total_successes: u64,
747 pub suspicious_entities: usize,
749}
750
751#[derive(Debug, Clone, Default, Serialize, Deserialize)]
753pub struct StuffingState {
754 pub entity_metrics: Vec<AuthMetrics>,
755 pub distributed_attacks: Vec<DistributedAttack>,
756 pub takeover_alerts: Vec<TakeoverAlert>,
757}
758
759#[cfg(test)]
760mod tests {
761 use super::*;
762
763 fn test_config() -> StuffingConfig {
764 StuffingConfig {
765 failure_window_ms: 60_000, failure_threshold_suspicious: 3,
767 failure_threshold_high: 5,
768 failure_threshold_block: 10,
769 distributed_min_ips: 3,
770 distributed_window_ms: 60_000,
771 takeover_window_ms: 60_000,
772 takeover_min_failures: 3,
773 low_slow_min_hours: 2,
774 low_slow_min_per_hour: 1,
775 cleanup_interval_ms: 60_000,
776 ..Default::default()
777 }
778 }
779
780 #[test]
781 fn test_is_auth_endpoint() {
782 let detector = CredentialStuffingDetector::with_defaults();
783
784 assert!(detector.is_auth_endpoint("/api/login"));
785 assert!(detector.is_auth_endpoint("/api/auth/token"));
786 assert!(detector.is_auth_endpoint("/v1/signin"));
787 assert!(detector.is_auth_endpoint("/oauth/authorize"));
788
789 assert!(!detector.is_auth_endpoint("/api/users"));
790 assert!(!detector.is_auth_endpoint("/api/products"));
791 }
792
793 #[test]
794 fn test_invalid_patterns_are_skipped_gracefully() {
795 let config = StuffingConfig {
798 auth_path_patterns: vec![
799 r"(?i)/valid-login".to_string(),
800 r"[invalid(regex".to_string(), r"(?i)/another-valid".to_string(),
802 r"*invalid*".to_string(), ],
804 ..Default::default()
805 };
806
807 let detector = CredentialStuffingDetector::new(config);
809
810 assert!(detector.is_auth_endpoint("/valid-login"));
812 assert!(detector.is_auth_endpoint("/another-valid"));
813
814 assert!(!detector.is_auth_endpoint("/something-else"));
816 }
817
818 #[test]
819 fn test_record_attempt_allow() {
820 let detector = CredentialStuffingDetector::new(test_config());
821 let now = now_ms();
822
823 let attempt = AuthAttempt::new("1.2.3.4", "/login", now);
824 let verdict = detector.record_attempt(&attempt);
825
826 assert!(verdict.is_allow());
827 }
828
829 #[test]
830 fn test_record_attempt_suspicious() {
831 let detector = CredentialStuffingDetector::new(test_config());
832 let now = now_ms();
833
834 for i in 0..3 {
836 let result = AuthResult::new("1.2.3.4", "/login", false, now + i);
837 detector.record_result(&result);
838 }
839
840 let attempt = AuthAttempt::new("1.2.3.4", "/login", now + 100);
842 let verdict = detector.record_attempt(&attempt);
843
844 assert!(!verdict.is_allow());
845 assert!(!verdict.is_block());
846 assert!(verdict.risk_delta() > 0);
847 }
848
849 #[test]
850 fn test_record_attempt_block() {
851 let detector = CredentialStuffingDetector::new(test_config());
852 let now = now_ms();
853
854 for i in 0..10 {
856 let result = AuthResult::new("1.2.3.4", "/login", false, now + i);
857 detector.record_result(&result);
858 }
859
860 let attempt = AuthAttempt::new("1.2.3.4", "/login", now + 100);
862 let verdict = detector.record_attempt(&attempt);
863
864 assert!(verdict.is_block());
865 }
866
867 #[test]
868 fn test_takeover_detection() {
869 let detector = CredentialStuffingDetector::new(test_config());
870 let now = now_ms();
871
872 for i in 0..5 {
874 let result = AuthResult::new("1.2.3.4", "/login", false, now + i * 1000);
875 detector.record_result(&result);
876 }
877
878 let result = AuthResult::new("1.2.3.4", "/login", true, now + 10000);
880 let alert = detector.record_result(&result);
881
882 assert!(alert.is_some());
883 let alert = alert.unwrap();
884 assert_eq!(alert.entity_id, "1.2.3.4");
885 assert_eq!(alert.prior_failures, 5);
886 assert_eq!(alert.severity, StuffingSeverity::Critical);
887
888 let alerts = detector.get_takeover_alerts(now);
890 assert_eq!(alerts.len(), 1);
891 }
892
893 #[test]
894 fn test_distributed_attack_detection() {
895 let detector = CredentialStuffingDetector::new(test_config());
896 let now = now_ms();
897
898 let ips = ["1.1.1.1", "2.2.2.2", "3.3.3.3"];
900 for ip in &ips {
901 let attempt = AuthAttempt::new(*ip, "/login", now).with_fingerprint("same-fingerprint");
902 detector.record_attempt(&attempt);
903 }
904
905 let attempt =
907 AuthAttempt::new("4.4.4.4", "/login", now).with_fingerprint("same-fingerprint");
908 let verdict = detector.record_attempt(&attempt);
909
910 assert!(!verdict.is_allow());
912 assert_eq!(verdict.risk_delta(), 30);
913 }
914
915 #[test]
916 fn test_window_reset() {
917 let mut config = test_config();
918 config.failure_window_ms = 100; let detector = CredentialStuffingDetector::new(config);
920 let now = now_ms();
921
922 for i in 0..5 {
924 let result = AuthResult::new("1.2.3.4", "/login", false, now + i);
925 detector.record_result(&result);
926 }
927
928 std::thread::sleep(std::time::Duration::from_millis(150));
930 let later = now_ms();
931
932 let attempt = AuthAttempt::new("1.2.3.4", "/login", later);
934 let verdict = detector.record_attempt(&attempt);
935
936 assert!(verdict.is_allow());
938 }
939
940 #[test]
941 fn test_success_resets_window() {
942 let detector = CredentialStuffingDetector::new(test_config());
943 let now = now_ms();
944
945 for i in 0..4 {
947 let result = AuthResult::new("1.2.3.4", "/login", false, now + i * 1000);
948 detector.record_result(&result);
949 }
950
951 let result = AuthResult::new("1.2.3.4", "/login", true, now + 5000);
953 let alert = detector.record_result(&result);
954
955 assert!(alert.is_some());
957
958 let attempt = AuthAttempt::new("1.2.3.4", "/login", now + 6000);
960 let verdict = detector.record_attempt(&attempt);
961 assert!(verdict.is_allow());
962 }
963
964 #[test]
965 fn test_stats() {
966 let detector = CredentialStuffingDetector::new(test_config());
967 let now = now_ms();
968
969 for i in 0..5 {
971 let result = AuthResult::new("1.2.3.4", "/login", false, now + i * 1000);
972 detector.record_result(&result);
973 }
974
975 let result = AuthResult::new("5.6.7.8", "/login", true, now + 10000);
976 detector.record_result(&result);
977
978 let stats = detector.get_stats();
979 assert_eq!(stats.entity_count, 2);
980 assert_eq!(stats.total_failures, 5);
981 assert_eq!(stats.total_successes, 1);
982 assert!(stats.suspicious_entities >= 1);
983 }
984
985 #[test]
986 fn test_clear() {
987 let detector = CredentialStuffingDetector::new(test_config());
988 let now = now_ms();
989
990 let result = AuthResult::new("1.2.3.4", "/login", false, now);
992 detector.record_result(&result);
993
994 assert!(!detector.is_empty());
995
996 detector.clear();
997
998 assert!(detector.is_empty());
999 assert_eq!(detector.get_stats().entity_count, 0);
1000 }
1001
1002 #[test]
1003 fn test_export_import() {
1004 let detector1 = CredentialStuffingDetector::new(test_config());
1005 let now = now_ms();
1006
1007 for i in 0..3 {
1009 let result = AuthResult::new("1.2.3.4", "/login", false, now + i * 1000);
1010 detector1.record_result(&result);
1011 }
1012
1013 let state = detector1.export();
1015 assert!(!state.entity_metrics.is_empty());
1016
1017 let detector2 = CredentialStuffingDetector::new(test_config());
1019 detector2.import(state);
1020
1021 assert_eq!(detector1.len(), detector2.len());
1022 }
1023
1024 #[test]
1025 fn test_events_emitted() {
1026 let detector = CredentialStuffingDetector::new(test_config());
1027 let now = now_ms();
1028
1029 for i in 0..5 {
1031 let result = AuthResult::new("1.2.3.4", "/login", false, now + i * 1000);
1032 detector.record_result(&result);
1033 }
1034
1035 let attempt = AuthAttempt::new("1.2.3.4", "/login", now + 10000);
1037 detector.record_attempt(&attempt);
1038
1039 let events = detector.drain_events();
1040 assert!(!events.is_empty());
1041 }
1042
1043 #[test]
1044 fn test_username_targeted_attack_detection() {
1045 let mut config = test_config();
1046 config.username_targeted_min_ips = 3;
1047 config.username_targeted_min_failures = 5;
1048 config.username_targeted_window_ms = 60_000;
1049 let detector = CredentialStuffingDetector::new(config);
1050 let now = now_ms();
1051
1052 let ips = ["1.1.1.1", "2.2.2.2", "3.3.3.3", "4.4.4.4", "5.5.5.5"];
1054 for (i, ip) in ips.iter().enumerate() {
1055 let attempt =
1057 AuthAttempt::new(*ip, "/login", now + i as u64 * 100).with_username("admin");
1058 detector.record_attempt(&attempt);
1059
1060 let result = AuthResult::new(*ip, "/login", false, now + i as u64 * 100 + 50)
1062 .with_username("admin");
1063 detector.record_result(&result);
1064 }
1065
1066 let attempt = AuthAttempt::new("6.6.6.6", "/login", now + 1000).with_username("admin");
1068 let verdict = detector.record_attempt(&attempt);
1069
1070 assert!(!verdict.is_allow());
1072 assert_eq!(verdict.risk_delta(), 35); let events = detector.drain_events();
1076 let has_username_targeted = events.iter().any(|e| {
1077 matches!(e, StuffingEvent::UsernameTargetedAttack { username, .. } if username == "admin")
1078 });
1079 assert!(
1080 has_username_targeted,
1081 "Expected UsernameTargetedAttack event"
1082 );
1083 }
1084
1085 #[test]
1086 fn test_username_targeted_different_usernames_isolated() {
1087 let mut config = test_config();
1088 config.username_targeted_min_ips = 3;
1089 config.username_targeted_min_failures = 5;
1090 let detector = CredentialStuffingDetector::new(config);
1091 let now = now_ms();
1092
1093 for (i, ip) in ["1.1.1.1", "2.2.2.2"].iter().enumerate() {
1095 let attempt =
1096 AuthAttempt::new(*ip, "/login", now + i as u64 * 100).with_username("admin");
1097 detector.record_attempt(&attempt);
1098 let result = AuthResult::new(*ip, "/login", false, now + i as u64 * 100 + 50)
1099 .with_username("admin");
1100 detector.record_result(&result);
1101 }
1102
1103 for (i, ip) in ["3.3.3.3", "4.4.4.4"].iter().enumerate() {
1105 let attempt =
1106 AuthAttempt::new(*ip, "/login", now + i as u64 * 100).with_username("user");
1107 detector.record_attempt(&attempt);
1108 let result = AuthResult::new(*ip, "/login", false, now + i as u64 * 100 + 50)
1109 .with_username("user");
1110 detector.record_result(&result);
1111 }
1112
1113 let attempt = AuthAttempt::new("5.5.5.5", "/login", now + 1000).with_username("admin");
1115 let verdict = detector.record_attempt(&attempt);
1116 assert!(
1117 verdict.is_allow(),
1118 "Should not detect attack with only 3 IPs"
1119 );
1120 }
1121
1122 #[test]
1123 fn test_global_velocity_spike_detection() {
1124 let mut config = test_config();
1125 config.global_velocity_threshold_rate = 5.0; config.global_velocity_window_ms = 1000; config.global_velocity_max_track = 100;
1128 let detector = CredentialStuffingDetector::new(config);
1129 let now = now_ms();
1130
1131 for i in 0..10 {
1133 let result = AuthResult::new(
1134 format!("10.0.0.{}", i),
1135 "/login",
1136 false,
1137 now + i as u64 * 50, );
1139 detector.record_result(&result);
1140 }
1141
1142 let attempt = AuthAttempt::new("11.11.11.11", "/login", now + 600);
1144 let verdict = detector.record_attempt(&attempt);
1145
1146 assert!(!verdict.is_allow());
1148 assert_eq!(verdict.risk_delta(), 20); let events = detector.drain_events();
1152 let has_velocity_spike = events
1153 .iter()
1154 .any(|e| matches!(e, StuffingEvent::GlobalVelocitySpike { .. }));
1155 assert!(has_velocity_spike, "Expected GlobalVelocitySpike event");
1156 }
1157
1158 #[test]
1159 fn test_global_velocity_below_threshold() {
1160 let mut config = test_config();
1161 config.global_velocity_threshold_rate = 100.0; config.global_velocity_window_ms = 1000;
1163 let detector = CredentialStuffingDetector::new(config);
1164 let now = now_ms();
1165
1166 for i in 0..3 {
1168 let result = AuthResult::new(
1169 format!("10.0.0.{}", i),
1170 "/login",
1171 false,
1172 now + i as u64 * 100,
1173 );
1174 detector.record_result(&result);
1175 }
1176
1177 let attempt = AuthAttempt::new("11.11.11.11", "/login", now + 500);
1179 let verdict = detector.record_attempt(&attempt);
1180
1181 assert!(verdict.is_allow());
1182 }
1183
1184 #[test]
1185 fn test_distributed_vs_username_targeted_priority() {
1186 let mut config = test_config();
1188 config.distributed_min_ips = 3;
1189 config.username_targeted_min_ips = 3;
1190 config.username_targeted_min_failures = 3;
1191 let detector = CredentialStuffingDetector::new(config);
1192 let now = now_ms();
1193
1194 let ips = ["1.1.1.1", "2.2.2.2", "3.3.3.3", "4.4.4.4"];
1196 for (i, ip) in ips.iter().enumerate() {
1197 let attempt = AuthAttempt::new(*ip, "/login", now + i as u64 * 100)
1198 .with_fingerprint("shared-fp")
1199 .with_username("admin");
1200 detector.record_attempt(&attempt);
1201 let result = AuthResult::new(*ip, "/login", false, now + i as u64 * 100 + 50)
1202 .with_username("admin");
1203 detector.record_result(&result);
1204 }
1205
1206 let attempt = AuthAttempt::new("5.5.5.5", "/login", now + 1000)
1208 .with_fingerprint("shared-fp")
1209 .with_username("admin");
1210 let verdict = detector.record_attempt(&attempt);
1211
1212 assert!(!verdict.is_allow());
1214 assert_eq!(
1217 verdict.risk_delta(),
1218 30,
1219 "Fingerprint-based detection should take priority"
1220 );
1221 }
1222
1223 #[test]
1224 fn test_username_tracking_across_results() {
1225 let mut config = test_config();
1226 config.username_targeted_min_ips = 2;
1227 config.username_targeted_min_failures = 3;
1228 let detector = CredentialStuffingDetector::new(config);
1229 let now = now_ms();
1230
1231 let attempt1 = AuthAttempt::new("1.1.1.1", "/login", now).with_username("victim");
1233 detector.record_attempt(&attempt1);
1234 let result1 = AuthResult::new("1.1.1.1", "/login", false, now + 10).with_username("victim");
1235 detector.record_result(&result1);
1236
1237 let attempt2 = AuthAttempt::new("2.2.2.2", "/login", now + 100).with_username("victim");
1238 detector.record_attempt(&attempt2);
1239 let result2 =
1240 AuthResult::new("2.2.2.2", "/login", false, now + 110).with_username("victim");
1241 detector.record_result(&result2);
1242
1243 let attempt3 = AuthAttempt::new("3.3.3.3", "/login", now + 200).with_username("victim");
1245 let verdict = detector.record_attempt(&attempt3);
1246 assert!(verdict.is_allow(), "2 failures should not trigger (need 3)");
1247
1248 let result3 =
1250 AuthResult::new("3.3.3.3", "/login", false, now + 210).with_username("victim");
1251 detector.record_result(&result3);
1252
1253 let attempt4 = AuthAttempt::new("4.4.4.4", "/login", now + 300).with_username("victim");
1255 let verdict = detector.record_attempt(&attempt4);
1256 assert!(!verdict.is_allow(), "3 IPs and 3 failures should trigger");
1257 }
1258}