1use std::collections::{HashMap, HashSet};
4
5use super::time_store::SignalBucket;
6use super::types::{
7 Anomaly, AnomalyMetadata, AnomalySeverity, AnomalyType, Signal, SignalCategory, SignalType,
8};
9
10#[derive(Debug, Clone)]
12pub struct AnomalyDetectorConfig {
13 pub session_sharing_min_ips: usize,
15 pub velocity_spike_threshold: f64,
17 pub rotation_min_changes: usize,
19 pub timing_anomaly_window_ms: i64,
21 pub timing_anomaly_min_requests: usize,
23}
24
25impl Default for AnomalyDetectorConfig {
26 fn default() -> Self {
27 Self {
28 session_sharing_min_ips: 3,
29 velocity_spike_threshold: 3.0,
30 rotation_min_changes: 5,
31 timing_anomaly_window_ms: 60_000,
32 timing_anomaly_min_requests: 10,
33 }
34 }
35}
36
37pub struct AnomalyDetector {
39 config: AnomalyDetectorConfig,
40 risk_scores: HashMap<AnomalyType, u32>,
41}
42
43impl AnomalyDetector {
44 pub fn new(risk_scores: HashMap<AnomalyType, u32>) -> Self {
46 Self {
47 config: AnomalyDetectorConfig::default(),
48 risk_scores,
49 }
50 }
51
52 pub fn with_config(
54 config: AnomalyDetectorConfig,
55 risk_scores: HashMap<AnomalyType, u32>,
56 ) -> Self {
57 Self {
58 config,
59 risk_scores,
60 }
61 }
62
63 pub fn check_signal(&self, signal: &Signal, recent_signals: &[Signal]) -> Option<Anomaly> {
65 match signal.category {
66 SignalCategory::AuthToken => self.check_auth_anomaly(signal, recent_signals),
67 SignalCategory::Network => self.check_network_anomaly(signal, recent_signals),
68 SignalCategory::Device => self.check_device_anomaly(signal, recent_signals),
69 SignalCategory::Behavioral => self.check_behavioral_anomaly(signal, recent_signals),
70 }
71 }
72
73 pub fn detect_batch_anomalies(
75 &self,
76 current: &SignalBucket,
77 historical: &[&SignalBucket],
78 ) -> Vec<Anomaly> {
79 let mut anomalies = Vec::new();
80
81 if let Some(anomaly) = self.detect_velocity_spike(current, historical) {
83 anomalies.push(anomaly);
84 }
85
86 anomalies.extend(self.detect_session_sharing(¤t.signals));
88
89 anomalies.extend(self.detect_ja4_clusters(¤t.signals));
91
92 anomalies.extend(self.detect_rotation_patterns(¤t.signals, historical));
94
95 anomalies
96 }
97
98 fn check_auth_anomaly(&self, signal: &Signal, recent: &[Signal]) -> Option<Anomaly> {
100 let same_token_signals: Vec<_> = recent
102 .iter()
103 .filter(|s| s.signal_type == signal.signal_type && s.value == signal.value)
104 .collect();
105
106 if same_token_signals.len() >= 2 {
107 let entities: HashSet<String> = same_token_signals
108 .iter()
109 .map(|s| s.entity_id.clone())
110 .collect();
111 let entity_count = entities.len();
112 if entity_count >= self.config.session_sharing_min_ips {
113 return Some(self.create_anomaly(
114 AnomalyType::SessionSharing,
115 AnomalySeverity::High,
116 format!("Auth token used from {} different IPs", entity_count),
117 signal.category,
118 same_token_signals.into_iter().cloned().collect(),
119 entities.into_iter().collect(),
120 AnomalyMetadata {
121 ip_count: Some(entity_count),
122 ..Default::default()
123 },
124 ));
125 }
126 }
127
128 None
129 }
130
131 fn check_network_anomaly(&self, signal: &Signal, recent: &[Signal]) -> Option<Anomaly> {
133 if signal.signal_type == SignalType::Ja4 {
135 let previous_ja4: Vec<_> = recent
136 .iter()
137 .filter(|s| {
138 s.signal_type == SignalType::Ja4
139 && s.entity_id == signal.entity_id
140 && s.value != signal.value
141 })
142 .collect();
143
144 if !previous_ja4.is_empty() {
145 let prev = previous_ja4[0];
146 return Some(self.create_anomaly(
147 AnomalyType::Ja4hChange,
148 AnomalySeverity::Medium,
149 format!(
150 "JA4 fingerprint changed from {} to {}",
151 &prev.value[..8.min(prev.value.len())],
152 &signal.value[..8.min(signal.value.len())]
153 ),
154 SignalCategory::Network,
155 vec![prev.clone(), signal.clone()],
156 vec![signal.entity_id.clone()],
157 AnomalyMetadata {
158 previous_value: Some(prev.value.clone()),
159 new_value: Some(signal.value.clone()),
160 ..Default::default()
161 },
162 ));
163 }
164 }
165
166 None
167 }
168
169 fn check_device_anomaly(&self, signal: &Signal, recent: &[Signal]) -> Option<Anomaly> {
171 if let Some(ref session_id) = signal.session_id {
173 let session_fingerprints: Vec<_> = recent
174 .iter()
175 .filter(|s| {
176 s.session_id.as_ref() == Some(session_id)
177 && s.signal_type == SignalType::HttpFingerprint
178 })
179 .collect();
180
181 let unique_fps: HashSet<_> = session_fingerprints.iter().map(|s| &s.value).collect();
182
183 if unique_fps.len() >= 2 {
184 return Some(self.create_anomaly(
185 AnomalyType::FingerprintChange,
186 AnomalySeverity::Medium,
187 format!(
188 "HTTP fingerprint changed within session (now {} variants)",
189 unique_fps.len()
190 ),
191 SignalCategory::Device,
192 session_fingerprints.into_iter().cloned().collect(),
193 vec![signal.entity_id.clone()],
194 AnomalyMetadata {
195 change_count: Some(unique_fps.len()),
196 ..Default::default()
197 },
198 ));
199 }
200 }
201
202 None
203 }
204
205 fn check_behavioral_anomaly(&self, signal: &Signal, recent: &[Signal]) -> Option<Anomaly> {
207 if signal.signal_type != SignalType::Timing {
208 return None;
209 }
210
211 let timing_signals: Vec<_> = recent
213 .iter()
214 .filter(|s| s.signal_type == SignalType::Timing && s.entity_id == signal.entity_id)
215 .collect();
216
217 if timing_signals.len() < self.config.timing_anomaly_min_requests {
218 return None;
219 }
220
221 let mut intervals = Vec::new();
224 for i in 1..timing_signals.len() {
225 let delta = timing_signals[i].timestamp - timing_signals[i - 1].timestamp;
226 intervals.push(delta);
227 }
228
229 if intervals.is_empty() {
230 return None;
231 }
232
233 let mean = intervals.iter().sum::<i64>() as f64 / intervals.len() as f64;
234 let variance = intervals
235 .iter()
236 .map(|&i| (i as f64 - mean).powi(2))
237 .sum::<f64>()
238 / intervals.len() as f64;
239
240 if variance < 100.0 && mean < 1000.0 {
242 return Some(self.create_anomaly(
243 AnomalyType::TimingAnomaly,
244 AnomalySeverity::Low,
245 format!(
246 "Suspiciously regular request timing (mean: {:.0}ms, variance: {:.0})",
247 mean, variance
248 ),
249 SignalCategory::Behavioral,
250 timing_signals.into_iter().cloned().collect(),
251 vec![signal.entity_id.clone()],
252 AnomalyMetadata {
253 threshold: Some(100.0),
254 actual: Some(variance),
255 ..Default::default()
256 },
257 ));
258 }
259
260 None
261 }
262
263 fn detect_velocity_spike(
265 &self,
266 current: &SignalBucket,
267 historical: &[&SignalBucket],
268 ) -> Option<Anomaly> {
269 if historical.is_empty() {
270 return None;
271 }
272
273 let current_count = current.summary.total_count;
274 let historical_avg: f64 = historical
275 .iter()
276 .map(|b| b.summary.total_count as f64)
277 .sum::<f64>()
278 / historical.len() as f64;
279
280 if historical_avg == 0.0 {
281 return None;
282 }
283
284 let spike_ratio = current_count as f64 / historical_avg;
285
286 if spike_ratio >= self.config.velocity_spike_threshold {
287 return Some(self.create_anomaly(
288 AnomalyType::VelocitySpike,
289 AnomalySeverity::Medium,
290 format!(
291 "Signal velocity spike: {:.1}x baseline ({} vs avg {:.0})",
292 spike_ratio, current_count, historical_avg
293 ),
294 SignalCategory::Behavioral,
295 Vec::new(),
296 Vec::new(),
297 AnomalyMetadata {
298 threshold: Some(self.config.velocity_spike_threshold),
299 actual: Some(spike_ratio),
300 ..Default::default()
301 },
302 ));
303 }
304
305 None
306 }
307
308 fn detect_session_sharing(&self, signals: &[Signal]) -> Vec<Anomaly> {
310 let mut anomalies = Vec::new();
311
312 let mut token_ips: HashMap<String, HashSet<String>> = HashMap::new();
314 for signal in signals {
315 if signal.category == SignalCategory::AuthToken {
316 token_ips
317 .entry(signal.value.clone())
318 .or_default()
319 .insert(signal.entity_id.clone());
320 }
321 }
322
323 for (token_hash, ips) in token_ips {
324 if ips.len() >= self.config.session_sharing_min_ips {
325 anomalies.push(self.create_anomaly(
326 AnomalyType::SessionSharing,
327 AnomalySeverity::High,
328 format!(
329 "Auth token shared across {} IPs: {}...",
330 ips.len(),
331 &token_hash[..8.min(token_hash.len())]
332 ),
333 SignalCategory::AuthToken,
334 Vec::new(),
335 ips.into_iter().collect(),
336 AnomalyMetadata {
337 token_hash_prefix: Some(token_hash[..16.min(token_hash.len())].to_string()),
338 ..Default::default()
339 },
340 ));
341 }
342 }
343
344 anomalies
345 }
346
347 fn detect_ja4_clusters(&self, signals: &[Signal]) -> Vec<Anomaly> {
349 let mut anomalies = Vec::new();
350
351 let mut ja4_ips: HashMap<String, HashSet<String>> = HashMap::new();
353 for signal in signals {
354 if signal.signal_type == SignalType::Ja4 {
355 ja4_ips
356 .entry(signal.value.clone())
357 .or_default()
358 .insert(signal.entity_id.clone());
359 }
360 }
361
362 for (ja4, ips) in ja4_ips {
363 let ip_count = ips.len();
364 if ip_count >= 10 {
365 anomalies.push(self.create_anomaly(
367 AnomalyType::Ja4IpCluster,
368 AnomalySeverity::Medium,
369 format!(
370 "JA4 fingerprint {} seen from {} IPs (potential bot farm)",
371 &ja4[..12.min(ja4.len())],
372 ip_count
373 ),
374 SignalCategory::Network,
375 Vec::new(),
376 ips.into_iter().collect(),
377 AnomalyMetadata {
378 ip_count: Some(ip_count),
379 ..Default::default()
380 },
381 ));
382 }
383 }
384
385 anomalies
386 }
387
388 fn detect_rotation_patterns(
390 &self,
391 current_signals: &[Signal],
392 _historical: &[&SignalBucket],
393 ) -> Vec<Anomaly> {
394 let mut anomalies = Vec::new();
395
396 let mut entity_values: HashMap<(String, SignalType), HashSet<String>> = HashMap::new();
398
399 for signal in current_signals {
400 if matches!(
401 signal.signal_type,
402 SignalType::Ja4 | SignalType::HttpFingerprint
403 ) {
404 entity_values
405 .entry((signal.entity_id.clone(), signal.signal_type))
406 .or_default()
407 .insert(signal.value.clone());
408 }
409 }
410
411 for ((entity_id, signal_type), values) in entity_values {
412 if values.len() >= self.config.rotation_min_changes {
413 let anomaly_type = match signal_type {
414 SignalType::Ja4 => AnomalyType::Ja4RotationPattern,
415 _ => AnomalyType::RotationPattern,
416 };
417
418 anomalies.push(self.create_anomaly(
419 anomaly_type,
420 AnomalySeverity::High,
421 format!(
422 "Systematic {:?} rotation: {} unique values from {}",
423 signal_type,
424 values.len(),
425 entity_id
426 ),
427 signal_type.category(),
428 Vec::new(),
429 vec![entity_id],
430 AnomalyMetadata {
431 change_count: Some(values.len()),
432 ..Default::default()
433 },
434 ));
435 }
436 }
437
438 anomalies
439 }
440
441 fn create_anomaly(
443 &self,
444 anomaly_type: AnomalyType,
445 severity: AnomalySeverity,
446 description: String,
447 category: SignalCategory,
448 signals: Vec<Signal>,
449 entities: Vec<String>,
450 metadata: AnomalyMetadata,
451 ) -> Anomaly {
452 Anomaly {
453 id: uuid::Uuid::new_v4().to_string(),
454 detected_at: chrono::Utc::now().timestamp_millis(),
455 category,
456 anomaly_type,
457 severity,
458 description,
459 signals,
460 entities,
461 metadata,
462 risk_applied: self.risk_scores.get(&anomaly_type).copied(),
463 }
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470
471 fn create_test_signal(
472 entity_id: &str,
473 signal_type: SignalType,
474 value: &str,
475 session_id: Option<&str>,
476 ) -> Signal {
477 Signal {
478 id: uuid::Uuid::new_v4().to_string(),
479 timestamp: chrono::Utc::now().timestamp_millis(),
480 category: signal_type.category(),
481 signal_type,
482 value: value.to_string(),
483 entity_id: entity_id.to_string(),
484 session_id: session_id.map(String::from),
485 metadata: super::super::types::SignalMetadata::default(),
486 }
487 }
488
489 #[test]
490 fn test_session_sharing_detection() {
491 let detector = AnomalyDetector::new(HashMap::new());
492
493 let signals = vec![
494 create_test_signal("ip-1", SignalType::Bearer, "token123", None),
495 create_test_signal("ip-2", SignalType::Bearer, "token123", None),
496 create_test_signal("ip-3", SignalType::Bearer, "token123", None),
497 ];
498
499 let anomalies = detector.detect_session_sharing(&signals);
500 assert_eq!(anomalies.len(), 1);
501 assert_eq!(anomalies[0].anomaly_type, AnomalyType::SessionSharing);
502 }
503
504 #[test]
505 fn test_ja4_cluster_detection() {
506 let detector = AnomalyDetector::new(HashMap::new());
507
508 let mut signals = Vec::new();
509 for i in 0..15 {
510 signals.push(create_test_signal(
511 &format!("ip-{}", i),
512 SignalType::Ja4,
513 "t13d1516h2_same_fingerprint",
514 None,
515 ));
516 }
517
518 let anomalies = detector.detect_ja4_clusters(&signals);
519 assert_eq!(anomalies.len(), 1);
520 assert_eq!(anomalies[0].anomaly_type, AnomalyType::Ja4IpCluster);
521 }
522
523 #[test]
524 fn test_rotation_pattern_detection() {
525 let mut config = AnomalyDetectorConfig::default();
526 config.rotation_min_changes = 3;
527
528 let detector = AnomalyDetector::with_config(config, HashMap::new());
529
530 let signals = vec![
531 create_test_signal("ip-1", SignalType::Ja4, "fingerprint-1", None),
532 create_test_signal("ip-1", SignalType::Ja4, "fingerprint-2", None),
533 create_test_signal("ip-1", SignalType::Ja4, "fingerprint-3", None),
534 ];
535
536 let anomalies = detector.detect_rotation_patterns(&signals, &[]);
537 assert_eq!(anomalies.len(), 1);
538 assert_eq!(anomalies[0].anomaly_type, AnomalyType::Ja4RotationPattern);
539 }
540}