1use dashmap::DashMap;
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7use std::time::Duration;
8
9use super::anomaly_detector::AnomalyDetector;
10use super::config::TrendsConfig;
11use super::correlation::{Correlation, CorrelationEngine, CorrelationQueryOptions};
12use super::signal_extractor::SignalExtractor;
13use super::time_store::{TimeStore, TimeStoreStats};
14use super::types::{
15 Anomaly, AnomalyMetadata, AnomalyQueryOptions, AnomalySeverity, AnomalyType, Signal,
16 SignalCategory, SignalTrend, TrendQueryOptions, TrendsSummary,
17};
18use crate::geo::{GeoLocation, ImpossibleTravelDetector, LoginEvent, TravelConfig};
19
20type RiskCallback = Box<dyn Fn(&str, u32, &str) + Send + Sync>;
22
23#[derive(Default)]
25pub struct TrendsManagerDependencies {
26 pub apply_risk: Option<RiskCallback>,
28}
29
30pub struct TrendsManager {
32 config: TrendsConfig,
33 store: RwLock<TimeStore>,
34 anomaly_detector: AnomalyDetector,
35 correlation_engine: CorrelationEngine,
36 anomalies: DashMap<String, Anomaly>,
37 recent_signals: DashMap<String, Vec<Signal>>,
38 dependencies: TrendsManagerDependencies,
39 shutdown: Arc<std::sync::atomic::AtomicBool>,
40 impossible_travel: RwLock<ImpossibleTravelDetector>,
42}
43
44impl TrendsManager {
45 pub fn new(config: TrendsConfig) -> Self {
47 let store = TimeStore::new(&config);
48 let anomaly_detector = AnomalyDetector::new(config.anomaly_risk.clone());
49 let correlation_engine = CorrelationEngine::new();
50 let impossible_travel = ImpossibleTravelDetector::new(TravelConfig::default());
51
52 Self {
53 config,
54 store: RwLock::new(store),
55 anomaly_detector,
56 correlation_engine,
57 anomalies: DashMap::new(),
58 recent_signals: DashMap::new(),
59 dependencies: TrendsManagerDependencies::default(),
60 shutdown: Arc::new(std::sync::atomic::AtomicBool::new(false)),
61 impossible_travel: RwLock::new(impossible_travel),
62 }
63 }
64
65 pub fn with_dependencies(
67 config: TrendsConfig,
68 dependencies: TrendsManagerDependencies,
69 ) -> Self {
70 let mut manager = Self::new(config);
71 manager.dependencies = dependencies;
72 manager
73 }
74
75 pub fn start_background_detection(&self) -> tokio::task::JoinHandle<()> {
77 let shutdown = Arc::clone(&self.shutdown);
78 let interval_ms = self.config.anomaly_check_interval_ms;
79
80 tokio::spawn(async move {
83 let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
84
85 loop {
86 interval.tick().await;
87
88 if shutdown.load(std::sync::atomic::Ordering::Relaxed) {
89 break;
90 }
91
92 }
94 })
95 }
96
97 pub fn record_request(
103 &self,
104 entity_id: &str,
105 session_id: Option<&str>,
106 user_agent: Option<&str>,
107 authorization: Option<&str>,
108 client_ip: Option<&str>,
109 ja4: Option<&str>,
110 ja4h: Option<&str>,
111 last_request_time: Option<i64>,
112 ) -> Vec<Signal> {
113 if !self.config.enabled {
114 return Vec::new();
115 }
116
117 let signals = SignalExtractor::extract(
118 entity_id,
119 session_id,
120 user_agent,
121 authorization,
122 client_ip,
123 ja4,
124 ja4h,
125 last_request_time,
126 );
127
128 for signal in &signals {
129 self.record_signal(signal.clone());
130 }
131
132 signals
133 }
134
135 pub fn record_signal(&self, signal: Signal) {
137 if !self.config.enabled {
138 return;
139 }
140
141 let entity_id = signal.entity_id.clone();
142
143 {
145 let mut store = self.store.write();
146 store.record(signal.clone());
147 }
148
149 self.track_recent_signal(&entity_id, signal.clone());
151
152 let recent = self.get_recent_signals(&entity_id);
154 if let Some(anomaly) = self.anomaly_detector.check_signal(&signal, &recent) {
155 self.handle_anomaly(anomaly);
156 }
157 }
158
159 pub fn record_payload_anomaly(
161 &self,
162 id: String,
163 anomaly_type: AnomalyType,
164 severity: AnomalySeverity,
165 detected_at: i64,
166 template: String,
167 entity_id: String,
168 description: String,
169 metadata: super::types::AnomalyMetadata,
170 ) {
171 if !self.config.enabled {
172 return;
173 }
174
175 let mut full_metadata = metadata;
176 full_metadata.template = Some(template);
177 full_metadata.source = Some("payload_profiler".to_string());
178
179 let anomaly = Anomaly {
180 id,
181 detected_at,
182 category: super::types::SignalCategory::Behavioral,
183 anomaly_type,
184 severity,
185 description,
186 signals: Vec::new(),
187 entities: vec![entity_id],
188 metadata: full_metadata,
189 risk_applied: self.config.anomaly_risk.get(&anomaly_type).copied(),
190 };
191
192 self.handle_anomaly(anomaly);
193 }
194
195 #[allow(clippy::too_many_arguments)]
221 pub fn record_login(
222 &self,
223 user_id: &str,
224 timestamp_ms: u64,
225 ip: &str,
226 latitude: f64,
227 longitude: f64,
228 country: &str,
229 country_code: &str,
230 city: Option<&str>,
231 accuracy_km: u32,
232 device_fingerprint: Option<&str>,
233 ) -> bool {
234 if !self.config.enabled {
235 return false;
236 }
237
238 let location = GeoLocation {
239 ip: ip.to_string(),
240 latitude,
241 longitude,
242 city: city.map(String::from),
243 country: country.to_string(),
244 country_code: country_code.to_string(),
245 accuracy_radius_km: accuracy_km,
246 };
247
248 let mut event = LoginEvent::new(user_id, timestamp_ms, location);
249 if let Some(fp) = device_fingerprint {
250 event = event.with_fingerprint(fp);
251 }
252
253 let alert = {
254 let mut detector = self.impossible_travel.write();
255 detector.check_login(&event)
256 };
257
258 if let Some(alert) = alert {
259 let severity = match alert.severity {
260 crate::geo::Severity::Low => AnomalySeverity::Low,
261 crate::geo::Severity::Medium => AnomalySeverity::Medium,
262 crate::geo::Severity::High => AnomalySeverity::High,
263 crate::geo::Severity::Critical => AnomalySeverity::Critical,
264 };
265
266 let anomaly = Anomaly {
267 id: uuid::Uuid::new_v4().to_string(),
268 detected_at: chrono::Utc::now().timestamp_millis(),
269 category: SignalCategory::Network, anomaly_type: AnomalyType::ImpossibleTravel,
271 severity,
272 description: format!(
273 "Impossible travel detected for {}: {} km in {:.2} hours ({:.0} km/h required)",
274 alert.user_id,
275 alert.distance_km as u64,
276 alert.time_diff_hours,
277 if alert.required_speed_kmh < 0.0 {
278 f64::INFINITY
279 } else {
280 alert.required_speed_kmh
281 }
282 ),
283 signals: Vec::new(),
284 entities: vec![ip.to_string()],
285 metadata: AnomalyMetadata {
286 previous_value: Some(alert.from_location.ip.clone()),
287 new_value: Some(alert.to_location.ip.clone()),
288 time_delta_ms: Some((alert.to_time - alert.from_time) as i64),
289 time_delta_minutes: Some(alert.time_diff_hours * 60.0),
290 threshold: Some(1000.0), actual: Some(alert.required_speed_kmh),
292 detection_method: Some("impossible_travel".to_string()),
293 ..Default::default()
294 },
295 risk_applied: self
296 .config
297 .anomaly_risk
298 .get(&AnomalyType::ImpossibleTravel)
299 .copied(),
300 };
301
302 self.handle_anomaly(anomaly);
303 return true;
304 }
305
306 false
307 }
308
309 pub fn whitelist_travel_route(&self, user_id: &str, country1: &str, country2: &str) {
314 let mut detector = self.impossible_travel.write();
315 detector.add_whitelist_route(user_id, country1, country2);
316 }
317
318 pub fn remove_travel_whitelist(&self, user_id: &str, country1: &str, country2: &str) {
320 let mut detector = self.impossible_travel.write();
321 detector.remove_whitelist_route(user_id, country1, country2);
322 }
323
324 pub fn travel_stats(&self) -> crate::geo::TravelStats {
326 let detector = self.impossible_travel.read();
327 detector.stats()
328 }
329
330 pub fn clear_travel_history(&self, user_id: &str) {
332 let mut detector = self.impossible_travel.write();
333 detector.clear_user(user_id);
334 }
335
336 pub fn get_summary(&self, options: TrendQueryOptions) -> TrendsSummary {
342 let store = self.store.read();
343 let mut summary = store.get_summary(&options);
344 summary.anomaly_count = self.anomalies.len();
345 summary
346 }
347
348 pub fn get_trends(&self, options: TrendQueryOptions) -> Vec<SignalTrend> {
350 let store = self.store.read();
351 store.get_trends(&options)
352 }
353
354 pub fn get_signals_for_entity(
356 &self,
357 entity_id: &str,
358 options: TrendQueryOptions,
359 ) -> Vec<Signal> {
360 let store = self.store.read();
361 store.get_signals_for_entity(entity_id, &options)
362 }
363
364 pub fn get_signals(&self, options: TrendQueryOptions) -> Vec<Signal> {
366 let store = self.store.read();
367 store.get_signals(&options)
368 }
369
370 pub fn get_anomalies(&self, options: AnomalyQueryOptions) -> Vec<Anomaly> {
376 let mut anomalies: Vec<Anomaly> = self
377 .anomalies
378 .iter()
379 .map(|r| r.value().clone())
380 .filter(|a| {
381 if let Some(severity) = options.severity {
382 if a.severity != severity {
383 return false;
384 }
385 }
386 if let Some(ref anomaly_type) = options.anomaly_type {
387 if &a.anomaly_type != anomaly_type {
388 return false;
389 }
390 }
391 if let Some(ref category) = options.category {
392 if &a.category != category {
393 return false;
394 }
395 }
396 if let Some(ref entity_id) = options.entity_id {
397 if !a.entities.contains(entity_id) {
398 return false;
399 }
400 }
401 if let Some(from) = options.from {
402 if a.detected_at < from {
403 return false;
404 }
405 }
406 if let Some(to) = options.to {
407 if a.detected_at > to {
408 return false;
409 }
410 }
411 true
412 })
413 .collect();
414
415 anomalies.sort_by(|a, b| b.detected_at.cmp(&a.detected_at));
417
418 if let Some(limit) = options.limit {
420 anomalies.truncate(limit);
421 }
422
423 anomalies
424 }
425
426 pub fn get_anomaly(&self, id: &str) -> Option<Anomaly> {
428 self.anomalies.get(id).map(|r| r.value().clone())
429 }
430
431 pub fn active_anomaly_count(&self) -> usize {
433 self.anomalies.len()
434 }
435
436 pub fn get_correlations(&self, options: CorrelationQueryOptions) -> Vec<Correlation> {
442 let signals = self.get_signals(TrendQueryOptions {
443 from: options.from,
444 to: options.to,
445 entity_id: options.entity_id.clone(),
446 signal_type: options.signal_type,
447 ..Default::default()
448 });
449
450 self.correlation_engine
451 .find_correlations(&signals, &options)
452 }
453
454 pub fn get_entity_correlations(
456 &self,
457 entity_id: &str,
458 options: CorrelationQueryOptions,
459 ) -> Vec<Correlation> {
460 let mut opts = options;
461 opts.entity_id = Some(entity_id.to_string());
462 self.get_correlations(opts)
463 }
464
465 pub fn stats(&self) -> TrendsManagerStats {
471 let store = self.store.read();
472 let store_stats = store.get_stats();
473
474 TrendsManagerStats {
475 enabled: self.config.enabled,
476 store: store_stats,
477 anomaly_count: self.anomalies.len(),
478 recent_signals_cached: self.recent_signals.len(),
479 bucket_size_ms: self.config.bucket_size_ms,
480 retention_hours: self.config.retention_hours,
481 }
482 }
483
484 pub fn stats_snapshot(&self) -> TrendsStats {
486 let stats = self.stats();
487 TrendsStats {
488 enabled: stats.enabled,
489 total_signals: stats.store.total_signals,
490 bucket_count: stats.store.bucket_count,
491 entity_count: stats.store.entity_count,
492 anomaly_count: stats.anomaly_count,
493 }
494 }
495
496 pub fn clear(&self) {
502 let mut store = self.store.write();
503 store.clear();
504 self.anomalies.clear();
505 self.recent_signals.clear();
506 }
507
508 pub fn cleanup(&self) {
510 {
511 let mut store = self.store.write();
512 store.cleanup();
513 }
514 self.cleanup_old_anomalies();
515 self.cleanup_recent_signals();
516 }
517
518 pub fn destroy(&self) {
520 self.shutdown
521 .store(true, std::sync::atomic::Ordering::Relaxed);
522 let mut store = self.store.write();
523 store.destroy();
524 }
525
526 pub fn is_enabled(&self) -> bool {
528 self.config.enabled
529 }
530
531 fn track_recent_signal(&self, entity_id: &str, signal: Signal) {
536 let mut entry = self
537 .recent_signals
538 .entry(entity_id.to_string())
539 .or_insert_with(Vec::new);
540 entry.push(signal);
541
542 if entry.len() > self.config.max_recent_signals {
544 entry.remove(0);
545 }
546 }
547
548 fn get_recent_signals(&self, entity_id: &str) -> Vec<Signal> {
549 self.recent_signals
550 .get(entity_id)
551 .map(|r| r.value().clone())
552 .unwrap_or_default()
553 }
554
555 fn handle_anomaly(&self, anomaly: Anomaly) {
556 if self.anomalies.contains_key(&anomaly.id) {
558 return;
559 }
560
561 if let Some(risk) = anomaly.risk_applied {
563 if risk > 0 {
564 if let Some(ref apply_risk) = self.dependencies.apply_risk {
565 for entity_id in &anomaly.entities {
566 apply_risk(
567 entity_id,
568 risk,
569 &format!("Anomaly: {}", anomaly.anomaly_type),
570 );
571 }
572 }
573 }
574 }
575
576 tracing::info!(
577 "Anomaly detected: {} ({:?}) - {}",
578 anomaly.anomaly_type,
579 anomaly.severity,
580 anomaly.description
581 );
582
583 self.anomalies.insert(anomaly.id.clone(), anomaly);
584 }
585
586 fn cleanup_old_anomalies(&self) {
587 let cutoff = chrono::Utc::now().timestamp_millis()
588 - (self.config.retention_hours as i64 * 60 * 60 * 1000);
589
590 self.anomalies.retain(|_, v| v.detected_at >= cutoff);
591
592 if self.anomalies.len() > self.config.max_anomalies {
594 let mut entries: Vec<_> = self
595 .anomalies
596 .iter()
597 .map(|r| (r.key().clone(), r.value().detected_at))
598 .collect();
599 entries.sort_by(|a, b| b.1.cmp(&a.1));
600
601 let to_remove: Vec<_> = entries
602 .into_iter()
603 .skip(self.config.max_anomalies)
604 .map(|(k, _)| k)
605 .collect();
606
607 for key in to_remove {
608 self.anomalies.remove(&key);
609 }
610 }
611 }
612
613 fn cleanup_recent_signals(&self) {
614 let cutoff = chrono::Utc::now().timestamp_millis() - 10 * 60 * 1000; self.recent_signals.retain(|_, signals| {
617 signals.retain(|s| s.timestamp > cutoff);
618 !signals.is_empty()
619 });
620 }
621}
622
623#[derive(Debug, Clone)]
625pub struct TrendsManagerStats {
626 pub enabled: bool,
627 pub store: TimeStoreStats,
628 pub anomaly_count: usize,
629 pub recent_signals_cached: usize,
630 pub bucket_size_ms: u64,
631 pub retention_hours: u32,
632}
633
634#[derive(Debug, Clone, Serialize, Deserialize)]
636pub struct TrendsStats {
637 pub enabled: bool,
638 pub total_signals: usize,
639 pub bucket_count: usize,
640 pub entity_count: usize,
641 pub anomaly_count: usize,
642}
643
644#[cfg(test)]
645mod tests {
646 use super::*;
647
648 #[test]
649 fn test_manager_creation() {
650 let config = TrendsConfig::default();
651 let manager = TrendsManager::new(config);
652 assert!(manager.is_enabled());
653 }
654
655 #[test]
656 fn test_disabled_manager() {
657 let config = TrendsConfig::disabled();
658 let manager = TrendsManager::new(config);
659 assert!(!manager.is_enabled());
660
661 let signals = manager.record_request(
663 "entity-1",
664 None,
665 None,
666 None,
667 Some("192.168.1.1"),
668 None,
669 None,
670 None,
671 );
672 assert!(signals.is_empty());
673 }
674
675 #[test]
676 fn test_record_and_query() {
677 let config = TrendsConfig::default();
678 let manager = TrendsManager::new(config);
679
680 manager.record_request(
681 "entity-1",
682 None,
683 Some("Mozilla/5.0"),
684 None,
685 Some("192.168.1.100"),
686 Some("t13d1516h2_abc123"),
687 None,
688 None,
689 );
690
691 let stats = manager.stats();
692 assert!(stats.store.total_signals > 0);
693 }
694
695 #[test]
696 fn test_anomaly_query() {
697 let config = TrendsConfig::default();
698 let manager = TrendsManager::new(config);
699
700 let anomalies = manager.get_anomalies(AnomalyQueryOptions::default());
702 assert!(anomalies.is_empty());
703 }
704
705 #[test]
706 fn test_cleanup() {
707 let config = TrendsConfig::default();
708 let manager = TrendsManager::new(config);
709
710 manager.record_request(
711 "entity-1",
712 None,
713 None,
714 None,
715 Some("192.168.1.1"),
716 None,
717 None,
718 None,
719 );
720
721 manager.cleanup();
722
723 let stats = manager.stats();
725 assert!(stats.enabled);
726 }
727}