r_lanlib/scanners/
arp_scanner.rs

1//! Provides Scanner implementation for ARP scanning
2
3use log::*;
4use pnet::packet::{Packet, arp, ethernet};
5use std::{
6    net,
7    sync::{self, Arc, Mutex},
8    thread::{self, JoinHandle},
9    time::Duration,
10};
11use threadpool::ThreadPool;
12
13use crate::{
14    error::{RLanLibError, Result},
15    network::NetworkInterface,
16    packet::{self, Reader, Sender, arp_packet::ArpPacketBuilder},
17    scanners::{Device, PortSet, Scanning},
18    targets::ips::IPTargets,
19};
20
21use super::{ScanMessage, Scanner, heartbeat::HeartBeat};
22
23/// Data structure representing an ARP scanner
24pub struct ARPScanner<'net> {
25    interface: &'net NetworkInterface,
26    packet_reader: Arc<Mutex<dyn Reader>>,
27    packet_sender: Arc<Mutex<dyn Sender>>,
28    targets: Arc<IPTargets>,
29    source_port: u16,
30    include_vendor: bool,
31    include_host_names: bool,
32    idle_timeout: Duration,
33    notifier: sync::mpsc::Sender<ScanMessage>,
34}
35
36/// Data structure holding parameters needed to create instance of ARPScanner
37pub struct ARPScannerArgs<'net> {
38    /// The network interface to use when scanning
39    pub interface: &'net NetworkInterface,
40    /// A packet Reader implementation (can use default provided in packet
41    /// crate)
42    pub packet_reader: Arc<Mutex<dyn Reader>>,
43    /// A packet Sender implementation (can use default provided in packet
44    /// crate)
45    pub packet_sender: Arc<Mutex<dyn Sender>>,
46    /// [`IPTargets`] to scan
47    pub targets: Arc<IPTargets>,
48    /// An open source port to listen for incoming packets (can use network
49    /// packet to find open port)
50    pub source_port: u16,
51    /// Whether or not to include vendor look-ups for detected devices
52    pub include_vendor: bool,
53    /// Whether or not to include hostname look-ups for detected devices
54    pub include_host_names: bool,
55    /// The amount of time to wait for incoming packets after scanning all
56    /// targets
57    pub idle_timeout: Duration,
58    /// Channel to send messages regarding devices being scanned, and detected
59    /// devices
60    pub notifier: sync::mpsc::Sender<ScanMessage>,
61}
62
63impl<'net> ARPScanner<'net> {
64    /// Returns an instance of ARPScanner
65    pub fn new(args: ARPScannerArgs<'net>) -> Self {
66        Self {
67            interface: args.interface,
68            packet_reader: args.packet_reader,
69            packet_sender: args.packet_sender,
70            targets: args.targets,
71            source_port: args.source_port,
72            include_vendor: args.include_vendor,
73            include_host_names: args.include_host_names,
74            idle_timeout: args.idle_timeout,
75            notifier: args.notifier,
76        }
77    }
78}
79
80impl ARPScanner<'_> {
81    // Implements packet reading in a separate thread so we can send and
82    // receive packets simultaneously
83    fn read_packets(&self, done: sync::mpsc::Receiver<()>) -> JoinHandle<Result<()>> {
84        let packet_reader = Arc::clone(&self.packet_reader);
85        let packet_sender = Arc::clone(&self.packet_sender);
86        let include_host_names = self.include_host_names;
87        let include_vendor = self.include_vendor;
88        let source_ipv4 = self.interface.ipv4;
89        let source_mac = self.interface.mac;
90        let source_port = self.source_port;
91        let notifier = self.notifier.clone();
92        let (heartbeat_tx, heartbeat_rx) = sync::mpsc::channel::<()>();
93
94        // since reading packets off the wire is a blocking operation, we
95        // won't be able to detect a "done" signal if no packets are being
96        // received as we'll be blocked on waiting for one to come it. To fix
97        // this we send periodic "heartbeat" packets so we can continue to
98        // check for "done" signals
99        thread::spawn(move || {
100            debug!("starting arp heartbeat thread");
101            let heartbeat = HeartBeat::new(source_mac, source_ipv4, source_port, packet_sender);
102            let interval = Duration::from_secs(1);
103            loop {
104                if heartbeat_rx.try_recv().is_ok() {
105                    debug!("stopping arp heartbeat");
106                    break;
107                }
108                debug!("sending arp heartbeat");
109                heartbeat.beat();
110                thread::sleep(interval);
111            }
112        });
113
114        thread::spawn(move || -> Result<()> {
115            let mut reader = packet_reader.lock()?;
116            // Use a bounded thread pool for DNS/vendor lookups to prevent
117            // spawning thousands of threads on large networks
118            let lookup_pool = ThreadPool::new(8);
119
120            loop {
121                if done.try_recv().is_ok() {
122                    debug!("exiting arp packet reader");
123                    if let Err(e) = heartbeat_tx.send(()) {
124                        error!("failed to stop heartbeat: {}", e);
125                    }
126                    break;
127                }
128
129                let pkt = reader.next_packet()?;
130
131                let Some(eth) = ethernet::EthernetPacket::new(pkt) else {
132                    continue;
133                };
134
135                let Some(header) = arp::ArpPacket::new(eth.payload()) else {
136                    continue;
137                };
138
139                // Capture ANY ARP reply as it's an indication that there's a
140                // device on the network
141                if header.get_operation() != arp::ArpOperations::Reply {
142                    continue;
143                }
144
145                let ip4 = header.get_sender_proto_addr();
146                let mac = eth.get_source();
147
148                let notification_sender = notifier.clone();
149
150                // use a thread pool here so we don't slow down packet
151                // processing while limiting concurrent threads
152                lookup_pool.execute(move || {
153                    let hostname = if include_host_names {
154                        debug!("looking up hostname for {}", ip4);
155                        dns_lookup::lookup_addr(&ip4.into()).unwrap_or_default()
156                    } else {
157                        String::new()
158                    };
159
160                    let vendor = if include_vendor {
161                        oui_data::lookup(&mac.to_string())
162                            .map(|v| v.organization().to_owned())
163                            .unwrap_or_default()
164                    } else {
165                        String::new()
166                    };
167
168                    let _ = notification_sender.send(ScanMessage::ARPScanDevice(Device {
169                        hostname,
170                        ip: ip4,
171                        mac,
172                        vendor,
173                        is_current_host: ip4 == source_ipv4,
174                        open_ports: PortSet::new(),
175                    }));
176                });
177            }
178
179            Ok(())
180        })
181    }
182}
183
184// Implements the Scanner trait for ARPScanner
185impl Scanner for ARPScanner<'_> {
186    fn scan(&self) -> JoinHandle<Result<()>> {
187        debug!("performing ARP scan on targets: {:?}", self.targets);
188        debug!("include_vendor: {}", self.include_vendor);
189        debug!("include_host_names: {}", self.include_host_names);
190        debug!("starting arp packet reader");
191        let (done_tx, done_rx) = sync::mpsc::channel::<()>();
192        let notifier = self.notifier.clone();
193        let packet_sender = Arc::clone(&self.packet_sender);
194        let idle_timeout = self.idle_timeout;
195        let source_ipv4 = self.interface.ipv4;
196        let source_mac = self.interface.mac;
197        let targets = Arc::clone(&self.targets);
198
199        let read_handle = self.read_packets(done_rx);
200
201        // prevent blocking thread so messages can be freely sent to consumer
202        thread::spawn(move || -> Result<()> {
203            let process_target = |target_ipv4: net::Ipv4Addr| {
204                // throttle packet sending to prevent packet loss
205                thread::sleep(packet::DEFAULT_PACKET_SEND_TIMING);
206
207                debug!("scanning ARP target: {}", target_ipv4);
208
209                let arp_packet = ArpPacketBuilder::default()
210                    .source_ip(source_ipv4)
211                    .source_mac(source_mac)
212                    .dest_ip(target_ipv4)
213                    .build()?;
214
215                let pkt_buf = arp_packet.to_raw();
216
217                // inform consumer we are scanning this target (ignore error on failure to notify)
218                notifier
219                    .send(ScanMessage::Info(Scanning {
220                        ip: target_ipv4,
221                        port: None,
222                    }))
223                    .map_err(RLanLibError::from_channel_send_error)?;
224
225                let mut pkt_sender = packet_sender.lock()?;
226
227                // Send to the broadcast address
228                pkt_sender.send(&pkt_buf)?;
229
230                Ok(())
231            };
232
233            let mut scan_error: Option<RLanLibError> = None;
234
235            if let Err(err) = targets.lazy_loop(process_target) {
236                scan_error = Some(err);
237            }
238
239            thread::sleep(idle_timeout);
240
241            notifier
242                .send(ScanMessage::Done)
243                .map_err(RLanLibError::from_channel_send_error)?;
244
245            // ignore errors here as the thread may already be dead due to error
246            // we'll catch any errors from that thread below and report
247            let _ = done_tx.send(());
248
249            let read_result = read_handle.join()?;
250
251            if let Some(err) = scan_error {
252                return Err(err);
253            }
254
255            read_result
256        })
257    }
258}
259
260#[cfg(test)]
261#[path = "./arp_scanner_tests.rs"]
262mod tests;