Skip to main content

netwatch_rs/
network_intelligence.rs

1use serde::{Deserialize, Serialize};
2use std::collections::{HashMap, HashSet, VecDeque};
3use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
4use std::time::{Duration, SystemTime};
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
7pub struct GeoIpInfo {
8    pub country: String,
9    pub country_code: String,
10    pub city: String,
11    pub region: String,
12    pub is_internal: bool,
13    pub is_suspicious: bool,
14    pub threat_level: ThreatLevel,
15    pub organization: String,
16    pub asn: u32,
17}
18
19#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
20pub enum ThreatLevel {
21    Clean,
22    Suspicious,
23    Malicious,
24    Critical,
25}
26
27#[derive(Debug, Clone)]
28pub struct ConnectionIntelligence {
29    pub remote_ip: IpAddr,
30    pub local_port: u16,
31    pub remote_port: u16,
32    pub protocol: String,
33    pub service_name: String,
34    pub geo_info: Option<GeoIpInfo>,
35    pub connection_duration: Duration,
36    pub bytes_transferred: u64,
37    pub packet_count: u64,
38    pub first_seen: SystemTime,
39    pub last_activity: SystemTime,
40    pub is_outbound: bool,
41    pub threat_indicators: Vec<ThreatIndicator>,
42}
43
44#[derive(Debug, Clone)]
45pub enum ThreatIndicator {
46    PortScanAttempt {
47        ports_scanned: u16,
48        time_window: Duration,
49    },
50    UnusualTrafficVolume {
51        bytes_per_second: u64,
52        baseline: u64,
53    },
54    SuspiciousPort {
55        port: u16,
56        reason: String,
57    },
58    GeoAnomalyConnection {
59        country: String,
60        reason: String,
61    },
62    RapidConnections {
63        count: u32,
64        time_window: Duration,
65    },
66    LongLivedConnection {
67        duration: Duration,
68    },
69    HighBandwidthUsage {
70        bandwidth: u64,
71        threshold: u64,
72    },
73}
74
75#[derive(Debug, Clone)]
76pub struct PortScanDetection {
77    pub scanner_ip: IpAddr,
78    pub ports_scanned: HashSet<u16>,
79    pub scan_start_time: SystemTime,
80    pub scan_duration: Duration,
81    pub scan_rate: f64,  // ports per second
82    pub confidence: f64, // 0.0 to 1.0
83}
84
85#[derive(Debug, Clone)]
86pub struct NetworkAnomaly {
87    pub anomaly_type: AnomalyType,
88    pub severity: Severity,
89    pub description: String,
90    pub affected_ip: Option<IpAddr>,
91    pub affected_port: Option<u16>,
92    pub detected_at: SystemTime,
93    pub confidence: f64,
94    pub metrics: HashMap<String, f64>,
95}
96
97#[derive(Debug, Clone)]
98pub enum AnomalyType {
99    PortScan,
100    TrafficSpike,
101    UnusualGeoLocation,
102    SuspiciousProtocol,
103    BandwidthAnomaly,
104    ConnectionFlood,
105    DnsAnomaly,
106    TunnelDetection,
107}
108
109#[derive(Debug, Clone)]
110pub enum Severity {
111    Info,
112    Low,
113    Medium,
114    High,
115    Critical,
116}
117
118pub struct NetworkIntelligenceEngine {
119    connection_history: VecDeque<ConnectionIntelligence>,
120    geo_cache: HashMap<IpAddr, GeoIpInfo>,
121    port_scan_detectors: HashMap<IpAddr, PortScanDetection>,
122    anomalies: VecDeque<NetworkAnomaly>,
123    #[allow(dead_code)]
124    traffic_baselines: HashMap<String, TrafficBaseline>,
125    known_services: HashMap<u16, String>,
126    suspicious_ips: HashSet<IpAddr>,
127    internal_networks: Vec<(IpAddr, u8)>, // CIDR notation
128}
129
130#[derive(Debug, Clone)]
131#[allow(dead_code)]
132struct TrafficBaseline {
133    average_bps: f64,
134    std_deviation: f64,
135    samples: VecDeque<(SystemTime, u64)>,
136    last_updated: SystemTime,
137}
138
139impl NetworkIntelligenceEngine {
140    pub fn new() -> Self {
141        let mut engine = Self {
142            connection_history: VecDeque::with_capacity(10000),
143            geo_cache: HashMap::new(),
144            port_scan_detectors: HashMap::new(),
145            anomalies: VecDeque::with_capacity(1000),
146            traffic_baselines: HashMap::new(),
147            known_services: Self::initialize_known_services(),
148            suspicious_ips: HashSet::new(),
149            internal_networks: Self::initialize_internal_networks(),
150        };
151
152        // Pre-populate with some threat intelligence
153        engine.load_threat_intelligence();
154
155        engine
156    }
157
158    fn initialize_known_services() -> HashMap<u16, String> {
159        let mut services = HashMap::new();
160
161        // Well-known ports
162        services.insert(22, "SSH".to_string());
163        services.insert(23, "Telnet".to_string());
164        services.insert(25, "SMTP".to_string());
165        services.insert(53, "DNS".to_string());
166        services.insert(80, "HTTP".to_string());
167        services.insert(110, "POP3".to_string());
168        services.insert(143, "IMAP".to_string());
169        services.insert(443, "HTTPS".to_string());
170        services.insert(993, "IMAPS".to_string());
171        services.insert(995, "POP3S".to_string());
172        services.insert(3389, "RDP".to_string());
173        services.insert(5432, "PostgreSQL".to_string());
174        services.insert(3306, "MySQL".to_string());
175        services.insert(27017, "MongoDB".to_string());
176        services.insert(6379, "Redis".to_string());
177        services.insert(9200, "Elasticsearch".to_string());
178        services.insert(8080, "HTTP-Alt".to_string());
179        services.insert(8443, "HTTPS-Alt".to_string());
180
181        // Common malicious ports
182        services.insert(1337, "Elite/Leet (Suspicious)".to_string());
183        services.insert(31337, "Back Orifice (Malware)".to_string());
184        services.insert(12345, "NetBus (Malware)".to_string());
185        services.insert(54321, "Back Orifice 2000 (Malware)".to_string());
186
187        services
188    }
189
190    fn initialize_internal_networks() -> Vec<(IpAddr, u8)> {
191        vec![
192            (IpAddr::V4(Ipv4Addr::new(10, 0, 0, 0)), 8), // 10.0.0.0/8
193            (IpAddr::V4(Ipv4Addr::new(172, 16, 0, 0)), 12), // 172.16.0.0/12
194            (IpAddr::V4(Ipv4Addr::new(192, 168, 0, 0)), 16), // 192.168.0.0/16
195            (IpAddr::V4(Ipv4Addr::new(127, 0, 0, 0)), 8), // 127.0.0.0/8 (loopback)
196            (IpAddr::V6(Ipv6Addr::new(0xfc00, 0, 0, 0, 0, 0, 0, 0)), 7), // fc00::/7 (ULA)
197        ]
198    }
199
200    fn load_threat_intelligence(&mut self) {
201        // No pre-populated threat intelligence - load from external sources only
202        // In production, this would load from real threat feeds:
203        // - Abuse.ch feeds
204        // - SANS ISC feeds
205        // - Custom threat intelligence feeds
206        // For now, keep empty - no fake data
207    }
208
209    pub fn analyze_connection(
210        &mut self,
211        connection: &crate::connections::NetworkConnection,
212    ) -> ConnectionIntelligence {
213        let remote_ip = connection.remote_addr.ip();
214        let local_port = connection.local_addr.port();
215        let remote_port = connection.remote_addr.port();
216
217        // Determine if connection is outbound
218        let is_outbound = self.is_internal_ip(&connection.local_addr.ip());
219
220        // Get or create GeoIP info
221        let geo_info = self.get_geo_info(&remote_ip);
222
223        // Determine protocol and service
224        let (protocol, service_name) = self.identify_service(
225            local_port,
226            remote_port,
227            &format!("{:?}", connection.protocol),
228        );
229
230        // Calculate metrics (use duration from socket_info if available)
231        let connection_duration = connection
232            .socket_info
233            .duration
234            .as_ref()
235            .and_then(|d| parse_duration(d))
236            .unwrap_or_else(|| Duration::from_secs(0));
237
238        // Detect threat indicators
239        let mut threat_indicators = Vec::new();
240
241        // Check for port scanning
242        if let Some(scan_detection) = self.detect_port_scan(&remote_ip, remote_port) {
243            threat_indicators.push(ThreatIndicator::PortScanAttempt {
244                ports_scanned: scan_detection.ports_scanned.len() as u16,
245                time_window: scan_detection.scan_duration,
246            });
247        }
248
249        // Check for suspicious ports
250        if self.is_suspicious_port(remote_port) {
251            threat_indicators.push(ThreatIndicator::SuspiciousPort {
252                port: remote_port,
253                reason: "Known malicious or uncommon port".to_string(),
254            });
255        }
256
257        // Check bandwidth usage
258        let bytes_per_second = if connection_duration.as_secs() > 0 {
259            connection.bytes_sent + connection.bytes_received / connection_duration.as_secs()
260        } else {
261            0
262        };
263
264        if bytes_per_second > 10_000_000 {
265            // >10MB/s threshold
266            threat_indicators.push(ThreatIndicator::HighBandwidthUsage {
267                bandwidth: bytes_per_second,
268                threshold: 10_000_000,
269            });
270        }
271
272        // Check for geo anomalies
273        if let Some(ref geo) = geo_info {
274            if geo.is_suspicious || geo.threat_level != ThreatLevel::Clean {
275                threat_indicators.push(ThreatIndicator::GeoAnomalyConnection {
276                    country: geo.country.clone(),
277                    reason: "Connection from suspicious geographic location".to_string(),
278                });
279            }
280        }
281
282        ConnectionIntelligence {
283            remote_ip,
284            local_port,
285            remote_port,
286            protocol: protocol.clone(),
287            service_name,
288            geo_info,
289            connection_duration,
290            bytes_transferred: connection.bytes_sent + connection.bytes_received,
291            packet_count: 0, // Not available in current connection struct
292            first_seen: SystemTime::now() - connection_duration, // Estimate based on duration
293            last_activity: SystemTime::now(),
294            is_outbound,
295            threat_indicators,
296        }
297    }
298
299    fn get_geo_info(&mut self, ip: &IpAddr) -> Option<GeoIpInfo> {
300        // Check cache first
301        if let Some(cached) = self.geo_cache.get(ip) {
302            return Some(cached.clone());
303        }
304
305        // Skip internal IPs
306        if self.is_internal_ip(ip) {
307            let internal_info = GeoIpInfo {
308                country: "Internal".to_string(),
309                country_code: "INT".to_string(),
310                city: "Local Network".to_string(),
311                region: "Private".to_string(),
312                is_internal: true,
313                is_suspicious: false,
314                threat_level: ThreatLevel::Clean,
315                organization: "Internal Network".to_string(),
316                asn: 0,
317            };
318            self.geo_cache.insert(*ip, internal_info.clone());
319            return Some(internal_info);
320        }
321
322        // Simplified GeoIP lookup (in real implementation, use MaxMind GeoIP2 or similar)
323        let geo_info = self.mock_geo_lookup(ip);
324        self.geo_cache.insert(*ip, geo_info.clone());
325        Some(geo_info)
326    }
327
328    fn mock_geo_lookup(&self, ip: &IpAddr) -> GeoIpInfo {
329        // No fake geo data - return unknown for all IPs
330        // In production, integrate with real GeoIP service like MaxMind
331        let is_suspicious = self.suspicious_ips.contains(ip);
332
333        let threat_level = if is_suspicious {
334            ThreatLevel::Malicious
335        } else {
336            ThreatLevel::Clean
337        };
338
339        GeoIpInfo {
340            country: "Unknown".to_string(),
341            country_code: "UN".to_string(),
342            city: "Unknown".to_string(),
343            region: "Unknown".to_string(),
344            is_internal: false,
345            is_suspicious,
346            threat_level,
347            organization: "Unknown".to_string(),
348            asn: 0, // Unknown ASN
349        }
350    }
351
352    fn is_internal_ip(&self, ip: &IpAddr) -> bool {
353        for (network, prefix_len) in &self.internal_networks {
354            if self.ip_in_cidr(ip, network, *prefix_len) {
355                return true;
356            }
357        }
358        false
359    }
360
361    fn ip_in_cidr(&self, ip: &IpAddr, network: &IpAddr, prefix_len: u8) -> bool {
362        match (ip, network) {
363            (IpAddr::V4(ip), IpAddr::V4(net)) => {
364                let ip_u32 = u32::from(*ip);
365                let net_u32 = u32::from(*net);
366                let mask = if prefix_len == 0 {
367                    0
368                } else {
369                    !((1u32 << (32 - prefix_len)) - 1)
370                };
371                (ip_u32 & mask) == (net_u32 & mask)
372            }
373            (IpAddr::V6(ip), IpAddr::V6(net)) => {
374                let ip_bytes = ip.octets();
375                let net_bytes = net.octets();
376                let full_bytes = prefix_len / 8;
377                let remaining_bits = prefix_len % 8;
378
379                // Check full bytes
380                for i in 0..full_bytes as usize {
381                    if ip_bytes[i] != net_bytes[i] {
382                        return false;
383                    }
384                }
385
386                // Check remaining bits
387                if remaining_bits > 0 && full_bytes < 16 {
388                    let mask = 0xFF << (8 - remaining_bits);
389                    if (ip_bytes[full_bytes as usize] & mask)
390                        != (net_bytes[full_bytes as usize] & mask)
391                    {
392                        return false;
393                    }
394                }
395
396                true
397            }
398            _ => false,
399        }
400    }
401
402    fn identify_service(
403        &self,
404        local_port: u16,
405        remote_port: u16,
406        protocol: &str,
407    ) -> (String, String) {
408        let port_to_check = if local_port < 1024 {
409            local_port
410        } else {
411            remote_port
412        };
413
414        let service_name = self
415            .known_services
416            .get(&port_to_check)
417            .cloned()
418            .unwrap_or_else(|| {
419                if port_to_check < 1024 {
420                    "System Service".to_string()
421                } else if port_to_check < 49152 {
422                    "Registered Service".to_string()
423                } else {
424                    "Dynamic/Ephemeral".to_string()
425                }
426            });
427
428        (protocol.to_uppercase(), service_name)
429    }
430
431    fn detect_port_scan(&mut self, ip: &IpAddr, port: u16) -> Option<PortScanDetection> {
432        let now = SystemTime::now();
433
434        // Check if we already have a detector for this IP
435        let mut updated_detector = if let Some(existing) = self.port_scan_detectors.get(ip) {
436            let mut detector = existing.clone();
437            detector.ports_scanned.insert(port);
438            detector.scan_duration = now
439                .duration_since(detector.scan_start_time)
440                .unwrap_or_default();
441
442            // Calculate scan rate
443            if detector.scan_duration.as_secs() > 0 {
444                detector.scan_rate =
445                    detector.ports_scanned.len() as f64 / detector.scan_duration.as_secs_f64();
446            }
447
448            detector
449        } else {
450            // Create new detector
451            let mut detector = PortScanDetection {
452                scanner_ip: *ip,
453                ports_scanned: HashSet::new(),
454                scan_start_time: now,
455                scan_duration: Duration::from_secs(0),
456                scan_rate: 0.0,
457                confidence: 0.0,
458            };
459            detector.ports_scanned.insert(port);
460            detector
461        };
462
463        // Calculate confidence
464        updated_detector.confidence = self.calculate_port_scan_confidence(&updated_detector);
465
466        // Update the detector in the map
467        self.port_scan_detectors
468            .insert(*ip, updated_detector.clone());
469
470        // Clean up old detectors (older than 5 minutes)
471        let cutoff = now - Duration::from_secs(300);
472        self.port_scan_detectors
473            .retain(|_, detector| detector.scan_start_time > cutoff);
474
475        // Return detector if confidence is high enough
476        if updated_detector.confidence > 0.7 {
477            Some(updated_detector)
478        } else {
479            None
480        }
481    }
482
483    fn calculate_port_scan_confidence(&self, detector: &PortScanDetection) -> f64 {
484        let mut confidence = 0.0;
485
486        // Number of ports scanned
487        let port_count = detector.ports_scanned.len() as f64;
488        confidence += (port_count / 20.0).min(0.4); // Max 0.4 for port count
489
490        // Scan rate (ports per second)
491        if detector.scan_rate > 10.0 {
492            confidence += 0.3; // Very fast scanning
493        } else if detector.scan_rate > 1.0 {
494            confidence += 0.2; // Moderate scanning
495        }
496
497        // Sequential port scanning pattern
498        let mut ports: Vec<u16> = detector.ports_scanned.iter().cloned().collect();
499        ports.sort();
500        let sequential_count = self.count_sequential_ports(&ports);
501        if sequential_count > 5 {
502            confidence += 0.2;
503        }
504
505        // Common port scan targets
506        let common_scan_ports: HashSet<u16> =
507            [22, 23, 80, 443, 21, 25, 53, 110, 143, 993, 995, 3389]
508                .iter()
509                .cloned()
510                .collect();
511        let scan_common_count = detector
512            .ports_scanned
513            .intersection(&common_scan_ports)
514            .count();
515        if scan_common_count > 3 {
516            confidence += 0.1;
517        }
518
519        confidence.min(1.0)
520    }
521
522    fn count_sequential_ports(&self, ports: &[u16]) -> usize {
523        if ports.len() < 2 {
524            return 0;
525        }
526
527        let mut max_sequential = 0;
528        let mut current_sequential = 1;
529
530        for i in 1..ports.len() {
531            if ports[i] == ports[i - 1] + 1 {
532                current_sequential += 1;
533            } else {
534                max_sequential = max_sequential.max(current_sequential);
535                current_sequential = 1;
536            }
537        }
538
539        max_sequential.max(current_sequential)
540    }
541
542    fn is_suspicious_port(&self, port: u16) -> bool {
543        // Known malicious ports
544        matches!(port, 1337 | 31337 | 12345 | 54321 | 6667 | 6668 | 6669)
545    }
546
547    pub fn get_recent_anomalies(&self, limit: usize) -> Vec<&NetworkAnomaly> {
548        self.anomalies.iter().rev().take(limit).collect()
549    }
550
551    pub fn get_port_scan_alerts(&self) -> Vec<PortScanDetection> {
552        self.port_scan_detectors
553            .values()
554            .filter(|detector| detector.confidence > 0.7)
555            .cloned()
556            .collect()
557    }
558
559    pub fn get_connection_stats(&self) -> ConnectionStats {
560        let total_connections = self.connection_history.len();
561        let external_connections = self
562            .connection_history
563            .iter()
564            .filter(|conn| !conn.geo_info.as_ref().map_or(true, |geo| geo.is_internal))
565            .count();
566
567        let suspicious_connections = self
568            .connection_history
569            .iter()
570            .filter(|conn| !conn.threat_indicators.is_empty())
571            .count();
572
573        let countries: HashSet<String> = self
574            .connection_history
575            .iter()
576            .filter_map(|conn| conn.geo_info.as_ref())
577            .filter(|geo| !geo.is_internal)
578            .map(|geo| geo.country.clone())
579            .collect();
580
581        ConnectionStats {
582            total_connections,
583            external_connections,
584            suspicious_connections,
585            unique_countries: countries.len(),
586            active_port_scans: self.port_scan_detectors.len(),
587        }
588    }
589}
590
591#[derive(Debug, Clone)]
592pub struct ConnectionStats {
593    pub total_connections: usize,
594    pub external_connections: usize,
595    pub suspicious_connections: usize,
596    pub unique_countries: usize,
597    pub active_port_scans: usize,
598}
599
600impl Default for NetworkIntelligenceEngine {
601    fn default() -> Self {
602        Self::new()
603    }
604}
605
606// Helper function to parse duration strings from ss command output
607fn parse_duration(duration_str: &str) -> Option<Duration> {
608    // Parse duration strings like "1h30m", "45m", "30s", etc.
609    let mut total_secs = 0u64;
610    let mut current_num = String::new();
611
612    for ch in duration_str.chars() {
613        if ch.is_ascii_digit() {
614            current_num.push(ch);
615        } else {
616            if let Ok(num) = current_num.parse::<u64>() {
617                match ch {
618                    'h' => total_secs += num * 3600,
619                    'm' => total_secs += num * 60,
620                    's' => total_secs += num,
621                    _ => {}
622                }
623            }
624            current_num.clear();
625        }
626    }
627
628    if total_secs > 0 {
629        Some(Duration::from_secs(total_secs))
630    } else {
631        None
632    }
633}