Skip to main content

r_lanlib/scanners/
arp_scanner.rs

1//! Provides Scanner implementation for ARP scanning
2
3use derive_builder::Builder;
4use pnet::packet::{Packet, arp, ethernet};
5use std::{
6    collections::HashMap,
7    net::Ipv4Addr,
8    sync::{self, Arc, Mutex},
9    thread::{self, JoinHandle},
10    time::{Duration, Instant},
11};
12use threadpool::ThreadPool;
13
14use crate::{
15    error::{RLanLibError, Result},
16    network::NetworkInterface,
17    packet::{self, arp_packet::ArpPacketBuilder, wire::Wire},
18    scanners::{Device, PortSet, Scanning},
19    targets::ips::IPTargets,
20};
21
22use super::{ScanMessage, Scanner, heartbeat::HeartBeat};
23
24/// Data structure representing an ARP scanner
25#[derive(Clone, Builder)]
26#[builder(setter(into))]
27pub struct ARPScanner {
28    /// Network interface to use for scanning
29    interface: Arc<NetworkInterface>,
30    /// Wire for reading and sending packets on the wire
31    wire: Wire,
32    /// IP targets to scan
33    targets: Arc<IPTargets>,
34    /// Source port for packet listener and incoming packet identification
35    source_port: u16,
36    /// Whether to include vendor lookups for discovered devices
37    include_vendor: bool,
38    /// Whether to include hostname lookups for discovered devices
39    include_host_names: bool,
40    /// Duration to wait for responses after scanning completes
41    idle_timeout: Duration,
42    /// Channel for sending scan results and status messages
43    notifier: sync::mpsc::Sender<ScanMessage>,
44    /// Tracks the send time for each ARP request by target IP
45    #[builder(default = "Arc::new(Mutex::new(HashMap::new()))")]
46    send_times: Arc<Mutex<HashMap<Ipv4Addr, Instant>>>,
47}
48
49impl ARPScanner {
50    /// Returns builder for ARPScanner
51    pub fn builder() -> ARPScannerBuilder {
52        ARPScannerBuilder::default()
53    }
54
55    fn process_target(&self, target: Ipv4Addr) -> Result<()> {
56        // throttle packet sending to prevent packet loss
57        thread::sleep(packet::DEFAULT_PACKET_SEND_TIMING);
58
59        log::debug!("scanning ARP target: {}", target);
60
61        let arp_packet = ArpPacketBuilder::default()
62            .source_ip(self.interface.ipv4)
63            .source_mac(self.interface.mac)
64            .dest_ip(target)
65            .build()?;
66
67        let pkt_buf = arp_packet.to_raw();
68
69        // inform consumer we are scanning this target (ignore error on failure to notify)
70        self.notifier
71            .send(ScanMessage::Info(Scanning {
72                ip: target,
73                port: None,
74            }))
75            .map_err(RLanLibError::from_channel_send_error)?;
76
77        let mut pkt_sender = self.wire.0.lock()?;
78
79        // Record send time immediately before putting the packet on the wire
80        if let Ok(mut times) = self.send_times.lock() {
81            times.insert(target, Instant::now());
82        }
83
84        // Send to the broadcast address
85        pkt_sender.send(&pkt_buf)?;
86
87        Ok(())
88    }
89
90    fn process_incoming_packet(
91        &self,
92        pkt: &[u8],
93        pool: &ThreadPool,
94    ) -> Result<()> {
95        let Some(eth) = ethernet::EthernetPacket::new(pkt) else {
96            return Ok(());
97        };
98
99        let Some(header) = arp::ArpPacket::new(eth.payload()) else {
100            return Ok(());
101        };
102
103        // Capture ANY ARP reply as it's an indication that there's a
104        // device on the network
105        if header.get_operation() != arp::ArpOperations::Reply {
106            return Ok(());
107        }
108
109        let ip4 = header.get_sender_proto_addr();
110        let mac = eth.get_source();
111
112        let latency_ms = self.send_times.lock().ok().and_then(|mut times| {
113            times.remove(&ip4).map(|t| t.elapsed().as_millis())
114        });
115
116        let notification_sender = self.notifier.clone();
117        let interface = Arc::clone(&self.interface);
118        let include_host_names = self.include_host_names;
119        let include_vendor = self.include_vendor;
120
121        // use a thread pool here so we don't slow down packet
122        // processing while limiting concurrent threads
123        pool.execute(move || {
124            let hostname = if include_host_names {
125                log::debug!("looking up hostname for {}", ip4);
126                dns_lookup::lookup_addr(&ip4.into()).unwrap_or_default()
127            } else {
128                String::new()
129            };
130
131            let vendor = if include_vendor {
132                oui_data::lookup(&mac.to_string())
133                    .map(|v| v.organization().to_owned())
134                    .unwrap_or_default()
135            } else {
136                String::new()
137            };
138
139            let _ =
140                notification_sender.send(ScanMessage::ARPScanDevice(Device {
141                    hostname,
142                    ip: ip4,
143                    mac,
144                    vendor,
145                    is_current_host: ip4 == interface.ipv4,
146                    open_ports: PortSet::new(),
147                    latency_ms,
148                }));
149        });
150
151        Ok(())
152    }
153
154    // Implements packet reading in a separate thread so we can send and
155    // receive packets simultaneously
156    fn read_packets(
157        &self,
158        done: sync::mpsc::Receiver<()>,
159    ) -> Result<JoinHandle<Result<()>>> {
160        let (heartbeat_tx, heartbeat_rx) = sync::mpsc::channel::<()>();
161
162        let heartbeat = HeartBeat::builder()
163            .source_mac(self.interface.mac)
164            .source_ipv4(self.interface.ipv4)
165            .source_port(self.source_port)
166            .packet_sender(Arc::clone(&self.wire.0))
167            .build()?;
168
169        heartbeat.start_in_thread(heartbeat_rx)?;
170
171        let self_clone = self.clone();
172
173        Ok(thread::spawn(move || -> Result<()> {
174            let mut reader = self_clone.wire.1.lock()?;
175            // Use a bounded thread pool for DNS/vendor lookups to prevent
176            // spawning thousands of threads on large networks
177            let lookup_pool = ThreadPool::new(8);
178
179            loop {
180                if done.try_recv().is_ok() {
181                    log::debug!("exiting arp packet reader");
182                    if let Err(e) = heartbeat_tx.send(()) {
183                        log::error!("failed to stop heartbeat: {}", e);
184                    }
185                    break;
186                }
187
188                let pkt = reader.next_packet()?;
189
190                self_clone.process_incoming_packet(pkt, &lookup_pool)?;
191            }
192
193            Ok(())
194        }))
195    }
196}
197
198// Implements the Scanner trait for ARPScanner
199impl Scanner for ARPScanner {
200    fn scan(&self) -> Result<JoinHandle<Result<()>>> {
201        log::debug!("performing ARP scan on targets: {:?}", self.targets);
202        log::debug!("include_vendor: {}", self.include_vendor);
203        log::debug!("include_host_names: {}", self.include_host_names);
204        log::debug!("starting arp packet reader");
205
206        let self_clone = self.clone();
207        let (done_tx, done_rx) = sync::mpsc::channel::<()>();
208
209        let read_handle = self.read_packets(done_rx)?;
210
211        // prevent blocking thread so messages can be freely sent to consumer
212        let scan_handle = thread::spawn(move || -> Result<()> {
213            let mut scan_error: Option<RLanLibError> = None;
214
215            if let Err(err) = self_clone
216                .targets
217                .lazy_loop(|t| self_clone.process_target(t))
218            {
219                scan_error = Some(err);
220            }
221
222            thread::sleep(self_clone.idle_timeout);
223
224            self_clone
225                .notifier
226                .send(ScanMessage::Done)
227                .map_err(RLanLibError::from_channel_send_error)?;
228
229            // ignore errors here as the thread may already be dead due to error
230            // we'll catch any errors from that thread below and report
231            let _ = done_tx.send(());
232
233            let read_result = read_handle.join()?;
234
235            if let Some(err) = scan_error {
236                return Err(err);
237            }
238
239            read_result
240        });
241
242        Ok(scan_handle)
243    }
244}
245
246#[cfg(test)]
247#[path = "./arp_scanner_tests.rs"]
248mod tests;