synapse-waf 0.9.0

High-performance WAF and reverse proxy with embedded intelligence — built on Cloudflare Pingora
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
//! Integration Tests for Multi-Detector Campaign Correlation
//!
//! This module tests the 7-detector campaign correlation system to verify:
//! - Multiple detectors can detect the same campaign
//! - Weighted scoring works correctly across detectors
//! - Detection cycle processes all detectors
//! - Campaign merging when multiple detectors find same actors
//! - Concurrent access safety

#![cfg(test)]

use std::net::IpAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};

use crate::correlation::detectors::{AttackPayload, TokenFingerprint};
use crate::correlation::{
    Campaign, CampaignManager, CorrelationReason, CorrelationType, ManagerConfig,
};

// ============================================================================
// Helper Functions
// ============================================================================

/// Create a test IP address from last octet
fn ip(last_octet: u8) -> IpAddr {
    format!("192.168.1.{}", last_octet).parse().unwrap()
}

/// Create a manager configured for testing with low thresholds
fn test_manager() -> CampaignManager {
    let config = ManagerConfig {
        // Lower thresholds for testing
        shared_threshold: 2,
        rotation_threshold: 2,
        rotation_window: Duration::from_secs(60),
        scan_interval: Duration::from_millis(100),
        background_scanning: false,
        track_combined: true,
        shared_confidence: 0.85,
        // Attack sequence detector (weight: 50)
        attack_sequence_min_ips: 2,
        attack_sequence_window: Duration::from_secs(300),
        // Auth token detector (weight: 45)
        auth_token_min_ips: 2,
        auth_token_window: Duration::from_secs(600),
        // Behavioral similarity detector (weight: 30)
        behavioral_min_ips: 2,
        behavioral_min_sequence: 3,
        behavioral_window: Duration::from_secs(300),
        // Timing correlation detector (weight: 25)
        timing_min_ips: 2,
        timing_bucket_ms: 100,
        timing_min_bucket_hits: 3,
        timing_window: Duration::from_secs(60),
        // Network proximity detector (weight: 15)
        network_min_ips: 2,
        network_check_subnet: true,
        // Graph correlation detector (weight: 20)
        graph_min_component_size: 3,
        graph_max_depth: 3,
        graph_edge_ttl: Duration::from_secs(3600),
        // Automated Response
        auto_mitigation_enabled: false,
        auto_mitigation_threshold: 0.90,
    };
    CampaignManager::with_config(config)
}

/// Create a mock JWT string
fn mock_jwt() -> String {
    // Simple mock JWT with header.payload.signature structure
    "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwiaXNzIjoiaHR0cHM6Ly9leGFtcGxlLmNvbSJ9.signature".to_string()
}

/// Create a mock attack payload
fn attack_payload(payload_hash: &str, attack_type: &str) -> AttackPayload {
    AttackPayload {
        payload_hash: payload_hash.to_string(),
        attack_type: attack_type.to_string(),
        target_path: "/api/vulnerable".to_string(),
        timestamp: Instant::now(),
    }
}

/// Allow constructing TokenFingerprint if needed in future tests
#[allow(dead_code)]
fn token_fingerprint(issuer: &str) -> TokenFingerprint {
    TokenFingerprint {
        issuer: Some(issuer.to_string()),
        algorithm: "RS256".to_string(),
        header_fields: vec!["alg".to_string(), "typ".to_string()],
        payload_fields: vec!["sub".to_string(), "iss".to_string()],
    }
}

// ============================================================================
// Test: Multi-Detector Campaign Detection
// ============================================================================

/// Verifies that multiple detectors can detect activity from the same set of IPs
/// and that their correlation reasons are properly aggregated.
#[tokio::test]
async fn test_multi_detector_campaign_detection() {
    let manager = test_manager();

    // Setup: 5 IPs exhibiting coordinated behavior across multiple signals
    let ips: Vec<IpAddr> = (1..=5).map(ip).collect();

    // Signal 1: Same TLS fingerprint (TlsFingerprint detector - weight: 35)
    let tls_fingerprint = "t13d1516h2_shared_campaign_fp".to_string();
    for &test_ip in &ips {
        manager.register_ja4(test_ip, tls_fingerprint.clone());
    }

    // Signal 2: Same attack payload (AttackSequence detector - weight: 50)
    let shared_payload = attack_payload("sqli_probe_hash_abc123", "sqli");
    for &test_ip in &ips {
        manager.record_attack(
            test_ip,
            shared_payload.payload_hash.clone(),
            shared_payload.attack_type.clone(),
            shared_payload.target_path.clone(),
        );
    }

    // Signal 3: Same JWT structure (AuthToken detector - weight: 45)
    let jwt = mock_jwt();
    for &test_ip in &ips {
        manager.record_token(test_ip, &jwt);
    }

    // Signal 4: Same behavioral pattern (BehavioralSimilarity detector - weight: 30)
    for &test_ip in &ips {
        manager.record_request(test_ip, "GET", "/");
        manager.record_request(test_ip, "GET", "/api/users");
        manager.record_request(test_ip, "POST", "/api/login");
    }

    // Run detection cycle
    let updates = manager.run_detection_cycle().await.unwrap();

    // Verify: At least one detection was made
    assert!(updates > 0, "Expected at least one campaign update");

    // Verify campaigns were created
    let campaigns = manager.get_campaigns();
    assert!(
        !campaigns.is_empty(),
        "Expected at least one campaign to be created"
    );

    // Verify that we have detections from multiple detector types
    let stats = manager.stats();
    let detections_by_type = &stats.detections_by_type;

    // We should have detections from at least 2 different detector types
    let detector_types_with_detections: Vec<&String> = detections_by_type
        .iter()
        .filter(|(_, &count)| count > 0)
        .map(|(name, _)| name)
        .collect();

    assert!(
        detector_types_with_detections.len() >= 1,
        "Expected detections from multiple detector types, got: {:?}",
        detector_types_with_detections
    );
}

// ============================================================================
// Test: Weighted Scoring Aggregation
// ============================================================================

/// Verifies that campaign scores are calculated using correct weights:
/// - AttackSequence: 50
/// - AuthToken: 45
/// - HttpFingerprint: 40
/// - TlsFingerprint: 35
/// - BehavioralSimilarity: 30
/// - TimingCorrelation: 25
/// - NetworkProximity: 15
#[tokio::test]
async fn test_weighted_scoring_aggregation() {
    // Create a campaign with correlation reasons from multiple detectors
    let mut campaign = Campaign::new(
        "test-weighted-scoring".to_string(),
        vec!["192.168.1.1".to_string(), "192.168.1.2".to_string()],
        0.5,
    );

    // Add correlation reasons from different detectors with varying confidence
    campaign.correlation_reasons.push(CorrelationReason::new(
        CorrelationType::AttackSequence,
        0.95,
        "Same SQLi payload",
        vec!["192.168.1.1".to_string(), "192.168.1.2".to_string()],
    ));

    campaign.correlation_reasons.push(CorrelationReason::new(
        CorrelationType::AuthToken,
        0.90,
        "Same JWT issuer",
        vec!["192.168.1.1".to_string(), "192.168.1.2".to_string()],
    ));

    campaign.correlation_reasons.push(CorrelationReason::new(
        CorrelationType::TlsFingerprint,
        0.85,
        "Same JA4 fingerprint",
        vec!["192.168.1.1".to_string(), "192.168.1.2".to_string()],
    ));

    // Verify weight values are correct
    assert_eq!(CorrelationType::AttackSequence.weight(), 50);
    assert_eq!(CorrelationType::AuthToken.weight(), 45);
    assert_eq!(CorrelationType::HttpFingerprint.weight(), 40);
    assert_eq!(CorrelationType::TlsFingerprint.weight(), 35);
    assert_eq!(CorrelationType::BehavioralSimilarity.weight(), 30);
    assert_eq!(CorrelationType::TimingCorrelation.weight(), 25);
    assert_eq!(CorrelationType::NetworkProximity.weight(), 15);

    // Calculate expected score
    // Score = sum(weight * confidence) / num_reasons
    // = (50 * 0.95 + 45 * 0.90 + 35 * 0.85) / 3
    // = (47.5 + 40.5 + 29.75) / 3
    // = 117.75 / 3
    // = 39.25
    let manager = test_manager();
    let calculated_score = manager.calculate_campaign_score(&campaign);

    let expected_score = (50.0 * 0.95 + 45.0 * 0.90 + 35.0 * 0.85) / 3.0;
    let tolerance = 0.01;

    assert!(
        (calculated_score - expected_score).abs() < tolerance,
        "Score mismatch: calculated={:.4}, expected={:.4}",
        calculated_score,
        expected_score
    );

    // Verify score ordering matches weight ordering
    let attack_only_score = 50.0 * 0.95; // 47.5
    let network_only_score = 15.0 * 0.95; // 14.25
    assert!(
        attack_only_score > network_only_score,
        "AttackSequence (50) should contribute more than NetworkProximity (15)"
    );
}

/// Verifies all correlation types are returned in correct weight order
#[test]
fn test_correlation_type_weight_ordering() {
    let all_types = CorrelationType::all_by_weight();

    assert_eq!(all_types.len(), 7, "Should have 7 correlation types");

    // Verify order is highest to lowest weight
    assert_eq!(all_types[0], CorrelationType::AttackSequence);
    assert_eq!(all_types[1], CorrelationType::AuthToken);
    assert_eq!(all_types[2], CorrelationType::HttpFingerprint);
    assert_eq!(all_types[3], CorrelationType::TlsFingerprint);
    assert_eq!(all_types[4], CorrelationType::BehavioralSimilarity);
    assert_eq!(all_types[5], CorrelationType::TimingCorrelation);
    assert_eq!(all_types[6], CorrelationType::NetworkProximity);

    // Verify weights are in descending order
    for i in 0..all_types.len() - 1 {
        assert!(
            all_types[i].weight() > all_types[i + 1].weight(),
            "Weight ordering violated at index {}: {} ({}) should be > {} ({})",
            i,
            all_types[i],
            all_types[i].weight(),
            all_types[i + 1],
            all_types[i + 1].weight()
        );
    }
}

// ============================================================================
// Test: Detection Cycle Runs All Detectors
// ============================================================================

/// Verifies that a detection cycle runs all 7 detectors and processes their results
#[tokio::test]
async fn test_detection_cycle_runs_all_detectors() {
    let manager = test_manager();

    // Setup data for each detector type
    let test_ips: Vec<IpAddr> = (1..=5).map(ip).collect();

    // Data for TLS Fingerprint detector (weight: 35)
    for &test_ip in &test_ips {
        manager.register_ja4(test_ip, "shared_tls_fp".to_string());
    }

    // Data for Attack Sequence detector (weight: 50)
    for &test_ip in &test_ips {
        manager.record_attack(
            test_ip,
            "attack_hash_xyz".to_string(),
            "sqli".to_string(),
            "/api/data".to_string(),
        );
    }

    // Data for Auth Token detector (weight: 45)
    for &test_ip in &test_ips {
        manager.record_token(test_ip, &mock_jwt());
    }

    // Data for Behavioral detector (weight: 30) - needs min_sequence_length requests
    for &test_ip in &test_ips {
        manager.record_request(test_ip, "GET", "/home");
        manager.record_request(test_ip, "GET", "/products");
        manager.record_request(test_ip, "POST", "/cart");
    }

    // Data for Network Proximity detector (weight: 15)
    // All test_ips are in 192.168.1.0/24 subnet by default

    // Initial state check
    let stats_before = manager.stats();
    assert_eq!(stats_before.detections_run, 0);

    // Run detection cycle
    let updates = manager.run_detection_cycle().await.unwrap();

    // Verify detection cycle was counted
    let stats_after = manager.stats();
    assert_eq!(stats_after.detections_run, 1);
    assert!(stats_after.last_scan.is_some());

    // Verify at least some detections were made
    assert!(updates > 0, "Expected detection updates from cycle");

    // Verify stats show detections from multiple detector types
    let detection_stats = &stats_after.detections_by_type;
    let active_detectors: Vec<&String> = detection_stats
        .iter()
        .filter(|(_, &count)| count > 0)
        .map(|(name, _)| name)
        .collect();

    // Log active detectors for debugging
    println!("Active detectors with detections: {:?}", active_detectors);
    println!("Detection stats: {:?}", detection_stats);

    // We should have at least 1 detector with detections
    assert!(
        !active_detectors.is_empty(),
        "Expected at least one detector to produce detections"
    );
}

// ============================================================================
// Test: Detector Independence
// ============================================================================

/// Verifies that detectors operate independently and disabling one doesn't affect others
#[tokio::test]
async fn test_detector_independence() {
    // Test with network proximity disabled
    let config_no_network = ManagerConfig {
        shared_threshold: 2,
        rotation_threshold: 2,
        rotation_window: Duration::from_secs(60),
        scan_interval: Duration::from_millis(100),
        background_scanning: false,
        network_check_subnet: false, // Disable network proximity
        ..Default::default()
    };
    let manager_no_network = CampaignManager::with_config(config_no_network);

    // Test with network proximity enabled
    let config_with_network = ManagerConfig {
        shared_threshold: 2,
        rotation_threshold: 2,
        rotation_window: Duration::from_secs(60),
        scan_interval: Duration::from_millis(100),
        background_scanning: false,
        network_check_subnet: true, // Enable network proximity
        ..Default::default()
    };
    let manager_with_network = CampaignManager::with_config(config_with_network);

    // Same test data for both
    let test_ips: Vec<IpAddr> = (1..=5).map(ip).collect();

    // Register same data to both managers
    for &test_ip in &test_ips {
        // TLS fingerprint - should work in both
        manager_no_network.register_ja4(test_ip, "detector_independence_test".to_string());
        manager_with_network.register_ja4(test_ip, "detector_independence_test".to_string());

        // Behavioral data - should work in both
        manager_no_network.record_request(test_ip, "GET", "/");
        manager_no_network.record_request(test_ip, "GET", "/api");
        manager_no_network.record_request(test_ip, "POST", "/submit");

        manager_with_network.record_request(test_ip, "GET", "/");
        manager_with_network.record_request(test_ip, "GET", "/api");
        manager_with_network.record_request(test_ip, "POST", "/submit");
    }

    // Run detection cycles
    let updates_no_network = manager_no_network.run_detection_cycle().await.unwrap();
    let updates_with_network = manager_with_network.run_detection_cycle().await.unwrap();

    // Both should have some detections from other detectors
    assert!(
        updates_no_network > 0,
        "Manager without network detector should still have detections"
    );
    assert!(
        updates_with_network > 0,
        "Manager with network detector should have detections"
    );

    // Verify network detector stats differ between the two
    let stats_no_network = manager_no_network.stats();
    let stats_with_network = manager_with_network.stats();

    let network_detections_disabled = stats_no_network
        .detections_by_type
        .get("network")
        .copied()
        .unwrap_or(0);

    // When network is disabled, its detection count should be 0
    assert_eq!(
        network_detections_disabled, 0,
        "Disabled network detector should not contribute detections"
    );

    // When network is enabled, we don't assert a specific count since
    // the IPs may or may not trigger network proximity based on subnet grouping
    let _network_detections_enabled = stats_with_network
        .detections_by_type
        .get("network")
        .copied()
        .unwrap_or(0);
}

/// Verifies that different detector thresholds work independently
#[tokio::test]
async fn test_detector_threshold_independence() {
    // Create manager with different thresholds for different detectors
    let config = ManagerConfig {
        shared_threshold: 3,        // HTTP fingerprint needs 3 IPs
        attack_sequence_min_ips: 2, // Attack sequence needs only 2 IPs
        auth_token_min_ips: 4,      // Auth token needs 4 IPs
        behavioral_min_ips: 2,      // Behavioral needs 2 IPs
        timing_min_ips: 5,          // Timing needs 5 IPs
        network_min_ips: 3,         // Network needs 3 IPs
        rotation_threshold: 3,
        rotation_window: Duration::from_secs(60),
        scan_interval: Duration::from_millis(100),
        background_scanning: false,
        ..Default::default()
    };
    let manager = CampaignManager::with_config(config);

    // Register 2 IPs - should trigger attack_sequence but not others with higher thresholds
    let two_ips: Vec<IpAddr> = (1..=2).map(ip).collect();

    for &test_ip in &two_ips {
        // Attack sequence - threshold 2, should detect
        manager.record_attack(
            test_ip,
            "threshold_test_hash".to_string(),
            "xss".to_string(),
            "/test".to_string(),
        );

        // TLS fingerprint - threshold 3, should NOT detect with 2 IPs
        manager.register_ja4(test_ip, "threshold_test_fp".to_string());

        // Auth token - threshold 4, should NOT detect with 2 IPs
        manager.record_token(test_ip, &mock_jwt());
    }

    // Run detection
    let _updates = manager.run_detection_cycle().await.unwrap();
    let stats = manager.stats();

    // Attack sequence should have detected (threshold 2, we have 2 IPs)
    let attack_detections = stats
        .detections_by_type
        .get("attack_sequence")
        .copied()
        .unwrap_or(0);
    assert!(
        attack_detections > 0,
        "Attack sequence should detect with 2 IPs (threshold: 2)"
    );

    // Auth token should NOT have detected (threshold 4, we have 2 IPs)
    let auth_detections = stats
        .detections_by_type
        .get("auth_token")
        .copied()
        .unwrap_or(0);
    assert_eq!(
        auth_detections, 0,
        "Auth token should not detect with 2 IPs (threshold: 4)"
    );
}

// ============================================================================
// Test: Concurrent Multi-Detector Access
// ============================================================================

/// Verifies that concurrent registration and detection doesn't cause deadlocks or corruption
#[tokio::test]
async fn test_concurrent_multi_detector_access() {
    let manager = Arc::new(test_manager());
    let mut handles = vec![];

    // Spawn multiple tasks registering different data types concurrently
    for task_id in 0..5 {
        let manager = Arc::clone(&manager);
        handles.push(tokio::spawn(async move {
            for i in 0..20 {
                let test_ip: IpAddr = format!("10.{}.{}.{}", task_id, i / 256, i % 256)
                    .parse()
                    .unwrap();

                // Register different types of data
                manager.register_ja4(test_ip, format!("concurrent_fp_{}", task_id));

                manager.record_attack(
                    test_ip,
                    format!("attack_{}", task_id),
                    "sqli".to_string(),
                    "/api".to_string(),
                );

                manager.record_token(test_ip, &mock_jwt());

                manager.record_request(test_ip, "GET", "/");
                manager.record_request(test_ip, "POST", "/api");
                manager.record_request(test_ip, "GET", "/done");

                // Small yield to encourage interleaving
                tokio::task::yield_now().await;
            }
        }));
    }

    // Spawn detection cycle tasks running concurrently with registrations
    for _ in 0..3 {
        let manager = Arc::clone(&manager);
        handles.push(tokio::spawn(async move {
            for _ in 0..5 {
                let _ = manager.run_detection_cycle().await;
                tokio::time::sleep(Duration::from_millis(10)).await;
            }
        }));
    }

    // Wait for all tasks to complete (with timeout)
    let timeout_result = tokio::time::timeout(Duration::from_secs(10), async {
        for handle in handles {
            handle.await.expect("Task panicked");
        }
    })
    .await;

    assert!(
        timeout_result.is_ok(),
        "Concurrent access test timed out - possible deadlock"
    );

    // Verify state is consistent after concurrent access
    let stats = manager.stats();
    assert!(
        stats.fingerprints_registered > 0,
        "Should have registered fingerprints"
    );
    assert!(stats.detections_run > 0, "Should have run detection cycles");

    // Verify no panics occurred and data structures are intact
    let campaigns = manager.get_campaigns();
    let all_campaigns = manager.get_all_campaigns();

    println!(
        "After concurrent test: {} active campaigns, {} total campaigns",
        campaigns.len(),
        all_campaigns.len()
    );
}

/// Verifies that concurrent read and write operations are safe
#[tokio::test]
async fn test_concurrent_read_write_operations() {
    let manager = Arc::new(test_manager());
    let num_writers = 4;
    let num_readers = 4;
    let operations_per_task = 50;

    let mut handles = vec![];

    // Writer tasks
    for writer_id in 0..num_writers {
        let manager = Arc::clone(&manager);
        handles.push(tokio::spawn(async move {
            for i in 0..operations_per_task {
                let test_ip: IpAddr = format!("10.{}.0.{}", writer_id, i).parse().unwrap();
                manager.register_ja4(test_ip, format!("writer_{}_fp", writer_id));
                manager.record_request(test_ip, "GET", "/");
                manager.record_request(test_ip, "GET", "/api");
                manager.record_request(test_ip, "POST", "/submit");
                tokio::task::yield_now().await;
            }
        }));
    }

    // Reader tasks
    for _reader_id in 0..num_readers {
        let manager = Arc::clone(&manager);
        handles.push(tokio::spawn(async move {
            for _ in 0..operations_per_task {
                let _ = manager.stats();
                let _ = manager.get_campaigns();
                let _ = manager.get_all_campaigns();
                tokio::task::yield_now().await;
            }
        }));
    }

    // Detection cycle tasks
    for _ in 0..2 {
        let manager = Arc::clone(&manager);
        handles.push(tokio::spawn(async move {
            for _ in 0..10 {
                let _ = manager.run_detection_cycle().await;
                tokio::time::sleep(Duration::from_millis(5)).await;
            }
        }));
    }

    // Wait for completion
    for handle in handles {
        handle.await.expect("Task failed");
    }

    // Final state should be consistent
    let final_stats = manager.stats();
    assert!(
        final_stats.fingerprints_registered > 0,
        "Should have data after concurrent operations"
    );
}

// ============================================================================
// Test: Campaign Merging
// ============================================================================

/// Verifies that when multiple detectors find the same actors, they're merged into one campaign
#[tokio::test]
async fn test_campaign_merging_same_actors() {
    let manager = test_manager();

    // Use the same 3 IPs for all signals
    let shared_ips: Vec<IpAddr> = (1..=3).map(ip).collect();

    // First signal: TLS fingerprint
    for &test_ip in &shared_ips {
        manager.register_ja4(test_ip, "merge_test_tls".to_string());
    }

    // Run first detection
    manager.run_detection_cycle().await.unwrap();
    let campaigns_after_first = manager.get_campaigns();

    // Second signal: Attack sequence
    for &test_ip in &shared_ips {
        manager.record_attack(
            test_ip,
            "merge_test_attack".to_string(),
            "sqli".to_string(),
            "/api".to_string(),
        );
    }

    // Run second detection
    manager.run_detection_cycle().await.unwrap();
    let campaigns_after_second = manager.get_campaigns();

    // Third signal: Behavioral pattern
    for &test_ip in &shared_ips {
        manager.record_request(test_ip, "GET", "/merge");
        manager.record_request(test_ip, "GET", "/test");
        manager.record_request(test_ip, "POST", "/data");
    }

    // Run third detection
    manager.run_detection_cycle().await.unwrap();
    let campaigns_final = manager.get_campaigns();

    // The same IPs should be in the same campaign(s) - not creating duplicates
    // We expect either 1 merged campaign or a small number of related campaigns
    println!(
        "Campaigns after first: {}, second: {}, final: {}",
        campaigns_after_first.len(),
        campaigns_after_second.len(),
        campaigns_final.len()
    );

    // Verify that at least one campaign contains all the IPs
    let _campaigns_with_all_ips = campaigns_final
        .iter()
        .filter(|c| {
            shared_ips
                .iter()
                .all(|ip| c.actors.contains(&ip.to_string()))
        })
        .count();

    // At least one campaign should contain our test IPs
    assert!(
        !campaigns_final.is_empty(),
        "Should have at least one campaign"
    );

    // Log campaign details for debugging
    for (i, campaign) in campaigns_final.iter().enumerate() {
        println!(
            "Campaign {}: {} actors, {} correlation reasons",
            i,
            campaign.actor_count,
            campaign.correlation_reasons.len()
        );
    }
}

// ============================================================================
// Test: Detection Stats Accuracy
// ============================================================================

/// Verifies that detection statistics are accurately tracked per detector
#[tokio::test]
async fn test_detection_stats_accuracy() {
    let manager = test_manager();

    // Initial stats should be zero
    let initial_stats = manager.stats();
    assert_eq!(initial_stats.detections_run, 0);
    assert_eq!(initial_stats.campaigns_created, 0);
    assert!(
        initial_stats.detections_by_type.is_empty()
            || initial_stats.detections_by_type.values().all(|&v| v == 0)
    );

    // Register data to trigger specific detectors
    let test_ips: Vec<IpAddr> = (1..=4).map(ip).collect();

    for &test_ip in &test_ips {
        // This should trigger TLS fingerprint detector
        manager.register_ja4(test_ip, "stats_test_fp".to_string());

        // This should trigger attack sequence detector
        manager.record_attack(
            test_ip,
            "stats_test_payload".to_string(),
            "xss".to_string(),
            "/".to_string(),
        );
    }

    // Run detection cycle
    manager.run_detection_cycle().await.unwrap();

    let stats_after = manager.stats();
    assert_eq!(
        stats_after.detections_run, 1,
        "Should count 1 detection cycle"
    );
    assert!(
        stats_after.campaigns_created > 0,
        "Should have created campaigns"
    );

    // Run another detection cycle
    manager.run_detection_cycle().await.unwrap();

    let stats_final = manager.stats();
    assert_eq!(
        stats_final.detections_run, 2,
        "Should count 2 detection cycles"
    );
}

// ============================================================================
// Test: Edge Cases
// ============================================================================

/// Verifies behavior with empty data
#[tokio::test]
async fn test_empty_data_handling() {
    let manager = test_manager();

    // Run detection with no data
    let updates = manager.run_detection_cycle().await.unwrap();
    assert_eq!(updates, 0, "Should have no updates with no data");

    let campaigns = manager.get_campaigns();
    assert!(
        campaigns.is_empty(),
        "Should have no campaigns with no data"
    );

    let stats = manager.stats();
    assert_eq!(stats.detections_run, 1, "Detection cycle should be counted");
    assert_eq!(stats.campaigns_created, 0, "No campaigns should be created");
}

/// Verifies behavior with single IP (below all thresholds)
#[tokio::test]
async fn test_single_ip_no_detection() {
    let manager = test_manager();
    let single_ip = ip(1);

    // Register all types of data for single IP
    manager.register_ja4(single_ip, "single_ip_fp".to_string());
    manager.record_attack(
        single_ip,
        "single_ip_attack".to_string(),
        "sqli".to_string(),
        "/".to_string(),
    );
    manager.record_token(single_ip, &mock_jwt());
    manager.record_request(single_ip, "GET", "/");
    manager.record_request(single_ip, "GET", "/api");
    manager.record_request(single_ip, "POST", "/submit");

    let updates = manager.run_detection_cycle().await.unwrap();

    // With only 1 IP, no detections should occur (all thresholds are >= 2)
    assert_eq!(updates, 0, "Single IP should not trigger any detections");
}

/// Verifies proper handling of mixed IPv4 and IPv6 addresses
#[tokio::test]
async fn test_mixed_ip_versions() {
    let manager = test_manager();

    let ipv4_1: IpAddr = "192.168.1.1".parse().unwrap();
    let ipv4_2: IpAddr = "192.168.1.2".parse().unwrap();
    let ipv6_1: IpAddr = "2001:db8::1".parse().unwrap();
    let ipv6_2: IpAddr = "2001:db8::2".parse().unwrap();

    let mixed_ips = vec![ipv4_1, ipv4_2, ipv6_1, ipv6_2];

    // Register same fingerprint for all IPs
    for test_ip in &mixed_ips {
        manager.register_ja4(*test_ip, "mixed_ip_fp".to_string());
    }

    let updates = manager.run_detection_cycle().await.unwrap();

    // Should detect across both IPv4 and IPv6
    assert!(updates > 0, "Should detect mixed IP campaign");

    let campaigns = manager.get_campaigns();
    assert!(
        !campaigns.is_empty(),
        "Should create campaign with mixed IPs"
    );

    // Verify campaign contains both address types
    if let Some(campaign) = campaigns.first() {
        let has_ipv4 = campaign.actors.iter().any(|a| !a.contains(':'));
        let has_ipv6 = campaign.actors.iter().any(|a| a.contains(':'));
        assert!(
            has_ipv4 && has_ipv6,
            "Campaign should contain both IPv4 and IPv6 addresses"
        );
    }
}