Skip to main content

synapse_pingora/correlation/
manager.rs

1//! Campaign Correlation Manager
2//!
3//! Orchestrates fingerprint indexing, campaign detection, and state management.
4//! This is the main entry point for the correlation subsystem.
5//!
6//! # Architecture
7//!
8//! The `CampaignManager` coordinates three main components:
9//! - **FingerprintIndex**: O(1) fingerprint→IPs lookup for efficient correlation
10//! - **CampaignStore**: Campaign state storage with thread-safe access
11//! - **Detectors**: SharedFingerprintDetector and Ja4RotationDetector for pattern detection
12//!
13//! # Usage
14//!
15//! ```rust,ignore
16//! use synapse_pingora::correlation::{CampaignManager, ManagerConfig};
17//! use std::sync::Arc;
18//!
19//! // Create manager with custom configuration
20//! let config = ManagerConfig {
21//!     shared_threshold: 3,
22//!     rotation_threshold: 3,
23//!     background_scanning: true,
24//!     ..Default::default()
25//! };
26//! let manager = Arc::new(CampaignManager::with_config(config));
27//!
28//! // Register fingerprints during request processing (fast path)
29//! let ip = "192.168.1.100".parse().unwrap();
30//! manager.register_ja4(ip, "t13d1516h2_abc123".to_string());
31//!
32//! // Start background worker for periodic detection
33//! let worker = Arc::clone(&manager).start_background_worker();
34//! ```
35//!
36//! # Performance
37//!
38//! - Registration operations are O(1) and non-blocking for hot path
39//! - Detection cycles are run periodically in background, not per-request
40//! - All structures use lock-free DashMap for high concurrency
41
42use std::net::IpAddr;
43use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
44use std::sync::Arc;
45use std::time::{Duration, Instant};
46
47use futures::future::join_all;
48use parking_lot::RwLock as ParkingLotRwLock;
49use tokio::sync::{Mutex, RwLock};
50use tokio::time::interval;
51
52use crate::access::AccessListManager;
53use crate::correlation::{
54    Campaign, CampaignStatus, CampaignStore, CampaignStoreStats, CampaignUpdate, FingerprintGroup,
55    FingerprintIndex, IndexStats,
56};
57use crate::telemetry::{TelemetryClient, TelemetryEvent};
58
59use crate::correlation::detectors::{
60    AttackPayload,
61    AttackSequenceConfig,
62    // New detectors
63    AttackSequenceDetector,
64    AuthTokenConfig,
65    AuthTokenDetector,
66    BehavioralConfig,
67    BehavioralSimilarityDetector,
68    Detector,
69    DetectorError,
70    DetectorResult,
71    GraphConfig,
72    GraphDetector,
73    Ja4RotationDetector,
74    NetworkProximityConfig,
75    NetworkProximityDetector,
76    RotationConfig,
77    // Fingerprint detectors
78    SharedFingerprintDetector,
79    TimingConfig,
80    TimingCorrelationDetector,
81};
82
83// ============================================================================
84// Mitigation Rate Limiter (Security: Prevent mass-ban DoS)
85// ============================================================================
86
87/// Rate limiter for auto-mitigation to prevent mass-banning attacks.
88///
89/// Limits the number of IPs that can be banned per time window.
90/// If an attacker generates many apparent campaigns, this prevents
91/// legitimate users from being incorrectly blocked en masse.
92#[derive(Debug)]
93pub struct MitigationRateLimiter {
94    /// Number of bans in current window.
95    bans_in_window: AtomicU64,
96    /// Window start time.
97    window_start: Mutex<Instant>,
98    /// Maximum bans allowed per window.
99    max_bans_per_window: u64,
100    /// Window duration.
101    window_duration: Duration,
102    /// Maximum IPs to ban per campaign.
103    max_ips_per_campaign: usize,
104}
105
106impl MitigationRateLimiter {
107    /// Creates a new rate limiter.
108    pub fn new(
109        max_bans_per_window: u64,
110        window_duration: Duration,
111        max_ips_per_campaign: usize,
112    ) -> Self {
113        Self {
114            bans_in_window: AtomicU64::new(0),
115            window_start: Mutex::new(Instant::now()),
116            max_bans_per_window,
117            window_duration,
118            max_ips_per_campaign,
119        }
120    }
121
122    /// Attempts to acquire a ban permit.
123    ///
124    /// Returns Ok(()) if the ban is allowed, Err with reason if rate limited.
125    pub async fn try_ban(&self) -> Result<(), String> {
126        self.maybe_reset_window().await;
127
128        let current = self.bans_in_window.fetch_add(1, Ordering::SeqCst);
129        if current >= self.max_bans_per_window {
130            self.bans_in_window.fetch_sub(1, Ordering::SeqCst);
131            return Err(format!(
132                "Rate limit exceeded: {} bans in {:?} window",
133                self.max_bans_per_window, self.window_duration
134            ));
135        }
136        Ok(())
137    }
138
139    /// Resets the window if it has expired.
140    async fn maybe_reset_window(&self) {
141        let mut start = self.window_start.lock().await;
142
143        // Double-check expiration under the lock to prevent multiple resets
144        if start.elapsed() >= self.window_duration {
145            *start = Instant::now();
146            self.bans_in_window.store(0, Ordering::SeqCst);
147        }
148    }
149
150    /// Returns the maximum IPs that can be banned per campaign.
151    pub fn max_ips_per_campaign(&self) -> usize {
152        self.max_ips_per_campaign
153    }
154
155    /// Returns current ban count in window.
156    pub fn current_count(&self) -> u64 {
157        self.bans_in_window.load(Ordering::SeqCst)
158    }
159}
160
161impl Default for MitigationRateLimiter {
162    fn default() -> Self {
163        Self::new(
164            50,                      // Max 50 bans per window
165            Duration::from_secs(60), // 1 minute window
166            10,                      // Max 10 IPs per campaign
167        )
168    }
169}
170
171// ============================================================================
172// Configuration
173// ============================================================================
174
175/// Configuration for the campaign manager.
176///
177/// Controls detector thresholds, timing windows, and background scanning behavior.
178#[derive(Debug, Clone)]
179pub struct ManagerConfig {
180    /// Minimum IPs sharing fingerprint to form campaign (shared FP detector).
181    ///
182    /// Default: 3
183    pub shared_threshold: usize,
184
185    /// Time window for rotation detection.
186    ///
187    /// Default: 60 seconds
188    pub rotation_window: Duration,
189
190    /// Minimum fingerprints for rotation detection.
191    ///
192    /// Default: 3
193    pub rotation_threshold: usize,
194
195    /// How often to run full detector scans.
196    ///
197    /// Default: 5 seconds
198    pub scan_interval: Duration,
199
200    /// Enable background scanning.
201    ///
202    /// When enabled, a background worker periodically runs detection cycles.
203    /// Default: true
204    pub background_scanning: bool,
205
206    /// Track combined fingerprints (JA4+JA4H) in rotation detector.
207    ///
208    /// Default: true
209    pub track_combined: bool,
210
211    /// Base confidence for shared fingerprint detections.
212    ///
213    /// Default: 0.85
214    pub shared_confidence: f64,
215
216    // ========================================================================
217    // Attack Sequence Detector Configuration (weight: 50)
218    // ========================================================================
219    /// Minimum IPs sharing same payload to trigger detection.
220    ///
221    /// Default: 2
222    pub attack_sequence_min_ips: usize,
223
224    /// Time window for attack sequence correlation.
225    ///
226    /// Default: 300 seconds (5 minutes)
227    pub attack_sequence_window: Duration,
228
229    // ========================================================================
230    // Auth Token Detector Configuration (weight: 45)
231    // ========================================================================
232    /// Minimum IPs sharing token structure to trigger detection.
233    ///
234    /// Default: 2
235    pub auth_token_min_ips: usize,
236
237    /// Time window for auth token correlation.
238    ///
239    /// Default: 600 seconds (10 minutes)
240    pub auth_token_window: Duration,
241
242    // ========================================================================
243    // Behavioral Similarity Detector Configuration (weight: 30)
244    // ========================================================================
245    /// Minimum IPs with same behavior pattern.
246    ///
247    /// Default: 2
248    pub behavioral_min_ips: usize,
249
250    /// Minimum sequence length to consider for behavioral analysis.
251    ///
252    /// Default: 3
253    pub behavioral_min_sequence: usize,
254
255    /// Time window for behavioral pattern observation.
256    ///
257    /// Default: 300 seconds (5 minutes)
258    pub behavioral_window: Duration,
259
260    // ========================================================================
261    // Timing Correlation Detector Configuration (weight: 25)
262    // ========================================================================
263    /// Minimum IPs with synchronized timing.
264    ///
265    /// Default: 3
266    pub timing_min_ips: usize,
267
268    /// Time bucket size for synchronization detection in milliseconds.
269    ///
270    /// Default: 100ms
271    pub timing_bucket_ms: u64,
272
273    /// Minimum requests in same bucket to consider correlated.
274    ///
275    /// Default: 5
276    pub timing_min_bucket_hits: usize,
277
278    /// Time window for timing analysis.
279    ///
280    /// Default: 60 seconds
281    pub timing_window: Duration,
282
283    // ========================================================================
284    // Network Proximity Detector Configuration (weight: 15)
285    // ========================================================================
286    /// Minimum IPs in same network segment.
287    ///
288    /// Default: 3
289    pub network_min_ips: usize,
290
291    /// Enable subnet-based correlation (/24 for IPv4).
292    ///
293    /// Default: true
294    pub network_check_subnet: bool,
295
296    // ========================================================================
297    // Graph Correlation Detector Configuration (weight: 20)
298    // ========================================================================
299    /// Minimum connected component size.
300    ///
301    /// Default: 3
302    pub graph_min_component_size: usize,
303
304    /// Maximum traversal depth.
305    ///
306    /// Default: 3
307    pub graph_max_depth: usize,
308
309    /// Edge TTL.
310    ///
311    /// Default: 3600 seconds
312    pub graph_edge_ttl: Duration,
313
314    // ========================================================================
315    // Automated Response Configuration
316    // ========================================================================
317    /// Enable automated mitigation (blocking) of high-confidence campaigns.
318    ///
319    /// Default: false
320    pub auto_mitigation_enabled: bool,
321
322    /// Confidence threshold for automated mitigation (0.0 - 1.0).
323    ///
324    /// Default: 0.90
325    pub auto_mitigation_threshold: f64,
326}
327
328impl Default for ManagerConfig {
329    fn default() -> Self {
330        Self {
331            shared_threshold: 3,
332            rotation_window: Duration::from_secs(60),
333            rotation_threshold: 3,
334            scan_interval: Duration::from_secs(5),
335            background_scanning: true,
336            track_combined: true,
337            shared_confidence: 0.85,
338            // Attack sequence detector (weight: 50)
339            attack_sequence_min_ips: 2,
340            attack_sequence_window: Duration::from_secs(300),
341            // Auth token detector (weight: 45)
342            auth_token_min_ips: 2,
343            auth_token_window: Duration::from_secs(600),
344            // Behavioral similarity detector (weight: 30)
345            behavioral_min_ips: 2,
346            behavioral_min_sequence: 3,
347            behavioral_window: Duration::from_secs(300),
348            // Timing correlation detector (weight: 25)
349            timing_min_ips: 3,
350            timing_bucket_ms: 100,
351            timing_min_bucket_hits: 5,
352            timing_window: Duration::from_secs(60),
353            // Network proximity detector (weight: 15)
354            network_min_ips: 3,
355            network_check_subnet: true,
356            // Graph correlation detector (weight: 20)
357            graph_min_component_size: 3,
358            graph_max_depth: 3,
359            graph_edge_ttl: Duration::from_secs(3600),
360            // Automated Response
361            auto_mitigation_enabled: false,
362            auto_mitigation_threshold: 0.90,
363        }
364    }
365}
366
367impl ManagerConfig {
368    /// Create a new configuration with default values.
369    pub fn new() -> Self {
370        Self::default()
371    }
372
373    /// Builder method to set shared threshold.
374    pub fn with_shared_threshold(mut self, threshold: usize) -> Self {
375        self.shared_threshold = threshold;
376        self
377    }
378
379    /// Builder method to set rotation window.
380    pub fn with_rotation_window(mut self, window: Duration) -> Self {
381        self.rotation_window = window;
382        self
383    }
384
385    /// Builder method to set rotation threshold.
386    pub fn with_rotation_threshold(mut self, threshold: usize) -> Self {
387        self.rotation_threshold = threshold;
388        self
389    }
390
391    /// Builder method to set scan interval.
392    pub fn with_scan_interval(mut self, interval: Duration) -> Self {
393        self.scan_interval = interval;
394        self
395    }
396
397    /// Builder method to enable/disable background scanning.
398    pub fn with_background_scanning(mut self, enabled: bool) -> Self {
399        self.background_scanning = enabled;
400        self
401    }
402
403    /// Builder method to enable/disable combined fingerprint tracking.
404    pub fn with_track_combined(mut self, enabled: bool) -> Self {
405        self.track_combined = enabled;
406        self
407    }
408
409    /// Builder method to set shared confidence.
410    pub fn with_shared_confidence(mut self, confidence: f64) -> Self {
411        self.shared_confidence = confidence.clamp(0.0, 1.0);
412        self
413    }
414
415    /// Builder method to enable/disable automated mitigation.
416    pub fn with_auto_mitigation(mut self, enabled: bool) -> Self {
417        self.auto_mitigation_enabled = enabled;
418        self
419    }
420
421    /// Builder method to set automated mitigation threshold.
422    pub fn with_auto_mitigation_threshold(mut self, threshold: f64) -> Self {
423        self.auto_mitigation_threshold = threshold.clamp(0.0, 1.0);
424        self
425    }
426
427    /// Validate the configuration.
428    ///
429    /// Returns an error message if configuration is invalid.
430    pub fn validate(&self) -> Result<(), String> {
431        if self.shared_threshold < 2 {
432            return Err("shared_threshold must be at least 2".to_string());
433        }
434        if self.rotation_threshold < 2 {
435            return Err("rotation_threshold must be at least 2".to_string());
436        }
437        if self.rotation_window.is_zero() {
438            return Err("rotation_window must be positive".to_string());
439        }
440        if self.scan_interval.is_zero() {
441            return Err("scan_interval must be positive".to_string());
442        }
443        // Security: Auto-mitigation threshold must be high to prevent false positives
444        if self.auto_mitigation_enabled && self.auto_mitigation_threshold < 0.7 {
445            return Err(
446                "auto_mitigation_threshold must be >= 0.7 when auto_mitigation is enabled to prevent false positives"
447                    .to_string(),
448            );
449        }
450        // Security: Graph bounds must be reasonable
451        if self.graph_min_component_size < 2 {
452            return Err("graph_min_component_size must be at least 2".to_string());
453        }
454        Ok(())
455    }
456}
457
458// ============================================================================
459// Statistics
460// ============================================================================
461
462/// Statistics for the campaign manager.
463///
464/// Provides observability into manager operations including registration counts,
465/// detection cycles, and campaign creation.
466#[derive(Debug, Clone, Default)]
467pub struct ManagerStats {
468    /// Total fingerprints registered since start.
469    pub fingerprints_registered: u64,
470
471    /// Total detection cycles run.
472    pub detections_run: u64,
473
474    /// Total campaigns created.
475    pub campaigns_created: u64,
476
477    /// Last successful scan timestamp.
478    pub last_scan: Option<Instant>,
479
480    /// Statistics from the fingerprint index.
481    pub index_stats: IndexStats,
482
483    /// Statistics from the campaign store.
484    pub campaign_stats: CampaignStoreStats,
485
486    /// Detections count by detector type.
487    pub detections_by_type: std::collections::HashMap<String, u64>,
488}
489
490// ============================================================================
491// Fingerprint Group Cache
492// ============================================================================
493
494/// TTL for cached fingerprint groups (100ms).
495const GROUP_CACHE_TTL: Duration = Duration::from_millis(100);
496
497/// Cache for fingerprint groups to avoid repeated expensive scans.
498///
499/// The `get_groups_above_threshold()` method on FingerprintIndex is O(n) and
500/// can be called by multiple detectors during a single detection cycle. This
501/// cache provides a short-lived (100ms) cached result to amortize the cost.
502struct GroupCache {
503    /// Cached fingerprint groups.
504    groups: Vec<FingerprintGroup>,
505    /// Timestamp when the cache was populated.
506    cached_at: Instant,
507    /// The threshold that was used to generate this cache.
508    threshold: usize,
509}
510
511impl GroupCache {
512    /// Create a new cache entry.
513    fn new(groups: Vec<FingerprintGroup>, threshold: usize) -> Self {
514        Self {
515            groups,
516            cached_at: Instant::now(),
517            threshold,
518        }
519    }
520
521    /// Check if the cache is still valid.
522    fn is_valid(&self, threshold: usize) -> bool {
523        self.threshold == threshold && self.cached_at.elapsed() < GROUP_CACHE_TTL
524    }
525}
526
527// ============================================================================
528// Campaign Manager
529// ============================================================================
530
531/// Main orchestrator for campaign correlation.
532///
533/// Coordinates fingerprint indexing, campaign detection, and state management.
534/// This is the entry point for the correlation subsystem.
535///
536/// # Thread Safety
537///
538/// All methods are thread-safe and can be called concurrently. The manager uses
539/// lock-free data structures (DashMap) and atomic counters for high-performance
540/// concurrent access.
541///
542/// # Registration vs Detection
543///
544/// - **Registration** (`register_*` methods): Called per-request, must be FAST.
545///   Only updates indexes, no detection logic.
546/// - **Detection** (`run_detection_cycle`): Called periodically by background
547///   worker or on-demand. Processes all detectors and applies campaign updates.
548///
549/// # Detectors (ordered by weight)
550///
551/// 1. Attack Sequence (50) - Same attack payloads across actors
552/// 2. Auth Token (45) - Same JWT structure/issuer across IPs
553/// 3. HTTP Fingerprint (40) - Identical browser fingerprint (JA4H)
554/// 4. TLS Fingerprint (35) - Same TLS signature (JA4)
555/// 5. Behavioral Similarity (30) - Identical navigation/timing patterns
556/// 6. Timing Correlation (25) - Coordinated request timing (botnets)
557/// 7. Network Proximity (15) - Same ASN or /24 subnet
558pub struct CampaignManager {
559    /// Manager configuration.
560    config: ManagerConfig,
561
562    /// Fingerprint index for O(1) lookups.
563    index: Arc<FingerprintIndex>,
564
565    /// Campaign state storage.
566    store: Arc<CampaignStore>,
567
568    /// Access List Manager for automated mitigation (optional).
569    access_list_manager: Option<Arc<ParkingLotRwLock<AccessListManager>>>,
570
571    /// Telemetry client for cross-tenant correlation (optional).
572    telemetry_client: Option<Arc<TelemetryClient>>,
573
574    // ========================================================================
575    // All 7 Detectors (ordered by weight)
576    // ========================================================================
577    /// Attack sequence detector (weight: 50 - highest signal).
578    attack_sequence_detector: AttackSequenceDetector,
579
580    /// Auth token detector (weight: 45).
581    auth_token_detector: AuthTokenDetector,
582
583    /// HTTP fingerprint detector (weight: 40).
584    http_fingerprint_detector: SharedFingerprintDetector,
585
586    /// TLS fingerprint / JA4 rotation detector (weight: 35).
587    tls_fingerprint_detector: Ja4RotationDetector,
588
589    /// Behavioral similarity detector (weight: 30).
590    behavioral_detector: BehavioralSimilarityDetector,
591
592    /// Timing correlation detector (weight: 25).
593    timing_detector: TimingCorrelationDetector,
594
595    /// Network proximity detector (weight: 15 - lowest signal).
596    network_detector: NetworkProximityDetector,
597
598    /// Graph correlation detector (weight: 20).
599    graph_detector: GraphDetector,
600
601    /// Internal statistics (atomic counters for thread safety).
602    stats_fingerprints_registered: AtomicU64,
603    stats_detections_run: AtomicU64,
604    stats_campaigns_created: AtomicU64,
605
606    /// Per-detector detection counts.
607    stats_detections_by_type: RwLock<std::collections::HashMap<String, u64>>,
608
609    /// Last scan timestamp (protected by RwLock for safe concurrent access).
610    last_scan: RwLock<Option<Instant>>,
611
612    /// Flag to signal background worker shutdown.
613    shutdown: AtomicBool,
614
615    /// Cache for fingerprint groups (100ms TTL).
616    /// Reduces repeated expensive scans during detection cycles.
617    group_cache: RwLock<Option<GroupCache>>,
618
619    /// Rate limiter for auto-mitigation to prevent mass-banning.
620    mitigation_rate_limiter: MitigationRateLimiter,
621
622    /// Track mitigated campaigns to prevent re-mitigation.
623    mitigated_campaigns: dashmap::DashSet<String>,
624}
625
626impl CampaignManager {
627    /// Create a new campaign manager with default configuration.
628    pub fn new() -> Self {
629        Self::with_config(ManagerConfig::default())
630    }
631
632    /// Create a new campaign manager with custom configuration.
633    pub fn with_config(config: ManagerConfig) -> Self {
634        // ====================================================================
635        // Initialize all 7 detectors from config
636        // ====================================================================
637
638        // 1. Attack Sequence Detector (weight: 50)
639        let attack_sequence_config = AttackSequenceConfig {
640            min_ips: config.attack_sequence_min_ips,
641            window: config.attack_sequence_window,
642            similarity_threshold: 0.95, // Default similarity threshold
643            ..Default::default()
644        };
645        let attack_sequence_detector = AttackSequenceDetector::new(attack_sequence_config);
646
647        // 2. Auth Token Detector (weight: 45)
648        let auth_token_config = AuthTokenConfig {
649            min_ips: config.auth_token_min_ips,
650            window: config.auth_token_window,
651            ..Default::default()
652        };
653        let auth_token_detector = AuthTokenDetector::new(auth_token_config);
654
655        // 3. HTTP Fingerprint Detector (weight: 40)
656        let http_fingerprint_detector = SharedFingerprintDetector::with_config(
657            config.shared_threshold,
658            config.shared_confidence,
659            config.scan_interval.as_millis() as u64,
660        );
661
662        // 4. TLS Fingerprint / JA4 Rotation Detector (weight: 35)
663        let rotation_config = RotationConfig {
664            min_fingerprints: config.rotation_threshold,
665            window: config.rotation_window,
666            track_combined: config.track_combined,
667            ..Default::default()
668        };
669        let tls_fingerprint_detector = Ja4RotationDetector::new(rotation_config);
670
671        // 5. Behavioral Similarity Detector (weight: 30)
672        let behavioral_config = BehavioralConfig {
673            min_ips: config.behavioral_min_ips,
674            min_sequence_length: config.behavioral_min_sequence,
675            window: config.behavioral_window,
676            ..Default::default()
677        };
678        let behavioral_detector = BehavioralSimilarityDetector::new(behavioral_config);
679
680        // 6. Timing Correlation Detector (weight: 25)
681        let timing_config = TimingConfig {
682            min_ips: config.timing_min_ips,
683            bucket_size: Duration::from_millis(config.timing_bucket_ms),
684            min_bucket_hits: config.timing_min_bucket_hits,
685            window: config.timing_window,
686            ..Default::default()
687        };
688        let timing_detector = TimingCorrelationDetector::new(timing_config);
689
690        // 7. Network Proximity Detector (weight: 15)
691        let network_config = NetworkProximityConfig {
692            min_ips: config.network_min_ips,
693            check_subnet: config.network_check_subnet,
694            check_asn: false, // ASN lookup requires external data
695            ..Default::default()
696        };
697        let network_detector = NetworkProximityDetector::new(network_config);
698
699        // 8. Graph Correlation Detector (weight: 20)
700        let graph_config = GraphConfig {
701            min_component_size: config.graph_min_component_size,
702            max_traversal_depth: config.graph_max_depth,
703            edge_ttl: config.graph_edge_ttl,
704            ..Default::default()
705        };
706        let graph_detector = GraphDetector::new(graph_config);
707
708        Self {
709            config,
710            index: Arc::new(FingerprintIndex::new()),
711            store: Arc::new(CampaignStore::new()),
712            access_list_manager: None,
713            telemetry_client: None,
714            // All 7 detectors
715            attack_sequence_detector,
716            auth_token_detector,
717            http_fingerprint_detector,
718            tls_fingerprint_detector,
719            behavioral_detector,
720            timing_detector,
721            network_detector,
722            graph_detector,
723            // Statistics
724            stats_fingerprints_registered: AtomicU64::new(0),
725            stats_detections_run: AtomicU64::new(0),
726            stats_campaigns_created: AtomicU64::new(0),
727            stats_detections_by_type: RwLock::new(std::collections::HashMap::new()),
728            last_scan: RwLock::new(None),
729            shutdown: AtomicBool::new(false),
730            // Cache for fingerprint groups (starts empty)
731            group_cache: RwLock::new(None),
732            // Mitigation rate limiter and tracking
733            mitigation_rate_limiter: MitigationRateLimiter::default(),
734            mitigated_campaigns: dashmap::DashSet::new(),
735        }
736    }
737
738    /// Set the AccessListManager for automated mitigation.
739    pub fn set_access_list_manager(&mut self, manager: Arc<ParkingLotRwLock<AccessListManager>>) {
740        self.access_list_manager = Some(manager);
741    }
742
743    /// Set the TelemetryClient for cross-tenant correlation.
744    pub fn set_telemetry_client(&mut self, client: Arc<TelemetryClient>) {
745        self.telemetry_client = Some(client);
746    }
747
748    /// Register a JA4 fingerprint for an IP address.
749    ///
750    /// Called during request processing - must be fast.
751    /// Only updates indexes, no detection logic is run.
752    ///
753    /// # Arguments
754    /// * `ip` - The IP address of the client
755    /// * `fingerprint` - The JA4 TLS fingerprint
756    pub fn register_ja4(&self, ip: IpAddr, fingerprint: String) {
757        if fingerprint.is_empty() {
758            return;
759        }
760
761        let ip_str = ip.to_string();
762
763        // Update fingerprint index
764        self.index.update_entity(&ip_str, Some(&fingerprint), None);
765
766        // Record in rotation detector
767        self.tls_fingerprint_detector
768            .record_fingerprint(ip, fingerprint);
769
770        // Increment stats
771        self.stats_fingerprints_registered
772            .fetch_add(1, Ordering::Relaxed);
773    }
774
775    /// Register a JA4 fingerprint using Arc<str> to reduce allocations.
776    ///
777    /// Optimized version for callers who already have an Arc<str> fingerprint.
778    /// This avoids cloning the fingerprint string when it's already reference-counted.
779    ///
780    /// # Arguments
781    /// * `ip` - The IP address of the client
782    /// * `fingerprint` - The JA4 TLS fingerprint as Arc<str>
783    pub fn register_ja4_arc(&self, ip: IpAddr, fingerprint: Arc<str>) {
784        if fingerprint.is_empty() {
785            return;
786        }
787
788        let ip_str = ip.to_string();
789
790        // Update fingerprint index (uses &str reference, no allocation needed)
791        self.index.update_entity(&ip_str, Some(&fingerprint), None);
792
793        // Record in rotation detector (requires String, but Arc<str> → String is cheap clone)
794        self.tls_fingerprint_detector
795            .record_fingerprint(ip, fingerprint.to_string());
796
797        // Increment stats
798        self.stats_fingerprints_registered
799            .fetch_add(1, Ordering::Relaxed);
800    }
801
802    /// Register a combined (JA4+JA4H) fingerprint for an IP address.
803    ///
804    /// Combined fingerprints provide higher confidence correlation due to
805    /// increased specificity.
806    ///
807    /// # Arguments
808    /// * `ip` - The IP address of the client
809    /// * `fingerprint` - The combined fingerprint hash
810    pub fn register_combined(&self, ip: IpAddr, fingerprint: String) {
811        if fingerprint.is_empty() {
812            return;
813        }
814
815        let ip_str = ip.to_string();
816
817        // Update fingerprint index (combined only)
818        self.index.update_entity(&ip_str, None, Some(&fingerprint));
819
820        // Record in rotation detector if tracking combined
821        if self.config.track_combined {
822            self.tls_fingerprint_detector
823                .record_fingerprint(ip, fingerprint);
824        }
825
826        // Increment stats
827        self.stats_fingerprints_registered
828            .fetch_add(1, Ordering::Relaxed);
829    }
830
831    /// Register a combined fingerprint using Arc<str> to reduce allocations.
832    ///
833    /// Optimized version for callers who already have an Arc<str> fingerprint.
834    /// This avoids cloning the fingerprint string when it's already reference-counted.
835    ///
836    /// # Arguments
837    /// * `ip` - The IP address of the client
838    /// * `fingerprint` - The combined fingerprint hash as Arc<str>
839    pub fn register_combined_arc(&self, ip: IpAddr, fingerprint: Arc<str>) {
840        if fingerprint.is_empty() {
841            return;
842        }
843
844        let ip_str = ip.to_string();
845
846        // Update fingerprint index (combined only, uses &str reference)
847        self.index.update_entity(&ip_str, None, Some(&fingerprint));
848
849        // Record in rotation detector if tracking combined
850        if self.config.track_combined {
851            self.tls_fingerprint_detector
852                .record_fingerprint(ip, fingerprint.to_string());
853        }
854
855        // Increment stats
856        self.stats_fingerprints_registered
857            .fetch_add(1, Ordering::Relaxed);
858    }
859
860    /// Register both JA4 and JA4H fingerprints.
861    ///
862    /// Convenience method for registering both fingerprint types in one call.
863    ///
864    /// # Arguments
865    /// * `ip` - The IP address of the client
866    /// * `ja4` - Optional JA4 TLS fingerprint
867    /// * `ja4h` - Optional JA4H HTTP fingerprint (used in combined hash)
868    pub fn register_fingerprints(&self, ip: IpAddr, ja4: Option<String>, ja4h: Option<String>) {
869        let ip_str = ip.to_string();
870        let mut registered = false;
871
872        // Update fingerprint index
873        let ja4_ref = ja4.as_deref();
874        let combined = ja4h.as_ref().map(|h| {
875            // Create combined hash from JA4+JA4H
876            format!("{}_{}", ja4.as_deref().unwrap_or(""), h)
877        });
878        let combined_ref = combined.as_deref();
879
880        self.index.update_entity(&ip_str, ja4_ref, combined_ref);
881
882        // Record JA4 in rotation detector
883        if let Some(ref fp) = ja4 {
884            if !fp.is_empty() {
885                self.tls_fingerprint_detector
886                    .record_fingerprint(ip, fp.clone());
887                registered = true;
888            }
889        }
890
891        // Record combined in rotation detector if tracking
892        if self.config.track_combined {
893            if let Some(ref fp) = combined {
894                if !fp.is_empty() {
895                    self.tls_fingerprint_detector
896                        .record_fingerprint(ip, fp.clone());
897                    registered = true;
898                }
899            }
900        }
901
902        if registered {
903            self.stats_fingerprints_registered
904                .fetch_add(1, Ordering::Relaxed);
905        }
906    }
907
908    // ========================================================================
909    // New Detector Registration Methods
910    // ========================================================================
911
912    /// Record an attack payload observation for campaign correlation.
913    ///
914    /// Called when an attack is detected (SQLi, XSS, etc.) to correlate
915    /// identical payloads across different IPs. Weight: 50 (highest signal).
916    ///
917    /// # Arguments
918    /// * `ip` - The IP address of the attacker
919    /// * `payload_hash` - Hash of the normalized attack payload
920    /// * `attack_type` - Classification (sqli, xss, path_traversal, etc.)
921    /// * `path` - Target path of the attack
922    pub fn record_attack(
923        &self,
924        ip: IpAddr,
925        payload_hash: String,
926        attack_type: String,
927        path: String,
928    ) {
929        self.attack_sequence_detector.record_attack(
930            ip,
931            AttackPayload {
932                payload_hash,
933                attack_type,
934                target_path: path,
935                timestamp: std::time::Instant::now(),
936            },
937        );
938    }
939
940    /// Record a JWT token observation for campaign correlation.
941    ///
942    /// Called when a JWT is seen in request headers. Correlates IPs
943    /// using tokens with identical structure or issuer. Weight: 45.
944    ///
945    /// # Arguments
946    /// * `ip` - The IP address of the client
947    /// * `jwt` - The raw JWT string (header.payload.signature)
948    pub fn record_token(&self, ip: IpAddr, jwt: &str) {
949        self.auth_token_detector.record_jwt(ip, jwt);
950    }
951
952    /// Record a request for behavioral and timing analysis.
953    ///
954    /// Should be called for every request to build behavioral patterns
955    /// and detect timing correlations. Updates multiple detectors:
956    /// - Behavioral detector (weight: 30) - navigation patterns
957    /// - Timing detector (weight: 25) - request synchronization
958    /// - Network detector (weight: 15) - subnet correlation
959    ///
960    /// # Arguments
961    /// * `ip` - The IP address of the client
962    /// * `method` - HTTP method (GET, POST, etc.)
963    /// * `path` - Request path
964    pub fn record_request(&self, ip: IpAddr, method: &str, path: &str) {
965        self.behavioral_detector.record_request(ip, method, path);
966        self.timing_detector.record_request(ip);
967        self.network_detector.register_ip(ip);
968    }
969
970    /// Record a request with full context for all applicable detectors.
971    ///
972    /// Convenience method that records data to multiple detectors at once.
973    /// Call this during request processing to capture all correlation signals.
974    ///
975    /// # Arguments
976    /// * `ip` - The IP address of the client
977    /// * `method` - HTTP method
978    /// * `path` - Request path
979    /// * `ja4` - Optional JA4 TLS fingerprint
980    /// * `jwt` - Optional JWT from Authorization header
981    pub fn record_request_full(
982        &self,
983        ip: IpAddr,
984        method: &str,
985        path: &str,
986        ja4: Option<&str>,
987        jwt: Option<&str>,
988    ) {
989        // Record for behavioral/timing/network
990        self.record_request(ip, method, path);
991
992        let ip_id = GraphDetector::ip_id(&ip.to_string());
993
994        // Record JA4 fingerprint
995        if let Some(fp) = ja4 {
996            if !fp.is_empty() {
997                self.register_ja4(ip, fp.to_string());
998                // Graph correlation: Link IP to Fingerprint
999                self.record_relation(&ip_id, &GraphDetector::fp_id(fp));
1000            }
1001        }
1002
1003        // Record JWT token
1004        if let Some(token) = jwt {
1005            if !token.is_empty() {
1006                self.record_token(ip, token);
1007                // Graph correlation: Link IP to Token (use hash or prefix for ID)
1008                // Using first 16 chars of token as ID to avoid sensitive data in graph keys
1009                let token_id = if token.len() > 16 {
1010                    &token[..16]
1011                } else {
1012                    token
1013                };
1014                self.record_relation(&ip_id, &GraphDetector::token_id(token_id));
1015            }
1016        }
1017    }
1018
1019    /// Record a relationship for graph correlation.
1020    ///
1021    /// Records a connection between two entities (e.g., IP and Fingerprint)
1022    /// to build the correlation graph.
1023    ///
1024    /// # Arguments
1025    /// * `entity_a` - First entity ID (e.g., "ip:1.2.3.4")
1026    /// * `entity_b` - Second entity ID (e.g., "fp:abc12345")
1027    pub fn record_relation(&self, entity_a: &str, entity_b: &str) {
1028        self.graph_detector.record_relation(entity_a, entity_b);
1029    }
1030
1031    // ========================================================================
1032    // Campaign Scoring
1033    // ========================================================================
1034
1035    /// Calculate weighted campaign score from all correlation reasons.
1036    ///
1037    /// The score is computed as the weighted average of all correlation
1038    /// reasons, where each reason's contribution is:
1039    /// `weight * confidence / total_reasons`
1040    ///
1041    /// # Arguments
1042    /// * `campaign` - The campaign to score
1043    ///
1044    /// # Returns
1045    /// A score between 0.0 and 50.0 (max weight * max confidence)
1046    pub fn calculate_campaign_score(&self, campaign: &Campaign) -> f64 {
1047        if campaign.correlation_reasons.is_empty() {
1048            return 0.0;
1049        }
1050
1051        let total_weighted: f64 = campaign
1052            .correlation_reasons
1053            .iter()
1054            .map(|r| r.correlation_type.weight() as f64 * r.confidence)
1055            .sum();
1056
1057        total_weighted / campaign.correlation_reasons.len() as f64
1058    }
1059
1060    /// Run all 7 detectors in parallel and process updates with weighted scoring.
1061    ///
1062    /// Called periodically by background worker or on-demand.
1063    /// Detectors run concurrently for improved performance (~70ms savings at scale):
1064    /// 1. Attack Sequence (50) - Same attack payloads
1065    /// 2. Auth Token (45) - Same JWT structure/issuer
1066    /// 3. HTTP Fingerprint (40) - Identical JA4H
1067    /// 4. TLS Fingerprint (35) - Same JA4
1068    /// 5. Behavioral Similarity (30) - Navigation patterns
1069    /// 6. Timing Correlation (25) - Synchronized requests
1070    /// 7. Network Proximity (15) - Same ASN/subnet
1071    ///
1072    /// # Returns
1073    /// Number of campaign updates processed.
1074    ///
1075    /// # Errors
1076    /// Returns an error if any detector fails critically.
1077    pub async fn run_detection_cycle(&self) -> DetectorResult<usize> {
1078        // Create futures for each detector using trait objects for dynamic dispatch
1079        // This allows heterogeneous detectors to be run in parallel via join_all
1080        let detectors: Vec<(&dyn Detector, &'static str)> = vec![
1081            (
1082                &self.attack_sequence_detector as &dyn Detector,
1083                "attack_sequence",
1084            ),
1085            (&self.auth_token_detector as &dyn Detector, "auth_token"),
1086            (
1087                &self.http_fingerprint_detector as &dyn Detector,
1088                "http_fingerprint",
1089            ),
1090            (
1091                &self.tls_fingerprint_detector as &dyn Detector,
1092                "tls_fingerprint",
1093            ),
1094            (&self.behavioral_detector as &dyn Detector, "behavioral"),
1095            (&self.timing_detector as &dyn Detector, "timing"),
1096            (&self.network_detector as &dyn Detector, "network"),
1097            (&self.graph_detector as &dyn Detector, "graph"),
1098        ];
1099
1100        // Run all detectors in parallel using join_all
1101        // Each future wraps the synchronous analyze() call
1102        let detector_futures: Vec<_> = detectors
1103            .into_iter()
1104            .map(|(detector, name)| {
1105                let index = &self.index;
1106                // Wrap each detector in an async block
1107                async move {
1108                    let result = detector.analyze(index);
1109                    (name, result)
1110                }
1111            })
1112            .collect();
1113
1114        let results = join_all(detector_futures).await;
1115
1116        // Process all results and collect updates
1117        let mut total_updates = 0;
1118        let mut stats_updates: std::collections::HashMap<String, u64> =
1119            std::collections::HashMap::new();
1120
1121        for (name, result) in results {
1122            match result {
1123                Ok(updates) => {
1124                    let update_count = updates.len();
1125                    for update in updates {
1126                        self.process_campaign_update(update).await;
1127                        total_updates += 1;
1128                    }
1129                    // Collect per-detector stats
1130                    if update_count > 0 {
1131                        *stats_updates.entry(name.to_string()).or_insert(0) += update_count as u64;
1132                    }
1133                }
1134                Err(e) => {
1135                    tracing::warn!("Detector {} failed: {}", name, e);
1136                }
1137            }
1138        }
1139
1140        // Batch update stats (single lock acquisition)
1141        if !stats_updates.is_empty() {
1142            let mut stats = self.stats_detections_by_type.write().await;
1143            for (name, count) in stats_updates {
1144                *stats.entry(name).or_insert(0) += count;
1145            }
1146        }
1147
1148        // Update global stats
1149        self.stats_detections_run.fetch_add(1, Ordering::Relaxed);
1150        {
1151            let mut last_scan = self.last_scan.write().await;
1152            *last_scan = Some(Instant::now());
1153        }
1154
1155        Ok(total_updates)
1156    }
1157
1158    /// Get fingerprint groups above threshold with caching.
1159    ///
1160    /// This method caches the results of `get_groups_above_threshold()` for 100ms
1161    /// to avoid repeated expensive O(n) scans during a single detection cycle.
1162    /// Multiple detectors can use the same cached result within the TTL window.
1163    ///
1164    /// # Arguments
1165    /// * `threshold` - Minimum number of IPs required for a group
1166    ///
1167    /// # Returns
1168    /// Vector of fingerprint groups above the threshold.
1169    pub async fn get_cached_groups(&self, threshold: usize) -> Vec<FingerprintGroup> {
1170        // Check cache first
1171        {
1172            let cache_guard = self.group_cache.read().await;
1173            if let Some(ref cache) = *cache_guard {
1174                if cache.is_valid(threshold) {
1175                    return cache.groups.clone();
1176                }
1177            }
1178        }
1179
1180        // Cache miss or expired - compute fresh groups
1181        let groups = self.index.get_groups_above_threshold(threshold);
1182
1183        // Update cache
1184        {
1185            let mut cache_guard = self.group_cache.write().await;
1186            *cache_guard = Some(GroupCache::new(groups.clone(), threshold));
1187        }
1188
1189        groups
1190    }
1191
1192    /// Invalidate the fingerprint groups cache.
1193    ///
1194    /// Called when significant changes occur that would affect group composition.
1195    pub async fn invalidate_group_cache(&self) {
1196        let mut cache_guard = self.group_cache.write().await;
1197        *cache_guard = None;
1198    }
1199
1200    /// Process a campaign update from a detector.
1201    ///
1202    /// If the update contains a correlation reason with IPs, we try to:
1203    /// 1. Find existing campaign for any of those IPs
1204    /// 2. If found, update the existing campaign
1205    /// 3. If not found, create a new campaign
1206    async fn process_campaign_update(&self, update: CampaignUpdate) {
1207        // Extract IPs from correlation reason if present
1208        let ips: Vec<String> = update
1209            .add_correlation_reason
1210            .as_ref()
1211            .map(|reason| reason.evidence.clone())
1212            .unwrap_or_default();
1213
1214        if ips.is_empty() {
1215            return;
1216        }
1217
1218        // Check if any IP is already in a campaign
1219        let existing_campaign_id = ips.iter().find_map(|ip| self.store.get_campaign_for_ip(ip));
1220
1221        // Use a variable to track if we need to check for mitigation
1222        let mut check_mitigation = false;
1223        let mut target_campaign_id = String::new();
1224
1225        match existing_campaign_id {
1226            Some(campaign_id) => {
1227                // Update existing campaign
1228                let _ = self.store.update_campaign(&campaign_id, update);
1229
1230                // Add any new IPs to the campaign
1231                for ip in &ips {
1232                    let _ = self.store.add_actor_to_campaign(&campaign_id, ip);
1233                }
1234
1235                check_mitigation = true;
1236                target_campaign_id = campaign_id;
1237            }
1238            None => {
1239                // Create new campaign
1240                let confidence = update.confidence.unwrap_or(0.5);
1241
1242                // Generate a unique ID, retrying if collision occurs (rare edge case)
1243                // ID collisions can happen if two campaigns are created in the same millisecond
1244                let mut campaign_id = Campaign::generate_id();
1245                let mut retry_count = 0;
1246                while self.store.get_campaign(&campaign_id).is_some() && retry_count < 10 {
1247                    // Add random suffix to handle collision
1248                    campaign_id = format!("{}-{:x}", Campaign::generate_id(), fastrand::u32(..));
1249                    retry_count += 1;
1250                }
1251
1252                let mut campaign = Campaign::new(campaign_id.clone(), ips, confidence);
1253
1254                // Apply update fields to new campaign
1255                if let Some(status) = update.status {
1256                    campaign.status = status;
1257                }
1258                if let Some(ref attack_types) = update.attack_types {
1259                    campaign.attack_types = attack_types.clone();
1260                }
1261                if let Some(reason) = update.add_correlation_reason {
1262                    campaign.correlation_reasons.push(reason);
1263                }
1264                if let Some(risk_score) = update.risk_score {
1265                    campaign.risk_score = risk_score;
1266                }
1267
1268                // Store the campaign
1269                if self.store.create_campaign(campaign).is_ok() {
1270                    self.stats_campaigns_created.fetch_add(1, Ordering::Relaxed);
1271                    check_mitigation = true;
1272                    target_campaign_id = campaign_id;
1273                }
1274            }
1275        }
1276
1277        // Check for automated mitigation if enabled
1278        if check_mitigation {
1279            if let Some(campaign) = self.store.get_campaign(&target_campaign_id) {
1280                // Auto-mitigation (Block)
1281                if self.config.auto_mitigation_enabled
1282                    && campaign.confidence >= self.config.auto_mitigation_threshold
1283                    && campaign.status != CampaignStatus::Resolved
1284                {
1285                    self.mitigate_campaign(&campaign).await;
1286                }
1287
1288                // Cross-Tenant Reporting (Fleet Intelligence)
1289                // Report high-confidence campaigns (>= 0.8) to Signal Horizon
1290                if campaign.confidence >= 0.8 {
1291                    self.report_campaign(&campaign);
1292                }
1293            }
1294        }
1295    }
1296
1297    /// Report a high-confidence campaign to Signal Horizon telemetry.
1298    fn report_campaign(&self, campaign: &Campaign) {
1299        if let Some(ref client) = self.telemetry_client {
1300            // Only report if client is enabled
1301            if !client.is_enabled() {
1302                return;
1303            }
1304
1305            let event = TelemetryEvent::CampaignReport {
1306                campaign_id: campaign.id.clone(),
1307                confidence: campaign.confidence,
1308                attack_types: campaign
1309                    .attack_types
1310                    .iter()
1311                    .map(|at| format!("{:?}", at))
1312                    .collect(),
1313                actor_count: campaign.actor_count,
1314                correlation_reasons: campaign
1315                    .correlation_reasons
1316                    .iter()
1317                    .map(|r| r.description.clone())
1318                    .collect(),
1319                timestamp_ms: std::time::SystemTime::now()
1320                    .duration_since(std::time::UNIX_EPOCH)
1321                    .unwrap_or_default()
1322                    .as_millis() as u64,
1323            };
1324
1325            // Fire and forget - runs in background
1326            let client = Arc::clone(client);
1327            tokio::spawn(async move {
1328                if let Err(e) = client.report(event).await {
1329                    tracing::debug!("Failed to report campaign telemetry: {}", e);
1330                }
1331            });
1332        }
1333    }
1334
1335    /// Apply automated mitigation to a high-confidence campaign.
1336    ///
1337    /// Adds campaign actors to the deny list via AccessListManager.
1338    /// Implements rate limiting and batch blocking for safety.
1339    async fn mitigate_campaign(&self, campaign: &Campaign) {
1340        // Check if already mitigated
1341        if self.mitigated_campaigns.contains(&campaign.id) {
1342            tracing::debug!(campaign_id = %campaign.id, "Campaign already mitigated, skipping");
1343            return;
1344        }
1345
1346        let access_list = match &self.access_list_manager {
1347            Some(al) => al,
1348            None => {
1349                tracing::debug!("No AccessListManager configured, skipping mitigation");
1350                return;
1351            }
1352        };
1353
1354        // Collect IPs to block (limit per campaign)
1355        let max_ips = self.mitigation_rate_limiter.max_ips_per_campaign();
1356        let ips_to_block: Vec<IpAddr> = campaign
1357            .actors
1358            .iter()
1359            .filter_map(|ip_str| ip_str.parse::<IpAddr>().ok())
1360            .take(max_ips)
1361            .collect();
1362
1363        if ips_to_block.is_empty() {
1364            tracing::debug!(campaign_id = %campaign.id, "No valid IPs to block");
1365            return;
1366        }
1367
1368        // Rate limit check - acquire permits for all IPs
1369        let mut blocked_count = 0;
1370        let mut rate_limited = false;
1371
1372        for ip in &ips_to_block {
1373            if let Err(reason) = self.mitigation_rate_limiter.try_ban().await {
1374                tracing::warn!(
1375                    campaign_id = %campaign.id,
1376                    reason = %reason,
1377                    blocked = blocked_count,
1378                    remaining = ips_to_block.len() - blocked_count,
1379                    "Mitigation rate limited"
1380                );
1381                rate_limited = true;
1382                break;
1383            }
1384
1385            // Add deny rule
1386            let comment = format!(
1387                "Campaign {} (confidence: {:.2})",
1388                campaign.id, campaign.confidence
1389            );
1390            {
1391                let mut al = access_list.write();
1392                if let Err(e) = al.add_deny_ip(ip, Some(&comment)) {
1393                    tracing::error!(ip = %ip, error = %e, "Failed to add deny rule");
1394                    continue;
1395                }
1396            }
1397            blocked_count += 1;
1398        }
1399
1400        // Log audit event
1401        let attack_types: Vec<String> = campaign
1402            .attack_types
1403            .iter()
1404            .map(|at| format!("{:?}", at))
1405            .collect();
1406        tracing::info!(
1407            campaign_id = %campaign.id,
1408            confidence = campaign.confidence,
1409            total_actors = campaign.actors.len(),
1410            blocked = blocked_count,
1411            rate_limited = rate_limited,
1412            attack_types = ?attack_types,
1413            "Auto-mitigation applied"
1414        );
1415
1416        // Mark as mitigated
1417        self.mitigated_campaigns.insert(campaign.id.clone());
1418
1419        // Report mitigation event to telemetry
1420        if let Some(ref client) = self.telemetry_client {
1421            if client.is_enabled() {
1422                let event = TelemetryEvent::CampaignReport {
1423                    campaign_id: format!("mitigation:{}", campaign.id),
1424                    confidence: campaign.confidence,
1425                    attack_types,
1426                    actor_count: blocked_count,
1427                    correlation_reasons: vec![format!(
1428                        "Auto-mitigation applied: {} IPs blocked",
1429                        blocked_count
1430                    )],
1431                    timestamp_ms: std::time::SystemTime::now()
1432                        .duration_since(std::time::UNIX_EPOCH)
1433                        .unwrap_or_default()
1434                        .as_millis() as u64,
1435                };
1436
1437                let client = Arc::clone(client);
1438                tokio::spawn(async move {
1439                    if let Err(e) = client.report(event).await {
1440                        tracing::debug!("Failed to report mitigation telemetry: {}", e);
1441                    }
1442                });
1443            }
1444        }
1445    }
1446
1447    /// Check if an IP should trigger immediate detection.
1448    ///
1449    /// Used for event-driven detection on new requests. Checks all 7 detectors
1450    /// to see if any threshold has been reached that warrants immediate analysis.
1451    ///
1452    /// # Arguments
1453    /// * `ip` - The IP address to check
1454    ///
1455    /// # Returns
1456    /// `true` if immediate detection should be triggered.
1457    pub fn should_trigger_detection(&self, ip: &IpAddr) -> bool {
1458        // Check detectors in order of weight (short-circuit on first match)
1459        self.attack_sequence_detector
1460            .should_trigger(ip, &self.index)
1461            || self.auth_token_detector.should_trigger(ip, &self.index)
1462            || self
1463                .http_fingerprint_detector
1464                .should_trigger(ip, &self.index)
1465            || self
1466                .tls_fingerprint_detector
1467                .should_trigger(ip, &self.index)
1468            || self.behavioral_detector.should_trigger(ip, &self.index)
1469            || self.timing_detector.should_trigger(ip, &self.index)
1470            || self.network_detector.should_trigger(ip, &self.index)
1471            || self.graph_detector.should_trigger(ip, &self.index)
1472    }
1473
1474    /// Get all active campaigns for API response.
1475    ///
1476    /// Returns campaigns with Detected or Active status.
1477    pub fn get_campaigns(&self) -> Vec<Campaign> {
1478        self.store.list_active_campaigns()
1479    }
1480
1481    /// Get all campaigns (including resolved/dormant).
1482    pub fn get_all_campaigns(&self) -> Vec<Campaign> {
1483        self.store.list_campaigns(None)
1484    }
1485
1486    /// Create a snapshot of all campaigns for persistence.
1487    ///
1488    /// Returns all campaigns regardless of status.
1489    pub fn snapshot(&self) -> Vec<Campaign> {
1490        self.store.list_campaigns(None)
1491    }
1492
1493    /// Restore campaigns from a snapshot.
1494    ///
1495    /// Clears existing state and loads the provided campaigns.
1496    pub fn restore(&self, campaigns: Vec<Campaign>) {
1497        // Clear existing state
1498        self.store.clear();
1499        self.index.clear();
1500
1501        // Restore campaigns
1502        for campaign in campaigns {
1503            // Re-add IP mappings
1504            for ip_str in &campaign.actors {
1505                // Update fingerprint index with a placeholder to re-establish the IP entry
1506                self.index.update_entity(ip_str, None, None);
1507            }
1508
1509            // Create the campaign in the store
1510            let _ = self.store.create_campaign(campaign);
1511        }
1512    }
1513
1514    /// Get a specific campaign by ID.
1515    ///
1516    /// # Arguments
1517    /// * `id` - The campaign ID to retrieve
1518    ///
1519    /// # Returns
1520    /// The campaign if found, None otherwise.
1521    pub fn get_campaign(&self, id: &str) -> Option<Campaign> {
1522        self.store.get_campaign(id)
1523    }
1524
1525    /// Get IPs that are members of a campaign.
1526    ///
1527    /// # Arguments
1528    /// * `campaign_id` - The campaign ID to query
1529    ///
1530    /// # Returns
1531    /// Vector of IP addresses in the campaign.
1532    pub fn get_campaign_actors(&self, campaign_id: &str) -> Vec<IpAddr> {
1533        self.store
1534            .get_campaign(campaign_id)
1535            .map(|campaign| {
1536                campaign
1537                    .actors
1538                    .iter()
1539                    .filter_map(|ip_str| ip_str.parse().ok())
1540                    .collect()
1541            })
1542            .unwrap_or_default()
1543    }
1544
1545    /// Get the correlation graph for a campaign.
1546    pub fn get_campaign_graph(&self, campaign_id: &str) -> serde_json::Value {
1547        let ips = self.get_campaign_actors(campaign_id);
1548        let ips_str: Vec<String> = ips.into_iter().map(|ip| ip.to_string()).collect();
1549
1550        self.graph_detector.get_cytoscape_data(&ips_str)
1551    }
1552
1553    /// Get the correlation graph for a campaign with pagination and identifier hashing.
1554    /// P1 fix: Supports pagination to prevent memory exhaustion and hashes identifiers
1555    /// to prevent information disclosure.
1556    pub fn get_campaign_graph_paginated(
1557        &self,
1558        campaign_id: &str,
1559        limit: Option<usize>,
1560        offset: Option<usize>,
1561        hash_identifiers: bool,
1562    ) -> crate::correlation::detectors::graph::PaginatedGraph {
1563        use crate::correlation::detectors::graph::GraphExportOptions;
1564
1565        let ips = self.get_campaign_actors(campaign_id);
1566        let ips_str: Vec<String> = ips.into_iter().map(|ip| ip.to_string()).collect();
1567
1568        let options = GraphExportOptions {
1569            limit,
1570            offset,
1571            hash_identifiers,
1572        };
1573
1574        self.graph_detector
1575            .get_cytoscape_data_paginated(&ips_str, options)
1576    }
1577
1578    /// Get current statistics.
1579    ///
1580    /// Returns a snapshot of manager statistics including index and store stats.
1581    pub fn stats(&self) -> ManagerStats {
1582        let last_scan = {
1583            // Use try_read to avoid blocking; if locked, use None
1584            self.last_scan
1585                .try_read()
1586                .map(|guard| *guard)
1587                .unwrap_or(None)
1588        };
1589
1590        let detections_by_type = self
1591            .stats_detections_by_type
1592            .try_read()
1593            .map(|guard| guard.clone())
1594            .unwrap_or_default();
1595
1596        ManagerStats {
1597            fingerprints_registered: self.stats_fingerprints_registered.load(Ordering::Relaxed),
1598            detections_run: self.stats_detections_run.load(Ordering::Relaxed),
1599            campaigns_created: self.stats_campaigns_created.load(Ordering::Relaxed),
1600            last_scan,
1601            index_stats: self.index.stats(),
1602            campaign_stats: self.store.stats(),
1603            detections_by_type,
1604        }
1605    }
1606
1607    /// Start background detection worker.
1608    ///
1609    /// Returns a handle that can be used to await worker completion.
1610    /// The worker runs detection cycles at the configured interval until
1611    /// the manager is dropped or shutdown is signaled.
1612    ///
1613    /// # Returns
1614    /// JoinHandle for the background task.
1615    pub fn start_background_worker(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
1616        let manager = self;
1617        let scan_interval = manager.config.scan_interval;
1618
1619        tokio::spawn(async move {
1620            let mut ticker = interval(scan_interval);
1621
1622            loop {
1623                ticker.tick().await;
1624
1625                // Check for shutdown signal
1626                if manager.shutdown.load(Ordering::Relaxed) {
1627                    log::info!("Campaign manager background worker shutting down");
1628                    break;
1629                }
1630
1631                // Run detection cycle
1632                match manager.run_detection_cycle().await {
1633                    Ok(updates) => {
1634                        if updates > 0 {
1635                            log::debug!("Detection cycle processed {} updates", updates);
1636                        }
1637                    }
1638                    Err(e) => {
1639                        log::warn!("Detection cycle error: {}", e);
1640                    }
1641                }
1642            }
1643        })
1644    }
1645
1646    /// Signal the background worker to shut down.
1647    pub fn shutdown(&self) {
1648        self.shutdown.store(true, Ordering::Relaxed);
1649    }
1650
1651    /// Check if shutdown has been signaled.
1652    pub fn is_shutdown(&self) -> bool {
1653        self.shutdown.load(Ordering::Relaxed)
1654    }
1655
1656    /// Remove an IP from tracking (called when entity is evicted).
1657    ///
1658    /// Cleans up the IP from:
1659    /// - Fingerprint index
1660    /// - Any associated campaigns
1661    ///
1662    /// # Arguments
1663    /// * `ip` - The IP address to remove
1664    pub fn remove_ip(&self, ip: &IpAddr) {
1665        let ip_str = ip.to_string();
1666
1667        // Remove from fingerprint index
1668        self.index.remove_entity(&ip_str);
1669
1670        // Remove from any campaign
1671        if let Some(campaign_id) = self.store.get_campaign_for_ip(&ip_str) {
1672            let _ = self.store.remove_actor_from_campaign(&campaign_id, &ip_str);
1673        }
1674    }
1675
1676    /// Get the fingerprint index (for integration with EntityManager).
1677    ///
1678    /// Allows direct access to the index for advanced use cases.
1679    pub fn index(&self) -> &Arc<FingerprintIndex> {
1680        &self.index
1681    }
1682
1683    /// Get the campaign store (for integration).
1684    ///
1685    /// Allows direct access to the store for advanced use cases.
1686    pub fn store(&self) -> &Arc<CampaignStore> {
1687        &self.store
1688    }
1689
1690    /// Get the current configuration.
1691    pub fn config(&self) -> &ManagerConfig {
1692        &self.config
1693    }
1694
1695    /// Resolve a campaign.
1696    ///
1697    /// # Arguments
1698    /// * `campaign_id` - The campaign ID to resolve
1699    /// * `reason` - The reason for resolution
1700    ///
1701    /// # Returns
1702    /// Ok(()) if successful, Err if campaign not found or already resolved.
1703    pub fn resolve_campaign(&self, campaign_id: &str, reason: &str) -> Result<(), DetectorError> {
1704        self.store
1705            .resolve_campaign(campaign_id, reason)
1706            .map_err(|e| DetectorError::DetectionFailed(e.to_string()))
1707    }
1708
1709    /// Clear all state (primarily for testing).
1710    ///
1711    /// Clears fingerprint index, campaign store, and detector state.
1712    pub fn clear(&self) {
1713        self.index.clear();
1714        self.store.clear();
1715        self.http_fingerprint_detector.clear_processed();
1716        self.tls_fingerprint_detector.cleanup_old_observations();
1717    }
1718}
1719
1720impl Default for CampaignManager {
1721    fn default() -> Self {
1722        Self::new()
1723    }
1724}
1725
1726// ============================================================================
1727// Tests
1728// ============================================================================
1729
1730#[cfg(test)]
1731mod tests {
1732    use super::*;
1733    use std::thread;
1734
1735    // ========================================================================
1736    // Helper Functions
1737    // ========================================================================
1738
1739    fn create_test_manager() -> CampaignManager {
1740        let config = ManagerConfig {
1741            shared_threshold: 3,
1742            rotation_threshold: 3,
1743            rotation_window: Duration::from_secs(60),
1744            scan_interval: Duration::from_millis(100),
1745            background_scanning: false,
1746            ..Default::default()
1747        };
1748        CampaignManager::with_config(config)
1749    }
1750
1751    fn create_test_ip(last_octet: u8) -> IpAddr {
1752        format!("192.168.1.{}", last_octet).parse().unwrap()
1753    }
1754
1755    // ========================================================================
1756    // Configuration Tests
1757    // ========================================================================
1758
1759    #[test]
1760    fn test_config_default() {
1761        let config = ManagerConfig::default();
1762
1763        assert_eq!(config.shared_threshold, 3);
1764        assert_eq!(config.rotation_threshold, 3);
1765        assert_eq!(config.rotation_window, Duration::from_secs(60));
1766        assert_eq!(config.scan_interval, Duration::from_secs(5));
1767        assert!(config.background_scanning);
1768        assert!(config.track_combined);
1769        assert!((config.shared_confidence - 0.85).abs() < 0.001);
1770    }
1771
1772    #[test]
1773    fn test_config_builder() {
1774        let config = ManagerConfig::new()
1775            .with_shared_threshold(5)
1776            .with_rotation_threshold(4)
1777            .with_rotation_window(Duration::from_secs(120))
1778            .with_scan_interval(Duration::from_secs(10))
1779            .with_background_scanning(false)
1780            .with_track_combined(false)
1781            .with_shared_confidence(0.9);
1782
1783        assert_eq!(config.shared_threshold, 5);
1784        assert_eq!(config.rotation_threshold, 4);
1785        assert_eq!(config.rotation_window, Duration::from_secs(120));
1786        assert_eq!(config.scan_interval, Duration::from_secs(10));
1787        assert!(!config.background_scanning);
1788        assert!(!config.track_combined);
1789        assert!((config.shared_confidence - 0.9).abs() < 0.001);
1790    }
1791
1792    #[tokio::test]
1793    async fn test_mitigation_rate_limiter_limits() {
1794        let limiter = MitigationRateLimiter::new(2, Duration::from_secs(60), 10);
1795
1796        assert!(limiter.try_ban().await.is_ok());
1797        assert!(limiter.try_ban().await.is_ok());
1798        assert!(limiter.try_ban().await.is_err());
1799    }
1800
1801    #[test]
1802    fn test_config_validation() {
1803        // Valid config
1804        let config = ManagerConfig::default();
1805        assert!(config.validate().is_ok());
1806
1807        // Invalid shared_threshold
1808        let config = ManagerConfig::new().with_shared_threshold(1);
1809        assert!(config.validate().is_err());
1810
1811        // Invalid rotation_threshold
1812        let config = ManagerConfig::new().with_rotation_threshold(1);
1813        assert!(config.validate().is_err());
1814
1815        // Invalid rotation_window
1816        let config = ManagerConfig {
1817            rotation_window: Duration::ZERO,
1818            ..Default::default()
1819        };
1820        assert!(config.validate().is_err());
1821
1822        // Invalid scan_interval
1823        let config = ManagerConfig {
1824            scan_interval: Duration::ZERO,
1825            ..Default::default()
1826        };
1827        assert!(config.validate().is_err());
1828
1829        // Auto-mitigation with threshold too low (security risk)
1830        let config = ManagerConfig {
1831            auto_mitigation_enabled: true,
1832            auto_mitigation_threshold: 0.5, // Below 0.7 minimum
1833            ..Default::default()
1834        };
1835        assert!(config.validate().is_err());
1836
1837        // Auto-mitigation with valid threshold
1838        let config = ManagerConfig {
1839            auto_mitigation_enabled: true,
1840            auto_mitigation_threshold: 0.9,
1841            ..Default::default()
1842        };
1843        assert!(config.validate().is_ok());
1844
1845        // Auto-mitigation disabled ignores threshold
1846        let config = ManagerConfig {
1847            auto_mitigation_enabled: false,
1848            auto_mitigation_threshold: 0.5, // Would be invalid if enabled
1849            ..Default::default()
1850        };
1851        assert!(config.validate().is_ok());
1852    }
1853
1854    #[test]
1855    fn test_config_confidence_clamping() {
1856        let config = ManagerConfig::new().with_shared_confidence(1.5);
1857        assert!((config.shared_confidence - 1.0).abs() < 0.001);
1858
1859        let config = ManagerConfig::new().with_shared_confidence(-0.5);
1860        assert!(config.shared_confidence >= 0.0);
1861    }
1862
1863    // ========================================================================
1864    // Registration Flow Tests
1865    // ========================================================================
1866
1867    #[test]
1868    fn test_register_ja4() {
1869        let manager = create_test_manager();
1870        let ip = create_test_ip(1);
1871
1872        manager.register_ja4(ip, "t13d1516h2_abc123".to_string());
1873
1874        let stats = manager.stats();
1875        assert_eq!(stats.fingerprints_registered, 1);
1876        assert_eq!(stats.index_stats.total_ips, 1);
1877        assert_eq!(stats.index_stats.ja4_fingerprints, 1);
1878    }
1879
1880    #[test]
1881    fn test_register_ja4_empty_skipped() {
1882        let manager = create_test_manager();
1883        let ip = create_test_ip(1);
1884
1885        manager.register_ja4(ip, "".to_string());
1886
1887        let stats = manager.stats();
1888        assert_eq!(stats.fingerprints_registered, 0);
1889        assert_eq!(stats.index_stats.total_ips, 0);
1890    }
1891
1892    #[test]
1893    fn test_register_combined() {
1894        let manager = create_test_manager();
1895        let ip = create_test_ip(1);
1896
1897        manager.register_combined(ip, "combined_hash_xyz".to_string());
1898
1899        let stats = manager.stats();
1900        assert_eq!(stats.fingerprints_registered, 1);
1901        assert_eq!(stats.index_stats.total_ips, 1);
1902        assert_eq!(stats.index_stats.combined_fingerprints, 1);
1903    }
1904
1905    #[test]
1906    fn test_register_fingerprints_both() {
1907        let manager = create_test_manager();
1908        let ip = create_test_ip(1);
1909
1910        manager.register_fingerprints(
1911            ip,
1912            Some("ja4_test".to_string()),
1913            Some("ja4h_test".to_string()),
1914        );
1915
1916        let stats = manager.stats();
1917        assert_eq!(stats.fingerprints_registered, 1);
1918        assert_eq!(stats.index_stats.ja4_fingerprints, 1);
1919        assert_eq!(stats.index_stats.combined_fingerprints, 1);
1920    }
1921
1922    #[test]
1923    fn test_register_fingerprints_ja4_only() {
1924        let manager = create_test_manager();
1925        let ip = create_test_ip(1);
1926
1927        manager.register_fingerprints(ip, Some("ja4_only".to_string()), None);
1928
1929        let stats = manager.stats();
1930        assert_eq!(stats.fingerprints_registered, 1);
1931        assert_eq!(stats.index_stats.ja4_fingerprints, 1);
1932        assert_eq!(stats.index_stats.combined_fingerprints, 0);
1933    }
1934
1935    // ========================================================================
1936    // Detection Cycle Tests
1937    // ========================================================================
1938
1939    #[tokio::test]
1940    async fn test_detection_cycle_empty() {
1941        let manager = create_test_manager();
1942
1943        let updates = manager.run_detection_cycle().await.unwrap();
1944
1945        assert_eq!(updates, 0);
1946        assert_eq!(manager.stats().detections_run, 1);
1947    }
1948
1949    #[tokio::test]
1950    async fn test_detection_cycle_creates_campaign() {
1951        let manager = create_test_manager();
1952
1953        // Register 3 IPs with same fingerprint (threshold)
1954        for i in 1..=3 {
1955            let ip = create_test_ip(i);
1956            manager.register_ja4(ip, "shared_fingerprint".to_string());
1957        }
1958
1959        let updates = manager.run_detection_cycle().await.unwrap();
1960
1961        assert!(updates >= 1);
1962        assert_eq!(manager.stats().campaigns_created, 1);
1963
1964        let campaigns = manager.get_campaigns();
1965        assert_eq!(campaigns.len(), 1);
1966    }
1967
1968    #[tokio::test]
1969    async fn test_detection_cycle_no_duplicate_campaigns() {
1970        let manager = create_test_manager();
1971
1972        // Register 3 IPs with same fingerprint
1973        for i in 1..=3 {
1974            let ip = create_test_ip(i);
1975            manager.register_ja4(ip, "shared_fp".to_string());
1976        }
1977
1978        // First detection cycle
1979        manager.run_detection_cycle().await.unwrap();
1980        let first_count = manager.stats().campaigns_created;
1981
1982        // Second detection cycle - should not create duplicate
1983        manager.run_detection_cycle().await.unwrap();
1984        let second_count = manager.stats().campaigns_created;
1985
1986        assert_eq!(first_count, second_count);
1987    }
1988
1989    // ========================================================================
1990    // Campaign Retrieval Tests
1991    // ========================================================================
1992
1993    #[tokio::test]
1994    async fn test_get_campaigns() {
1995        let manager = create_test_manager();
1996
1997        // Create a campaign
1998        for i in 1..=3 {
1999            let ip = create_test_ip(i);
2000            manager.register_ja4(ip, "test_fp".to_string());
2001        }
2002        manager.run_detection_cycle().await.unwrap();
2003
2004        let campaigns = manager.get_campaigns();
2005        assert!(!campaigns.is_empty());
2006
2007        // Verify campaign has the expected actors
2008        let campaign = &campaigns[0];
2009        assert_eq!(campaign.actor_count, 3);
2010    }
2011
2012    #[tokio::test]
2013    async fn test_get_campaign_by_id() {
2014        let manager = create_test_manager();
2015
2016        // Create a campaign
2017        for i in 1..=3 {
2018            let ip = create_test_ip(i);
2019            manager.register_ja4(ip, "get_by_id_fp".to_string());
2020        }
2021        manager.run_detection_cycle().await.unwrap();
2022
2023        let campaigns = manager.get_campaigns();
2024        let campaign_id = &campaigns[0].id;
2025
2026        let retrieved = manager.get_campaign(campaign_id);
2027        assert!(retrieved.is_some());
2028        assert_eq!(retrieved.unwrap().id, *campaign_id);
2029
2030        // Non-existent ID
2031        let not_found = manager.get_campaign("nonexistent");
2032        assert!(not_found.is_none());
2033    }
2034
2035    #[tokio::test]
2036    async fn test_get_campaign_actors() {
2037        let manager = create_test_manager();
2038
2039        // Create a campaign
2040        for i in 1..=3 {
2041            let ip = create_test_ip(i);
2042            manager.register_ja4(ip, "actors_fp".to_string());
2043        }
2044        manager.run_detection_cycle().await.unwrap();
2045
2046        let campaigns = manager.get_campaigns();
2047        let campaign_id = &campaigns[0].id;
2048
2049        let actors = manager.get_campaign_actors(campaign_id);
2050        assert_eq!(actors.len(), 3);
2051
2052        // Non-existent campaign
2053        let no_actors = manager.get_campaign_actors("nonexistent");
2054        assert!(no_actors.is_empty());
2055    }
2056
2057    // ========================================================================
2058    // Stats Tracking Tests
2059    // ========================================================================
2060
2061    #[tokio::test]
2062    async fn test_stats_tracking() {
2063        let manager = create_test_manager();
2064
2065        // Initial stats
2066        let initial = manager.stats();
2067        assert_eq!(initial.fingerprints_registered, 0);
2068        assert_eq!(initial.detections_run, 0);
2069        assert_eq!(initial.campaigns_created, 0);
2070        assert!(initial.last_scan.is_none());
2071
2072        // Register some fingerprints
2073        for i in 1..=5 {
2074            let ip = create_test_ip(i);
2075            manager.register_ja4(ip, "stats_test_fp".to_string());
2076        }
2077
2078        let after_register = manager.stats();
2079        assert_eq!(after_register.fingerprints_registered, 5);
2080        assert_eq!(after_register.index_stats.total_ips, 5);
2081
2082        // Run detection
2083        manager.run_detection_cycle().await.unwrap();
2084
2085        let after_detect = manager.stats();
2086        assert_eq!(after_detect.detections_run, 1);
2087        assert!(after_detect.last_scan.is_some());
2088        assert!(after_detect.campaigns_created >= 1);
2089    }
2090
2091    // ========================================================================
2092    // Remove IP Cleanup Tests
2093    // ========================================================================
2094
2095    #[tokio::test]
2096    async fn test_remove_ip_cleanup() {
2097        let manager = create_test_manager();
2098
2099        // Create a campaign
2100        for i in 1..=3 {
2101            let ip = create_test_ip(i);
2102            manager.register_ja4(ip, "remove_test_fp".to_string());
2103        }
2104        manager.run_detection_cycle().await.unwrap();
2105
2106        // Verify campaign exists
2107        let campaigns = manager.get_campaigns();
2108        assert_eq!(campaigns[0].actor_count, 3);
2109
2110        // Remove one IP
2111        let ip_to_remove = create_test_ip(1);
2112        manager.remove_ip(&ip_to_remove);
2113
2114        // Verify IP was removed from index
2115        assert_eq!(manager.index.len(), 2);
2116
2117        // Verify IP was removed from campaign
2118        let updated_campaigns = manager.get_campaigns();
2119        assert_eq!(updated_campaigns[0].actor_count, 2);
2120    }
2121
2122    // ========================================================================
2123    // Concurrent Registration Tests
2124    // ========================================================================
2125
2126    #[test]
2127    fn test_concurrent_registration() {
2128        let manager = Arc::new(create_test_manager());
2129        let mut handles = vec![];
2130
2131        // Spawn multiple threads registering fingerprints
2132        for thread_id in 0..10 {
2133            let manager = Arc::clone(&manager);
2134            handles.push(thread::spawn(move || {
2135                for i in 0..100 {
2136                    let ip: IpAddr = format!("10.{}.0.{}", thread_id, i % 256).parse().unwrap();
2137                    manager.register_ja4(ip, format!("fp_t{}_{}", thread_id, i % 5));
2138                }
2139            }));
2140        }
2141
2142        for handle in handles {
2143            handle.join().unwrap();
2144        }
2145
2146        // Verify no panics and reasonable state
2147        let stats = manager.stats();
2148        assert_eq!(stats.fingerprints_registered, 1000);
2149        assert!(stats.index_stats.total_ips > 0);
2150    }
2151
2152    // ========================================================================
2153    // Trigger Detection Logic Tests
2154    // ========================================================================
2155
2156    #[test]
2157    fn test_should_trigger_detection_below_threshold() {
2158        let manager = create_test_manager();
2159
2160        // Register only 2 IPs (below threshold of 3)
2161        for i in 1..=2 {
2162            let ip = create_test_ip(i);
2163            manager.register_ja4(ip, "trigger_test_fp".to_string());
2164        }
2165
2166        let ip = create_test_ip(1);
2167        assert!(!manager.should_trigger_detection(&ip));
2168    }
2169
2170    #[test]
2171    fn test_should_trigger_detection_at_threshold() {
2172        let manager = create_test_manager();
2173
2174        // Register 3 IPs (at threshold)
2175        for i in 1..=3 {
2176            let ip = create_test_ip(i);
2177            manager.register_ja4(ip, "trigger_threshold_fp".to_string());
2178        }
2179
2180        let ip = create_test_ip(1);
2181        assert!(manager.should_trigger_detection(&ip));
2182    }
2183
2184    // ========================================================================
2185    // Background Worker Lifecycle Tests
2186    // ========================================================================
2187
2188    #[tokio::test]
2189    async fn test_background_worker_lifecycle() {
2190        let config = ManagerConfig {
2191            scan_interval: Duration::from_millis(50),
2192            background_scanning: true,
2193            shared_threshold: 3,
2194            ..Default::default()
2195        };
2196        let manager = Arc::new(CampaignManager::with_config(config));
2197
2198        // Register some fingerprints
2199        for i in 1..=3 {
2200            let ip = create_test_ip(i);
2201            manager.register_ja4(ip, "worker_test_fp".to_string());
2202        }
2203
2204        // Start worker
2205        let worker = Arc::clone(&manager).start_background_worker();
2206
2207        // Wait for a few cycles
2208        tokio::time::sleep(Duration::from_millis(200)).await;
2209
2210        // Verify detection ran
2211        let stats = manager.stats();
2212        assert!(stats.detections_run >= 1);
2213
2214        // Signal shutdown
2215        manager.shutdown();
2216
2217        // Worker should complete
2218        let timeout = tokio::time::timeout(Duration::from_millis(500), worker).await;
2219        assert!(timeout.is_ok(), "Worker should shut down gracefully");
2220    }
2221
2222    #[tokio::test]
2223    async fn test_shutdown_flag() {
2224        let manager = CampaignManager::new();
2225
2226        assert!(!manager.is_shutdown());
2227
2228        manager.shutdown();
2229
2230        assert!(manager.is_shutdown());
2231    }
2232
2233    // ========================================================================
2234    // Integration Tests
2235    // ========================================================================
2236
2237    #[tokio::test]
2238    async fn test_full_flow() {
2239        let manager = create_test_manager();
2240
2241        // Phase 1: Register fingerprints from multiple IPs
2242        let fingerprint = "t13d1516h2_full_flow_test";
2243        for i in 1..=5 {
2244            let ip = create_test_ip(i);
2245            manager.register_ja4(ip, fingerprint.to_string());
2246        }
2247
2248        // Phase 2: Run detection
2249        let updates = manager.run_detection_cycle().await.unwrap();
2250        assert!(updates >= 1);
2251
2252        // Phase 3: Verify campaign was created
2253        let campaigns = manager.get_campaigns();
2254        assert_eq!(campaigns.len(), 1);
2255
2256        let campaign = &campaigns[0];
2257        assert_eq!(campaign.actor_count, 5);
2258        assert!(campaign.confidence >= 0.8);
2259        assert!(!campaign.correlation_reasons.is_empty());
2260
2261        // Phase 4: Get campaign by ID
2262        let retrieved = manager.get_campaign(&campaign.id).unwrap();
2263        assert_eq!(retrieved.actors.len(), 5);
2264
2265        // Phase 5: Get actors
2266        let actors = manager.get_campaign_actors(&campaign.id);
2267        assert_eq!(actors.len(), 5);
2268
2269        // Phase 6: Remove an IP
2270        manager.remove_ip(&create_test_ip(1));
2271        let updated = manager.get_campaign(&campaign.id).unwrap();
2272        assert_eq!(updated.actors.len(), 4);
2273
2274        // Phase 7: Verify stats
2275        let stats = manager.stats();
2276        assert_eq!(stats.fingerprints_registered, 5);
2277        assert_eq!(stats.campaigns_created, 1);
2278        assert_eq!(stats.campaign_stats.total_campaigns, 1);
2279    }
2280
2281    #[test]
2282    fn test_clear() {
2283        let manager = create_test_manager();
2284
2285        // Add some data
2286        for i in 1..=5 {
2287            let ip = create_test_ip(i);
2288            manager.register_ja4(ip, "clear_test_fp".to_string());
2289        }
2290
2291        assert_eq!(manager.index.len(), 5);
2292
2293        // Clear
2294        manager.clear();
2295
2296        assert_eq!(manager.index.len(), 0);
2297        assert!(manager.store.is_empty());
2298    }
2299
2300    #[tokio::test]
2301    async fn test_resolve_campaign() {
2302        let manager = create_test_manager();
2303
2304        // Create a campaign
2305        for i in 1..=3 {
2306            let ip = create_test_ip(i);
2307            manager.register_ja4(ip, "resolve_test_fp".to_string());
2308        }
2309        manager.run_detection_cycle().await.unwrap();
2310
2311        let campaigns = manager.get_campaigns();
2312        let campaign_id = campaigns[0].id.clone();
2313
2314        // Resolve the campaign
2315        let result = manager.resolve_campaign(&campaign_id, "Threat mitigated");
2316        assert!(result.is_ok());
2317
2318        // Verify campaign is resolved
2319        let resolved = manager.get_campaign(&campaign_id).unwrap();
2320        assert_eq!(resolved.status, CampaignStatus::Resolved);
2321
2322        // Active campaigns should now be empty
2323        let active = manager.get_campaigns();
2324        assert!(active.is_empty());
2325    }
2326
2327    #[test]
2328    fn test_index_and_store_access() {
2329        let manager = create_test_manager();
2330
2331        // Verify we can access internal components
2332        let _index = manager.index();
2333        let _store = manager.store();
2334        let _config = manager.config();
2335
2336        // These should not panic
2337        assert!(manager.index().is_empty());
2338        assert!(manager.store().is_empty());
2339    }
2340
2341    // ========================================================================
2342    // Edge Cases
2343    // ========================================================================
2344
2345    #[test]
2346    fn test_ipv6_addresses() {
2347        let manager = create_test_manager();
2348
2349        let ipv6_1: IpAddr = "2001:db8::1".parse().unwrap();
2350        let ipv6_2: IpAddr = "2001:db8::2".parse().unwrap();
2351        let ipv6_3: IpAddr = "2001:db8::3".parse().unwrap();
2352
2353        manager.register_ja4(ipv6_1, "ipv6_fp".to_string());
2354        manager.register_ja4(ipv6_2, "ipv6_fp".to_string());
2355        manager.register_ja4(ipv6_3, "ipv6_fp".to_string());
2356
2357        let stats = manager.stats();
2358        assert_eq!(stats.fingerprints_registered, 3);
2359        assert_eq!(stats.index_stats.total_ips, 3);
2360    }
2361
2362    #[test]
2363    fn test_default_trait() {
2364        let manager = CampaignManager::default();
2365
2366        assert!(manager.index.is_empty());
2367        assert!(manager.store.is_empty());
2368        assert!(!manager.is_shutdown());
2369    }
2370
2371    #[tokio::test]
2372    async fn test_multiple_fingerprint_groups() {
2373        let manager = create_test_manager();
2374
2375        // Group 1: 3 IPs with fingerprint A
2376        for i in 1..=3 {
2377            let ip = create_test_ip(i);
2378            manager.register_ja4(ip, "group_a_fp".to_string());
2379        }
2380
2381        // Group 2: 4 IPs with fingerprint B
2382        for i in 10..=13 {
2383            let ip = create_test_ip(i);
2384            manager.register_ja4(ip, "group_b_fp".to_string());
2385        }
2386
2387        manager.run_detection_cycle().await.unwrap();
2388
2389        let campaigns = manager.get_campaigns();
2390        assert_eq!(campaigns.len(), 2);
2391
2392        // Verify both groups created campaigns
2393        let actor_counts: Vec<usize> = campaigns.iter().map(|c| c.actor_count).collect();
2394        assert!(actor_counts.contains(&3));
2395        assert!(actor_counts.contains(&4));
2396    }
2397}