Skip to main content

synapse_pingora/trends/
manager.rs

1//! TrendsManager - Coordinator for fingerprint trends and anomaly detection.
2
3use dashmap::DashMap;
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7use std::time::Duration;
8
9use super::anomaly_detector::AnomalyDetector;
10use super::config::TrendsConfig;
11use super::correlation::{Correlation, CorrelationEngine, CorrelationQueryOptions};
12use super::signal_extractor::SignalExtractor;
13use super::time_store::{TimeStore, TimeStoreStats};
14use super::types::{
15    Anomaly, AnomalyMetadata, AnomalyQueryOptions, AnomalySeverity, AnomalyType, Signal,
16    SignalCategory, SignalTrend, TrendQueryOptions, TrendsSummary,
17};
18use crate::geo::{GeoLocation, ImpossibleTravelDetector, LoginEvent, TravelConfig};
19
20/// Callback to apply risk: (entity_id, risk_score, reason)
21type RiskCallback = Box<dyn Fn(&str, u32, &str) + Send + Sync>;
22
23/// Dependencies for the trends manager.
24#[derive(Default)]
25pub struct TrendsManagerDependencies {
26    /// Callback to apply risk to an entity
27    pub apply_risk: Option<RiskCallback>,
28}
29
30/// High-level trends manager.
31pub struct TrendsManager {
32    config: TrendsConfig,
33    store: RwLock<TimeStore>,
34    anomaly_detector: AnomalyDetector,
35    correlation_engine: CorrelationEngine,
36    anomalies: DashMap<String, Anomaly>,
37    recent_signals: DashMap<String, Vec<Signal>>,
38    dependencies: TrendsManagerDependencies,
39    shutdown: Arc<std::sync::atomic::AtomicBool>,
40    /// Impossible travel detector for geographic anomaly detection.
41    impossible_travel: RwLock<ImpossibleTravelDetector>,
42}
43
44impl TrendsManager {
45    /// Create a new trends manager.
46    pub fn new(config: TrendsConfig) -> Self {
47        let store = TimeStore::new(&config);
48        let anomaly_detector = AnomalyDetector::new(config.anomaly_risk.clone());
49        let correlation_engine = CorrelationEngine::new();
50        let impossible_travel = ImpossibleTravelDetector::new(TravelConfig::default());
51
52        Self {
53            config,
54            store: RwLock::new(store),
55            anomaly_detector,
56            correlation_engine,
57            anomalies: DashMap::new(),
58            recent_signals: DashMap::new(),
59            dependencies: TrendsManagerDependencies::default(),
60            shutdown: Arc::new(std::sync::atomic::AtomicBool::new(false)),
61            impossible_travel: RwLock::new(impossible_travel),
62        }
63    }
64
65    /// Create with dependencies.
66    pub fn with_dependencies(
67        config: TrendsConfig,
68        dependencies: TrendsManagerDependencies,
69    ) -> Self {
70        let mut manager = Self::new(config);
71        manager.dependencies = dependencies;
72        manager
73    }
74
75    /// Start background anomaly detection.
76    pub fn start_background_detection(&self) -> tokio::task::JoinHandle<()> {
77        let shutdown = Arc::clone(&self.shutdown);
78        let interval_ms = self.config.anomaly_check_interval_ms;
79
80        // Note: In production, this would spawn a task that runs detection
81        // For now, return a dummy task
82        tokio::spawn(async move {
83            let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
84
85            loop {
86                interval.tick().await;
87
88                if shutdown.load(std::sync::atomic::Ordering::Relaxed) {
89                    break;
90                }
91
92                // Batch detection would run here
93            }
94        })
95    }
96
97    // --------------------------------------------------------------------------
98    // Signal Recording
99    // --------------------------------------------------------------------------
100
101    /// Extract and record signals from request context.
102    pub fn record_request(
103        &self,
104        entity_id: &str,
105        session_id: Option<&str>,
106        user_agent: Option<&str>,
107        authorization: Option<&str>,
108        client_ip: Option<&str>,
109        ja4: Option<&str>,
110        ja4h: Option<&str>,
111        last_request_time: Option<i64>,
112    ) -> Vec<Signal> {
113        if !self.config.enabled {
114            return Vec::new();
115        }
116
117        let signals = SignalExtractor::extract(
118            entity_id,
119            session_id,
120            user_agent,
121            authorization,
122            client_ip,
123            ja4,
124            ja4h,
125            last_request_time,
126        );
127
128        for signal in &signals {
129            self.record_signal(signal.clone());
130        }
131
132        signals
133    }
134
135    /// Record a single signal.
136    pub fn record_signal(&self, signal: Signal) {
137        if !self.config.enabled {
138            return;
139        }
140
141        let entity_id = signal.entity_id.clone();
142
143        // Store in time-series
144        {
145            let mut store = self.store.write();
146            store.record(signal.clone());
147        }
148
149        // Track recent signals for real-time anomaly detection
150        self.track_recent_signal(&entity_id, signal.clone());
151
152        // Real-time anomaly check
153        let recent = self.get_recent_signals(&entity_id);
154        if let Some(anomaly) = self.anomaly_detector.check_signal(&signal, &recent) {
155            self.handle_anomaly(anomaly);
156        }
157    }
158
159    /// Record a payload anomaly.
160    pub fn record_payload_anomaly(
161        &self,
162        id: String,
163        anomaly_type: AnomalyType,
164        severity: AnomalySeverity,
165        detected_at: i64,
166        template: String,
167        entity_id: String,
168        description: String,
169        metadata: super::types::AnomalyMetadata,
170    ) {
171        if !self.config.enabled {
172            return;
173        }
174
175        let mut full_metadata = metadata;
176        full_metadata.template = Some(template);
177        full_metadata.source = Some("payload_profiler".to_string());
178
179        let anomaly = Anomaly {
180            id,
181            detected_at,
182            category: super::types::SignalCategory::Behavioral,
183            anomaly_type,
184            severity,
185            description,
186            signals: Vec::new(),
187            entities: vec![entity_id],
188            metadata: full_metadata,
189            risk_applied: self.config.anomaly_risk.get(&anomaly_type).copied(),
190        };
191
192        self.handle_anomaly(anomaly);
193    }
194
195    // --------------------------------------------------------------------------
196    // Impossible Travel Detection
197    // --------------------------------------------------------------------------
198
199    /// Record a login event for impossible travel detection.
200    ///
201    /// Checks if the user's login pattern indicates geographically impossible travel
202    /// (e.g., logging in from NYC then London within 10 minutes).
203    ///
204    /// # Arguments
205    ///
206    /// * `user_id` - User identifier (session subject, user ID, or email)
207    /// * `timestamp_ms` - Unix timestamp in milliseconds
208    /// * `ip` - IP address of the login
209    /// * `latitude` - Latitude from GeoIP lookup
210    /// * `longitude` - Longitude from GeoIP lookup
211    /// * `country` - Country name
212    /// * `country_code` - ISO country code
213    /// * `city` - Optional city name
214    /// * `accuracy_km` - GeoIP accuracy radius in km
215    /// * `device_fingerprint` - Optional device fingerprint for correlation
216    ///
217    /// # Returns
218    ///
219    /// `true` if an impossible travel alert was generated.
220    #[allow(clippy::too_many_arguments)]
221    pub fn record_login(
222        &self,
223        user_id: &str,
224        timestamp_ms: u64,
225        ip: &str,
226        latitude: f64,
227        longitude: f64,
228        country: &str,
229        country_code: &str,
230        city: Option<&str>,
231        accuracy_km: u32,
232        device_fingerprint: Option<&str>,
233    ) -> bool {
234        if !self.config.enabled {
235            return false;
236        }
237
238        let location = GeoLocation {
239            ip: ip.to_string(),
240            latitude,
241            longitude,
242            city: city.map(String::from),
243            country: country.to_string(),
244            country_code: country_code.to_string(),
245            accuracy_radius_km: accuracy_km,
246        };
247
248        let mut event = LoginEvent::new(user_id, timestamp_ms, location);
249        if let Some(fp) = device_fingerprint {
250            event = event.with_fingerprint(fp);
251        }
252
253        let alert = {
254            let mut detector = self.impossible_travel.write();
255            detector.check_login(&event)
256        };
257
258        if let Some(alert) = alert {
259            let severity = match alert.severity {
260                crate::geo::Severity::Low => AnomalySeverity::Low,
261                crate::geo::Severity::Medium => AnomalySeverity::Medium,
262                crate::geo::Severity::High => AnomalySeverity::High,
263                crate::geo::Severity::Critical => AnomalySeverity::Critical,
264            };
265
266            let anomaly = Anomaly {
267                id: uuid::Uuid::new_v4().to_string(),
268                detected_at: chrono::Utc::now().timestamp_millis(),
269                category: SignalCategory::Network, // Geographic anomalies are network-related
270                anomaly_type: AnomalyType::ImpossibleTravel,
271                severity,
272                description: format!(
273                    "Impossible travel detected for {}: {} km in {:.2} hours ({:.0} km/h required)",
274                    alert.user_id,
275                    alert.distance_km as u64,
276                    alert.time_diff_hours,
277                    if alert.required_speed_kmh < 0.0 {
278                        f64::INFINITY
279                    } else {
280                        alert.required_speed_kmh
281                    }
282                ),
283                signals: Vec::new(),
284                entities: vec![ip.to_string()],
285                metadata: AnomalyMetadata {
286                    previous_value: Some(alert.from_location.ip.clone()),
287                    new_value: Some(alert.to_location.ip.clone()),
288                    time_delta_ms: Some((alert.to_time - alert.from_time) as i64),
289                    time_delta_minutes: Some(alert.time_diff_hours * 60.0),
290                    threshold: Some(1000.0), // max speed threshold
291                    actual: Some(alert.required_speed_kmh),
292                    detection_method: Some("impossible_travel".to_string()),
293                    ..Default::default()
294                },
295                risk_applied: self
296                    .config
297                    .anomaly_risk
298                    .get(&AnomalyType::ImpossibleTravel)
299                    .copied(),
300            };
301
302            self.handle_anomaly(anomaly);
303            return true;
304        }
305
306        false
307    }
308
309    /// Add a whitelisted travel route for a user.
310    ///
311    /// Known travel patterns (e.g., home <-> work across countries) can be whitelisted
312    /// to prevent false positives.
313    pub fn whitelist_travel_route(&self, user_id: &str, country1: &str, country2: &str) {
314        let mut detector = self.impossible_travel.write();
315        detector.add_whitelist_route(user_id, country1, country2);
316    }
317
318    /// Remove a whitelisted travel route.
319    pub fn remove_travel_whitelist(&self, user_id: &str, country1: &str, country2: &str) {
320        let mut detector = self.impossible_travel.write();
321        detector.remove_whitelist_route(user_id, country1, country2);
322    }
323
324    /// Get impossible travel detection statistics.
325    pub fn travel_stats(&self) -> crate::geo::TravelStats {
326        let detector = self.impossible_travel.read();
327        detector.stats()
328    }
329
330    /// Clear impossible travel history for a user.
331    pub fn clear_travel_history(&self, user_id: &str) {
332        let mut detector = self.impossible_travel.write();
333        detector.clear_user(user_id);
334    }
335
336    // --------------------------------------------------------------------------
337    // Trend Queries
338    // --------------------------------------------------------------------------
339
340    /// Get overall trends summary.
341    pub fn get_summary(&self, options: TrendQueryOptions) -> TrendsSummary {
342        let store = self.store.read();
343        let mut summary = store.get_summary(&options);
344        summary.anomaly_count = self.anomalies.len();
345        summary
346    }
347
348    /// Get detailed trends by type.
349    pub fn get_trends(&self, options: TrendQueryOptions) -> Vec<SignalTrend> {
350        let store = self.store.read();
351        store.get_trends(&options)
352    }
353
354    /// Get signals for an entity.
355    pub fn get_signals_for_entity(
356        &self,
357        entity_id: &str,
358        options: TrendQueryOptions,
359    ) -> Vec<Signal> {
360        let store = self.store.read();
361        store.get_signals_for_entity(entity_id, &options)
362    }
363
364    /// Get all signals matching criteria.
365    pub fn get_signals(&self, options: TrendQueryOptions) -> Vec<Signal> {
366        let store = self.store.read();
367        store.get_signals(&options)
368    }
369
370    // --------------------------------------------------------------------------
371    // Anomaly Queries
372    // --------------------------------------------------------------------------
373
374    /// Get anomalies matching criteria.
375    pub fn get_anomalies(&self, options: AnomalyQueryOptions) -> Vec<Anomaly> {
376        let mut anomalies: Vec<Anomaly> = self
377            .anomalies
378            .iter()
379            .map(|r| r.value().clone())
380            .filter(|a| {
381                if let Some(severity) = options.severity {
382                    if a.severity != severity {
383                        return false;
384                    }
385                }
386                if let Some(ref anomaly_type) = options.anomaly_type {
387                    if &a.anomaly_type != anomaly_type {
388                        return false;
389                    }
390                }
391                if let Some(ref category) = options.category {
392                    if &a.category != category {
393                        return false;
394                    }
395                }
396                if let Some(ref entity_id) = options.entity_id {
397                    if !a.entities.contains(entity_id) {
398                        return false;
399                    }
400                }
401                if let Some(from) = options.from {
402                    if a.detected_at < from {
403                        return false;
404                    }
405                }
406                if let Some(to) = options.to {
407                    if a.detected_at > to {
408                        return false;
409                    }
410                }
411                true
412            })
413            .collect();
414
415        // Sort by detection time (newest first)
416        anomalies.sort_by(|a, b| b.detected_at.cmp(&a.detected_at));
417
418        // Apply limit
419        if let Some(limit) = options.limit {
420            anomalies.truncate(limit);
421        }
422
423        anomalies
424    }
425
426    /// Get a specific anomaly by ID.
427    pub fn get_anomaly(&self, id: &str) -> Option<Anomaly> {
428        self.anomalies.get(id).map(|r| r.value().clone())
429    }
430
431    /// Get count of active anomalies.
432    pub fn active_anomaly_count(&self) -> usize {
433        self.anomalies.len()
434    }
435
436    // --------------------------------------------------------------------------
437    // Correlation Queries
438    // --------------------------------------------------------------------------
439
440    /// Get correlations matching criteria.
441    pub fn get_correlations(&self, options: CorrelationQueryOptions) -> Vec<Correlation> {
442        let signals = self.get_signals(TrendQueryOptions {
443            from: options.from,
444            to: options.to,
445            entity_id: options.entity_id.clone(),
446            signal_type: options.signal_type,
447            ..Default::default()
448        });
449
450        self.correlation_engine
451            .find_correlations(&signals, &options)
452    }
453
454    /// Get correlations for a specific entity.
455    pub fn get_entity_correlations(
456        &self,
457        entity_id: &str,
458        options: CorrelationQueryOptions,
459    ) -> Vec<Correlation> {
460        let mut opts = options;
461        opts.entity_id = Some(entity_id.to_string());
462        self.get_correlations(opts)
463    }
464
465    // --------------------------------------------------------------------------
466    // Statistics
467    // --------------------------------------------------------------------------
468
469    /// Get manager statistics.
470    pub fn stats(&self) -> TrendsManagerStats {
471        let store = self.store.read();
472        let store_stats = store.get_stats();
473
474        TrendsManagerStats {
475            enabled: self.config.enabled,
476            store: store_stats,
477            anomaly_count: self.anomalies.len(),
478            recent_signals_cached: self.recent_signals.len(),
479            bucket_size_ms: self.config.bucket_size_ms,
480            retention_hours: self.config.retention_hours,
481        }
482    }
483
484    /// Get a stats snapshot for API responses.
485    pub fn stats_snapshot(&self) -> TrendsStats {
486        let stats = self.stats();
487        TrendsStats {
488            enabled: stats.enabled,
489            total_signals: stats.store.total_signals,
490            bucket_count: stats.store.bucket_count,
491            entity_count: stats.store.entity_count,
492            anomaly_count: stats.anomaly_count,
493        }
494    }
495
496    // --------------------------------------------------------------------------
497    // Lifecycle
498    // --------------------------------------------------------------------------
499
500    /// Clear all data.
501    pub fn clear(&self) {
502        let mut store = self.store.write();
503        store.clear();
504        self.anomalies.clear();
505        self.recent_signals.clear();
506    }
507
508    /// Cleanup old data.
509    pub fn cleanup(&self) {
510        {
511            let mut store = self.store.write();
512            store.cleanup();
513        }
514        self.cleanup_old_anomalies();
515        self.cleanup_recent_signals();
516    }
517
518    /// Shutdown the manager.
519    pub fn destroy(&self) {
520        self.shutdown
521            .store(true, std::sync::atomic::Ordering::Relaxed);
522        let mut store = self.store.write();
523        store.destroy();
524    }
525
526    /// Check if enabled.
527    pub fn is_enabled(&self) -> bool {
528        self.config.enabled
529    }
530
531    // --------------------------------------------------------------------------
532    // Private
533    // --------------------------------------------------------------------------
534
535    fn track_recent_signal(&self, entity_id: &str, signal: Signal) {
536        let mut entry = self
537            .recent_signals
538            .entry(entity_id.to_string())
539            .or_insert_with(Vec::new);
540        entry.push(signal);
541
542        // Keep only recent signals
543        if entry.len() > self.config.max_recent_signals {
544            entry.remove(0);
545        }
546    }
547
548    fn get_recent_signals(&self, entity_id: &str) -> Vec<Signal> {
549        self.recent_signals
550            .get(entity_id)
551            .map(|r| r.value().clone())
552            .unwrap_or_default()
553    }
554
555    fn handle_anomaly(&self, anomaly: Anomaly) {
556        // Dedupe by ID
557        if self.anomalies.contains_key(&anomaly.id) {
558            return;
559        }
560
561        // Apply risk if configured
562        if let Some(risk) = anomaly.risk_applied {
563            if risk > 0 {
564                if let Some(ref apply_risk) = self.dependencies.apply_risk {
565                    for entity_id in &anomaly.entities {
566                        apply_risk(
567                            entity_id,
568                            risk,
569                            &format!("Anomaly: {}", anomaly.anomaly_type),
570                        );
571                    }
572                }
573            }
574        }
575
576        tracing::info!(
577            "Anomaly detected: {} ({:?}) - {}",
578            anomaly.anomaly_type,
579            anomaly.severity,
580            anomaly.description
581        );
582
583        self.anomalies.insert(anomaly.id.clone(), anomaly);
584    }
585
586    fn cleanup_old_anomalies(&self) {
587        let cutoff = chrono::Utc::now().timestamp_millis()
588            - (self.config.retention_hours as i64 * 60 * 60 * 1000);
589
590        self.anomalies.retain(|_, v| v.detected_at >= cutoff);
591
592        // Cap at max anomalies
593        if self.anomalies.len() > self.config.max_anomalies {
594            let mut entries: Vec<_> = self
595                .anomalies
596                .iter()
597                .map(|r| (r.key().clone(), r.value().detected_at))
598                .collect();
599            entries.sort_by(|a, b| b.1.cmp(&a.1));
600
601            let to_remove: Vec<_> = entries
602                .into_iter()
603                .skip(self.config.max_anomalies)
604                .map(|(k, _)| k)
605                .collect();
606
607            for key in to_remove {
608                self.anomalies.remove(&key);
609            }
610        }
611    }
612
613    fn cleanup_recent_signals(&self) {
614        let cutoff = chrono::Utc::now().timestamp_millis() - 10 * 60 * 1000; // 10 minutes
615
616        self.recent_signals.retain(|_, signals| {
617            signals.retain(|s| s.timestamp > cutoff);
618            !signals.is_empty()
619        });
620    }
621}
622
623/// Statistics for the trends manager.
624#[derive(Debug, Clone)]
625pub struct TrendsManagerStats {
626    pub enabled: bool,
627    pub store: TimeStoreStats,
628    pub anomaly_count: usize,
629    pub recent_signals_cached: usize,
630    pub bucket_size_ms: u64,
631    pub retention_hours: u32,
632}
633
634/// Lightweight stats snapshot.
635#[derive(Debug, Clone, Serialize, Deserialize)]
636pub struct TrendsStats {
637    pub enabled: bool,
638    pub total_signals: usize,
639    pub bucket_count: usize,
640    pub entity_count: usize,
641    pub anomaly_count: usize,
642}
643
644#[cfg(test)]
645mod tests {
646    use super::*;
647
648    #[test]
649    fn test_manager_creation() {
650        let config = TrendsConfig::default();
651        let manager = TrendsManager::new(config);
652        assert!(manager.is_enabled());
653    }
654
655    #[test]
656    fn test_disabled_manager() {
657        let config = TrendsConfig::disabled();
658        let manager = TrendsManager::new(config);
659        assert!(!manager.is_enabled());
660
661        // Recording should be no-op
662        let signals = manager.record_request(
663            "entity-1",
664            None,
665            None,
666            None,
667            Some("192.168.1.1"),
668            None,
669            None,
670            None,
671        );
672        assert!(signals.is_empty());
673    }
674
675    #[test]
676    fn test_record_and_query() {
677        let config = TrendsConfig::default();
678        let manager = TrendsManager::new(config);
679
680        manager.record_request(
681            "entity-1",
682            None,
683            Some("Mozilla/5.0"),
684            None,
685            Some("192.168.1.100"),
686            Some("t13d1516h2_abc123"),
687            None,
688            None,
689        );
690
691        let stats = manager.stats();
692        assert!(stats.store.total_signals > 0);
693    }
694
695    #[test]
696    fn test_anomaly_query() {
697        let config = TrendsConfig::default();
698        let manager = TrendsManager::new(config);
699
700        // Initially no anomalies
701        let anomalies = manager.get_anomalies(AnomalyQueryOptions::default());
702        assert!(anomalies.is_empty());
703    }
704
705    #[test]
706    fn test_cleanup() {
707        let config = TrendsConfig::default();
708        let manager = TrendsManager::new(config);
709
710        manager.record_request(
711            "entity-1",
712            None,
713            None,
714            None,
715            Some("192.168.1.1"),
716            None,
717            None,
718            None,
719        );
720
721        manager.cleanup();
722
723        // Should still work after cleanup
724        let stats = manager.stats();
725        assert!(stats.enabled);
726    }
727}