1use super::{DetectionCategory, RecommendedAction, ScanResult, Severity};
11use super::threat_intel::ThreatIntelDB;
12use parking_lot::RwLock;
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Instant;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct NetworkMonitorConfig {
22 pub poll_interval_ms: u64,
23 pub exfil_threshold_bytes: u64,
24 pub beacon_jitter_pct: f64,
25 pub beacon_min_count: u32,
26 pub suspicious_ports: Vec<u16>,
27}
28
29impl Default for NetworkMonitorConfig {
30 fn default() -> Self {
31 Self {
32 poll_interval_ms: 5000,
33 exfil_threshold_bytes: 52_428_800, beacon_jitter_pct: 15.0,
35 beacon_min_count: 10,
36 suspicious_ports: vec![4444, 5555, 8888, 6667, 6697, 1337, 31337, 9001, 1234],
37 }
38 }
39}
40
41#[derive(Debug, Clone)]
43pub struct TcpEntry {
44 pub local_ip: String,
45 pub local_port: u16,
46 pub remote_ip: String,
47 pub remote_port: u16,
48 pub state: u8,
49 pub uid: u32,
50}
51
52pub struct NetworkMonitor {
54 config: NetworkMonitorConfig,
55 threat_intel: Arc<ThreatIntelDB>,
56 conn_history: RwLock<HashMap<String, Vec<Instant>>>,
57 running: Arc<AtomicBool>,
58}
59
60impl NetworkMonitor {
61 pub fn new(config: NetworkMonitorConfig, threat_intel: Arc<ThreatIntelDB>) -> Self {
62 Self {
63 config,
64 threat_intel,
65 conn_history: RwLock::new(HashMap::new()),
66 running: Arc::new(AtomicBool::new(true)),
67 }
68 }
69
70 pub fn parse_proc_net_tcp(content: &str) -> Vec<TcpEntry> {
72 let mut entries = Vec::new();
73
74 for (i, line) in content.lines().enumerate() {
75 if i == 0 {
77 continue;
78 }
79
80 let line = line.trim();
81 if line.is_empty() {
82 continue;
83 }
84
85 let fields: Vec<&str> = line.split_whitespace().collect();
86 if fields.len() < 8 {
87 continue;
88 }
89
90 let local = fields[1];
93 let remote = fields[2];
94 let state_hex = fields[3];
95 let uid_str = fields.get(7).unwrap_or(&"0");
96
97 let (local_ip, local_port) = match parse_hex_addr(local) {
98 Some(v) => v,
99 None => continue,
100 };
101
102 let (remote_ip, remote_port) = match parse_hex_addr(remote) {
103 Some(v) => v,
104 None => continue,
105 };
106
107 let state = u8::from_str_radix(state_hex, 16).unwrap_or(0);
108 let uid: u32 = uid_str.parse().unwrap_or(0);
109
110 entries.push(TcpEntry {
111 local_ip,
112 local_port,
113 remote_ip,
114 remote_port,
115 state,
116 uid,
117 });
118 }
119
120 entries
121 }
122
123 pub fn scan_once(&self) -> Vec<ScanResult> {
125 let mut results = Vec::new();
126
127 let tcp4 = std::fs::read_to_string("/proc/net/tcp").unwrap_or_default();
129 let tcp6 = std::fs::read_to_string("/proc/net/tcp6").unwrap_or_default();
130
131 let mut entries = Self::parse_proc_net_tcp(&tcp4);
132 let _ = tcp6;
135
136 entries.retain(|e| e.state == 1);
138
139 entries.retain(|e| e.remote_ip != "127.0.0.1" && e.remote_ip != "0.0.0.0");
141
142 let now = Instant::now();
143
144 for entry in &entries {
145 let conn_key = format!("{}:{}", entry.remote_ip, entry.remote_port);
146
147 if self.threat_intel.check_ip(&entry.remote_ip) {
149 results.push(ScanResult::new(
150 "network_monitor",
151 &conn_key,
152 Severity::High,
153 DetectionCategory::NetworkAnomaly {
154 connection: conn_key.clone(),
155 },
156 format!(
157 "Connection to known malicious IP {} on port {} — threat intel match",
158 entry.remote_ip, entry.remote_port
159 ),
160 0.95,
161 RecommendedAction::BlockConnection {
162 addr: conn_key.clone(),
163 },
164 ));
165 }
166
167 if self.config.suspicious_ports.contains(&entry.remote_port) {
169 results.push(ScanResult::new(
170 "network_monitor",
171 &conn_key,
172 Severity::Medium,
173 DetectionCategory::NetworkAnomaly {
174 connection: conn_key.clone(),
175 },
176 format!(
177 "Outbound connection to suspicious port {} (IP: {}) — common C2/backdoor port",
178 entry.remote_port, entry.remote_ip
179 ),
180 0.6,
181 RecommendedAction::Alert,
182 ));
183 }
184
185 let mut history = self.conn_history.write();
187 let timestamps = history.entry(entry.remote_ip.clone()).or_default();
188 timestamps.push(now);
189
190 if timestamps.len() > 100 {
192 timestamps.drain(..timestamps.len() - 100);
193 }
194
195 if timestamps.len() >= self.config.beacon_min_count as usize {
197 if let Some(score) = detect_beaconing(timestamps, self.config.beacon_jitter_pct) {
198 if score > 0.7 {
199 results.push(ScanResult::new(
200 "network_monitor",
201 &entry.remote_ip,
202 Severity::Critical,
203 DetectionCategory::NetworkAnomaly {
204 connection: format!("beacon:{}", entry.remote_ip),
205 },
206 format!(
207 "C2 beaconing detected — {} connections to {} at regular intervals (score: {:.2})",
208 timestamps.len(), entry.remote_ip, score
209 ),
210 score,
211 RecommendedAction::BlockConnection {
212 addr: entry.remote_ip.clone(),
213 },
214 ));
215 }
216 }
217 }
218 }
219
220 results
221 }
222
223 pub fn start(
225 self: Arc<Self>,
226 detection_tx: tokio::sync::mpsc::UnboundedSender<ScanResult>,
227 ) -> tokio::task::JoinHandle<()> {
228 let running = Arc::clone(&self.running);
229 let interval_ms = self.config.poll_interval_ms;
230
231 tokio::spawn(async move {
232 let mut interval =
233 tokio::time::interval(std::time::Duration::from_millis(interval_ms));
234
235 while running.load(Ordering::Relaxed) {
236 interval.tick().await;
237 let results = self.scan_once();
238 for result in results {
239 if detection_tx.send(result).is_err() {
240 return;
241 }
242 }
243 }
244 })
245 }
246
247 pub fn stop(&self) {
248 self.running.store(false, Ordering::Relaxed);
249 }
250}
251
252pub fn parse_hex_addr(addr: &str) -> Option<(String, u16)> {
255 let parts: Vec<&str> = addr.split(':').collect();
256 if parts.len() != 2 {
257 return None;
258 }
259
260 let ip_hex = parts[0];
261 let port_hex = parts[1];
262
263 let port = u16::from_str_radix(port_hex, 16).ok()?;
264
265 if ip_hex.len() == 8 {
267 let ip_bytes = u32::from_str_radix(ip_hex, 16).ok()?;
268 let ip = format!(
269 "{}.{}.{}.{}",
270 ip_bytes & 0xFF,
271 (ip_bytes >> 8) & 0xFF,
272 (ip_bytes >> 16) & 0xFF,
273 (ip_bytes >> 24) & 0xFF,
274 );
275 Some((ip, port))
276 } else {
277 Some((ip_hex.to_string(), port))
279 }
280}
281
282fn detect_beaconing(timestamps: &[Instant], max_jitter_pct: f64) -> Option<f64> {
285 if timestamps.len() < 3 {
286 return None;
287 }
288
289 let mut intervals: Vec<f64> = Vec::new();
291 for i in 1..timestamps.len() {
292 let dur = timestamps[i].duration_since(timestamps[i - 1]);
293 intervals.push(dur.as_secs_f64());
294 }
295
296 if intervals.is_empty() {
297 return None;
298 }
299
300 let mean: f64 = intervals.iter().sum::<f64>() / intervals.len() as f64;
302 if mean < 0.001 {
303 return None; }
305
306 let variance: f64 = intervals.iter().map(|x| (x - mean).powi(2)).sum::<f64>()
307 / intervals.len() as f64;
308 let stddev = variance.sqrt();
309
310 let cv = stddev / mean;
312 let jitter_threshold = max_jitter_pct / 100.0;
313
314 let score = (1.0 - (cv / jitter_threshold)).max(0.0).min(1.0);
316
317 Some(score)
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323
324 #[test]
325 fn parse_loopback() {
326 let result = parse_hex_addr("0100007F:1F90").unwrap();
328 assert_eq!(result.0, "127.0.0.1");
329 assert_eq!(result.1, 0x1F90); }
331
332 #[test]
333 fn parse_null_addr() {
334 let result = parse_hex_addr("00000000:0000").unwrap();
335 assert_eq!(result.0, "0.0.0.0");
336 assert_eq!(result.1, 0);
337 }
338
339 #[test]
340 fn parse_real_addr() {
341 let result = parse_hex_addr("0101A8C0:0050").unwrap();
345 assert_eq!(result.0, "192.168.1.1");
346 assert_eq!(result.1, 80);
347 }
348
349 #[test]
350 fn port_parsing() {
351 let result = parse_hex_addr("00000000:01BB").unwrap();
352 assert_eq!(result.1, 443);
353 }
354
355 #[test]
356 fn parse_proc_net_tcp_sample() {
357 let sample = r#" sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode
358 0: 0100007F:1F90 00000000:0000 0A 00000000:00000000 00:00000000 00000000 0 0 12345 1 0000000000000000 100 0 0 10 0
359 1: 0100007F:0035 0101A8C0:D431 01 00000000:00000000 00:00000000 00000000 1000 0 23456 1 0000000000000000 100 0 0 10 0"#;
360
361 let entries = NetworkMonitor::parse_proc_net_tcp(sample);
362 assert_eq!(entries.len(), 2);
363
364 assert_eq!(entries[0].local_ip, "127.0.0.1");
366 assert_eq!(entries[0].local_port, 8080);
367 assert_eq!(entries[0].state, 0x0A); assert_eq!(entries[1].state, 0x01); assert_eq!(entries[1].remote_ip, "192.168.1.1");
372 }
373
374 #[test]
375 fn suspicious_port_detection() {
376 let config = NetworkMonitorConfig::default();
377 assert!(config.suspicious_ports.contains(&4444));
378 assert!(config.suspicious_ports.contains(&6667));
379 assert!(!config.suspicious_ports.contains(&80));
380 assert!(!config.suspicious_ports.contains(&443));
381 }
382
383 #[test]
384 fn beaconing_detection_regular() {
385 let base = Instant::now();
387 let timestamps: Vec<Instant> = (0..15)
388 .map(|i| base + std::time::Duration::from_secs(i * 5))
389 .collect();
390
391 let score = detect_beaconing(×tamps, 15.0);
392 assert!(score.is_some());
393 assert!(
394 score.unwrap() > 0.8,
395 "Score should be high for regular intervals, got {}",
396 score.unwrap()
397 );
398 }
399
400 #[test]
401 fn beaconing_detection_irregular() {
402 let base = Instant::now();
404 let offsets = [0, 1, 5, 6, 20, 21, 50, 51, 100, 200];
405 let timestamps: Vec<Instant> = offsets
406 .iter()
407 .map(|&s| base + std::time::Duration::from_secs(s))
408 .collect();
409
410 let score = detect_beaconing(×tamps, 15.0);
411 if let Some(s) = score {
413 assert!(s < 0.5, "Score should be low for irregular intervals, got {}", s);
414 }
415 }
416
417 #[test]
418 fn config_defaults() {
419 let config = NetworkMonitorConfig::default();
420 assert_eq!(config.poll_interval_ms, 5000);
421 assert!(config.exfil_threshold_bytes > 0);
422 }
423
424 #[test]
425 fn scan_once_no_crash() {
426 let ti = Arc::new(ThreatIntelDB::new(
427 super::super::threat_intel::ThreatIntelConfig::new(
428 std::env::temp_dir().join("nexus-netmon-test"),
429 ),
430 ));
431 let monitor = NetworkMonitor::new(NetworkMonitorConfig::default(), ti);
432 let results = monitor.scan_once();
433 let _ = results; }
435}