Skip to main content

r_lanlib/scanners/
arp_scanner.rs

1//! Provides Scanner implementation for ARP scanning
2
3use derive_builder::Builder;
4use pnet::packet::{Packet, arp, ethernet};
5use std::{
6    collections::HashMap,
7    net::Ipv4Addr,
8    sync::{self, Arc, Mutex},
9    thread::{self, JoinHandle},
10    time::{Duration, SystemTime},
11};
12use threadpool::ThreadPool;
13
14use crate::{
15    error::{RLanLibError, Result},
16    network::NetworkInterface,
17    packet::arp_packet::ArpPacketBuilder,
18    scanners::{Device, PortSet, Scanning},
19    targets::ips::IPTargets,
20    wire::{DEFAULT_PACKET_SEND_TIMING, PacketMetadata, Wire},
21};
22
23use super::{ScanMessage, Scanner, heartbeat::HeartBeat};
24
25/// Data structure representing an ARP scanner
26#[derive(Clone, Builder)]
27#[builder(setter(into))]
28pub struct ARPScanner {
29    /// Network interface to use for scanning
30    interface: Arc<NetworkInterface>,
31    /// Wire for reading and sending packets on the wire
32    wire: Wire,
33    /// IP targets to scan
34    targets: Arc<IPTargets>,
35    /// Source port for packet listener and incoming packet identification
36    source_port: u16,
37    /// Whether to include vendor lookups for discovered devices
38    include_vendor: bool,
39    /// Whether to include hostname lookups for discovered devices
40    include_host_names: bool,
41    /// Duration to wait for responses after scanning completes
42    idle_timeout: Duration,
43    /// Channel for sending scan results and status messages
44    notifier: sync::mpsc::Sender<ScanMessage>,
45    /// Throttles speed at which packets are sent. Higher throttles result
46    /// in more accurate scans
47    #[builder(default = DEFAULT_PACKET_SEND_TIMING)]
48    throttle: Duration,
49    /// Default gateway IP, used to mark the gateway device in scan results
50    #[builder(default)]
51    gateway: Option<Ipv4Addr>,
52    /// Tracks the SystemTime at which each ARP request was sent, keyed by
53    /// target IP. Used to compute RTT from send time to kernel capture of
54    /// the reply (metadata.timestamp).
55    #[builder(default = "Arc::new(Mutex::new(HashMap::new()))")]
56    send_times: Arc<Mutex<HashMap<Ipv4Addr, SystemTime>>>,
57}
58
59impl ARPScanner {
60    /// Returns builder for ARPScanner
61    pub fn builder() -> ARPScannerBuilder {
62        ARPScannerBuilder::default()
63    }
64
65    fn process_target(&self, target: Ipv4Addr) -> Result<()> {
66        // throttle packet sending to prevent packet loss
67        thread::sleep(self.throttle);
68
69        log::debug!("scanning ARP target: {}", target);
70
71        // The OS never sends an ARP reply to its own IP, so synthesize the
72        // device entry immediately rather than waiting for a reply that will
73        // never arrive.
74        if target == self.interface.ipv4 {
75            self.notifier
76                .send(ScanMessage::ARPScanDevice(Device {
77                    hostname: String::new(),
78                    ip: self.interface.ipv4,
79                    mac: self.interface.mac,
80                    vendor: String::new(),
81                    is_current_host: true,
82                    is_gateway: self
83                        .gateway
84                        .is_some_and(|gw| gw == self.interface.ipv4),
85                    open_ports: PortSet::new(),
86                    latency_ms: Some(0),
87                    response_ttl: None,
88                }))
89                .map_err(RLanLibError::from_channel_send_error)?;
90            return Ok(());
91        }
92
93        let arp_packet = ArpPacketBuilder::default()
94            .source_ip(self.interface.ipv4)
95            .source_mac(self.interface.mac)
96            .dest_ip(target)
97            .build()?;
98
99        let pkt_buf = arp_packet.to_raw();
100
101        // inform consumer we are scanning this target (ignore error on failure to notify)
102        self.notifier
103            .send(ScanMessage::Info(Scanning {
104                ip: target,
105                port: None,
106            }))
107            .map_err(RLanLibError::from_channel_send_error)?;
108
109        let send_time = {
110            let mut pkt_sender = self.wire.0.lock()?;
111            // Capture timestamp inside the lock, immediately before the send,
112            // so it is as close to the actual wire event as possible.
113            let t = SystemTime::now();
114            pkt_sender.send(&pkt_buf)?;
115            t
116        };
117
118        if let Ok(mut times) = self.send_times.lock() {
119            times.insert(target, send_time);
120        }
121
122        Ok(())
123    }
124
125    fn process_incoming_packet(
126        &self,
127        pkt: &[u8],
128        metadata: PacketMetadata,
129        pool: &ThreadPool,
130    ) -> Result<()> {
131        let Some(eth) = ethernet::EthernetPacket::new(pkt) else {
132            return Ok(());
133        };
134
135        let Some(header) = arp::ArpPacket::new(eth.payload()) else {
136            return Ok(());
137        };
138
139        // Capture ANY ARP reply as it's an indication that there's a
140        // device on the network
141        if header.get_operation() != arp::ArpOperations::Reply {
142            return Ok(());
143        }
144
145        let ip4 = header.get_sender_proto_addr();
146        let mac = eth.get_source();
147
148        // RTT = kernel capture time of reply − SystemTime recorded just
149        // before the send. Both are fixed points so mutex contention during
150        // this lookup cannot inflate the measurement.
151        let send_time = self
152            .send_times
153            .lock()
154            .ok()
155            .and_then(|mut times| times.remove(&ip4));
156
157        let latency_ms = match (send_time, metadata.timestamp) {
158            (Some(sent), Some(recv)) => {
159                recv.duration_since(sent).map(|d| d.as_millis()).ok()
160            }
161            _ => None,
162        };
163
164        let notification_sender = self.notifier.clone();
165        let interface = Arc::clone(&self.interface);
166        let include_host_names = self.include_host_names;
167        let include_vendor = self.include_vendor;
168        let gateway = self.gateway;
169
170        // use a thread pool here so we don't slow down packet
171        // processing while limiting concurrent threads
172        pool.execute(move || {
173            let hostname = if include_host_names {
174                log::debug!("looking up hostname for {}", ip4);
175                dns_lookup::lookup_addr(&ip4.into()).unwrap_or_default()
176            } else {
177                String::new()
178            };
179
180            let vendor = if include_vendor {
181                oui_data::lookup(&mac.to_string())
182                    .map(|v| v.organization().to_owned())
183                    .unwrap_or_default()
184            } else {
185                String::new()
186            };
187
188            let _ =
189                notification_sender.send(ScanMessage::ARPScanDevice(Device {
190                    hostname,
191                    ip: ip4,
192                    mac,
193                    vendor,
194                    is_current_host: ip4 == interface.ipv4,
195                    is_gateway: gateway.is_some_and(|gw| gw == ip4),
196                    open_ports: PortSet::new(),
197                    latency_ms,
198                    response_ttl: None,
199                }));
200        });
201
202        Ok(())
203    }
204
205    // Implements packet reading in a separate thread so we can send and
206    // receive packets simultaneously
207    fn read_packets(
208        &self,
209        done: sync::mpsc::Receiver<()>,
210    ) -> Result<JoinHandle<Result<()>>> {
211        let (heartbeat_tx, heartbeat_rx) = sync::mpsc::channel::<()>();
212
213        let heartbeat = HeartBeat::builder()
214            .source_mac(self.interface.mac)
215            .source_ipv4(self.interface.ipv4)
216            .source_port(self.source_port)
217            .packet_sender(Arc::clone(&self.wire.0))
218            .build()?;
219
220        heartbeat.start_in_thread(heartbeat_rx)?;
221
222        let self_clone = self.clone();
223
224        Ok(thread::spawn(move || -> Result<()> {
225            let mut reader = self_clone.wire.1.lock()?;
226            // Use a bounded thread pool for DNS/vendor lookups to prevent
227            // spawning thousands of threads on large networks
228            let lookup_pool = ThreadPool::new(8);
229
230            loop {
231                if done.try_recv().is_ok() {
232                    log::debug!("exiting arp packet reader");
233                    if let Err(e) = heartbeat_tx.send(()) {
234                        log::error!("failed to stop heartbeat: {}", e);
235                    }
236                    break;
237                }
238
239                let (pkt, metadata) = reader.next_packet_with_metadata()?;
240
241                self_clone.process_incoming_packet(
242                    pkt,
243                    metadata,
244                    &lookup_pool,
245                )?;
246            }
247
248            Ok(())
249        }))
250    }
251}
252
253// Implements the Scanner trait for ARPScanner
254impl Scanner for ARPScanner {
255    fn scan(&self) -> Result<JoinHandle<Result<()>>> {
256        log::debug!("performing ARP scan on targets: {:?}", self.targets);
257        log::debug!("include_vendor: {}", self.include_vendor);
258        log::debug!("include_host_names: {}", self.include_host_names);
259        log::debug!("starting arp packet reader");
260
261        let self_clone = self.clone();
262        let (done_tx, done_rx) = sync::mpsc::channel::<()>();
263
264        let read_handle = self.read_packets(done_rx)?;
265
266        // prevent blocking thread so messages can be freely sent to consumer
267        let scan_handle = thread::spawn(move || -> Result<()> {
268            let mut scan_error: Option<RLanLibError> = None;
269
270            if let Err(err) = self_clone
271                .targets
272                .lazy_loop(|t| self_clone.process_target(t))
273            {
274                scan_error = Some(err);
275            }
276
277            thread::sleep(self_clone.idle_timeout);
278
279            self_clone
280                .notifier
281                .send(ScanMessage::Done)
282                .map_err(RLanLibError::from_channel_send_error)?;
283
284            // ignore errors here as the thread may already be dead due to error
285            // we'll catch any errors from that thread below and report
286            let _ = done_tx.send(());
287
288            let read_result = read_handle.join()?;
289
290            if let Some(err) = scan_error {
291                return Err(err);
292            }
293
294            read_result
295        });
296
297        Ok(scan_handle)
298    }
299}
300
301#[cfg(test)]
302#[path = "./arp_scanner_tests.rs"]
303mod tests;