Skip to main content

netwatch_rs/
processes.rs

1use std::collections::HashMap;
2use std::fs;
3use std::path::Path;
4use std::time::{Duration, SystemTime};
5
6#[derive(Debug, Clone)]
7pub struct ProcessNetworkInfo {
8    pub pid: u32,
9    pub name: String,
10    pub command: String,
11    pub connections: u32,
12    pub bytes_sent: u64,
13    pub bytes_received: u64,
14    pub packets_sent: u64,
15    pub packets_received: u64,
16    pub established_connections: u32,
17    pub listening_ports: u32,
18    pub last_updated: SystemTime,
19}
20
21impl ProcessNetworkInfo {
22    pub fn total_bytes(&self) -> u64 {
23        self.bytes_sent + self.bytes_received
24    }
25
26    pub fn total_packets(&self) -> u64 {
27        self.packets_sent + self.packets_received
28    }
29}
30
31pub struct ProcessMonitor {
32    processes: HashMap<u32, ProcessNetworkInfo>,
33    previous_stats: HashMap<u32, ProcessNetworkStats>,
34    last_update: SystemTime,
35}
36
37#[derive(Debug, Clone)]
38pub struct ProcessNetworkStats {
39    bytes_sent: u64,
40    bytes_received: u64,
41    packets_sent: u64,
42    packets_received: u64,
43    timestamp: SystemTime,
44}
45
46impl ProcessMonitor {
47    pub fn new() -> Self {
48        Self {
49            processes: HashMap::new(),
50            previous_stats: HashMap::new(),
51            last_update: SystemTime::now(),
52        }
53    }
54
55    pub fn update(&mut self) -> Result<(), Box<dyn std::error::Error>> {
56        // Clear existing processes to get fresh data
57        self.processes.clear();
58
59        let now = SystemTime::now();
60
61        // Read all process network information
62        self.scan_processes()?;
63
64        // Update connection counts
65        self.update_connection_counts()?;
66
67        // Calculate network I/O rates
68        self.calculate_rates(now)?;
69
70        self.last_update = now;
71        Ok(())
72    }
73
74    fn scan_processes(&mut self) -> Result<(), Box<dyn std::error::Error>> {
75        if let Ok(entries) = fs::read_dir("/proc") {
76            for entry in entries.flatten() {
77                if let Ok(file_name) = entry.file_name().into_string() {
78                    if let Ok(pid) = file_name.parse::<u32>() {
79                        if let Some(process_info) = self.read_process_info(pid)? {
80                            self.processes.insert(pid, process_info);
81                        }
82                    }
83                }
84            }
85        } else {
86            // macOS fallback - get real process data from system commands
87            self.get_real_processes_from_system();
88        }
89        Ok(())
90    }
91
92    fn read_process_info(
93        &mut self,
94        pid: u32,
95    ) -> Result<Option<ProcessNetworkInfo>, Box<dyn std::error::Error>> {
96        let proc_path = format!("/proc/{pid}");
97
98        // Check if process directory exists and is accessible
99        if !Path::new(&proc_path).exists() {
100            return Ok(None);
101        }
102
103        // Read process name
104        let comm_path = format!("{proc_path}/comm");
105        let name = fs::read_to_string(comm_path)
106            .unwrap_or_else(|_| format!("process-{pid}"))
107            .trim()
108            .to_string();
109
110        // Read command line
111        let cmdline_path = format!("{proc_path}/cmdline");
112        let command = fs::read_to_string(cmdline_path)
113            .unwrap_or_else(|_| name.clone())
114            .replace('\0', " ")
115            .trim()
116            .to_string();
117
118        // Read network I/O statistics from /proc/pid/net/dev if available
119        let (bytes_sent, bytes_received, packets_sent, packets_received) =
120            self.read_process_network_stats(pid).unwrap_or((0, 0, 0, 0));
121
122        let process_info = ProcessNetworkInfo {
123            pid,
124            name,
125            command,
126            connections: 0, // Will be updated later
127            bytes_sent,
128            bytes_received,
129            packets_sent,
130            packets_received,
131            established_connections: 0,
132            listening_ports: 0,
133            last_updated: SystemTime::now(),
134        };
135
136        Ok(Some(process_info))
137    }
138
139    fn read_process_network_stats(
140        &self,
141        pid: u32,
142    ) -> Result<(u64, u64, u64, u64), Box<dyn std::error::Error>> {
143        // Try to read network statistics from various sources
144
145        // Method 1: Try /proc/pid/net/dev (process-specific network stats)
146        let net_dev_path = format!("/proc/{pid}/net/dev");
147        if let Ok(content) = fs::read_to_string(net_dev_path) {
148            if let Some((bytes_rx, bytes_tx, packets_rx, packets_tx)) = self.parse_net_dev(&content)
149            {
150                return Ok((bytes_tx, bytes_rx, packets_tx, packets_rx));
151            }
152        }
153
154        // Method 2: Try /proc/pid/io (process I/O stats)
155        let io_path = format!("/proc/{pid}/io");
156        if let Ok(content) = fs::read_to_string(io_path) {
157            if let Some((read_bytes, write_bytes)) = self.parse_io_stats(&content) {
158                // Estimate network I/O as a fraction of total I/O
159                // This is a rough approximation
160                return Ok((write_bytes / 4, read_bytes / 4, 0, 0));
161            }
162        }
163
164        // Method 3: Use system-wide stats as fallback
165        // Read from /proc/net/netstat for system-wide network stats
166        if let Ok(content) = fs::read_to_string("/proc/net/netstat") {
167            if let Some((bytes_sent, bytes_received)) = self.parse_netstat(&content) {
168                // Distribute proportionally based on number of connections
169                // This is a very rough estimate
170                return Ok((bytes_sent / 100, bytes_received / 100, 0, 0));
171            }
172        }
173
174        Ok((0, 0, 0, 0))
175    }
176
177    fn parse_net_dev(&self, content: &str) -> Option<(u64, u64, u64, u64)> {
178        let mut total_bytes_rx = 0;
179        let mut total_bytes_tx = 0;
180        let mut total_packets_rx = 0;
181        let mut total_packets_tx = 0;
182
183        for line in content.lines().skip(2) {
184            // Skip header lines
185            let fields: Vec<&str> = line.split_whitespace().collect();
186            if fields.len() >= 10 {
187                if let (Ok(bytes_rx), Ok(packets_rx), Ok(bytes_tx), Ok(packets_tx)) = (
188                    fields[1].parse::<u64>(),
189                    fields[2].parse::<u64>(),
190                    fields[9].parse::<u64>(),
191                    fields[10].parse::<u64>(),
192                ) {
193                    total_bytes_rx += bytes_rx;
194                    total_bytes_tx += bytes_tx;
195                    total_packets_rx += packets_rx;
196                    total_packets_tx += packets_tx;
197                }
198            }
199        }
200
201        if total_bytes_rx > 0 || total_bytes_tx > 0 {
202            Some((
203                total_bytes_rx,
204                total_bytes_tx,
205                total_packets_rx,
206                total_packets_tx,
207            ))
208        } else {
209            None
210        }
211    }
212
213    fn parse_io_stats(&self, content: &str) -> Option<(u64, u64)> {
214        let mut read_bytes = 0;
215        let mut write_bytes = 0;
216
217        for line in content.lines() {
218            if line.starts_with("read_bytes:") {
219                if let Some(value) = line.split_whitespace().nth(1) {
220                    read_bytes = value.parse().unwrap_or(0);
221                }
222            } else if line.starts_with("write_bytes:") {
223                if let Some(value) = line.split_whitespace().nth(1) {
224                    write_bytes = value.parse().unwrap_or(0);
225                }
226            }
227        }
228
229        if read_bytes > 0 || write_bytes > 0 {
230            Some((read_bytes, write_bytes))
231        } else {
232            None
233        }
234    }
235
236    fn parse_netstat(&self, content: &str) -> Option<(u64, u64)> {
237        // Parse system-wide network statistics
238        for line in content.lines() {
239            if line.starts_with("IpExt:") {
240                let fields: Vec<&str> = line.split_whitespace().collect();
241                if fields.len() > 6 {
242                    if let (Ok(bytes_sent), Ok(bytes_received)) =
243                        (fields[5].parse::<u64>(), fields[6].parse::<u64>())
244                    {
245                        return Some((bytes_sent, bytes_received));
246                    }
247                }
248            }
249        }
250        None
251    }
252
253    fn update_connection_counts(&mut self) -> Result<(), Box<dyn std::error::Error>> {
254        // Count connections per process by parsing /proc/net/tcp and /proc/net/udp
255        let mut pid_connections: HashMap<u32, (u32, u32, u32)> = HashMap::new(); // (total, established, listening)
256
257        // Parse TCP connections
258        if let Ok(content) = fs::read_to_string("/proc/net/tcp") {
259            self.count_connections_in_file(&content, &mut pid_connections)?;
260        }
261
262        // Parse UDP connections
263        if let Ok(content) = fs::read_to_string("/proc/net/udp") {
264            self.count_connections_in_file(&content, &mut pid_connections)?;
265        }
266
267        // Update process information
268        for (pid, (total, established, listening)) in pid_connections {
269            if let Some(process) = self.processes.get_mut(&pid) {
270                process.connections = total;
271                process.established_connections = established;
272                process.listening_ports = listening;
273            }
274        }
275
276        Ok(())
277    }
278
279    fn count_connections_in_file(
280        &self,
281        content: &str,
282        pid_connections: &mut HashMap<u32, (u32, u32, u32)>,
283    ) -> Result<(), Box<dyn std::error::Error>> {
284        for line in content.lines().skip(1) {
285            // Skip header
286            let fields: Vec<&str> = line.split_whitespace().collect();
287            if fields.len() >= 8 {
288                // Try to extract PID from inode (this is complex and may not always work)
289                // For now, we'll use a simplified approach
290                if let Ok(inode) = fields[9].parse::<u64>() {
291                    if let Some(pid) = self.find_pid_by_inode(inode) {
292                        let (total, established, listening) =
293                            pid_connections.entry(pid).or_insert((0, 0, 0));
294                        *total += 1;
295
296                        // Check connection state
297                        if fields[3] == "01" {
298                            // ESTABLISHED
299                            *established += 1;
300                        } else if fields[3] == "0A" {
301                            // LISTEN
302                            *listening += 1;
303                        }
304                    }
305                }
306            }
307        }
308        Ok(())
309    }
310
311    fn find_pid_by_inode(&self, _inode: u64) -> Option<u32> {
312        // This is a simplified implementation
313        // In reality, we'd need to scan /proc/*/fd/* to find which process owns this inode
314        // For now, return None to avoid complex filesystem scanning
315        None
316    }
317
318    fn calculate_rates(&mut self, now: SystemTime) -> Result<(), Box<dyn std::error::Error>> {
319        // Calculate network I/O rates by comparing with previous measurements
320        for (pid, process) in &mut self.processes {
321            if let Some(prev_stats) = self.previous_stats.get(pid) {
322                let time_diff = now
323                    .duration_since(prev_stats.timestamp)
324                    .unwrap_or(Duration::from_secs(1));
325                let time_secs = time_diff.as_secs_f64();
326
327                if time_secs > 0.0 {
328                    // Calculate rates (bytes per second)
329                    let bytes_sent_rate =
330                        ((process.bytes_sent.saturating_sub(prev_stats.bytes_sent)) as f64
331                            / time_secs) as u64;
332                    let bytes_received_rate = ((process
333                        .bytes_received
334                        .saturating_sub(prev_stats.bytes_received))
335                        as f64
336                        / time_secs) as u64;
337
338                    // Update with calculated rates
339                    process.bytes_sent = bytes_sent_rate;
340                    process.bytes_received = bytes_received_rate;
341                }
342            }
343
344            // Store current stats for next calculation
345            self.previous_stats.insert(
346                *pid,
347                ProcessNetworkStats {
348                    bytes_sent: process.bytes_sent,
349                    bytes_received: process.bytes_received,
350                    packets_sent: process.packets_sent,
351                    packets_received: process.packets_received,
352                    timestamp: now,
353                },
354            );
355        }
356
357        Ok(())
358    }
359
360    pub fn get_processes(&self) -> Vec<&ProcessNetworkInfo> {
361        let mut processes: Vec<&ProcessNetworkInfo> = self.processes.values().collect();
362
363        // Sort by total network activity (bytes sent + received)
364        processes.sort_by(|a, b| {
365            let a_total = a.total_bytes();
366            let b_total = b.total_bytes();
367            b_total.cmp(&a_total)
368        });
369
370        processes
371    }
372
373    pub fn get_top_network_processes(&self, limit: usize) -> Vec<&ProcessNetworkInfo> {
374        let mut processes = self.get_processes();
375        processes.truncate(limit);
376        processes
377    }
378
379    pub fn get_process_stats(&self) -> ProcessNetworkStats {
380        let mut stats = ProcessNetworkStats {
381            bytes_sent: 0,
382            bytes_received: 0,
383            packets_sent: 0,
384            packets_received: 0,
385            timestamp: SystemTime::now(),
386        };
387
388        for process in self.processes.values() {
389            stats.bytes_sent += process.bytes_sent;
390            stats.bytes_received += process.bytes_received;
391            stats.packets_sent += process.packets_sent;
392            stats.packets_received += process.packets_received;
393        }
394
395        stats
396    }
397
398    pub fn get_listening_processes(&self) -> Vec<&ProcessNetworkInfo> {
399        let mut processes: Vec<&ProcessNetworkInfo> = self
400            .processes
401            .values()
402            .filter(|p| p.listening_ports > 0)
403            .collect();
404
405        processes.sort_by(|a, b| b.listening_ports.cmp(&a.listening_ports));
406        processes
407    }
408
409    fn get_real_processes_from_system(&mut self) {
410        use std::process::Command;
411
412        // Use ps and lsof to get real process data with network activity
413        if let Ok(output) = Command::new("lsof").args(["-i", "-n", "-P"]).output() {
414            let stdout = String::from_utf8_lossy(&output.stdout);
415            self.parse_lsof_processes(&stdout);
416        } else {
417            // Fallback to ps if lsof is not available
418            if let Ok(output) = Command::new("ps").args(["-eo", "pid,comm,rss"]).output() {
419                let stdout = String::from_utf8_lossy(&output.stdout);
420                self.parse_ps_processes(&stdout);
421            }
422        }
423    }
424
425    fn parse_lsof_processes(&mut self, output: &str) {
426        let mut process_map: HashMap<u32, (String, String)> = HashMap::new(); // pid -> (name, command)
427        let mut process_connections: HashMap<u32, u32> = HashMap::new(); // pid -> total connections
428        let mut process_listening: HashMap<u32, u32> = HashMap::new(); // pid -> listening ports
429        let mut process_established: HashMap<u32, u32> = HashMap::new(); // pid -> established connections
430
431        for line in output.lines().skip(1) {
432            // Skip header
433            let parts: Vec<&str> = line.split_whitespace().collect();
434            if parts.len() < 10 {
435                continue;
436            }
437
438            let process_name = parts[0].to_string();
439            if let Ok(pid) = parts[1].parse::<u32>() {
440                // Count total connections per process
441                *process_connections.entry(pid).or_insert(0) += 1;
442
443                // Check if this is a listening port or established connection
444                // The connection state is in parts[9] like "(LISTEN)" or "(ESTABLISHED)"
445                let connection_state = parts.get(9).unwrap_or(&"");
446                if connection_state.contains("LISTEN") {
447                    *process_listening.entry(pid).or_insert(0) += 1;
448                } else if connection_state.contains("ESTABLISHED") {
449                    *process_established.entry(pid).or_insert(0) += 1;
450                }
451
452                // Store process info if not already seen
453                process_map.entry(pid).or_insert_with(|| {
454                    let command = process_name.to_string(); // lsof doesn't give full command
455                    (process_name, command)
456                });
457            }
458        }
459
460        // Convert to ProcessNetworkInfo
461        for (pid, total_connections) in process_connections {
462            if let Some((name, command)) = process_map.get(&pid) {
463                let listening_ports = process_listening.get(&pid).copied().unwrap_or(0);
464                let established_connections = process_established.get(&pid).copied().unwrap_or(0);
465
466                let process_info = ProcessNetworkInfo {
467                    pid,
468                    name: name.clone(),
469                    command: command.clone(),
470                    connections: total_connections,
471                    bytes_sent: 0, // lsof doesn't provide byte counts
472                    bytes_received: 0,
473                    packets_sent: 0,
474                    packets_received: 0,
475                    established_connections,
476                    listening_ports,
477                    last_updated: SystemTime::now(),
478                };
479                self.processes.insert(process_info.pid, process_info);
480            }
481        }
482    }
483
484    fn parse_ps_processes(&mut self, output: &str) {
485        // Basic fallback - just get running processes without network info
486        for line in output.lines().skip(1) {
487            // Skip header
488            let parts: Vec<&str> = line.split_whitespace().collect();
489            if parts.len() < 2 {
490                continue;
491            }
492
493            if let Ok(pid) = parts[0].parse::<u32>() {
494                let name = parts[1].to_string();
495                let process_info = ProcessNetworkInfo {
496                    pid,
497                    name: name.clone(),
498                    command: name,
499                    connections: 0,
500                    bytes_sent: 0,
501                    bytes_received: 0,
502                    packets_sent: 0,
503                    packets_received: 0,
504                    established_connections: 0,
505                    listening_ports: 0,
506                    last_updated: SystemTime::now(),
507                };
508                self.processes.insert(process_info.pid, process_info);
509            }
510        }
511    }
512}
513
514impl Default for ProcessMonitor {
515    fn default() -> Self {
516        Self::new()
517    }
518}