Skip to main content

synapse_pingora/trends/
anomaly_detector.rs

1//! Anomaly detection for signal patterns.
2
3use std::collections::{HashMap, HashSet};
4
5use super::time_store::SignalBucket;
6use super::types::{
7    Anomaly, AnomalyMetadata, AnomalySeverity, AnomalyType, Signal, SignalCategory, SignalType,
8};
9
10/// Configuration for anomaly detection.
11#[derive(Debug, Clone)]
12pub struct AnomalyDetectorConfig {
13    /// Minimum IP count for session sharing detection
14    pub session_sharing_min_ips: usize,
15    /// Velocity spike threshold multiplier
16    pub velocity_spike_threshold: f64,
17    /// Minimum unique values for rotation detection
18    pub rotation_min_changes: usize,
19    /// Time window for timing anomaly (ms)
20    pub timing_anomaly_window_ms: i64,
21    /// Minimum requests for timing analysis
22    pub timing_anomaly_min_requests: usize,
23}
24
25impl Default for AnomalyDetectorConfig {
26    fn default() -> Self {
27        Self {
28            session_sharing_min_ips: 3,
29            velocity_spike_threshold: 3.0,
30            rotation_min_changes: 5,
31            timing_anomaly_window_ms: 60_000,
32            timing_anomaly_min_requests: 10,
33        }
34    }
35}
36
37/// Anomaly detector for signal patterns.
38pub struct AnomalyDetector {
39    config: AnomalyDetectorConfig,
40    risk_scores: HashMap<AnomalyType, u32>,
41}
42
43impl AnomalyDetector {
44    /// Create a new anomaly detector.
45    pub fn new(risk_scores: HashMap<AnomalyType, u32>) -> Self {
46        Self {
47            config: AnomalyDetectorConfig::default(),
48            risk_scores,
49        }
50    }
51
52    /// Create with custom configuration.
53    pub fn with_config(
54        config: AnomalyDetectorConfig,
55        risk_scores: HashMap<AnomalyType, u32>,
56    ) -> Self {
57        Self {
58            config,
59            risk_scores,
60        }
61    }
62
63    /// Check a single signal for anomalies against recent history.
64    pub fn check_signal(&self, signal: &Signal, recent_signals: &[Signal]) -> Option<Anomaly> {
65        match signal.category {
66            SignalCategory::AuthToken => self.check_auth_anomaly(signal, recent_signals),
67            SignalCategory::Network => self.check_network_anomaly(signal, recent_signals),
68            SignalCategory::Device => self.check_device_anomaly(signal, recent_signals),
69            SignalCategory::Behavioral => self.check_behavioral_anomaly(signal, recent_signals),
70        }
71    }
72
73    /// Detect batch anomalies across buckets.
74    pub fn detect_batch_anomalies(
75        &self,
76        current: &SignalBucket,
77        historical: &[&SignalBucket],
78    ) -> Vec<Anomaly> {
79        let mut anomalies = Vec::new();
80
81        // Velocity spike detection
82        if let Some(anomaly) = self.detect_velocity_spike(current, historical) {
83            anomalies.push(anomaly);
84        }
85
86        // Session sharing detection
87        anomalies.extend(self.detect_session_sharing(&current.signals));
88
89        // JA4 cluster detection
90        anomalies.extend(self.detect_ja4_clusters(&current.signals));
91
92        // Rotation pattern detection
93        anomalies.extend(self.detect_rotation_patterns(&current.signals, historical));
94
95        anomalies
96    }
97
98    /// Check for auth token anomalies.
99    fn check_auth_anomaly(&self, signal: &Signal, recent: &[Signal]) -> Option<Anomaly> {
100        // Check for token reuse with different fingerprints
101        let same_token_signals: Vec<_> = recent
102            .iter()
103            .filter(|s| s.signal_type == signal.signal_type && s.value == signal.value)
104            .collect();
105
106        if same_token_signals.len() >= 2 {
107            let entities: HashSet<String> = same_token_signals
108                .iter()
109                .map(|s| s.entity_id.clone())
110                .collect();
111            let entity_count = entities.len();
112            if entity_count >= self.config.session_sharing_min_ips {
113                return Some(self.create_anomaly(
114                    AnomalyType::SessionSharing,
115                    AnomalySeverity::High,
116                    format!("Auth token used from {} different IPs", entity_count),
117                    signal.category,
118                    same_token_signals.into_iter().cloned().collect(),
119                    entities.into_iter().collect(),
120                    AnomalyMetadata {
121                        ip_count: Some(entity_count),
122                        ..Default::default()
123                    },
124                ));
125            }
126        }
127
128        None
129    }
130
131    /// Check for network anomalies.
132    fn check_network_anomaly(&self, signal: &Signal, recent: &[Signal]) -> Option<Anomaly> {
133        // Check for JA4 fingerprint change
134        if signal.signal_type == SignalType::Ja4 {
135            let previous_ja4: Vec<_> = recent
136                .iter()
137                .filter(|s| {
138                    s.signal_type == SignalType::Ja4
139                        && s.entity_id == signal.entity_id
140                        && s.value != signal.value
141                })
142                .collect();
143
144            if !previous_ja4.is_empty() {
145                let prev = previous_ja4[0];
146                return Some(self.create_anomaly(
147                    AnomalyType::Ja4hChange,
148                    AnomalySeverity::Medium,
149                    format!(
150                        "JA4 fingerprint changed from {} to {}",
151                        &prev.value[..8.min(prev.value.len())],
152                        &signal.value[..8.min(signal.value.len())]
153                    ),
154                    SignalCategory::Network,
155                    vec![prev.clone(), signal.clone()],
156                    vec![signal.entity_id.clone()],
157                    AnomalyMetadata {
158                        previous_value: Some(prev.value.clone()),
159                        new_value: Some(signal.value.clone()),
160                        ..Default::default()
161                    },
162                ));
163            }
164        }
165
166        None
167    }
168
169    /// Check for device anomalies.
170    fn check_device_anomaly(&self, signal: &Signal, recent: &[Signal]) -> Option<Anomaly> {
171        // Check for fingerprint change within session
172        if let Some(ref session_id) = signal.session_id {
173            let session_fingerprints: Vec<_> = recent
174                .iter()
175                .filter(|s| {
176                    s.session_id.as_ref() == Some(session_id)
177                        && s.signal_type == SignalType::HttpFingerprint
178                })
179                .collect();
180
181            let unique_fps: HashSet<_> = session_fingerprints.iter().map(|s| &s.value).collect();
182
183            if unique_fps.len() >= 2 {
184                return Some(self.create_anomaly(
185                    AnomalyType::FingerprintChange,
186                    AnomalySeverity::Medium,
187                    format!(
188                        "HTTP fingerprint changed within session (now {} variants)",
189                        unique_fps.len()
190                    ),
191                    SignalCategory::Device,
192                    session_fingerprints.into_iter().cloned().collect(),
193                    vec![signal.entity_id.clone()],
194                    AnomalyMetadata {
195                        change_count: Some(unique_fps.len()),
196                        ..Default::default()
197                    },
198                ));
199            }
200        }
201
202        None
203    }
204
205    /// Check for behavioral anomalies.
206    fn check_behavioral_anomaly(&self, signal: &Signal, recent: &[Signal]) -> Option<Anomaly> {
207        if signal.signal_type != SignalType::Timing {
208            return None;
209        }
210
211        // Check for timing anomalies (suspiciously regular intervals)
212        let timing_signals: Vec<_> = recent
213            .iter()
214            .filter(|s| s.signal_type == SignalType::Timing && s.entity_id == signal.entity_id)
215            .collect();
216
217        if timing_signals.len() < self.config.timing_anomaly_min_requests {
218            return None;
219        }
220
221        // Calculate variance in timing
222        // (simplified check - real implementation would be more sophisticated)
223        let mut intervals = Vec::new();
224        for i in 1..timing_signals.len() {
225            let delta = timing_signals[i].timestamp - timing_signals[i - 1].timestamp;
226            intervals.push(delta);
227        }
228
229        if intervals.is_empty() {
230            return None;
231        }
232
233        let mean = intervals.iter().sum::<i64>() as f64 / intervals.len() as f64;
234        let variance = intervals
235            .iter()
236            .map(|&i| (i as f64 - mean).powi(2))
237            .sum::<f64>()
238            / intervals.len() as f64;
239
240        // Very low variance indicates bot-like regular intervals
241        if variance < 100.0 && mean < 1000.0 {
242            return Some(self.create_anomaly(
243                AnomalyType::TimingAnomaly,
244                AnomalySeverity::Low,
245                format!(
246                    "Suspiciously regular request timing (mean: {:.0}ms, variance: {:.0})",
247                    mean, variance
248                ),
249                SignalCategory::Behavioral,
250                timing_signals.into_iter().cloned().collect(),
251                vec![signal.entity_id.clone()],
252                AnomalyMetadata {
253                    threshold: Some(100.0),
254                    actual: Some(variance),
255                    ..Default::default()
256                },
257            ));
258        }
259
260        None
261    }
262
263    /// Detect velocity spikes.
264    fn detect_velocity_spike(
265        &self,
266        current: &SignalBucket,
267        historical: &[&SignalBucket],
268    ) -> Option<Anomaly> {
269        if historical.is_empty() {
270            return None;
271        }
272
273        let current_count = current.summary.total_count;
274        let historical_avg: f64 = historical
275            .iter()
276            .map(|b| b.summary.total_count as f64)
277            .sum::<f64>()
278            / historical.len() as f64;
279
280        if historical_avg == 0.0 {
281            return None;
282        }
283
284        let spike_ratio = current_count as f64 / historical_avg;
285
286        if spike_ratio >= self.config.velocity_spike_threshold {
287            return Some(self.create_anomaly(
288                AnomalyType::VelocitySpike,
289                AnomalySeverity::Medium,
290                format!(
291                    "Signal velocity spike: {:.1}x baseline ({} vs avg {:.0})",
292                    spike_ratio, current_count, historical_avg
293                ),
294                SignalCategory::Behavioral,
295                Vec::new(),
296                Vec::new(),
297                AnomalyMetadata {
298                    threshold: Some(self.config.velocity_spike_threshold),
299                    actual: Some(spike_ratio),
300                    ..Default::default()
301                },
302            ));
303        }
304
305        None
306    }
307
308    /// Detect session sharing patterns.
309    fn detect_session_sharing(&self, signals: &[Signal]) -> Vec<Anomaly> {
310        let mut anomalies = Vec::new();
311
312        // Group auth tokens by value
313        let mut token_ips: HashMap<String, HashSet<String>> = HashMap::new();
314        for signal in signals {
315            if signal.category == SignalCategory::AuthToken {
316                token_ips
317                    .entry(signal.value.clone())
318                    .or_default()
319                    .insert(signal.entity_id.clone());
320            }
321        }
322
323        for (token_hash, ips) in token_ips {
324            if ips.len() >= self.config.session_sharing_min_ips {
325                anomalies.push(self.create_anomaly(
326                    AnomalyType::SessionSharing,
327                    AnomalySeverity::High,
328                    format!(
329                        "Auth token shared across {} IPs: {}...",
330                        ips.len(),
331                        &token_hash[..8.min(token_hash.len())]
332                    ),
333                    SignalCategory::AuthToken,
334                    Vec::new(),
335                    ips.into_iter().collect(),
336                    AnomalyMetadata {
337                        token_hash_prefix: Some(token_hash[..16.min(token_hash.len())].to_string()),
338                        ..Default::default()
339                    },
340                ));
341            }
342        }
343
344        anomalies
345    }
346
347    /// Detect JA4 fingerprint clusters.
348    fn detect_ja4_clusters(&self, signals: &[Signal]) -> Vec<Anomaly> {
349        let mut anomalies = Vec::new();
350
351        // Group JA4 fingerprints by value
352        let mut ja4_ips: HashMap<String, HashSet<String>> = HashMap::new();
353        for signal in signals {
354            if signal.signal_type == SignalType::Ja4 {
355                ja4_ips
356                    .entry(signal.value.clone())
357                    .or_default()
358                    .insert(signal.entity_id.clone());
359            }
360        }
361
362        for (ja4, ips) in ja4_ips {
363            let ip_count = ips.len();
364            if ip_count >= 10 {
365                // Large cluster threshold
366                anomalies.push(self.create_anomaly(
367                    AnomalyType::Ja4IpCluster,
368                    AnomalySeverity::Medium,
369                    format!(
370                        "JA4 fingerprint {} seen from {} IPs (potential bot farm)",
371                        &ja4[..12.min(ja4.len())],
372                        ip_count
373                    ),
374                    SignalCategory::Network,
375                    Vec::new(),
376                    ips.into_iter().collect(),
377                    AnomalyMetadata {
378                        ip_count: Some(ip_count),
379                        ..Default::default()
380                    },
381                ));
382            }
383        }
384
385        anomalies
386    }
387
388    /// Detect rotation patterns.
389    fn detect_rotation_patterns(
390        &self,
391        current_signals: &[Signal],
392        _historical: &[&SignalBucket],
393    ) -> Vec<Anomaly> {
394        let mut anomalies = Vec::new();
395
396        // Group by entity and signal type
397        let mut entity_values: HashMap<(String, SignalType), HashSet<String>> = HashMap::new();
398
399        for signal in current_signals {
400            if matches!(
401                signal.signal_type,
402                SignalType::Ja4 | SignalType::HttpFingerprint
403            ) {
404                entity_values
405                    .entry((signal.entity_id.clone(), signal.signal_type))
406                    .or_default()
407                    .insert(signal.value.clone());
408            }
409        }
410
411        for ((entity_id, signal_type), values) in entity_values {
412            if values.len() >= self.config.rotation_min_changes {
413                let anomaly_type = match signal_type {
414                    SignalType::Ja4 => AnomalyType::Ja4RotationPattern,
415                    _ => AnomalyType::RotationPattern,
416                };
417
418                anomalies.push(self.create_anomaly(
419                    anomaly_type,
420                    AnomalySeverity::High,
421                    format!(
422                        "Systematic {:?} rotation: {} unique values from {}",
423                        signal_type,
424                        values.len(),
425                        entity_id
426                    ),
427                    signal_type.category(),
428                    Vec::new(),
429                    vec![entity_id],
430                    AnomalyMetadata {
431                        change_count: Some(values.len()),
432                        ..Default::default()
433                    },
434                ));
435            }
436        }
437
438        anomalies
439    }
440
441    /// Create an anomaly with standard fields.
442    fn create_anomaly(
443        &self,
444        anomaly_type: AnomalyType,
445        severity: AnomalySeverity,
446        description: String,
447        category: SignalCategory,
448        signals: Vec<Signal>,
449        entities: Vec<String>,
450        metadata: AnomalyMetadata,
451    ) -> Anomaly {
452        Anomaly {
453            id: uuid::Uuid::new_v4().to_string(),
454            detected_at: chrono::Utc::now().timestamp_millis(),
455            category,
456            anomaly_type,
457            severity,
458            description,
459            signals,
460            entities,
461            metadata,
462            risk_applied: self.risk_scores.get(&anomaly_type).copied(),
463        }
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470
471    fn create_test_signal(
472        entity_id: &str,
473        signal_type: SignalType,
474        value: &str,
475        session_id: Option<&str>,
476    ) -> Signal {
477        Signal {
478            id: uuid::Uuid::new_v4().to_string(),
479            timestamp: chrono::Utc::now().timestamp_millis(),
480            category: signal_type.category(),
481            signal_type,
482            value: value.to_string(),
483            entity_id: entity_id.to_string(),
484            session_id: session_id.map(String::from),
485            metadata: super::super::types::SignalMetadata::default(),
486        }
487    }
488
489    #[test]
490    fn test_session_sharing_detection() {
491        let detector = AnomalyDetector::new(HashMap::new());
492
493        let signals = vec![
494            create_test_signal("ip-1", SignalType::Bearer, "token123", None),
495            create_test_signal("ip-2", SignalType::Bearer, "token123", None),
496            create_test_signal("ip-3", SignalType::Bearer, "token123", None),
497        ];
498
499        let anomalies = detector.detect_session_sharing(&signals);
500        assert_eq!(anomalies.len(), 1);
501        assert_eq!(anomalies[0].anomaly_type, AnomalyType::SessionSharing);
502    }
503
504    #[test]
505    fn test_ja4_cluster_detection() {
506        let detector = AnomalyDetector::new(HashMap::new());
507
508        let mut signals = Vec::new();
509        for i in 0..15 {
510            signals.push(create_test_signal(
511                &format!("ip-{}", i),
512                SignalType::Ja4,
513                "t13d1516h2_same_fingerprint",
514                None,
515            ));
516        }
517
518        let anomalies = detector.detect_ja4_clusters(&signals);
519        assert_eq!(anomalies.len(), 1);
520        assert_eq!(anomalies[0].anomaly_type, AnomalyType::Ja4IpCluster);
521    }
522
523    #[test]
524    fn test_rotation_pattern_detection() {
525        let mut config = AnomalyDetectorConfig::default();
526        config.rotation_min_changes = 3;
527
528        let detector = AnomalyDetector::with_config(config, HashMap::new());
529
530        let signals = vec![
531            create_test_signal("ip-1", SignalType::Ja4, "fingerprint-1", None),
532            create_test_signal("ip-1", SignalType::Ja4, "fingerprint-2", None),
533            create_test_signal("ip-1", SignalType::Ja4, "fingerprint-3", None),
534        ];
535
536        let anomalies = detector.detect_rotation_patterns(&signals, &[]);
537        assert_eq!(anomalies.len(), 1);
538        assert_eq!(anomalies[0].anomaly_type, AnomalyType::Ja4RotationPattern);
539    }
540}