1use serde::{Deserialize, Serialize};
2use std::collections::{HashMap, HashSet, VecDeque};
3use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
4use std::time::{Duration, SystemTime};
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
7pub struct GeoIpInfo {
8 pub country: String,
9 pub country_code: String,
10 pub city: String,
11 pub region: String,
12 pub is_internal: bool,
13 pub is_suspicious: bool,
14 pub threat_level: ThreatLevel,
15 pub organization: String,
16 pub asn: u32,
17}
18
19#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
20pub enum ThreatLevel {
21 Clean,
22 Suspicious,
23 Malicious,
24 Critical,
25}
26
27#[derive(Debug, Clone)]
28pub struct ConnectionIntelligence {
29 pub remote_ip: IpAddr,
30 pub local_port: u16,
31 pub remote_port: u16,
32 pub protocol: String,
33 pub service_name: String,
34 pub geo_info: Option<GeoIpInfo>,
35 pub connection_duration: Duration,
36 pub bytes_transferred: u64,
37 pub packet_count: u64,
38 pub first_seen: SystemTime,
39 pub last_activity: SystemTime,
40 pub is_outbound: bool,
41 pub threat_indicators: Vec<ThreatIndicator>,
42}
43
44#[derive(Debug, Clone)]
45pub enum ThreatIndicator {
46 PortScanAttempt {
47 ports_scanned: u16,
48 time_window: Duration,
49 },
50 UnusualTrafficVolume {
51 bytes_per_second: u64,
52 baseline: u64,
53 },
54 SuspiciousPort {
55 port: u16,
56 reason: String,
57 },
58 GeoAnomalyConnection {
59 country: String,
60 reason: String,
61 },
62 RapidConnections {
63 count: u32,
64 time_window: Duration,
65 },
66 LongLivedConnection {
67 duration: Duration,
68 },
69 HighBandwidthUsage {
70 bandwidth: u64,
71 threshold: u64,
72 },
73}
74
75#[derive(Debug, Clone)]
76pub struct PortScanDetection {
77 pub scanner_ip: IpAddr,
78 pub ports_scanned: HashSet<u16>,
79 pub scan_start_time: SystemTime,
80 pub scan_duration: Duration,
81 pub scan_rate: f64, pub confidence: f64, }
84
85#[derive(Debug, Clone)]
86pub struct NetworkAnomaly {
87 pub anomaly_type: AnomalyType,
88 pub severity: Severity,
89 pub description: String,
90 pub affected_ip: Option<IpAddr>,
91 pub affected_port: Option<u16>,
92 pub detected_at: SystemTime,
93 pub confidence: f64,
94 pub metrics: HashMap<String, f64>,
95}
96
97#[derive(Debug, Clone)]
98pub enum AnomalyType {
99 PortScan,
100 TrafficSpike,
101 UnusualGeoLocation,
102 SuspiciousProtocol,
103 BandwidthAnomaly,
104 ConnectionFlood,
105 DnsAnomaly,
106 TunnelDetection,
107}
108
109#[derive(Debug, Clone)]
110pub enum Severity {
111 Info,
112 Low,
113 Medium,
114 High,
115 Critical,
116}
117
118pub struct NetworkIntelligenceEngine {
119 connection_history: VecDeque<ConnectionIntelligence>,
120 geo_cache: HashMap<IpAddr, GeoIpInfo>,
121 port_scan_detectors: HashMap<IpAddr, PortScanDetection>,
122 anomalies: VecDeque<NetworkAnomaly>,
123 #[allow(dead_code)]
124 traffic_baselines: HashMap<String, TrafficBaseline>,
125 known_services: HashMap<u16, String>,
126 suspicious_ips: HashSet<IpAddr>,
127 internal_networks: Vec<(IpAddr, u8)>, }
129
130#[derive(Debug, Clone)]
131#[allow(dead_code)]
132struct TrafficBaseline {
133 average_bps: f64,
134 std_deviation: f64,
135 samples: VecDeque<(SystemTime, u64)>,
136 last_updated: SystemTime,
137}
138
139impl NetworkIntelligenceEngine {
140 pub fn new() -> Self {
141 let mut engine = Self {
142 connection_history: VecDeque::with_capacity(10000),
143 geo_cache: HashMap::new(),
144 port_scan_detectors: HashMap::new(),
145 anomalies: VecDeque::with_capacity(1000),
146 traffic_baselines: HashMap::new(),
147 known_services: Self::initialize_known_services(),
148 suspicious_ips: HashSet::new(),
149 internal_networks: Self::initialize_internal_networks(),
150 };
151
152 engine.load_threat_intelligence();
154
155 engine
156 }
157
158 fn initialize_known_services() -> HashMap<u16, String> {
159 let mut services = HashMap::new();
160
161 services.insert(22, "SSH".to_string());
163 services.insert(23, "Telnet".to_string());
164 services.insert(25, "SMTP".to_string());
165 services.insert(53, "DNS".to_string());
166 services.insert(80, "HTTP".to_string());
167 services.insert(110, "POP3".to_string());
168 services.insert(143, "IMAP".to_string());
169 services.insert(443, "HTTPS".to_string());
170 services.insert(993, "IMAPS".to_string());
171 services.insert(995, "POP3S".to_string());
172 services.insert(3389, "RDP".to_string());
173 services.insert(5432, "PostgreSQL".to_string());
174 services.insert(3306, "MySQL".to_string());
175 services.insert(27017, "MongoDB".to_string());
176 services.insert(6379, "Redis".to_string());
177 services.insert(9200, "Elasticsearch".to_string());
178 services.insert(8080, "HTTP-Alt".to_string());
179 services.insert(8443, "HTTPS-Alt".to_string());
180
181 services.insert(1337, "Elite/Leet (Suspicious)".to_string());
183 services.insert(31337, "Back Orifice (Malware)".to_string());
184 services.insert(12345, "NetBus (Malware)".to_string());
185 services.insert(54321, "Back Orifice 2000 (Malware)".to_string());
186
187 services
188 }
189
190 fn initialize_internal_networks() -> Vec<(IpAddr, u8)> {
191 vec![
192 (IpAddr::V4(Ipv4Addr::new(10, 0, 0, 0)), 8), (IpAddr::V4(Ipv4Addr::new(172, 16, 0, 0)), 12), (IpAddr::V4(Ipv4Addr::new(192, 168, 0, 0)), 16), (IpAddr::V4(Ipv4Addr::new(127, 0, 0, 0)), 8), (IpAddr::V6(Ipv6Addr::new(0xfc00, 0, 0, 0, 0, 0, 0, 0)), 7), ]
198 }
199
200 fn load_threat_intelligence(&mut self) {
201 }
208
209 pub fn analyze_connection(
210 &mut self,
211 connection: &crate::connections::NetworkConnection,
212 ) -> ConnectionIntelligence {
213 let remote_ip = connection.remote_addr.ip();
214 let local_port = connection.local_addr.port();
215 let remote_port = connection.remote_addr.port();
216
217 let is_outbound = self.is_internal_ip(&connection.local_addr.ip());
219
220 let geo_info = self.get_geo_info(&remote_ip);
222
223 let (protocol, service_name) = self.identify_service(
225 local_port,
226 remote_port,
227 &format!("{:?}", connection.protocol),
228 );
229
230 let connection_duration = connection
232 .socket_info
233 .duration
234 .as_ref()
235 .and_then(|d| parse_duration(d))
236 .unwrap_or_else(|| Duration::from_secs(0));
237
238 let mut threat_indicators = Vec::new();
240
241 if let Some(scan_detection) = self.detect_port_scan(&remote_ip, remote_port) {
243 threat_indicators.push(ThreatIndicator::PortScanAttempt {
244 ports_scanned: scan_detection.ports_scanned.len() as u16,
245 time_window: scan_detection.scan_duration,
246 });
247 }
248
249 if self.is_suspicious_port(remote_port) {
251 threat_indicators.push(ThreatIndicator::SuspiciousPort {
252 port: remote_port,
253 reason: "Known malicious or uncommon port".to_string(),
254 });
255 }
256
257 let bytes_per_second = if connection_duration.as_secs() > 0 {
259 connection.bytes_sent + connection.bytes_received / connection_duration.as_secs()
260 } else {
261 0
262 };
263
264 if bytes_per_second > 10_000_000 {
265 threat_indicators.push(ThreatIndicator::HighBandwidthUsage {
267 bandwidth: bytes_per_second,
268 threshold: 10_000_000,
269 });
270 }
271
272 if let Some(ref geo) = geo_info {
274 if geo.is_suspicious || geo.threat_level != ThreatLevel::Clean {
275 threat_indicators.push(ThreatIndicator::GeoAnomalyConnection {
276 country: geo.country.clone(),
277 reason: "Connection from suspicious geographic location".to_string(),
278 });
279 }
280 }
281
282 ConnectionIntelligence {
283 remote_ip,
284 local_port,
285 remote_port,
286 protocol: protocol.clone(),
287 service_name,
288 geo_info,
289 connection_duration,
290 bytes_transferred: connection.bytes_sent + connection.bytes_received,
291 packet_count: 0, first_seen: SystemTime::now() - connection_duration, last_activity: SystemTime::now(),
294 is_outbound,
295 threat_indicators,
296 }
297 }
298
299 fn get_geo_info(&mut self, ip: &IpAddr) -> Option<GeoIpInfo> {
300 if let Some(cached) = self.geo_cache.get(ip) {
302 return Some(cached.clone());
303 }
304
305 if self.is_internal_ip(ip) {
307 let internal_info = GeoIpInfo {
308 country: "Internal".to_string(),
309 country_code: "INT".to_string(),
310 city: "Local Network".to_string(),
311 region: "Private".to_string(),
312 is_internal: true,
313 is_suspicious: false,
314 threat_level: ThreatLevel::Clean,
315 organization: "Internal Network".to_string(),
316 asn: 0,
317 };
318 self.geo_cache.insert(*ip, internal_info.clone());
319 return Some(internal_info);
320 }
321
322 let geo_info = self.mock_geo_lookup(ip);
324 self.geo_cache.insert(*ip, geo_info.clone());
325 Some(geo_info)
326 }
327
328 fn mock_geo_lookup(&self, ip: &IpAddr) -> GeoIpInfo {
329 let is_suspicious = self.suspicious_ips.contains(ip);
332
333 let threat_level = if is_suspicious {
334 ThreatLevel::Malicious
335 } else {
336 ThreatLevel::Clean
337 };
338
339 GeoIpInfo {
340 country: "Unknown".to_string(),
341 country_code: "UN".to_string(),
342 city: "Unknown".to_string(),
343 region: "Unknown".to_string(),
344 is_internal: false,
345 is_suspicious,
346 threat_level,
347 organization: "Unknown".to_string(),
348 asn: 0, }
350 }
351
352 fn is_internal_ip(&self, ip: &IpAddr) -> bool {
353 for (network, prefix_len) in &self.internal_networks {
354 if self.ip_in_cidr(ip, network, *prefix_len) {
355 return true;
356 }
357 }
358 false
359 }
360
361 fn ip_in_cidr(&self, ip: &IpAddr, network: &IpAddr, prefix_len: u8) -> bool {
362 match (ip, network) {
363 (IpAddr::V4(ip), IpAddr::V4(net)) => {
364 let ip_u32 = u32::from(*ip);
365 let net_u32 = u32::from(*net);
366 let mask = if prefix_len == 0 {
367 0
368 } else {
369 !((1u32 << (32 - prefix_len)) - 1)
370 };
371 (ip_u32 & mask) == (net_u32 & mask)
372 }
373 (IpAddr::V6(ip), IpAddr::V6(net)) => {
374 let ip_bytes = ip.octets();
375 let net_bytes = net.octets();
376 let full_bytes = prefix_len / 8;
377 let remaining_bits = prefix_len % 8;
378
379 for i in 0..full_bytes as usize {
381 if ip_bytes[i] != net_bytes[i] {
382 return false;
383 }
384 }
385
386 if remaining_bits > 0 && full_bytes < 16 {
388 let mask = 0xFF << (8 - remaining_bits);
389 if (ip_bytes[full_bytes as usize] & mask)
390 != (net_bytes[full_bytes as usize] & mask)
391 {
392 return false;
393 }
394 }
395
396 true
397 }
398 _ => false,
399 }
400 }
401
402 fn identify_service(
403 &self,
404 local_port: u16,
405 remote_port: u16,
406 protocol: &str,
407 ) -> (String, String) {
408 let port_to_check = if local_port < 1024 {
409 local_port
410 } else {
411 remote_port
412 };
413
414 let service_name = self
415 .known_services
416 .get(&port_to_check)
417 .cloned()
418 .unwrap_or_else(|| {
419 if port_to_check < 1024 {
420 "System Service".to_string()
421 } else if port_to_check < 49152 {
422 "Registered Service".to_string()
423 } else {
424 "Dynamic/Ephemeral".to_string()
425 }
426 });
427
428 (protocol.to_uppercase(), service_name)
429 }
430
431 fn detect_port_scan(&mut self, ip: &IpAddr, port: u16) -> Option<PortScanDetection> {
432 let now = SystemTime::now();
433
434 let mut updated_detector = if let Some(existing) = self.port_scan_detectors.get(ip) {
436 let mut detector = existing.clone();
437 detector.ports_scanned.insert(port);
438 detector.scan_duration = now
439 .duration_since(detector.scan_start_time)
440 .unwrap_or_default();
441
442 if detector.scan_duration.as_secs() > 0 {
444 detector.scan_rate =
445 detector.ports_scanned.len() as f64 / detector.scan_duration.as_secs_f64();
446 }
447
448 detector
449 } else {
450 let mut detector = PortScanDetection {
452 scanner_ip: *ip,
453 ports_scanned: HashSet::new(),
454 scan_start_time: now,
455 scan_duration: Duration::from_secs(0),
456 scan_rate: 0.0,
457 confidence: 0.0,
458 };
459 detector.ports_scanned.insert(port);
460 detector
461 };
462
463 updated_detector.confidence = self.calculate_port_scan_confidence(&updated_detector);
465
466 self.port_scan_detectors
468 .insert(*ip, updated_detector.clone());
469
470 let cutoff = now - Duration::from_secs(300);
472 self.port_scan_detectors
473 .retain(|_, detector| detector.scan_start_time > cutoff);
474
475 if updated_detector.confidence > 0.7 {
477 Some(updated_detector)
478 } else {
479 None
480 }
481 }
482
483 fn calculate_port_scan_confidence(&self, detector: &PortScanDetection) -> f64 {
484 let mut confidence = 0.0;
485
486 let port_count = detector.ports_scanned.len() as f64;
488 confidence += (port_count / 20.0).min(0.4); if detector.scan_rate > 10.0 {
492 confidence += 0.3; } else if detector.scan_rate > 1.0 {
494 confidence += 0.2; }
496
497 let mut ports: Vec<u16> = detector.ports_scanned.iter().cloned().collect();
499 ports.sort();
500 let sequential_count = self.count_sequential_ports(&ports);
501 if sequential_count > 5 {
502 confidence += 0.2;
503 }
504
505 let common_scan_ports: HashSet<u16> =
507 [22, 23, 80, 443, 21, 25, 53, 110, 143, 993, 995, 3389]
508 .iter()
509 .cloned()
510 .collect();
511 let scan_common_count = detector
512 .ports_scanned
513 .intersection(&common_scan_ports)
514 .count();
515 if scan_common_count > 3 {
516 confidence += 0.1;
517 }
518
519 confidence.min(1.0)
520 }
521
522 fn count_sequential_ports(&self, ports: &[u16]) -> usize {
523 if ports.len() < 2 {
524 return 0;
525 }
526
527 let mut max_sequential = 0;
528 let mut current_sequential = 1;
529
530 for i in 1..ports.len() {
531 if ports[i] == ports[i - 1] + 1 {
532 current_sequential += 1;
533 } else {
534 max_sequential = max_sequential.max(current_sequential);
535 current_sequential = 1;
536 }
537 }
538
539 max_sequential.max(current_sequential)
540 }
541
542 fn is_suspicious_port(&self, port: u16) -> bool {
543 matches!(port, 1337 | 31337 | 12345 | 54321 | 6667 | 6668 | 6669)
545 }
546
547 pub fn get_recent_anomalies(&self, limit: usize) -> Vec<&NetworkAnomaly> {
548 self.anomalies.iter().rev().take(limit).collect()
549 }
550
551 pub fn get_port_scan_alerts(&self) -> Vec<PortScanDetection> {
552 self.port_scan_detectors
553 .values()
554 .filter(|detector| detector.confidence > 0.7)
555 .cloned()
556 .collect()
557 }
558
559 pub fn get_connection_stats(&self) -> ConnectionStats {
560 let total_connections = self.connection_history.len();
561 let external_connections = self
562 .connection_history
563 .iter()
564 .filter(|conn| !conn.geo_info.as_ref().map_or(true, |geo| geo.is_internal))
565 .count();
566
567 let suspicious_connections = self
568 .connection_history
569 .iter()
570 .filter(|conn| !conn.threat_indicators.is_empty())
571 .count();
572
573 let countries: HashSet<String> = self
574 .connection_history
575 .iter()
576 .filter_map(|conn| conn.geo_info.as_ref())
577 .filter(|geo| !geo.is_internal)
578 .map(|geo| geo.country.clone())
579 .collect();
580
581 ConnectionStats {
582 total_connections,
583 external_connections,
584 suspicious_connections,
585 unique_countries: countries.len(),
586 active_port_scans: self.port_scan_detectors.len(),
587 }
588 }
589}
590
591#[derive(Debug, Clone)]
592pub struct ConnectionStats {
593 pub total_connections: usize,
594 pub external_connections: usize,
595 pub suspicious_connections: usize,
596 pub unique_countries: usize,
597 pub active_port_scans: usize,
598}
599
600impl Default for NetworkIntelligenceEngine {
601 fn default() -> Self {
602 Self::new()
603 }
604}
605
606fn parse_duration(duration_str: &str) -> Option<Duration> {
608 let mut total_secs = 0u64;
610 let mut current_num = String::new();
611
612 for ch in duration_str.chars() {
613 if ch.is_ascii_digit() {
614 current_num.push(ch);
615 } else {
616 if let Ok(num) = current_num.parse::<u64>() {
617 match ch {
618 'h' => total_secs += num * 3600,
619 'm' => total_secs += num * 60,
620 's' => total_secs += num,
621 _ => {}
622 }
623 }
624 current_num.clear();
625 }
626 }
627
628 if total_secs > 0 {
629 Some(Duration::from_secs(total_secs))
630 } else {
631 None
632 }
633}