1mod client;
28mod config;
29mod protocol;
30mod rate_limiter;
31
32pub use client::{ShadowClientStats, ShadowMirrorClient, ShadowMirrorError};
33pub use config::{ShadowConfigError, ShadowMirrorConfig};
34pub use protocol::{is_sensitive_header, MirrorPayload};
35pub use rate_limiter::{RateLimiter, RateLimiterStats};
36
37use std::collections::HashMap;
38use std::sync::atomic::{AtomicU64, Ordering};
39use std::sync::Arc;
40use tokio::sync::Semaphore;
41use tracing::{debug, info, warn};
42
43const DEFAULT_MAX_CONCURRENT_MIRRORS: usize = 100;
45
46pub struct ShadowMirrorManager {
51 config: ShadowMirrorConfig,
53 rate_limiter: Arc<RateLimiter>,
55 client: Arc<ShadowMirrorClient>,
57 sensor_id: String,
59 mirror_semaphore: Arc<Semaphore>,
61 max_concurrent: usize,
63 attempts: AtomicU64,
65 skipped_risk: AtomicU64,
67 skipped_sampling: AtomicU64,
69 skipped_rate_limit: AtomicU64,
71 dropped_queue_full: AtomicU64,
73 sent: AtomicU64,
75}
76
77impl ShadowMirrorManager {
78 pub fn new(config: ShadowMirrorConfig, sensor_id: String) -> Result<Self, ShadowMirrorError> {
83 Self::with_max_concurrent(config, sensor_id, DEFAULT_MAX_CONCURRENT_MIRRORS)
84 }
85
86 pub fn with_max_concurrent(
96 config: ShadowMirrorConfig,
97 sensor_id: String,
98 max_concurrent: usize,
99 ) -> Result<Self, ShadowMirrorError> {
100 let rate_limiter = Arc::new(RateLimiter::new(config.per_ip_rate_limit));
101 let client = Arc::new(ShadowMirrorClient::new(
102 config.hmac_secret.clone(),
103 config.timeout(),
104 )?);
105 let mirror_semaphore = Arc::new(Semaphore::new(max_concurrent));
106
107 info!(
108 enabled = config.enabled,
109 min_risk = config.min_risk_score,
110 max_risk = config.max_risk_score,
111 sampling = config.sampling_rate,
112 per_ip_limit = config.per_ip_rate_limit,
113 honeypots = config.honeypot_urls.len(),
114 max_concurrent = max_concurrent,
115 "Shadow mirror manager initialized with bounded queue"
116 );
117
118 Ok(Self {
119 config,
120 rate_limiter,
121 client,
122 sensor_id,
123 mirror_semaphore,
124 max_concurrent,
125 attempts: AtomicU64::new(0),
126 skipped_risk: AtomicU64::new(0),
127 skipped_sampling: AtomicU64::new(0),
128 skipped_rate_limit: AtomicU64::new(0),
129 dropped_queue_full: AtomicU64::new(0),
130 sent: AtomicU64::new(0),
131 })
132 }
133
134 pub fn should_mirror(&self, risk_score: f32, client_ip: &str) -> bool {
143 if !self.config.enabled {
144 return false;
145 }
146
147 self.attempts.fetch_add(1, Ordering::Relaxed);
148
149 if risk_score < self.config.min_risk_score {
151 self.skipped_risk.fetch_add(1, Ordering::Relaxed);
152 debug!(
153 risk = risk_score,
154 min = self.config.min_risk_score,
155 "Skipping mirror: risk below threshold"
156 );
157 return false;
158 }
159
160 if risk_score >= self.config.max_risk_score {
161 self.skipped_risk.fetch_add(1, Ordering::Relaxed);
162 debug!(
163 risk = risk_score,
164 max = self.config.max_risk_score,
165 "Skipping mirror: risk above threshold (will be blocked)"
166 );
167 return false;
168 }
169
170 if self.config.sampling_rate < 1.0 && fastrand::f32() > self.config.sampling_rate {
172 self.skipped_sampling.fetch_add(1, Ordering::Relaxed);
173 debug!(
174 sampling = self.config.sampling_rate,
175 "Skipping mirror: not selected by sampling"
176 );
177 return false;
178 }
179
180 if !self.rate_limiter.check_and_increment(client_ip) {
182 self.skipped_rate_limit.fetch_add(1, Ordering::Relaxed);
183 debug!(
184 ip = client_ip,
185 limit = self.config.per_ip_rate_limit,
186 "Skipping mirror: per-IP rate limit exceeded"
187 );
188 return false;
189 }
190
191 true
192 }
193
194 pub fn mirror_async(&self, payload: MirrorPayload) -> bool {
204 let permit = match self.mirror_semaphore.clone().try_acquire_owned() {
206 Ok(permit) => permit,
207 Err(_) => {
208 self.dropped_queue_full.fetch_add(1, Ordering::Relaxed);
210 debug!(
211 request_id = %payload.request_id,
212 max_concurrent = self.max_concurrent,
213 "Shadow mirror dropped: queue full (backpressure)"
214 );
215 return false;
216 }
217 };
218
219 let client = Arc::clone(&self.client);
220 let urls = self.config.honeypot_urls.clone();
221 let timeout = self.config.timeout();
222 let request_id = payload.request_id.clone();
223
224 self.sent.fetch_add(1, Ordering::Relaxed);
225
226 tokio::spawn(async move {
228 let _permit = permit;
230
231 if let Err(e) = client.send_to_honeypot(&urls, payload, timeout).await {
232 warn!(
234 request_id = %request_id,
235 error = %e,
236 "Shadow mirror delivery failed"
237 );
238 }
239 });
241
242 true
243 }
244
245 #[allow(clippy::too_many_arguments)]
261 pub fn create_payload(
262 &self,
263 request_id: String,
264 source_ip: String,
265 method: String,
266 uri: String,
267 site_name: String,
268 risk_score: f32,
269 matched_rules: Vec<String>,
270 ja4: Option<String>,
271 ja4h: Option<String>,
272 campaign_id: Option<String>,
273 headers: HashMap<String, String>,
274 body: Option<String>,
275 ) -> MirrorPayload {
276 let filtered_headers: HashMap<String, String> = headers
278 .into_iter()
279 .filter(|(k, _)| {
280 self.config
281 .include_headers
282 .iter()
283 .any(|h| h.eq_ignore_ascii_case(k))
284 })
285 .collect();
286
287 let body = if self.config.include_body {
289 body.map(|b| {
290 if b.len() > self.config.max_body_size {
291 b[..self.config.max_body_size].to_string()
292 } else {
293 b
294 }
295 })
296 } else {
297 None
298 };
299
300 MirrorPayload::new(
301 request_id,
302 source_ip,
303 risk_score,
304 method,
305 uri,
306 site_name,
307 self.sensor_id.clone(),
308 )
309 .with_ja4(ja4)
310 .with_ja4h(ja4h)
311 .with_rules(matched_rules)
312 .with_campaign(campaign_id)
313 .with_headers(filtered_headers)
314 .with_body(body)
315 }
316
317 pub fn cleanup(&self) {
321 self.rate_limiter.cleanup();
322 }
323
324 pub fn stats(&self) -> ShadowMirrorStats {
326 let client_stats = self.client.stats();
327 let rate_limiter_stats = self.rate_limiter.stats();
328
329 ShadowMirrorStats {
330 enabled: self.config.enabled,
331 attempts: self.attempts.load(Ordering::Relaxed),
332 skipped_risk: self.skipped_risk.load(Ordering::Relaxed),
333 skipped_sampling: self.skipped_sampling.load(Ordering::Relaxed),
334 skipped_rate_limit: self.skipped_rate_limit.load(Ordering::Relaxed),
335 dropped_queue_full: self.dropped_queue_full.load(Ordering::Relaxed),
336 sent: self.sent.load(Ordering::Relaxed),
337 delivery_successes: client_stats.successes,
338 delivery_failures: client_stats.failures,
339 bytes_sent: client_stats.bytes_sent,
340 tracked_ips: rate_limiter_stats.tracked_ips,
341 max_concurrent: self.max_concurrent,
342 queue_available: self.mirror_semaphore.available_permits(),
343 min_risk_score: self.config.min_risk_score,
344 max_risk_score: self.config.max_risk_score,
345 sampling_rate: self.config.sampling_rate,
346 per_ip_rate_limit: self.config.per_ip_rate_limit,
347 honeypot_count: self.config.honeypot_urls.len(),
348 }
349 }
350
351 pub fn reset_stats(&self) {
353 self.attempts.store(0, Ordering::Relaxed);
354 self.skipped_risk.store(0, Ordering::Relaxed);
355 self.skipped_sampling.store(0, Ordering::Relaxed);
356 self.skipped_rate_limit.store(0, Ordering::Relaxed);
357 self.dropped_queue_full.store(0, Ordering::Relaxed);
358 self.sent.store(0, Ordering::Relaxed);
359 self.client.reset_stats();
360 self.rate_limiter.reset();
361 }
362
363 pub fn is_enabled(&self) -> bool {
365 self.config.enabled
366 }
367
368 pub fn config(&self) -> &ShadowMirrorConfig {
370 &self.config
371 }
372}
373
374#[derive(Debug, Clone, serde::Serialize)]
376pub struct ShadowMirrorStats {
377 pub enabled: bool,
379 pub attempts: u64,
381 pub skipped_risk: u64,
383 pub skipped_sampling: u64,
385 pub skipped_rate_limit: u64,
387 pub dropped_queue_full: u64,
389 pub sent: u64,
391 pub delivery_successes: u64,
393 pub delivery_failures: u64,
395 pub bytes_sent: u64,
397 pub tracked_ips: usize,
399 pub max_concurrent: usize,
401 pub queue_available: usize,
403 pub min_risk_score: f32,
405 pub max_risk_score: f32,
407 pub sampling_rate: f32,
409 pub per_ip_rate_limit: u32,
411 pub honeypot_count: usize,
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418
419 fn create_test_config() -> ShadowMirrorConfig {
420 ShadowMirrorConfig {
421 enabled: true,
422 min_risk_score: 40.0,
423 max_risk_score: 70.0,
424 honeypot_urls: vec!["http://localhost:8888/mirror".to_string()],
425 sampling_rate: 1.0,
426 per_ip_rate_limit: 10,
427 timeout_secs: 5,
428 hmac_secret: None,
429 include_body: true,
430 max_body_size: 1024,
431 include_headers: vec!["User-Agent".to_string()],
432 }
433 }
434
435 fn create_test_manager() -> ShadowMirrorManager {
436 ShadowMirrorManager::new(create_test_config(), "sensor-01".to_string())
437 .expect("test manager creation should succeed")
438 }
439
440 #[test]
441 fn test_should_mirror_in_risk_window() {
442 let manager = create_test_manager();
443
444 assert!(manager.should_mirror(45.0, "192.168.1.1"));
446 assert!(manager.should_mirror(50.0, "192.168.1.2"));
447 assert!(manager.should_mirror(69.9, "192.168.1.3"));
448 }
449
450 #[test]
451 fn test_should_not_mirror_below_min() {
452 let manager = create_test_manager();
453
454 assert!(!manager.should_mirror(10.0, "192.168.1.1"));
455 assert!(!manager.should_mirror(39.9, "192.168.1.2"));
456 }
457
458 #[test]
459 fn test_should_not_mirror_above_max() {
460 let manager = create_test_manager();
461
462 assert!(!manager.should_mirror(70.0, "192.168.1.1"));
463 assert!(!manager.should_mirror(85.0, "192.168.1.2"));
464 assert!(!manager.should_mirror(100.0, "192.168.1.3"));
465 }
466
467 #[test]
468 fn test_should_not_mirror_when_disabled() {
469 let mut config = create_test_config();
470 config.enabled = false;
471 let manager = ShadowMirrorManager::new(config, "sensor-01".to_string())
472 .expect("manager creation should succeed");
473
474 assert!(!manager.should_mirror(50.0, "192.168.1.1"));
475 }
476
477 #[test]
478 fn test_rate_limiting() {
479 let mut config = create_test_config();
480 config.per_ip_rate_limit = 3;
481 let manager = ShadowMirrorManager::new(config, "sensor-01".to_string())
482 .expect("manager creation should succeed");
483
484 let ip = "10.0.0.1";
485 assert!(manager.should_mirror(50.0, ip));
486 assert!(manager.should_mirror(50.0, ip));
487 assert!(manager.should_mirror(50.0, ip));
488 assert!(!manager.should_mirror(50.0, ip));
490 }
491
492 #[test]
493 fn test_different_ips_independent() {
494 let mut config = create_test_config();
495 config.per_ip_rate_limit = 2;
496 let manager = ShadowMirrorManager::new(config, "sensor-01".to_string())
497 .expect("manager creation should succeed");
498
499 assert!(manager.should_mirror(50.0, "ip1"));
500 assert!(manager.should_mirror(50.0, "ip1"));
501 assert!(!manager.should_mirror(50.0, "ip1")); assert!(manager.should_mirror(50.0, "ip2"));
505 assert!(manager.should_mirror(50.0, "ip2"));
506 }
507
508 #[test]
509 fn test_sampling_rate() {
510 let mut config = create_test_config();
511 config.sampling_rate = 0.0; let manager = ShadowMirrorManager::new(config, "sensor-01".to_string())
513 .expect("manager creation should succeed");
514
515 for i in 0..100 {
517 assert!(!manager.should_mirror(50.0, &format!("ip{}", i)));
518 }
519 }
520
521 #[test]
522 fn test_create_payload() {
523 let manager = create_test_manager();
524
525 let mut headers = HashMap::new();
526 headers.insert("User-Agent".to_string(), "test-agent".to_string());
527 headers.insert("X-Custom".to_string(), "should-be-filtered".to_string());
528
529 let payload = manager.create_payload(
530 "req-123".to_string(),
531 "10.0.0.1".to_string(),
532 "POST".to_string(),
533 "/api/login".to_string(),
534 "example.com".to_string(),
535 55.0,
536 vec!["sqli-001".to_string()],
537 Some("ja4-fingerprint".to_string()),
538 None,
539 Some("campaign-123".to_string()),
540 headers,
541 Some("request body".to_string()),
542 );
543
544 assert_eq!(payload.request_id, "req-123");
545 assert_eq!(payload.source_ip, "10.0.0.1");
546 assert_eq!(payload.risk_score, 55.0);
547 assert_eq!(payload.sensor_id, "sensor-01");
548 assert!(payload.headers.contains_key("User-Agent"));
549 assert!(!payload.headers.contains_key("X-Custom")); }
551
552 #[test]
553 fn test_body_truncation() {
554 let mut config = create_test_config();
555 config.max_body_size = 10;
556 let manager = ShadowMirrorManager::new(config, "sensor-01".to_string())
557 .expect("manager creation should succeed");
558
559 let payload = manager.create_payload(
560 "req-123".to_string(),
561 "10.0.0.1".to_string(),
562 "POST".to_string(),
563 "/api".to_string(),
564 "site".to_string(),
565 50.0,
566 vec![],
567 None,
568 None,
569 None,
570 HashMap::new(),
571 Some("this is a very long body that should be truncated".to_string()),
572 );
573
574 assert_eq!(payload.body.unwrap().len(), 10);
575 }
576
577 #[test]
578 fn test_stats() {
579 let manager = create_test_manager();
580
581 manager.should_mirror(50.0, "ip1");
582 manager.should_mirror(50.0, "ip2");
583 manager.should_mirror(10.0, "ip3"); let stats = manager.stats();
586 assert!(stats.enabled);
587 assert_eq!(stats.attempts, 3);
588 assert_eq!(stats.skipped_risk, 1);
589 assert_eq!(stats.min_risk_score, 40.0);
590 assert_eq!(stats.max_risk_score, 70.0);
591 }
592
593 #[test]
594 fn test_reset_stats() {
595 let manager = create_test_manager();
596
597 manager.should_mirror(50.0, "ip1");
598 manager.should_mirror(50.0, "ip2");
599
600 manager.reset_stats();
601
602 let stats = manager.stats();
603 assert_eq!(stats.attempts, 0);
604 assert_eq!(stats.sent, 0);
605 }
606
607 #[test]
608 fn test_bounded_queue_default_concurrency() {
609 let manager = create_test_manager();
610
611 let stats = manager.stats();
612 assert_eq!(stats.max_concurrent, DEFAULT_MAX_CONCURRENT_MIRRORS);
613 assert_eq!(stats.queue_available, DEFAULT_MAX_CONCURRENT_MIRRORS);
614 }
615
616 #[test]
617 fn test_bounded_queue_custom_concurrency() {
618 let config = create_test_config();
619 let manager = ShadowMirrorManager::with_max_concurrent(config, "sensor-01".to_string(), 50)
620 .expect("manager creation should succeed");
621
622 let stats = manager.stats();
623 assert_eq!(stats.max_concurrent, 50);
624 assert_eq!(stats.queue_available, 50);
625 }
626
627 #[tokio::test]
628 async fn test_bounded_queue_backpressure() {
629 let config = create_test_config();
630 let manager = ShadowMirrorManager::with_max_concurrent(config, "sensor-01".to_string(), 2)
632 .expect("manager creation should succeed");
633
634 let payload1 = manager.create_payload(
636 "req-1".to_string(),
637 "10.0.0.1".to_string(),
638 "GET".to_string(),
639 "/test".to_string(),
640 "site".to_string(),
641 50.0,
642 vec![],
643 None,
644 None,
645 None,
646 HashMap::new(),
647 None,
648 );
649 let payload2 = manager.create_payload(
650 "req-2".to_string(),
651 "10.0.0.1".to_string(),
652 "GET".to_string(),
653 "/test".to_string(),
654 "site".to_string(),
655 50.0,
656 vec![],
657 None,
658 None,
659 None,
660 HashMap::new(),
661 None,
662 );
663 let payload3 = manager.create_payload(
664 "req-3".to_string(),
665 "10.0.0.1".to_string(),
666 "GET".to_string(),
667 "/test".to_string(),
668 "site".to_string(),
669 50.0,
670 vec![],
671 None,
672 None,
673 None,
674 HashMap::new(),
675 None,
676 );
677
678 assert!(manager.mirror_async(payload1));
680 assert!(manager.mirror_async(payload2));
681
682 assert!(!manager.mirror_async(payload3));
684
685 let stats = manager.stats();
686 assert_eq!(stats.sent, 2);
687 assert_eq!(stats.dropped_queue_full, 1);
688 }
689
690 #[test]
691 fn test_stats_includes_queue_metrics() {
692 let config = create_test_config();
693 let manager = ShadowMirrorManager::with_max_concurrent(config, "sensor-01".to_string(), 25)
694 .expect("manager creation should succeed");
695
696 let stats = manager.stats();
697 assert_eq!(stats.max_concurrent, 25);
698 assert_eq!(stats.queue_available, 25);
699 assert_eq!(stats.dropped_queue_full, 0);
700 }
701}