1use std::net::IpAddr;
43use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
44use std::sync::Arc;
45use std::time::{Duration, Instant};
46
47use futures::future::join_all;
48use parking_lot::RwLock as ParkingLotRwLock;
49use tokio::sync::{Mutex, RwLock};
50use tokio::time::interval;
51
52use crate::access::AccessListManager;
53use crate::correlation::{
54 Campaign, CampaignStatus, CampaignStore, CampaignStoreStats, CampaignUpdate, FingerprintGroup,
55 FingerprintIndex, IndexStats,
56};
57use crate::telemetry::{TelemetryClient, TelemetryEvent};
58
59use crate::correlation::detectors::{
60 AttackPayload,
61 AttackSequenceConfig,
62 AttackSequenceDetector,
64 AuthTokenConfig,
65 AuthTokenDetector,
66 BehavioralConfig,
67 BehavioralSimilarityDetector,
68 Detector,
69 DetectorError,
70 DetectorResult,
71 GraphConfig,
72 GraphDetector,
73 Ja4RotationDetector,
74 NetworkProximityConfig,
75 NetworkProximityDetector,
76 RotationConfig,
77 SharedFingerprintDetector,
79 TimingConfig,
80 TimingCorrelationDetector,
81};
82
83#[derive(Debug)]
93pub struct MitigationRateLimiter {
94 bans_in_window: AtomicU64,
96 window_start: Mutex<Instant>,
98 max_bans_per_window: u64,
100 window_duration: Duration,
102 max_ips_per_campaign: usize,
104}
105
106impl MitigationRateLimiter {
107 pub fn new(
109 max_bans_per_window: u64,
110 window_duration: Duration,
111 max_ips_per_campaign: usize,
112 ) -> Self {
113 Self {
114 bans_in_window: AtomicU64::new(0),
115 window_start: Mutex::new(Instant::now()),
116 max_bans_per_window,
117 window_duration,
118 max_ips_per_campaign,
119 }
120 }
121
122 pub async fn try_ban(&self) -> Result<(), String> {
126 self.maybe_reset_window().await;
127
128 let current = self.bans_in_window.fetch_add(1, Ordering::SeqCst);
129 if current >= self.max_bans_per_window {
130 self.bans_in_window.fetch_sub(1, Ordering::SeqCst);
131 return Err(format!(
132 "Rate limit exceeded: {} bans in {:?} window",
133 self.max_bans_per_window, self.window_duration
134 ));
135 }
136 Ok(())
137 }
138
139 async fn maybe_reset_window(&self) {
141 let mut start = self.window_start.lock().await;
142
143 if start.elapsed() >= self.window_duration {
145 *start = Instant::now();
146 self.bans_in_window.store(0, Ordering::SeqCst);
147 }
148 }
149
150 pub fn max_ips_per_campaign(&self) -> usize {
152 self.max_ips_per_campaign
153 }
154
155 pub fn current_count(&self) -> u64 {
157 self.bans_in_window.load(Ordering::SeqCst)
158 }
159}
160
161impl Default for MitigationRateLimiter {
162 fn default() -> Self {
163 Self::new(
164 50, Duration::from_secs(60), 10, )
168 }
169}
170
171#[derive(Debug, Clone)]
179pub struct ManagerConfig {
180 pub shared_threshold: usize,
184
185 pub rotation_window: Duration,
189
190 pub rotation_threshold: usize,
194
195 pub scan_interval: Duration,
199
200 pub background_scanning: bool,
205
206 pub track_combined: bool,
210
211 pub shared_confidence: f64,
215
216 pub attack_sequence_min_ips: usize,
223
224 pub attack_sequence_window: Duration,
228
229 pub auth_token_min_ips: usize,
236
237 pub auth_token_window: Duration,
241
242 pub behavioral_min_ips: usize,
249
250 pub behavioral_min_sequence: usize,
254
255 pub behavioral_window: Duration,
259
260 pub timing_min_ips: usize,
267
268 pub timing_bucket_ms: u64,
272
273 pub timing_min_bucket_hits: usize,
277
278 pub timing_window: Duration,
282
283 pub network_min_ips: usize,
290
291 pub network_check_subnet: bool,
295
296 pub graph_min_component_size: usize,
303
304 pub graph_max_depth: usize,
308
309 pub graph_edge_ttl: Duration,
313
314 pub auto_mitigation_enabled: bool,
321
322 pub auto_mitigation_threshold: f64,
326}
327
328impl Default for ManagerConfig {
329 fn default() -> Self {
330 Self {
331 shared_threshold: 3,
332 rotation_window: Duration::from_secs(60),
333 rotation_threshold: 3,
334 scan_interval: Duration::from_secs(5),
335 background_scanning: true,
336 track_combined: true,
337 shared_confidence: 0.85,
338 attack_sequence_min_ips: 2,
340 attack_sequence_window: Duration::from_secs(300),
341 auth_token_min_ips: 2,
343 auth_token_window: Duration::from_secs(600),
344 behavioral_min_ips: 2,
346 behavioral_min_sequence: 3,
347 behavioral_window: Duration::from_secs(300),
348 timing_min_ips: 3,
350 timing_bucket_ms: 100,
351 timing_min_bucket_hits: 5,
352 timing_window: Duration::from_secs(60),
353 network_min_ips: 3,
355 network_check_subnet: true,
356 graph_min_component_size: 3,
358 graph_max_depth: 3,
359 graph_edge_ttl: Duration::from_secs(3600),
360 auto_mitigation_enabled: false,
362 auto_mitigation_threshold: 0.90,
363 }
364 }
365}
366
367impl ManagerConfig {
368 pub fn new() -> Self {
370 Self::default()
371 }
372
373 pub fn with_shared_threshold(mut self, threshold: usize) -> Self {
375 self.shared_threshold = threshold;
376 self
377 }
378
379 pub fn with_rotation_window(mut self, window: Duration) -> Self {
381 self.rotation_window = window;
382 self
383 }
384
385 pub fn with_rotation_threshold(mut self, threshold: usize) -> Self {
387 self.rotation_threshold = threshold;
388 self
389 }
390
391 pub fn with_scan_interval(mut self, interval: Duration) -> Self {
393 self.scan_interval = interval;
394 self
395 }
396
397 pub fn with_background_scanning(mut self, enabled: bool) -> Self {
399 self.background_scanning = enabled;
400 self
401 }
402
403 pub fn with_track_combined(mut self, enabled: bool) -> Self {
405 self.track_combined = enabled;
406 self
407 }
408
409 pub fn with_shared_confidence(mut self, confidence: f64) -> Self {
411 self.shared_confidence = confidence.clamp(0.0, 1.0);
412 self
413 }
414
415 pub fn with_auto_mitigation(mut self, enabled: bool) -> Self {
417 self.auto_mitigation_enabled = enabled;
418 self
419 }
420
421 pub fn with_auto_mitigation_threshold(mut self, threshold: f64) -> Self {
423 self.auto_mitigation_threshold = threshold.clamp(0.0, 1.0);
424 self
425 }
426
427 pub fn validate(&self) -> Result<(), String> {
431 if self.shared_threshold < 2 {
432 return Err("shared_threshold must be at least 2".to_string());
433 }
434 if self.rotation_threshold < 2 {
435 return Err("rotation_threshold must be at least 2".to_string());
436 }
437 if self.rotation_window.is_zero() {
438 return Err("rotation_window must be positive".to_string());
439 }
440 if self.scan_interval.is_zero() {
441 return Err("scan_interval must be positive".to_string());
442 }
443 if self.auto_mitigation_enabled && self.auto_mitigation_threshold < 0.7 {
445 return Err(
446 "auto_mitigation_threshold must be >= 0.7 when auto_mitigation is enabled to prevent false positives"
447 .to_string(),
448 );
449 }
450 if self.graph_min_component_size < 2 {
452 return Err("graph_min_component_size must be at least 2".to_string());
453 }
454 Ok(())
455 }
456}
457
458#[derive(Debug, Clone, Default)]
467pub struct ManagerStats {
468 pub fingerprints_registered: u64,
470
471 pub detections_run: u64,
473
474 pub campaigns_created: u64,
476
477 pub last_scan: Option<Instant>,
479
480 pub index_stats: IndexStats,
482
483 pub campaign_stats: CampaignStoreStats,
485
486 pub detections_by_type: std::collections::HashMap<String, u64>,
488}
489
490const GROUP_CACHE_TTL: Duration = Duration::from_millis(100);
496
497struct GroupCache {
503 groups: Vec<FingerprintGroup>,
505 cached_at: Instant,
507 threshold: usize,
509}
510
511impl GroupCache {
512 fn new(groups: Vec<FingerprintGroup>, threshold: usize) -> Self {
514 Self {
515 groups,
516 cached_at: Instant::now(),
517 threshold,
518 }
519 }
520
521 fn is_valid(&self, threshold: usize) -> bool {
523 self.threshold == threshold && self.cached_at.elapsed() < GROUP_CACHE_TTL
524 }
525}
526
527pub struct CampaignManager {
559 config: ManagerConfig,
561
562 index: Arc<FingerprintIndex>,
564
565 store: Arc<CampaignStore>,
567
568 access_list_manager: Option<Arc<ParkingLotRwLock<AccessListManager>>>,
570
571 telemetry_client: Option<Arc<TelemetryClient>>,
573
574 attack_sequence_detector: AttackSequenceDetector,
579
580 auth_token_detector: AuthTokenDetector,
582
583 http_fingerprint_detector: SharedFingerprintDetector,
585
586 tls_fingerprint_detector: Ja4RotationDetector,
588
589 behavioral_detector: BehavioralSimilarityDetector,
591
592 timing_detector: TimingCorrelationDetector,
594
595 network_detector: NetworkProximityDetector,
597
598 graph_detector: GraphDetector,
600
601 stats_fingerprints_registered: AtomicU64,
603 stats_detections_run: AtomicU64,
604 stats_campaigns_created: AtomicU64,
605
606 stats_detections_by_type: RwLock<std::collections::HashMap<String, u64>>,
608
609 last_scan: RwLock<Option<Instant>>,
611
612 shutdown: AtomicBool,
614
615 group_cache: RwLock<Option<GroupCache>>,
618
619 mitigation_rate_limiter: MitigationRateLimiter,
621
622 mitigated_campaigns: dashmap::DashSet<String>,
624}
625
626impl CampaignManager {
627 pub fn new() -> Self {
629 Self::with_config(ManagerConfig::default())
630 }
631
632 pub fn with_config(config: ManagerConfig) -> Self {
634 let attack_sequence_config = AttackSequenceConfig {
640 min_ips: config.attack_sequence_min_ips,
641 window: config.attack_sequence_window,
642 similarity_threshold: 0.95, ..Default::default()
644 };
645 let attack_sequence_detector = AttackSequenceDetector::new(attack_sequence_config);
646
647 let auth_token_config = AuthTokenConfig {
649 min_ips: config.auth_token_min_ips,
650 window: config.auth_token_window,
651 ..Default::default()
652 };
653 let auth_token_detector = AuthTokenDetector::new(auth_token_config);
654
655 let http_fingerprint_detector = SharedFingerprintDetector::with_config(
657 config.shared_threshold,
658 config.shared_confidence,
659 config.scan_interval.as_millis() as u64,
660 );
661
662 let rotation_config = RotationConfig {
664 min_fingerprints: config.rotation_threshold,
665 window: config.rotation_window,
666 track_combined: config.track_combined,
667 ..Default::default()
668 };
669 let tls_fingerprint_detector = Ja4RotationDetector::new(rotation_config);
670
671 let behavioral_config = BehavioralConfig {
673 min_ips: config.behavioral_min_ips,
674 min_sequence_length: config.behavioral_min_sequence,
675 window: config.behavioral_window,
676 ..Default::default()
677 };
678 let behavioral_detector = BehavioralSimilarityDetector::new(behavioral_config);
679
680 let timing_config = TimingConfig {
682 min_ips: config.timing_min_ips,
683 bucket_size: Duration::from_millis(config.timing_bucket_ms),
684 min_bucket_hits: config.timing_min_bucket_hits,
685 window: config.timing_window,
686 ..Default::default()
687 };
688 let timing_detector = TimingCorrelationDetector::new(timing_config);
689
690 let network_config = NetworkProximityConfig {
692 min_ips: config.network_min_ips,
693 check_subnet: config.network_check_subnet,
694 check_asn: false, ..Default::default()
696 };
697 let network_detector = NetworkProximityDetector::new(network_config);
698
699 let graph_config = GraphConfig {
701 min_component_size: config.graph_min_component_size,
702 max_traversal_depth: config.graph_max_depth,
703 edge_ttl: config.graph_edge_ttl,
704 ..Default::default()
705 };
706 let graph_detector = GraphDetector::new(graph_config);
707
708 Self {
709 config,
710 index: Arc::new(FingerprintIndex::new()),
711 store: Arc::new(CampaignStore::new()),
712 access_list_manager: None,
713 telemetry_client: None,
714 attack_sequence_detector,
716 auth_token_detector,
717 http_fingerprint_detector,
718 tls_fingerprint_detector,
719 behavioral_detector,
720 timing_detector,
721 network_detector,
722 graph_detector,
723 stats_fingerprints_registered: AtomicU64::new(0),
725 stats_detections_run: AtomicU64::new(0),
726 stats_campaigns_created: AtomicU64::new(0),
727 stats_detections_by_type: RwLock::new(std::collections::HashMap::new()),
728 last_scan: RwLock::new(None),
729 shutdown: AtomicBool::new(false),
730 group_cache: RwLock::new(None),
732 mitigation_rate_limiter: MitigationRateLimiter::default(),
734 mitigated_campaigns: dashmap::DashSet::new(),
735 }
736 }
737
738 pub fn set_access_list_manager(&mut self, manager: Arc<ParkingLotRwLock<AccessListManager>>) {
740 self.access_list_manager = Some(manager);
741 }
742
743 pub fn set_telemetry_client(&mut self, client: Arc<TelemetryClient>) {
745 self.telemetry_client = Some(client);
746 }
747
748 pub fn register_ja4(&self, ip: IpAddr, fingerprint: String) {
757 if fingerprint.is_empty() {
758 return;
759 }
760
761 let ip_str = ip.to_string();
762
763 self.index.update_entity(&ip_str, Some(&fingerprint), None);
765
766 self.tls_fingerprint_detector
768 .record_fingerprint(ip, fingerprint);
769
770 self.stats_fingerprints_registered
772 .fetch_add(1, Ordering::Relaxed);
773 }
774
775 pub fn register_ja4_arc(&self, ip: IpAddr, fingerprint: Arc<str>) {
784 if fingerprint.is_empty() {
785 return;
786 }
787
788 let ip_str = ip.to_string();
789
790 self.index.update_entity(&ip_str, Some(&fingerprint), None);
792
793 self.tls_fingerprint_detector
795 .record_fingerprint(ip, fingerprint.to_string());
796
797 self.stats_fingerprints_registered
799 .fetch_add(1, Ordering::Relaxed);
800 }
801
802 pub fn register_combined(&self, ip: IpAddr, fingerprint: String) {
811 if fingerprint.is_empty() {
812 return;
813 }
814
815 let ip_str = ip.to_string();
816
817 self.index.update_entity(&ip_str, None, Some(&fingerprint));
819
820 if self.config.track_combined {
822 self.tls_fingerprint_detector
823 .record_fingerprint(ip, fingerprint);
824 }
825
826 self.stats_fingerprints_registered
828 .fetch_add(1, Ordering::Relaxed);
829 }
830
831 pub fn register_combined_arc(&self, ip: IpAddr, fingerprint: Arc<str>) {
840 if fingerprint.is_empty() {
841 return;
842 }
843
844 let ip_str = ip.to_string();
845
846 self.index.update_entity(&ip_str, None, Some(&fingerprint));
848
849 if self.config.track_combined {
851 self.tls_fingerprint_detector
852 .record_fingerprint(ip, fingerprint.to_string());
853 }
854
855 self.stats_fingerprints_registered
857 .fetch_add(1, Ordering::Relaxed);
858 }
859
860 pub fn register_fingerprints(&self, ip: IpAddr, ja4: Option<String>, ja4h: Option<String>) {
869 let ip_str = ip.to_string();
870 let mut registered = false;
871
872 let ja4_ref = ja4.as_deref();
874 let combined = ja4h.as_ref().map(|h| {
875 format!("{}_{}", ja4.as_deref().unwrap_or(""), h)
877 });
878 let combined_ref = combined.as_deref();
879
880 self.index.update_entity(&ip_str, ja4_ref, combined_ref);
881
882 if let Some(ref fp) = ja4 {
884 if !fp.is_empty() {
885 self.tls_fingerprint_detector
886 .record_fingerprint(ip, fp.clone());
887 registered = true;
888 }
889 }
890
891 if self.config.track_combined {
893 if let Some(ref fp) = combined {
894 if !fp.is_empty() {
895 self.tls_fingerprint_detector
896 .record_fingerprint(ip, fp.clone());
897 registered = true;
898 }
899 }
900 }
901
902 if registered {
903 self.stats_fingerprints_registered
904 .fetch_add(1, Ordering::Relaxed);
905 }
906 }
907
908 pub fn record_attack(
923 &self,
924 ip: IpAddr,
925 payload_hash: String,
926 attack_type: String,
927 path: String,
928 ) {
929 self.attack_sequence_detector.record_attack(
930 ip,
931 AttackPayload {
932 payload_hash,
933 attack_type,
934 target_path: path,
935 timestamp: std::time::Instant::now(),
936 },
937 );
938 }
939
940 pub fn record_token(&self, ip: IpAddr, jwt: &str) {
949 self.auth_token_detector.record_jwt(ip, jwt);
950 }
951
952 pub fn record_request(&self, ip: IpAddr, method: &str, path: &str) {
965 self.behavioral_detector.record_request(ip, method, path);
966 self.timing_detector.record_request(ip);
967 self.network_detector.register_ip(ip);
968 }
969
970 pub fn record_request_full(
982 &self,
983 ip: IpAddr,
984 method: &str,
985 path: &str,
986 ja4: Option<&str>,
987 jwt: Option<&str>,
988 ) {
989 self.record_request(ip, method, path);
991
992 let ip_id = GraphDetector::ip_id(&ip.to_string());
993
994 if let Some(fp) = ja4 {
996 if !fp.is_empty() {
997 self.register_ja4(ip, fp.to_string());
998 self.record_relation(&ip_id, &GraphDetector::fp_id(fp));
1000 }
1001 }
1002
1003 if let Some(token) = jwt {
1005 if !token.is_empty() {
1006 self.record_token(ip, token);
1007 let token_id = if token.len() > 16 {
1010 &token[..16]
1011 } else {
1012 token
1013 };
1014 self.record_relation(&ip_id, &GraphDetector::token_id(token_id));
1015 }
1016 }
1017 }
1018
1019 pub fn record_relation(&self, entity_a: &str, entity_b: &str) {
1028 self.graph_detector.record_relation(entity_a, entity_b);
1029 }
1030
1031 pub fn calculate_campaign_score(&self, campaign: &Campaign) -> f64 {
1047 if campaign.correlation_reasons.is_empty() {
1048 return 0.0;
1049 }
1050
1051 let total_weighted: f64 = campaign
1052 .correlation_reasons
1053 .iter()
1054 .map(|r| r.correlation_type.weight() as f64 * r.confidence)
1055 .sum();
1056
1057 total_weighted / campaign.correlation_reasons.len() as f64
1058 }
1059
1060 pub async fn run_detection_cycle(&self) -> DetectorResult<usize> {
1078 let detectors: Vec<(&dyn Detector, &'static str)> = vec![
1081 (
1082 &self.attack_sequence_detector as &dyn Detector,
1083 "attack_sequence",
1084 ),
1085 (&self.auth_token_detector as &dyn Detector, "auth_token"),
1086 (
1087 &self.http_fingerprint_detector as &dyn Detector,
1088 "http_fingerprint",
1089 ),
1090 (
1091 &self.tls_fingerprint_detector as &dyn Detector,
1092 "tls_fingerprint",
1093 ),
1094 (&self.behavioral_detector as &dyn Detector, "behavioral"),
1095 (&self.timing_detector as &dyn Detector, "timing"),
1096 (&self.network_detector as &dyn Detector, "network"),
1097 (&self.graph_detector as &dyn Detector, "graph"),
1098 ];
1099
1100 let detector_futures: Vec<_> = detectors
1103 .into_iter()
1104 .map(|(detector, name)| {
1105 let index = &self.index;
1106 async move {
1108 let result = detector.analyze(index);
1109 (name, result)
1110 }
1111 })
1112 .collect();
1113
1114 let results = join_all(detector_futures).await;
1115
1116 let mut total_updates = 0;
1118 let mut stats_updates: std::collections::HashMap<String, u64> =
1119 std::collections::HashMap::new();
1120
1121 for (name, result) in results {
1122 match result {
1123 Ok(updates) => {
1124 let update_count = updates.len();
1125 for update in updates {
1126 self.process_campaign_update(update).await;
1127 total_updates += 1;
1128 }
1129 if update_count > 0 {
1131 *stats_updates.entry(name.to_string()).or_insert(0) += update_count as u64;
1132 }
1133 }
1134 Err(e) => {
1135 tracing::warn!("Detector {} failed: {}", name, e);
1136 }
1137 }
1138 }
1139
1140 if !stats_updates.is_empty() {
1142 let mut stats = self.stats_detections_by_type.write().await;
1143 for (name, count) in stats_updates {
1144 *stats.entry(name).or_insert(0) += count;
1145 }
1146 }
1147
1148 self.stats_detections_run.fetch_add(1, Ordering::Relaxed);
1150 {
1151 let mut last_scan = self.last_scan.write().await;
1152 *last_scan = Some(Instant::now());
1153 }
1154
1155 Ok(total_updates)
1156 }
1157
1158 pub async fn get_cached_groups(&self, threshold: usize) -> Vec<FingerprintGroup> {
1170 {
1172 let cache_guard = self.group_cache.read().await;
1173 if let Some(ref cache) = *cache_guard {
1174 if cache.is_valid(threshold) {
1175 return cache.groups.clone();
1176 }
1177 }
1178 }
1179
1180 let groups = self.index.get_groups_above_threshold(threshold);
1182
1183 {
1185 let mut cache_guard = self.group_cache.write().await;
1186 *cache_guard = Some(GroupCache::new(groups.clone(), threshold));
1187 }
1188
1189 groups
1190 }
1191
1192 pub async fn invalidate_group_cache(&self) {
1196 let mut cache_guard = self.group_cache.write().await;
1197 *cache_guard = None;
1198 }
1199
1200 async fn process_campaign_update(&self, update: CampaignUpdate) {
1207 let ips: Vec<String> = update
1209 .add_correlation_reason
1210 .as_ref()
1211 .map(|reason| reason.evidence.clone())
1212 .unwrap_or_default();
1213
1214 if ips.is_empty() {
1215 return;
1216 }
1217
1218 let existing_campaign_id = ips.iter().find_map(|ip| self.store.get_campaign_for_ip(ip));
1220
1221 let mut check_mitigation = false;
1223 let mut target_campaign_id = String::new();
1224
1225 match existing_campaign_id {
1226 Some(campaign_id) => {
1227 let _ = self.store.update_campaign(&campaign_id, update);
1229
1230 for ip in &ips {
1232 let _ = self.store.add_actor_to_campaign(&campaign_id, ip);
1233 }
1234
1235 check_mitigation = true;
1236 target_campaign_id = campaign_id;
1237 }
1238 None => {
1239 let confidence = update.confidence.unwrap_or(0.5);
1241
1242 let mut campaign_id = Campaign::generate_id();
1245 let mut retry_count = 0;
1246 while self.store.get_campaign(&campaign_id).is_some() && retry_count < 10 {
1247 campaign_id = format!("{}-{:x}", Campaign::generate_id(), fastrand::u32(..));
1249 retry_count += 1;
1250 }
1251
1252 let mut campaign = Campaign::new(campaign_id.clone(), ips, confidence);
1253
1254 if let Some(status) = update.status {
1256 campaign.status = status;
1257 }
1258 if let Some(ref attack_types) = update.attack_types {
1259 campaign.attack_types = attack_types.clone();
1260 }
1261 if let Some(reason) = update.add_correlation_reason {
1262 campaign.correlation_reasons.push(reason);
1263 }
1264 if let Some(risk_score) = update.risk_score {
1265 campaign.risk_score = risk_score;
1266 }
1267
1268 if self.store.create_campaign(campaign).is_ok() {
1270 self.stats_campaigns_created.fetch_add(1, Ordering::Relaxed);
1271 check_mitigation = true;
1272 target_campaign_id = campaign_id;
1273 }
1274 }
1275 }
1276
1277 if check_mitigation {
1279 if let Some(campaign) = self.store.get_campaign(&target_campaign_id) {
1280 if self.config.auto_mitigation_enabled
1282 && campaign.confidence >= self.config.auto_mitigation_threshold
1283 && campaign.status != CampaignStatus::Resolved
1284 {
1285 self.mitigate_campaign(&campaign).await;
1286 }
1287
1288 if campaign.confidence >= 0.8 {
1291 self.report_campaign(&campaign);
1292 }
1293 }
1294 }
1295 }
1296
1297 fn report_campaign(&self, campaign: &Campaign) {
1299 if let Some(ref client) = self.telemetry_client {
1300 if !client.is_enabled() {
1302 return;
1303 }
1304
1305 let event = TelemetryEvent::CampaignReport {
1306 campaign_id: campaign.id.clone(),
1307 confidence: campaign.confidence,
1308 attack_types: campaign
1309 .attack_types
1310 .iter()
1311 .map(|at| format!("{:?}", at))
1312 .collect(),
1313 actor_count: campaign.actor_count,
1314 correlation_reasons: campaign
1315 .correlation_reasons
1316 .iter()
1317 .map(|r| r.description.clone())
1318 .collect(),
1319 timestamp_ms: std::time::SystemTime::now()
1320 .duration_since(std::time::UNIX_EPOCH)
1321 .unwrap_or_default()
1322 .as_millis() as u64,
1323 };
1324
1325 let client = Arc::clone(client);
1327 tokio::spawn(async move {
1328 if let Err(e) = client.report(event).await {
1329 tracing::debug!("Failed to report campaign telemetry: {}", e);
1330 }
1331 });
1332 }
1333 }
1334
1335 async fn mitigate_campaign(&self, campaign: &Campaign) {
1340 if self.mitigated_campaigns.contains(&campaign.id) {
1342 tracing::debug!(campaign_id = %campaign.id, "Campaign already mitigated, skipping");
1343 return;
1344 }
1345
1346 let access_list = match &self.access_list_manager {
1347 Some(al) => al,
1348 None => {
1349 tracing::debug!("No AccessListManager configured, skipping mitigation");
1350 return;
1351 }
1352 };
1353
1354 let max_ips = self.mitigation_rate_limiter.max_ips_per_campaign();
1356 let ips_to_block: Vec<IpAddr> = campaign
1357 .actors
1358 .iter()
1359 .filter_map(|ip_str| ip_str.parse::<IpAddr>().ok())
1360 .take(max_ips)
1361 .collect();
1362
1363 if ips_to_block.is_empty() {
1364 tracing::debug!(campaign_id = %campaign.id, "No valid IPs to block");
1365 return;
1366 }
1367
1368 let mut blocked_count = 0;
1370 let mut rate_limited = false;
1371
1372 for ip in &ips_to_block {
1373 if let Err(reason) = self.mitigation_rate_limiter.try_ban().await {
1374 tracing::warn!(
1375 campaign_id = %campaign.id,
1376 reason = %reason,
1377 blocked = blocked_count,
1378 remaining = ips_to_block.len() - blocked_count,
1379 "Mitigation rate limited"
1380 );
1381 rate_limited = true;
1382 break;
1383 }
1384
1385 let comment = format!(
1387 "Campaign {} (confidence: {:.2})",
1388 campaign.id, campaign.confidence
1389 );
1390 {
1391 let mut al = access_list.write();
1392 if let Err(e) = al.add_deny_ip(ip, Some(&comment)) {
1393 tracing::error!(ip = %ip, error = %e, "Failed to add deny rule");
1394 continue;
1395 }
1396 }
1397 blocked_count += 1;
1398 }
1399
1400 let attack_types: Vec<String> = campaign
1402 .attack_types
1403 .iter()
1404 .map(|at| format!("{:?}", at))
1405 .collect();
1406 tracing::info!(
1407 campaign_id = %campaign.id,
1408 confidence = campaign.confidence,
1409 total_actors = campaign.actors.len(),
1410 blocked = blocked_count,
1411 rate_limited = rate_limited,
1412 attack_types = ?attack_types,
1413 "Auto-mitigation applied"
1414 );
1415
1416 self.mitigated_campaigns.insert(campaign.id.clone());
1418
1419 if let Some(ref client) = self.telemetry_client {
1421 if client.is_enabled() {
1422 let event = TelemetryEvent::CampaignReport {
1423 campaign_id: format!("mitigation:{}", campaign.id),
1424 confidence: campaign.confidence,
1425 attack_types,
1426 actor_count: blocked_count,
1427 correlation_reasons: vec![format!(
1428 "Auto-mitigation applied: {} IPs blocked",
1429 blocked_count
1430 )],
1431 timestamp_ms: std::time::SystemTime::now()
1432 .duration_since(std::time::UNIX_EPOCH)
1433 .unwrap_or_default()
1434 .as_millis() as u64,
1435 };
1436
1437 let client = Arc::clone(client);
1438 tokio::spawn(async move {
1439 if let Err(e) = client.report(event).await {
1440 tracing::debug!("Failed to report mitigation telemetry: {}", e);
1441 }
1442 });
1443 }
1444 }
1445 }
1446
1447 pub fn should_trigger_detection(&self, ip: &IpAddr) -> bool {
1458 self.attack_sequence_detector
1460 .should_trigger(ip, &self.index)
1461 || self.auth_token_detector.should_trigger(ip, &self.index)
1462 || self
1463 .http_fingerprint_detector
1464 .should_trigger(ip, &self.index)
1465 || self
1466 .tls_fingerprint_detector
1467 .should_trigger(ip, &self.index)
1468 || self.behavioral_detector.should_trigger(ip, &self.index)
1469 || self.timing_detector.should_trigger(ip, &self.index)
1470 || self.network_detector.should_trigger(ip, &self.index)
1471 || self.graph_detector.should_trigger(ip, &self.index)
1472 }
1473
1474 pub fn get_campaigns(&self) -> Vec<Campaign> {
1478 self.store.list_active_campaigns()
1479 }
1480
1481 pub fn get_all_campaigns(&self) -> Vec<Campaign> {
1483 self.store.list_campaigns(None)
1484 }
1485
1486 pub fn snapshot(&self) -> Vec<Campaign> {
1490 self.store.list_campaigns(None)
1491 }
1492
1493 pub fn restore(&self, campaigns: Vec<Campaign>) {
1497 self.store.clear();
1499 self.index.clear();
1500
1501 for campaign in campaigns {
1503 for ip_str in &campaign.actors {
1505 self.index.update_entity(ip_str, None, None);
1507 }
1508
1509 let _ = self.store.create_campaign(campaign);
1511 }
1512 }
1513
1514 pub fn get_campaign(&self, id: &str) -> Option<Campaign> {
1522 self.store.get_campaign(id)
1523 }
1524
1525 pub fn get_campaign_actors(&self, campaign_id: &str) -> Vec<IpAddr> {
1533 self.store
1534 .get_campaign(campaign_id)
1535 .map(|campaign| {
1536 campaign
1537 .actors
1538 .iter()
1539 .filter_map(|ip_str| ip_str.parse().ok())
1540 .collect()
1541 })
1542 .unwrap_or_default()
1543 }
1544
1545 pub fn get_campaign_graph(&self, campaign_id: &str) -> serde_json::Value {
1547 let ips = self.get_campaign_actors(campaign_id);
1548 let ips_str: Vec<String> = ips.into_iter().map(|ip| ip.to_string()).collect();
1549
1550 self.graph_detector.get_cytoscape_data(&ips_str)
1551 }
1552
1553 pub fn get_campaign_graph_paginated(
1557 &self,
1558 campaign_id: &str,
1559 limit: Option<usize>,
1560 offset: Option<usize>,
1561 hash_identifiers: bool,
1562 ) -> crate::correlation::detectors::graph::PaginatedGraph {
1563 use crate::correlation::detectors::graph::GraphExportOptions;
1564
1565 let ips = self.get_campaign_actors(campaign_id);
1566 let ips_str: Vec<String> = ips.into_iter().map(|ip| ip.to_string()).collect();
1567
1568 let options = GraphExportOptions {
1569 limit,
1570 offset,
1571 hash_identifiers,
1572 };
1573
1574 self.graph_detector
1575 .get_cytoscape_data_paginated(&ips_str, options)
1576 }
1577
1578 pub fn stats(&self) -> ManagerStats {
1582 let last_scan = {
1583 self.last_scan
1585 .try_read()
1586 .map(|guard| *guard)
1587 .unwrap_or(None)
1588 };
1589
1590 let detections_by_type = self
1591 .stats_detections_by_type
1592 .try_read()
1593 .map(|guard| guard.clone())
1594 .unwrap_or_default();
1595
1596 ManagerStats {
1597 fingerprints_registered: self.stats_fingerprints_registered.load(Ordering::Relaxed),
1598 detections_run: self.stats_detections_run.load(Ordering::Relaxed),
1599 campaigns_created: self.stats_campaigns_created.load(Ordering::Relaxed),
1600 last_scan,
1601 index_stats: self.index.stats(),
1602 campaign_stats: self.store.stats(),
1603 detections_by_type,
1604 }
1605 }
1606
1607 pub fn start_background_worker(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
1616 let manager = self;
1617 let scan_interval = manager.config.scan_interval;
1618
1619 tokio::spawn(async move {
1620 let mut ticker = interval(scan_interval);
1621
1622 loop {
1623 ticker.tick().await;
1624
1625 if manager.shutdown.load(Ordering::Relaxed) {
1627 log::info!("Campaign manager background worker shutting down");
1628 break;
1629 }
1630
1631 match manager.run_detection_cycle().await {
1633 Ok(updates) => {
1634 if updates > 0 {
1635 log::debug!("Detection cycle processed {} updates", updates);
1636 }
1637 }
1638 Err(e) => {
1639 log::warn!("Detection cycle error: {}", e);
1640 }
1641 }
1642 }
1643 })
1644 }
1645
1646 pub fn shutdown(&self) {
1648 self.shutdown.store(true, Ordering::Relaxed);
1649 }
1650
1651 pub fn is_shutdown(&self) -> bool {
1653 self.shutdown.load(Ordering::Relaxed)
1654 }
1655
1656 pub fn remove_ip(&self, ip: &IpAddr) {
1665 let ip_str = ip.to_string();
1666
1667 self.index.remove_entity(&ip_str);
1669
1670 if let Some(campaign_id) = self.store.get_campaign_for_ip(&ip_str) {
1672 let _ = self.store.remove_actor_from_campaign(&campaign_id, &ip_str);
1673 }
1674 }
1675
1676 pub fn index(&self) -> &Arc<FingerprintIndex> {
1680 &self.index
1681 }
1682
1683 pub fn store(&self) -> &Arc<CampaignStore> {
1687 &self.store
1688 }
1689
1690 pub fn config(&self) -> &ManagerConfig {
1692 &self.config
1693 }
1694
1695 pub fn resolve_campaign(&self, campaign_id: &str, reason: &str) -> Result<(), DetectorError> {
1704 self.store
1705 .resolve_campaign(campaign_id, reason)
1706 .map_err(|e| DetectorError::DetectionFailed(e.to_string()))
1707 }
1708
1709 pub fn clear(&self) {
1713 self.index.clear();
1714 self.store.clear();
1715 self.http_fingerprint_detector.clear_processed();
1716 self.tls_fingerprint_detector.cleanup_old_observations();
1717 }
1718}
1719
1720impl Default for CampaignManager {
1721 fn default() -> Self {
1722 Self::new()
1723 }
1724}
1725
1726#[cfg(test)]
1731mod tests {
1732 use super::*;
1733 use std::thread;
1734
1735 fn create_test_manager() -> CampaignManager {
1740 let config = ManagerConfig {
1741 shared_threshold: 3,
1742 rotation_threshold: 3,
1743 rotation_window: Duration::from_secs(60),
1744 scan_interval: Duration::from_millis(100),
1745 background_scanning: false,
1746 ..Default::default()
1747 };
1748 CampaignManager::with_config(config)
1749 }
1750
1751 fn create_test_ip(last_octet: u8) -> IpAddr {
1752 format!("192.168.1.{}", last_octet).parse().unwrap()
1753 }
1754
1755 #[test]
1760 fn test_config_default() {
1761 let config = ManagerConfig::default();
1762
1763 assert_eq!(config.shared_threshold, 3);
1764 assert_eq!(config.rotation_threshold, 3);
1765 assert_eq!(config.rotation_window, Duration::from_secs(60));
1766 assert_eq!(config.scan_interval, Duration::from_secs(5));
1767 assert!(config.background_scanning);
1768 assert!(config.track_combined);
1769 assert!((config.shared_confidence - 0.85).abs() < 0.001);
1770 }
1771
1772 #[test]
1773 fn test_config_builder() {
1774 let config = ManagerConfig::new()
1775 .with_shared_threshold(5)
1776 .with_rotation_threshold(4)
1777 .with_rotation_window(Duration::from_secs(120))
1778 .with_scan_interval(Duration::from_secs(10))
1779 .with_background_scanning(false)
1780 .with_track_combined(false)
1781 .with_shared_confidence(0.9);
1782
1783 assert_eq!(config.shared_threshold, 5);
1784 assert_eq!(config.rotation_threshold, 4);
1785 assert_eq!(config.rotation_window, Duration::from_secs(120));
1786 assert_eq!(config.scan_interval, Duration::from_secs(10));
1787 assert!(!config.background_scanning);
1788 assert!(!config.track_combined);
1789 assert!((config.shared_confidence - 0.9).abs() < 0.001);
1790 }
1791
1792 #[tokio::test]
1793 async fn test_mitigation_rate_limiter_limits() {
1794 let limiter = MitigationRateLimiter::new(2, Duration::from_secs(60), 10);
1795
1796 assert!(limiter.try_ban().await.is_ok());
1797 assert!(limiter.try_ban().await.is_ok());
1798 assert!(limiter.try_ban().await.is_err());
1799 }
1800
1801 #[test]
1802 fn test_config_validation() {
1803 let config = ManagerConfig::default();
1805 assert!(config.validate().is_ok());
1806
1807 let config = ManagerConfig::new().with_shared_threshold(1);
1809 assert!(config.validate().is_err());
1810
1811 let config = ManagerConfig::new().with_rotation_threshold(1);
1813 assert!(config.validate().is_err());
1814
1815 let config = ManagerConfig {
1817 rotation_window: Duration::ZERO,
1818 ..Default::default()
1819 };
1820 assert!(config.validate().is_err());
1821
1822 let config = ManagerConfig {
1824 scan_interval: Duration::ZERO,
1825 ..Default::default()
1826 };
1827 assert!(config.validate().is_err());
1828
1829 let config = ManagerConfig {
1831 auto_mitigation_enabled: true,
1832 auto_mitigation_threshold: 0.5, ..Default::default()
1834 };
1835 assert!(config.validate().is_err());
1836
1837 let config = ManagerConfig {
1839 auto_mitigation_enabled: true,
1840 auto_mitigation_threshold: 0.9,
1841 ..Default::default()
1842 };
1843 assert!(config.validate().is_ok());
1844
1845 let config = ManagerConfig {
1847 auto_mitigation_enabled: false,
1848 auto_mitigation_threshold: 0.5, ..Default::default()
1850 };
1851 assert!(config.validate().is_ok());
1852 }
1853
1854 #[test]
1855 fn test_config_confidence_clamping() {
1856 let config = ManagerConfig::new().with_shared_confidence(1.5);
1857 assert!((config.shared_confidence - 1.0).abs() < 0.001);
1858
1859 let config = ManagerConfig::new().with_shared_confidence(-0.5);
1860 assert!(config.shared_confidence >= 0.0);
1861 }
1862
1863 #[test]
1868 fn test_register_ja4() {
1869 let manager = create_test_manager();
1870 let ip = create_test_ip(1);
1871
1872 manager.register_ja4(ip, "t13d1516h2_abc123".to_string());
1873
1874 let stats = manager.stats();
1875 assert_eq!(stats.fingerprints_registered, 1);
1876 assert_eq!(stats.index_stats.total_ips, 1);
1877 assert_eq!(stats.index_stats.ja4_fingerprints, 1);
1878 }
1879
1880 #[test]
1881 fn test_register_ja4_empty_skipped() {
1882 let manager = create_test_manager();
1883 let ip = create_test_ip(1);
1884
1885 manager.register_ja4(ip, "".to_string());
1886
1887 let stats = manager.stats();
1888 assert_eq!(stats.fingerprints_registered, 0);
1889 assert_eq!(stats.index_stats.total_ips, 0);
1890 }
1891
1892 #[test]
1893 fn test_register_combined() {
1894 let manager = create_test_manager();
1895 let ip = create_test_ip(1);
1896
1897 manager.register_combined(ip, "combined_hash_xyz".to_string());
1898
1899 let stats = manager.stats();
1900 assert_eq!(stats.fingerprints_registered, 1);
1901 assert_eq!(stats.index_stats.total_ips, 1);
1902 assert_eq!(stats.index_stats.combined_fingerprints, 1);
1903 }
1904
1905 #[test]
1906 fn test_register_fingerprints_both() {
1907 let manager = create_test_manager();
1908 let ip = create_test_ip(1);
1909
1910 manager.register_fingerprints(
1911 ip,
1912 Some("ja4_test".to_string()),
1913 Some("ja4h_test".to_string()),
1914 );
1915
1916 let stats = manager.stats();
1917 assert_eq!(stats.fingerprints_registered, 1);
1918 assert_eq!(stats.index_stats.ja4_fingerprints, 1);
1919 assert_eq!(stats.index_stats.combined_fingerprints, 1);
1920 }
1921
1922 #[test]
1923 fn test_register_fingerprints_ja4_only() {
1924 let manager = create_test_manager();
1925 let ip = create_test_ip(1);
1926
1927 manager.register_fingerprints(ip, Some("ja4_only".to_string()), None);
1928
1929 let stats = manager.stats();
1930 assert_eq!(stats.fingerprints_registered, 1);
1931 assert_eq!(stats.index_stats.ja4_fingerprints, 1);
1932 assert_eq!(stats.index_stats.combined_fingerprints, 0);
1933 }
1934
1935 #[tokio::test]
1940 async fn test_detection_cycle_empty() {
1941 let manager = create_test_manager();
1942
1943 let updates = manager.run_detection_cycle().await.unwrap();
1944
1945 assert_eq!(updates, 0);
1946 assert_eq!(manager.stats().detections_run, 1);
1947 }
1948
1949 #[tokio::test]
1950 async fn test_detection_cycle_creates_campaign() {
1951 let manager = create_test_manager();
1952
1953 for i in 1..=3 {
1955 let ip = create_test_ip(i);
1956 manager.register_ja4(ip, "shared_fingerprint".to_string());
1957 }
1958
1959 let updates = manager.run_detection_cycle().await.unwrap();
1960
1961 assert!(updates >= 1);
1962 assert_eq!(manager.stats().campaigns_created, 1);
1963
1964 let campaigns = manager.get_campaigns();
1965 assert_eq!(campaigns.len(), 1);
1966 }
1967
1968 #[tokio::test]
1969 async fn test_detection_cycle_no_duplicate_campaigns() {
1970 let manager = create_test_manager();
1971
1972 for i in 1..=3 {
1974 let ip = create_test_ip(i);
1975 manager.register_ja4(ip, "shared_fp".to_string());
1976 }
1977
1978 manager.run_detection_cycle().await.unwrap();
1980 let first_count = manager.stats().campaigns_created;
1981
1982 manager.run_detection_cycle().await.unwrap();
1984 let second_count = manager.stats().campaigns_created;
1985
1986 assert_eq!(first_count, second_count);
1987 }
1988
1989 #[tokio::test]
1994 async fn test_get_campaigns() {
1995 let manager = create_test_manager();
1996
1997 for i in 1..=3 {
1999 let ip = create_test_ip(i);
2000 manager.register_ja4(ip, "test_fp".to_string());
2001 }
2002 manager.run_detection_cycle().await.unwrap();
2003
2004 let campaigns = manager.get_campaigns();
2005 assert!(!campaigns.is_empty());
2006
2007 let campaign = &campaigns[0];
2009 assert_eq!(campaign.actor_count, 3);
2010 }
2011
2012 #[tokio::test]
2013 async fn test_get_campaign_by_id() {
2014 let manager = create_test_manager();
2015
2016 for i in 1..=3 {
2018 let ip = create_test_ip(i);
2019 manager.register_ja4(ip, "get_by_id_fp".to_string());
2020 }
2021 manager.run_detection_cycle().await.unwrap();
2022
2023 let campaigns = manager.get_campaigns();
2024 let campaign_id = &campaigns[0].id;
2025
2026 let retrieved = manager.get_campaign(campaign_id);
2027 assert!(retrieved.is_some());
2028 assert_eq!(retrieved.unwrap().id, *campaign_id);
2029
2030 let not_found = manager.get_campaign("nonexistent");
2032 assert!(not_found.is_none());
2033 }
2034
2035 #[tokio::test]
2036 async fn test_get_campaign_actors() {
2037 let manager = create_test_manager();
2038
2039 for i in 1..=3 {
2041 let ip = create_test_ip(i);
2042 manager.register_ja4(ip, "actors_fp".to_string());
2043 }
2044 manager.run_detection_cycle().await.unwrap();
2045
2046 let campaigns = manager.get_campaigns();
2047 let campaign_id = &campaigns[0].id;
2048
2049 let actors = manager.get_campaign_actors(campaign_id);
2050 assert_eq!(actors.len(), 3);
2051
2052 let no_actors = manager.get_campaign_actors("nonexistent");
2054 assert!(no_actors.is_empty());
2055 }
2056
2057 #[tokio::test]
2062 async fn test_stats_tracking() {
2063 let manager = create_test_manager();
2064
2065 let initial = manager.stats();
2067 assert_eq!(initial.fingerprints_registered, 0);
2068 assert_eq!(initial.detections_run, 0);
2069 assert_eq!(initial.campaigns_created, 0);
2070 assert!(initial.last_scan.is_none());
2071
2072 for i in 1..=5 {
2074 let ip = create_test_ip(i);
2075 manager.register_ja4(ip, "stats_test_fp".to_string());
2076 }
2077
2078 let after_register = manager.stats();
2079 assert_eq!(after_register.fingerprints_registered, 5);
2080 assert_eq!(after_register.index_stats.total_ips, 5);
2081
2082 manager.run_detection_cycle().await.unwrap();
2084
2085 let after_detect = manager.stats();
2086 assert_eq!(after_detect.detections_run, 1);
2087 assert!(after_detect.last_scan.is_some());
2088 assert!(after_detect.campaigns_created >= 1);
2089 }
2090
2091 #[tokio::test]
2096 async fn test_remove_ip_cleanup() {
2097 let manager = create_test_manager();
2098
2099 for i in 1..=3 {
2101 let ip = create_test_ip(i);
2102 manager.register_ja4(ip, "remove_test_fp".to_string());
2103 }
2104 manager.run_detection_cycle().await.unwrap();
2105
2106 let campaigns = manager.get_campaigns();
2108 assert_eq!(campaigns[0].actor_count, 3);
2109
2110 let ip_to_remove = create_test_ip(1);
2112 manager.remove_ip(&ip_to_remove);
2113
2114 assert_eq!(manager.index.len(), 2);
2116
2117 let updated_campaigns = manager.get_campaigns();
2119 assert_eq!(updated_campaigns[0].actor_count, 2);
2120 }
2121
2122 #[test]
2127 fn test_concurrent_registration() {
2128 let manager = Arc::new(create_test_manager());
2129 let mut handles = vec![];
2130
2131 for thread_id in 0..10 {
2133 let manager = Arc::clone(&manager);
2134 handles.push(thread::spawn(move || {
2135 for i in 0..100 {
2136 let ip: IpAddr = format!("10.{}.0.{}", thread_id, i % 256).parse().unwrap();
2137 manager.register_ja4(ip, format!("fp_t{}_{}", thread_id, i % 5));
2138 }
2139 }));
2140 }
2141
2142 for handle in handles {
2143 handle.join().unwrap();
2144 }
2145
2146 let stats = manager.stats();
2148 assert_eq!(stats.fingerprints_registered, 1000);
2149 assert!(stats.index_stats.total_ips > 0);
2150 }
2151
2152 #[test]
2157 fn test_should_trigger_detection_below_threshold() {
2158 let manager = create_test_manager();
2159
2160 for i in 1..=2 {
2162 let ip = create_test_ip(i);
2163 manager.register_ja4(ip, "trigger_test_fp".to_string());
2164 }
2165
2166 let ip = create_test_ip(1);
2167 assert!(!manager.should_trigger_detection(&ip));
2168 }
2169
2170 #[test]
2171 fn test_should_trigger_detection_at_threshold() {
2172 let manager = create_test_manager();
2173
2174 for i in 1..=3 {
2176 let ip = create_test_ip(i);
2177 manager.register_ja4(ip, "trigger_threshold_fp".to_string());
2178 }
2179
2180 let ip = create_test_ip(1);
2181 assert!(manager.should_trigger_detection(&ip));
2182 }
2183
2184 #[tokio::test]
2189 async fn test_background_worker_lifecycle() {
2190 let config = ManagerConfig {
2191 scan_interval: Duration::from_millis(50),
2192 background_scanning: true,
2193 shared_threshold: 3,
2194 ..Default::default()
2195 };
2196 let manager = Arc::new(CampaignManager::with_config(config));
2197
2198 for i in 1..=3 {
2200 let ip = create_test_ip(i);
2201 manager.register_ja4(ip, "worker_test_fp".to_string());
2202 }
2203
2204 let worker = Arc::clone(&manager).start_background_worker();
2206
2207 tokio::time::sleep(Duration::from_millis(200)).await;
2209
2210 let stats = manager.stats();
2212 assert!(stats.detections_run >= 1);
2213
2214 manager.shutdown();
2216
2217 let timeout = tokio::time::timeout(Duration::from_millis(500), worker).await;
2219 assert!(timeout.is_ok(), "Worker should shut down gracefully");
2220 }
2221
2222 #[tokio::test]
2223 async fn test_shutdown_flag() {
2224 let manager = CampaignManager::new();
2225
2226 assert!(!manager.is_shutdown());
2227
2228 manager.shutdown();
2229
2230 assert!(manager.is_shutdown());
2231 }
2232
2233 #[tokio::test]
2238 async fn test_full_flow() {
2239 let manager = create_test_manager();
2240
2241 let fingerprint = "t13d1516h2_full_flow_test";
2243 for i in 1..=5 {
2244 let ip = create_test_ip(i);
2245 manager.register_ja4(ip, fingerprint.to_string());
2246 }
2247
2248 let updates = manager.run_detection_cycle().await.unwrap();
2250 assert!(updates >= 1);
2251
2252 let campaigns = manager.get_campaigns();
2254 assert_eq!(campaigns.len(), 1);
2255
2256 let campaign = &campaigns[0];
2257 assert_eq!(campaign.actor_count, 5);
2258 assert!(campaign.confidence >= 0.8);
2259 assert!(!campaign.correlation_reasons.is_empty());
2260
2261 let retrieved = manager.get_campaign(&campaign.id).unwrap();
2263 assert_eq!(retrieved.actors.len(), 5);
2264
2265 let actors = manager.get_campaign_actors(&campaign.id);
2267 assert_eq!(actors.len(), 5);
2268
2269 manager.remove_ip(&create_test_ip(1));
2271 let updated = manager.get_campaign(&campaign.id).unwrap();
2272 assert_eq!(updated.actors.len(), 4);
2273
2274 let stats = manager.stats();
2276 assert_eq!(stats.fingerprints_registered, 5);
2277 assert_eq!(stats.campaigns_created, 1);
2278 assert_eq!(stats.campaign_stats.total_campaigns, 1);
2279 }
2280
2281 #[test]
2282 fn test_clear() {
2283 let manager = create_test_manager();
2284
2285 for i in 1..=5 {
2287 let ip = create_test_ip(i);
2288 manager.register_ja4(ip, "clear_test_fp".to_string());
2289 }
2290
2291 assert_eq!(manager.index.len(), 5);
2292
2293 manager.clear();
2295
2296 assert_eq!(manager.index.len(), 0);
2297 assert!(manager.store.is_empty());
2298 }
2299
2300 #[tokio::test]
2301 async fn test_resolve_campaign() {
2302 let manager = create_test_manager();
2303
2304 for i in 1..=3 {
2306 let ip = create_test_ip(i);
2307 manager.register_ja4(ip, "resolve_test_fp".to_string());
2308 }
2309 manager.run_detection_cycle().await.unwrap();
2310
2311 let campaigns = manager.get_campaigns();
2312 let campaign_id = campaigns[0].id.clone();
2313
2314 let result = manager.resolve_campaign(&campaign_id, "Threat mitigated");
2316 assert!(result.is_ok());
2317
2318 let resolved = manager.get_campaign(&campaign_id).unwrap();
2320 assert_eq!(resolved.status, CampaignStatus::Resolved);
2321
2322 let active = manager.get_campaigns();
2324 assert!(active.is_empty());
2325 }
2326
2327 #[test]
2328 fn test_index_and_store_access() {
2329 let manager = create_test_manager();
2330
2331 let _index = manager.index();
2333 let _store = manager.store();
2334 let _config = manager.config();
2335
2336 assert!(manager.index().is_empty());
2338 assert!(manager.store().is_empty());
2339 }
2340
2341 #[test]
2346 fn test_ipv6_addresses() {
2347 let manager = create_test_manager();
2348
2349 let ipv6_1: IpAddr = "2001:db8::1".parse().unwrap();
2350 let ipv6_2: IpAddr = "2001:db8::2".parse().unwrap();
2351 let ipv6_3: IpAddr = "2001:db8::3".parse().unwrap();
2352
2353 manager.register_ja4(ipv6_1, "ipv6_fp".to_string());
2354 manager.register_ja4(ipv6_2, "ipv6_fp".to_string());
2355 manager.register_ja4(ipv6_3, "ipv6_fp".to_string());
2356
2357 let stats = manager.stats();
2358 assert_eq!(stats.fingerprints_registered, 3);
2359 assert_eq!(stats.index_stats.total_ips, 3);
2360 }
2361
2362 #[test]
2363 fn test_default_trait() {
2364 let manager = CampaignManager::default();
2365
2366 assert!(manager.index.is_empty());
2367 assert!(manager.store.is_empty());
2368 assert!(!manager.is_shutdown());
2369 }
2370
2371 #[tokio::test]
2372 async fn test_multiple_fingerprint_groups() {
2373 let manager = create_test_manager();
2374
2375 for i in 1..=3 {
2377 let ip = create_test_ip(i);
2378 manager.register_ja4(ip, "group_a_fp".to_string());
2379 }
2380
2381 for i in 10..=13 {
2383 let ip = create_test_ip(i);
2384 manager.register_ja4(ip, "group_b_fp".to_string());
2385 }
2386
2387 manager.run_detection_cycle().await.unwrap();
2388
2389 let campaigns = manager.get_campaigns();
2390 assert_eq!(campaigns.len(), 2);
2391
2392 let actor_counts: Vec<usize> = campaigns.iter().map(|c| c.actor_count).collect();
2394 assert!(actor_counts.contains(&3));
2395 assert!(actor_counts.contains(&4));
2396 }
2397}