Skip to main content

r_lanlib/scanners/
arp_scanner.rs

1//! Provides Scanner implementation for ARP scanning
2
3use derive_builder::Builder;
4use pnet::packet::{Packet, arp, ethernet};
5use std::{
6    net::Ipv4Addr,
7    sync::{self, Arc, Mutex},
8    thread::{self, JoinHandle},
9    time::Duration,
10};
11use threadpool::ThreadPool;
12
13use crate::{
14    error::{RLanLibError, Result},
15    network::NetworkInterface,
16    packet::{self, Reader, Sender, arp_packet::ArpPacketBuilder},
17    scanners::{Device, PortSet, Scanning},
18    targets::ips::IPTargets,
19};
20
21use super::{ScanMessage, Scanner, heartbeat::HeartBeat};
22
23/// Data structure representing an ARP scanner
24#[derive(Clone, Builder)]
25#[builder(setter(into))]
26pub struct ARPScanner {
27    /// Network interface to use for scanning
28    interface: Arc<NetworkInterface>,
29    /// Packet reader for receiving ARP replies
30    packet_reader: Arc<Mutex<dyn Reader>>,
31    /// Packet sender for transmitting ARP requests
32    packet_sender: Arc<Mutex<dyn Sender>>,
33    /// IP targets to scan
34    targets: Arc<IPTargets>,
35    /// Source port for packet listener and incoming packet identification
36    source_port: u16,
37    /// Whether to include vendor lookups for discovered devices
38    include_vendor: bool,
39    /// Whether to include hostname lookups for discovered devices
40    include_host_names: bool,
41    /// Duration to wait for responses after scanning completes
42    idle_timeout: Duration,
43    /// Channel for sending scan results and status messages
44    notifier: sync::mpsc::Sender<ScanMessage>,
45}
46
47impl ARPScanner {
48    /// Returns builder for ARPScanner
49    pub fn builder() -> ARPScannerBuilder {
50        ARPScannerBuilder::default()
51    }
52
53    fn process_target(&self, target: Ipv4Addr) -> Result<()> {
54        // throttle packet sending to prevent packet loss
55        thread::sleep(packet::DEFAULT_PACKET_SEND_TIMING);
56
57        log::debug!("scanning ARP target: {}", target);
58
59        let arp_packet = ArpPacketBuilder::default()
60            .source_ip(self.interface.ipv4)
61            .source_mac(self.interface.mac)
62            .dest_ip(target)
63            .build()?;
64
65        let pkt_buf = arp_packet.to_raw();
66
67        // inform consumer we are scanning this target (ignore error on failure to notify)
68        self.notifier
69            .send(ScanMessage::Info(Scanning {
70                ip: target,
71                port: None,
72            }))
73            .map_err(RLanLibError::from_channel_send_error)?;
74
75        let mut pkt_sender = self.packet_sender.lock()?;
76
77        // Send to the broadcast address
78        pkt_sender.send(&pkt_buf)?;
79
80        Ok(())
81    }
82
83    fn process_incoming_packet(
84        &self,
85        pkt: &[u8],
86        pool: &ThreadPool,
87    ) -> Result<()> {
88        let Some(eth) = ethernet::EthernetPacket::new(pkt) else {
89            return Ok(());
90        };
91
92        let Some(header) = arp::ArpPacket::new(eth.payload()) else {
93            return Ok(());
94        };
95
96        // Capture ANY ARP reply as it's an indication that there's a
97        // device on the network
98        if header.get_operation() != arp::ArpOperations::Reply {
99            return Ok(());
100        }
101
102        let ip4 = header.get_sender_proto_addr();
103        let mac = eth.get_source();
104
105        let notification_sender = self.notifier.clone();
106        let interface = Arc::clone(&self.interface);
107        let include_host_names = self.include_host_names;
108        let include_vendor = self.include_vendor;
109
110        // use a thread pool here so we don't slow down packet
111        // processing while limiting concurrent threads
112        pool.execute(move || {
113            let hostname = if include_host_names {
114                log::debug!("looking up hostname for {}", ip4);
115                dns_lookup::lookup_addr(&ip4.into()).unwrap_or_default()
116            } else {
117                String::new()
118            };
119
120            let vendor = if include_vendor {
121                oui_data::lookup(&mac.to_string())
122                    .map(|v| v.organization().to_owned())
123                    .unwrap_or_default()
124            } else {
125                String::new()
126            };
127
128            let _ =
129                notification_sender.send(ScanMessage::ARPScanDevice(Device {
130                    hostname,
131                    ip: ip4,
132                    mac,
133                    vendor,
134                    is_current_host: ip4 == interface.ipv4,
135                    open_ports: PortSet::new(),
136                }));
137        });
138
139        Ok(())
140    }
141
142    // Implements packet reading in a separate thread so we can send and
143    // receive packets simultaneously
144    fn read_packets(
145        &self,
146        done: sync::mpsc::Receiver<()>,
147    ) -> Result<JoinHandle<Result<()>>> {
148        let (heartbeat_tx, heartbeat_rx) = sync::mpsc::channel::<()>();
149
150        let heartbeat = HeartBeat::builder()
151            .source_mac(self.interface.mac)
152            .source_ipv4(self.interface.ipv4)
153            .source_port(self.source_port)
154            .packet_sender(Arc::clone(&self.packet_sender))
155            .build()?;
156
157        heartbeat.start_in_thread(heartbeat_rx)?;
158
159        let self_clone = self.clone();
160
161        Ok(thread::spawn(move || -> Result<()> {
162            let mut reader = self_clone.packet_reader.lock()?;
163            // Use a bounded thread pool for DNS/vendor lookups to prevent
164            // spawning thousands of threads on large networks
165            let lookup_pool = ThreadPool::new(8);
166
167            loop {
168                if done.try_recv().is_ok() {
169                    log::debug!("exiting arp packet reader");
170                    if let Err(e) = heartbeat_tx.send(()) {
171                        log::error!("failed to stop heartbeat: {}", e);
172                    }
173                    break;
174                }
175
176                let pkt = reader.next_packet()?;
177
178                self_clone.process_incoming_packet(pkt, &lookup_pool)?;
179            }
180
181            Ok(())
182        }))
183    }
184}
185
186// Implements the Scanner trait for ARPScanner
187impl Scanner for ARPScanner {
188    fn scan(&self) -> Result<JoinHandle<Result<()>>> {
189        log::debug!("performing ARP scan on targets: {:?}", self.targets);
190        log::debug!("include_vendor: {}", self.include_vendor);
191        log::debug!("include_host_names: {}", self.include_host_names);
192        log::debug!("starting arp packet reader");
193
194        let self_clone = self.clone();
195        let (done_tx, done_rx) = sync::mpsc::channel::<()>();
196
197        let read_handle = self.read_packets(done_rx)?;
198
199        // prevent blocking thread so messages can be freely sent to consumer
200        let scan_handle = thread::spawn(move || -> Result<()> {
201            let mut scan_error: Option<RLanLibError> = None;
202
203            if let Err(err) = self_clone
204                .targets
205                .lazy_loop(|t| self_clone.process_target(t))
206            {
207                scan_error = Some(err);
208            }
209
210            thread::sleep(self_clone.idle_timeout);
211
212            self_clone
213                .notifier
214                .send(ScanMessage::Done)
215                .map_err(RLanLibError::from_channel_send_error)?;
216
217            // ignore errors here as the thread may already be dead due to error
218            // we'll catch any errors from that thread below and report
219            let _ = done_tx.send(());
220
221            let read_result = read_handle.join()?;
222
223            if let Some(err) = scan_error {
224                return Err(err);
225            }
226
227            read_result
228        });
229
230        Ok(scan_handle)
231    }
232}
233
234#[cfg(test)]
235#[path = "./arp_scanner_tests.rs"]
236mod tests;