nullnet_traffic_monitor/
lib.rs

1use async_channel::{Receiver, Sender};
2use chrono::Utc;
3use pcap::Device;
4use std::net::ToSocketAddrs;
5use std::thread;
6
7/// Configuration for the network traffic monitor
8pub struct MonitorConfig {
9    /// The address to filter out from the captured packets (i.e., the server's IP address)
10    pub addr: String,
11    /// The maximum number of bytes to capture per packet
12    pub snaplen: i32,
13}
14
15/// Information about a captured packet
16pub struct PacketInfo {
17    /// The interface name the packet was captured on
18    pub interface: String,
19    /// The raw packet data
20    pub data: Vec<u8>,
21    /// The link type of the packet
22    pub link_type: i32,
23    /// The timestamp of the packet capture
24    pub timestamp: String,
25}
26
27/// Monitor network traffic on all available network devices
28///
29/// # Arguments
30/// * `monitor_config` - The configuration for the network traffic monitor
31///
32/// # Returns
33/// A receiver channel for receiving captured packets
34#[must_use]
35pub fn monitor_devices(monitor_config: &MonitorConfig) -> Receiver<PacketInfo> {
36    let (tx, rx) = async_channel::bounded(10_000);
37
38    let bpf_program = bpf_program(&monitor_config.addr);
39    for device in Device::list().into_iter().flatten() {
40        let tx = tx.clone();
41        let snaplen = monitor_config.snaplen;
42        let bpf_program = bpf_program.clone();
43        thread::spawn(move || {
44            monitor_device(device, &tx, snaplen, &bpf_program);
45        });
46    }
47
48    rx
49}
50
51fn monitor_device(device: Device, tx: &Sender<PacketInfo>, snaplen: i32, bpf_program: &str) {
52    let device_name = device.name.clone();
53
54    let mut cap = match pcap::Capture::from_device(device)
55        .map(|c| c.promisc(true).snaplen(snaplen).immediate_mode(true).open())
56    {
57        Ok(Ok(cap)) => cap,
58        Ok(Err(err)) | Err(err) => {
59            log::warn!(
60                "Failed to initialize capture on {device_name}: {err}. Aborting monitoring..."
61            );
62            return;
63        }
64    };
65
66    if let Err(err) = cap.filter(bpf_program, true) {
67        log::error!("PCAP filter error on {device_name}: {err}. Aborting monitoring...");
68        return;
69    }
70
71    let mut savefile = if cfg!(feature = "export-pcap") {
72        let file_name = format!("./{device_name}.pcap");
73        let res = cap.savefile(&file_name);
74        match res {
75            Ok(savefile) => Some(savefile),
76            Err(err) => {
77                log::error!("Failed to create savefile '{file_name}': {err}");
78                None
79            }
80        }
81    } else {
82        None
83    };
84
85    let link_type = cap.get_datalink().0;
86
87    loop {
88        if let Ok(p) = cap.next_packet() {
89            let packet = PacketInfo {
90                interface: device_name.clone(),
91                data: p.data[..].to_vec(),
92                link_type,
93                timestamp: Utc::now().to_rfc3339(),
94            };
95            // send packet to caller, or exit if channel is closed
96            let Ok(()) = tx.send_blocking(packet) else {
97                return;
98            };
99            // Save packet to file
100            if let Some(file) = savefile.as_mut() {
101                file.write(&p);
102            }
103        }
104    }
105}
106
107fn bpf_program(addr: &str) -> String {
108    let ip_addr = format!("{addr}:0")
109        .to_socket_addrs()
110        .expect("Failed to resolve address")
111        .next()
112        .expect("Failed to get address")
113        .ip()
114        .to_string();
115
116    let bpf_program = format!("host not {ip_addr}");
117    log::info!("BPF Program: {bpf_program}");
118    bpf_program
119}