Skip to main content

synapse_pingora/shadow/
mod.rs

1//! Shadow Mirroring Module
2//!
3//! Provides zero-latency traffic mirroring for suspicious actors to honeypot endpoints.
4//!
5//! # Architecture
6//!
7//! ```text
8//! Request → Detection → Risk Score → Decision Point
9//!                          │
10//!                          ├── risk < min: Pass through (no mirror)
11//!                          ├── min ≤ risk < max: SHADOW MIRROR + Pass
12//!                          └── risk ≥ max: Block (no mirror needed)
13//!                                  │
14//!                                  ▼
15//!                          tokio::spawn() ──► Async HTTP POST to Honeypot
16//!                          (fire & forget)
17//! ```
18//!
19//! # Key Features
20//!
21//! - **Zero production latency**: Fire-and-forget async mirroring
22//! - **Per-IP rate limiting**: Prevents honeypot flooding
23//! - **Configurable thresholds**: Risk score window for mirroring
24//! - **HMAC signing**: Optional payload authentication
25//! - **Sampling**: Configurable percentage of eligible traffic
26
27mod client;
28mod config;
29mod protocol;
30mod rate_limiter;
31
32pub use client::{ShadowClientStats, ShadowMirrorClient, ShadowMirrorError};
33pub use config::{ShadowConfigError, ShadowMirrorConfig};
34pub use protocol::{is_sensitive_header, MirrorPayload};
35pub use rate_limiter::{RateLimiter, RateLimiterStats};
36
37use std::collections::HashMap;
38use std::sync::atomic::{AtomicU64, Ordering};
39use std::sync::Arc;
40use tokio::sync::Semaphore;
41use tracing::{debug, info, warn};
42
43/// Default maximum concurrent mirror operations
44const DEFAULT_MAX_CONCURRENT_MIRRORS: usize = 100;
45
46/// Manager for shadow mirroring operations.
47///
48/// Coordinates mirror decisions, rate limiting, and async delivery to honeypots.
49/// Uses a bounded queue (semaphore) to prevent memory exhaustion from slow honeypots.
50pub struct ShadowMirrorManager {
51    /// Shadow mirroring configuration
52    config: ShadowMirrorConfig,
53    /// Per-IP rate limiter
54    rate_limiter: Arc<RateLimiter>,
55    /// HTTP client for honeypot delivery
56    client: Arc<ShadowMirrorClient>,
57    /// Sensor ID for payload attribution
58    sensor_id: String,
59    /// Semaphore to bound concurrent mirror operations
60    mirror_semaphore: Arc<Semaphore>,
61    /// Maximum concurrent mirror operations
62    max_concurrent: usize,
63    /// Total mirror attempts
64    attempts: AtomicU64,
65    /// Mirrors skipped due to risk score
66    skipped_risk: AtomicU64,
67    /// Mirrors skipped due to sampling
68    skipped_sampling: AtomicU64,
69    /// Mirrors skipped due to rate limiting
70    skipped_rate_limit: AtomicU64,
71    /// Mirrors dropped due to queue full (backpressure)
72    dropped_queue_full: AtomicU64,
73    /// Mirrors sent successfully
74    sent: AtomicU64,
75}
76
77impl ShadowMirrorManager {
78    /// Creates a new shadow mirror manager with default concurrency limit.
79    ///
80    /// # Errors
81    /// Returns `ShadowMirrorError::ClientCreation` if the HTTP client cannot be created.
82    pub fn new(config: ShadowMirrorConfig, sensor_id: String) -> Result<Self, ShadowMirrorError> {
83        Self::with_max_concurrent(config, sensor_id, DEFAULT_MAX_CONCURRENT_MIRRORS)
84    }
85
86    /// Creates a new shadow mirror manager with a custom concurrency limit.
87    ///
88    /// # Arguments
89    /// * `config` - Shadow mirror configuration
90    /// * `sensor_id` - Sensor ID for payload attribution
91    /// * `max_concurrent` - Maximum concurrent mirror operations (prevents memory exhaustion)
92    ///
93    /// # Errors
94    /// Returns `ShadowMirrorError::ClientCreation` if the HTTP client cannot be created.
95    pub fn with_max_concurrent(
96        config: ShadowMirrorConfig,
97        sensor_id: String,
98        max_concurrent: usize,
99    ) -> Result<Self, ShadowMirrorError> {
100        let rate_limiter = Arc::new(RateLimiter::new(config.per_ip_rate_limit));
101        let client = Arc::new(ShadowMirrorClient::new(
102            config.hmac_secret.clone(),
103            config.timeout(),
104        )?);
105        let mirror_semaphore = Arc::new(Semaphore::new(max_concurrent));
106
107        info!(
108            enabled = config.enabled,
109            min_risk = config.min_risk_score,
110            max_risk = config.max_risk_score,
111            sampling = config.sampling_rate,
112            per_ip_limit = config.per_ip_rate_limit,
113            honeypots = config.honeypot_urls.len(),
114            max_concurrent = max_concurrent,
115            "Shadow mirror manager initialized with bounded queue"
116        );
117
118        Ok(Self {
119            config,
120            rate_limiter,
121            client,
122            sensor_id,
123            mirror_semaphore,
124            max_concurrent,
125            attempts: AtomicU64::new(0),
126            skipped_risk: AtomicU64::new(0),
127            skipped_sampling: AtomicU64::new(0),
128            skipped_rate_limit: AtomicU64::new(0),
129            dropped_queue_full: AtomicU64::new(0),
130            sent: AtomicU64::new(0),
131        })
132    }
133
134    /// Determines if a request should be mirrored based on detection result.
135    ///
136    /// # Arguments
137    /// * `risk_score` - Risk score from detection (0-100)
138    /// * `client_ip` - Source IP address
139    ///
140    /// # Returns
141    /// `true` if the request should be mirrored, `false` otherwise.
142    pub fn should_mirror(&self, risk_score: f32, client_ip: &str) -> bool {
143        if !self.config.enabled {
144            return false;
145        }
146
147        self.attempts.fetch_add(1, Ordering::Relaxed);
148
149        // Check risk score window
150        if risk_score < self.config.min_risk_score {
151            self.skipped_risk.fetch_add(1, Ordering::Relaxed);
152            debug!(
153                risk = risk_score,
154                min = self.config.min_risk_score,
155                "Skipping mirror: risk below threshold"
156            );
157            return false;
158        }
159
160        if risk_score >= self.config.max_risk_score {
161            self.skipped_risk.fetch_add(1, Ordering::Relaxed);
162            debug!(
163                risk = risk_score,
164                max = self.config.max_risk_score,
165                "Skipping mirror: risk above threshold (will be blocked)"
166            );
167            return false;
168        }
169
170        // Check sampling rate
171        if self.config.sampling_rate < 1.0 && fastrand::f32() > self.config.sampling_rate {
172            self.skipped_sampling.fetch_add(1, Ordering::Relaxed);
173            debug!(
174                sampling = self.config.sampling_rate,
175                "Skipping mirror: not selected by sampling"
176            );
177            return false;
178        }
179
180        // Check per-IP rate limit
181        if !self.rate_limiter.check_and_increment(client_ip) {
182            self.skipped_rate_limit.fetch_add(1, Ordering::Relaxed);
183            debug!(
184                ip = client_ip,
185                limit = self.config.per_ip_rate_limit,
186                "Skipping mirror: per-IP rate limit exceeded"
187            );
188            return false;
189        }
190
191        true
192    }
193
194    /// Sends a mirror payload asynchronously (fire-and-forget) with bounded concurrency.
195    ///
196    /// Returns immediately without waiting for delivery to complete.
197    /// Uses a semaphore to limit concurrent operations and prevent memory exhaustion
198    /// when honeypots are slow or unresponsive. If the queue is full, the request
199    /// is dropped (backpressure) rather than blocking or causing unbounded growth.
200    ///
201    /// # Returns
202    /// `true` if the payload was queued for delivery, `false` if dropped due to backpressure.
203    pub fn mirror_async(&self, payload: MirrorPayload) -> bool {
204        // Try to acquire semaphore permit without blocking
205        let permit = match self.mirror_semaphore.clone().try_acquire_owned() {
206            Ok(permit) => permit,
207            Err(_) => {
208                // Queue is full - apply backpressure by dropping this request
209                self.dropped_queue_full.fetch_add(1, Ordering::Relaxed);
210                debug!(
211                    request_id = %payload.request_id,
212                    max_concurrent = self.max_concurrent,
213                    "Shadow mirror dropped: queue full (backpressure)"
214                );
215                return false;
216            }
217        };
218
219        let client = Arc::clone(&self.client);
220        let urls = self.config.honeypot_urls.clone();
221        let timeout = self.config.timeout();
222        let request_id = payload.request_id.clone();
223
224        self.sent.fetch_add(1, Ordering::Relaxed);
225
226        // Fire and forget - permit is released when task completes
227        tokio::spawn(async move {
228            // Permit is held for the duration of this async block
229            let _permit = permit;
230
231            if let Err(e) = client.send_to_honeypot(&urls, payload, timeout).await {
232                // Log but don't fail - this is best-effort
233                warn!(
234                    request_id = %request_id,
235                    error = %e,
236                    "Shadow mirror delivery failed"
237                );
238            }
239            // Permit is dropped here, releasing the semaphore slot
240        });
241
242        true
243    }
244
245    /// Creates a mirror payload from request context.
246    ///
247    /// # Arguments
248    /// * `request_id` - Unique request identifier
249    /// * `source_ip` - Client IP address
250    /// * `method` - HTTP method
251    /// * `uri` - Request URI
252    /// * `site_name` - Site/vhost name
253    /// * `risk_score` - Calculated risk score
254    /// * `matched_rules` - IDs of rules that matched
255    /// * `ja4` - Optional JA4 TLS fingerprint
256    /// * `ja4h` - Optional JA4H HTTP fingerprint
257    /// * `campaign_id` - Optional campaign correlation ID
258    /// * `headers` - Request headers to include
259    /// * `body` - Optional request body
260    #[allow(clippy::too_many_arguments)]
261    pub fn create_payload(
262        &self,
263        request_id: String,
264        source_ip: String,
265        method: String,
266        uri: String,
267        site_name: String,
268        risk_score: f32,
269        matched_rules: Vec<String>,
270        ja4: Option<String>,
271        ja4h: Option<String>,
272        campaign_id: Option<String>,
273        headers: HashMap<String, String>,
274        body: Option<String>,
275    ) -> MirrorPayload {
276        // Filter headers based on config
277        let filtered_headers: HashMap<String, String> = headers
278            .into_iter()
279            .filter(|(k, _)| {
280                self.config
281                    .include_headers
282                    .iter()
283                    .any(|h| h.eq_ignore_ascii_case(k))
284            })
285            .collect();
286
287        // Truncate body if too large
288        let body = if self.config.include_body {
289            body.map(|b| {
290                if b.len() > self.config.max_body_size {
291                    b[..self.config.max_body_size].to_string()
292                } else {
293                    b
294                }
295            })
296        } else {
297            None
298        };
299
300        MirrorPayload::new(
301            request_id,
302            source_ip,
303            risk_score,
304            method,
305            uri,
306            site_name,
307            self.sensor_id.clone(),
308        )
309        .with_ja4(ja4)
310        .with_ja4h(ja4h)
311        .with_rules(matched_rules)
312        .with_campaign(campaign_id)
313        .with_headers(filtered_headers)
314        .with_body(body)
315    }
316
317    /// Runs periodic cleanup of the rate limiter.
318    ///
319    /// Call this from a background task at regular intervals (e.g., every 60s).
320    pub fn cleanup(&self) {
321        self.rate_limiter.cleanup();
322    }
323
324    /// Returns statistics about shadow mirroring.
325    pub fn stats(&self) -> ShadowMirrorStats {
326        let client_stats = self.client.stats();
327        let rate_limiter_stats = self.rate_limiter.stats();
328
329        ShadowMirrorStats {
330            enabled: self.config.enabled,
331            attempts: self.attempts.load(Ordering::Relaxed),
332            skipped_risk: self.skipped_risk.load(Ordering::Relaxed),
333            skipped_sampling: self.skipped_sampling.load(Ordering::Relaxed),
334            skipped_rate_limit: self.skipped_rate_limit.load(Ordering::Relaxed),
335            dropped_queue_full: self.dropped_queue_full.load(Ordering::Relaxed),
336            sent: self.sent.load(Ordering::Relaxed),
337            delivery_successes: client_stats.successes,
338            delivery_failures: client_stats.failures,
339            bytes_sent: client_stats.bytes_sent,
340            tracked_ips: rate_limiter_stats.tracked_ips,
341            max_concurrent: self.max_concurrent,
342            queue_available: self.mirror_semaphore.available_permits(),
343            min_risk_score: self.config.min_risk_score,
344            max_risk_score: self.config.max_risk_score,
345            sampling_rate: self.config.sampling_rate,
346            per_ip_rate_limit: self.config.per_ip_rate_limit,
347            honeypot_count: self.config.honeypot_urls.len(),
348        }
349    }
350
351    /// Resets all statistics.
352    pub fn reset_stats(&self) {
353        self.attempts.store(0, Ordering::Relaxed);
354        self.skipped_risk.store(0, Ordering::Relaxed);
355        self.skipped_sampling.store(0, Ordering::Relaxed);
356        self.skipped_rate_limit.store(0, Ordering::Relaxed);
357        self.dropped_queue_full.store(0, Ordering::Relaxed);
358        self.sent.store(0, Ordering::Relaxed);
359        self.client.reset_stats();
360        self.rate_limiter.reset();
361    }
362
363    /// Returns whether shadow mirroring is enabled.
364    pub fn is_enabled(&self) -> bool {
365        self.config.enabled
366    }
367
368    /// Returns the configuration.
369    pub fn config(&self) -> &ShadowMirrorConfig {
370        &self.config
371    }
372}
373
374/// Statistics about shadow mirroring operations.
375#[derive(Debug, Clone, serde::Serialize)]
376pub struct ShadowMirrorStats {
377    /// Whether shadow mirroring is enabled
378    pub enabled: bool,
379    /// Total mirror attempts (requests that could be mirrored)
380    pub attempts: u64,
381    /// Skipped due to risk score outside window
382    pub skipped_risk: u64,
383    /// Skipped due to sampling
384    pub skipped_sampling: u64,
385    /// Skipped due to per-IP rate limiting
386    pub skipped_rate_limit: u64,
387    /// Dropped due to queue being full (backpressure)
388    pub dropped_queue_full: u64,
389    /// Successfully queued for sending
390    pub sent: u64,
391    /// Successfully delivered to honeypot
392    pub delivery_successes: u64,
393    /// Failed to deliver to honeypot
394    pub delivery_failures: u64,
395    /// Total bytes sent to honeypots
396    pub bytes_sent: u64,
397    /// Number of IPs being rate-tracked
398    pub tracked_ips: usize,
399    /// Maximum concurrent mirror operations allowed
400    pub max_concurrent: usize,
401    /// Current available slots in the queue
402    pub queue_available: usize,
403    /// Configured minimum risk score
404    pub min_risk_score: f32,
405    /// Configured maximum risk score
406    pub max_risk_score: f32,
407    /// Configured sampling rate
408    pub sampling_rate: f32,
409    /// Configured per-IP rate limit
410    pub per_ip_rate_limit: u32,
411    /// Number of configured honeypot URLs
412    pub honeypot_count: usize,
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418
419    fn create_test_config() -> ShadowMirrorConfig {
420        ShadowMirrorConfig {
421            enabled: true,
422            min_risk_score: 40.0,
423            max_risk_score: 70.0,
424            honeypot_urls: vec!["http://localhost:8888/mirror".to_string()],
425            sampling_rate: 1.0,
426            per_ip_rate_limit: 10,
427            timeout_secs: 5,
428            hmac_secret: None,
429            include_body: true,
430            max_body_size: 1024,
431            include_headers: vec!["User-Agent".to_string()],
432        }
433    }
434
435    fn create_test_manager() -> ShadowMirrorManager {
436        ShadowMirrorManager::new(create_test_config(), "sensor-01".to_string())
437            .expect("test manager creation should succeed")
438    }
439
440    #[test]
441    fn test_should_mirror_in_risk_window() {
442        let manager = create_test_manager();
443
444        // Risk in window should mirror
445        assert!(manager.should_mirror(45.0, "192.168.1.1"));
446        assert!(manager.should_mirror(50.0, "192.168.1.2"));
447        assert!(manager.should_mirror(69.9, "192.168.1.3"));
448    }
449
450    #[test]
451    fn test_should_not_mirror_below_min() {
452        let manager = create_test_manager();
453
454        assert!(!manager.should_mirror(10.0, "192.168.1.1"));
455        assert!(!manager.should_mirror(39.9, "192.168.1.2"));
456    }
457
458    #[test]
459    fn test_should_not_mirror_above_max() {
460        let manager = create_test_manager();
461
462        assert!(!manager.should_mirror(70.0, "192.168.1.1"));
463        assert!(!manager.should_mirror(85.0, "192.168.1.2"));
464        assert!(!manager.should_mirror(100.0, "192.168.1.3"));
465    }
466
467    #[test]
468    fn test_should_not_mirror_when_disabled() {
469        let mut config = create_test_config();
470        config.enabled = false;
471        let manager = ShadowMirrorManager::new(config, "sensor-01".to_string())
472            .expect("manager creation should succeed");
473
474        assert!(!manager.should_mirror(50.0, "192.168.1.1"));
475    }
476
477    #[test]
478    fn test_rate_limiting() {
479        let mut config = create_test_config();
480        config.per_ip_rate_limit = 3;
481        let manager = ShadowMirrorManager::new(config, "sensor-01".to_string())
482            .expect("manager creation should succeed");
483
484        let ip = "10.0.0.1";
485        assert!(manager.should_mirror(50.0, ip));
486        assert!(manager.should_mirror(50.0, ip));
487        assert!(manager.should_mirror(50.0, ip));
488        // Fourth request should be rate limited
489        assert!(!manager.should_mirror(50.0, ip));
490    }
491
492    #[test]
493    fn test_different_ips_independent() {
494        let mut config = create_test_config();
495        config.per_ip_rate_limit = 2;
496        let manager = ShadowMirrorManager::new(config, "sensor-01".to_string())
497            .expect("manager creation should succeed");
498
499        assert!(manager.should_mirror(50.0, "ip1"));
500        assert!(manager.should_mirror(50.0, "ip1"));
501        assert!(!manager.should_mirror(50.0, "ip1")); // Limited
502
503        // Different IP should be independent
504        assert!(manager.should_mirror(50.0, "ip2"));
505        assert!(manager.should_mirror(50.0, "ip2"));
506    }
507
508    #[test]
509    fn test_sampling_rate() {
510        let mut config = create_test_config();
511        config.sampling_rate = 0.0; // 0% sampling - should never mirror
512        let manager = ShadowMirrorManager::new(config, "sensor-01".to_string())
513            .expect("manager creation should succeed");
514
515        // With 0% sampling, should never mirror
516        for i in 0..100 {
517            assert!(!manager.should_mirror(50.0, &format!("ip{}", i)));
518        }
519    }
520
521    #[test]
522    fn test_create_payload() {
523        let manager = create_test_manager();
524
525        let mut headers = HashMap::new();
526        headers.insert("User-Agent".to_string(), "test-agent".to_string());
527        headers.insert("X-Custom".to_string(), "should-be-filtered".to_string());
528
529        let payload = manager.create_payload(
530            "req-123".to_string(),
531            "10.0.0.1".to_string(),
532            "POST".to_string(),
533            "/api/login".to_string(),
534            "example.com".to_string(),
535            55.0,
536            vec!["sqli-001".to_string()],
537            Some("ja4-fingerprint".to_string()),
538            None,
539            Some("campaign-123".to_string()),
540            headers,
541            Some("request body".to_string()),
542        );
543
544        assert_eq!(payload.request_id, "req-123");
545        assert_eq!(payload.source_ip, "10.0.0.1");
546        assert_eq!(payload.risk_score, 55.0);
547        assert_eq!(payload.sensor_id, "sensor-01");
548        assert!(payload.headers.contains_key("User-Agent"));
549        assert!(!payload.headers.contains_key("X-Custom")); // Filtered out
550    }
551
552    #[test]
553    fn test_body_truncation() {
554        let mut config = create_test_config();
555        config.max_body_size = 10;
556        let manager = ShadowMirrorManager::new(config, "sensor-01".to_string())
557            .expect("manager creation should succeed");
558
559        let payload = manager.create_payload(
560            "req-123".to_string(),
561            "10.0.0.1".to_string(),
562            "POST".to_string(),
563            "/api".to_string(),
564            "site".to_string(),
565            50.0,
566            vec![],
567            None,
568            None,
569            None,
570            HashMap::new(),
571            Some("this is a very long body that should be truncated".to_string()),
572        );
573
574        assert_eq!(payload.body.unwrap().len(), 10);
575    }
576
577    #[test]
578    fn test_stats() {
579        let manager = create_test_manager();
580
581        manager.should_mirror(50.0, "ip1");
582        manager.should_mirror(50.0, "ip2");
583        manager.should_mirror(10.0, "ip3"); // Below threshold
584
585        let stats = manager.stats();
586        assert!(stats.enabled);
587        assert_eq!(stats.attempts, 3);
588        assert_eq!(stats.skipped_risk, 1);
589        assert_eq!(stats.min_risk_score, 40.0);
590        assert_eq!(stats.max_risk_score, 70.0);
591    }
592
593    #[test]
594    fn test_reset_stats() {
595        let manager = create_test_manager();
596
597        manager.should_mirror(50.0, "ip1");
598        manager.should_mirror(50.0, "ip2");
599
600        manager.reset_stats();
601
602        let stats = manager.stats();
603        assert_eq!(stats.attempts, 0);
604        assert_eq!(stats.sent, 0);
605    }
606
607    #[test]
608    fn test_bounded_queue_default_concurrency() {
609        let manager = create_test_manager();
610
611        let stats = manager.stats();
612        assert_eq!(stats.max_concurrent, DEFAULT_MAX_CONCURRENT_MIRRORS);
613        assert_eq!(stats.queue_available, DEFAULT_MAX_CONCURRENT_MIRRORS);
614    }
615
616    #[test]
617    fn test_bounded_queue_custom_concurrency() {
618        let config = create_test_config();
619        let manager = ShadowMirrorManager::with_max_concurrent(config, "sensor-01".to_string(), 50)
620            .expect("manager creation should succeed");
621
622        let stats = manager.stats();
623        assert_eq!(stats.max_concurrent, 50);
624        assert_eq!(stats.queue_available, 50);
625    }
626
627    #[tokio::test]
628    async fn test_bounded_queue_backpressure() {
629        let config = create_test_config();
630        // Create manager with very small queue to test backpressure
631        let manager = ShadowMirrorManager::with_max_concurrent(config, "sensor-01".to_string(), 2)
632            .expect("manager creation should succeed");
633
634        // Create test payloads
635        let payload1 = manager.create_payload(
636            "req-1".to_string(),
637            "10.0.0.1".to_string(),
638            "GET".to_string(),
639            "/test".to_string(),
640            "site".to_string(),
641            50.0,
642            vec![],
643            None,
644            None,
645            None,
646            HashMap::new(),
647            None,
648        );
649        let payload2 = manager.create_payload(
650            "req-2".to_string(),
651            "10.0.0.1".to_string(),
652            "GET".to_string(),
653            "/test".to_string(),
654            "site".to_string(),
655            50.0,
656            vec![],
657            None,
658            None,
659            None,
660            HashMap::new(),
661            None,
662        );
663        let payload3 = manager.create_payload(
664            "req-3".to_string(),
665            "10.0.0.1".to_string(),
666            "GET".to_string(),
667            "/test".to_string(),
668            "site".to_string(),
669            50.0,
670            vec![],
671            None,
672            None,
673            None,
674            HashMap::new(),
675            None,
676        );
677
678        // First two should succeed (queue size = 2)
679        assert!(manager.mirror_async(payload1));
680        assert!(manager.mirror_async(payload2));
681
682        // Third should be dropped (backpressure)
683        assert!(!manager.mirror_async(payload3));
684
685        let stats = manager.stats();
686        assert_eq!(stats.sent, 2);
687        assert_eq!(stats.dropped_queue_full, 1);
688    }
689
690    #[test]
691    fn test_stats_includes_queue_metrics() {
692        let config = create_test_config();
693        let manager = ShadowMirrorManager::with_max_concurrent(config, "sensor-01".to_string(), 25)
694            .expect("manager creation should succeed");
695
696        let stats = manager.stats();
697        assert_eq!(stats.max_concurrent, 25);
698        assert_eq!(stats.queue_available, 25);
699        assert_eq!(stats.dropped_queue_full, 0);
700    }
701}