r_lanlib/scanners/
arp_scanner.rs1use derive_builder::Builder;
4use pnet::packet::{Packet, arp, ethernet};
5use std::{
6 collections::HashMap,
7 net::Ipv4Addr,
8 sync::{self, Arc, Mutex},
9 thread::{self, JoinHandle},
10 time::{Duration, Instant},
11};
12use threadpool::ThreadPool;
13
14use crate::{
15 error::{RLanLibError, Result},
16 network::NetworkInterface,
17 packet::{self, arp_packet::ArpPacketBuilder, wire::Wire},
18 scanners::{Device, PortSet, Scanning},
19 targets::ips::IPTargets,
20};
21
22use super::{ScanMessage, Scanner, heartbeat::HeartBeat};
23
24#[derive(Clone, Builder)]
26#[builder(setter(into))]
27pub struct ARPScanner {
28 interface: Arc<NetworkInterface>,
30 wire: Wire,
32 targets: Arc<IPTargets>,
34 source_port: u16,
36 include_vendor: bool,
38 include_host_names: bool,
40 idle_timeout: Duration,
42 notifier: sync::mpsc::Sender<ScanMessage>,
44 #[builder(default = "Arc::new(Mutex::new(HashMap::new()))")]
46 send_times: Arc<Mutex<HashMap<Ipv4Addr, Instant>>>,
47}
48
49impl ARPScanner {
50 pub fn builder() -> ARPScannerBuilder {
52 ARPScannerBuilder::default()
53 }
54
55 fn process_target(&self, target: Ipv4Addr) -> Result<()> {
56 thread::sleep(packet::DEFAULT_PACKET_SEND_TIMING);
58
59 log::debug!("scanning ARP target: {}", target);
60
61 let arp_packet = ArpPacketBuilder::default()
62 .source_ip(self.interface.ipv4)
63 .source_mac(self.interface.mac)
64 .dest_ip(target)
65 .build()?;
66
67 let pkt_buf = arp_packet.to_raw();
68
69 self.notifier
71 .send(ScanMessage::Info(Scanning {
72 ip: target,
73 port: None,
74 }))
75 .map_err(RLanLibError::from_channel_send_error)?;
76
77 let mut pkt_sender = self.wire.0.lock()?;
78
79 if let Ok(mut times) = self.send_times.lock() {
81 times.insert(target, Instant::now());
82 }
83
84 pkt_sender.send(&pkt_buf)?;
86
87 Ok(())
88 }
89
90 fn process_incoming_packet(
91 &self,
92 pkt: &[u8],
93 pool: &ThreadPool,
94 ) -> Result<()> {
95 let Some(eth) = ethernet::EthernetPacket::new(pkt) else {
96 return Ok(());
97 };
98
99 let Some(header) = arp::ArpPacket::new(eth.payload()) else {
100 return Ok(());
101 };
102
103 if header.get_operation() != arp::ArpOperations::Reply {
106 return Ok(());
107 }
108
109 let ip4 = header.get_sender_proto_addr();
110 let mac = eth.get_source();
111
112 let latency_ms = self.send_times.lock().ok().and_then(|mut times| {
113 times.remove(&ip4).map(|t| t.elapsed().as_millis())
114 });
115
116 let notification_sender = self.notifier.clone();
117 let interface = Arc::clone(&self.interface);
118 let include_host_names = self.include_host_names;
119 let include_vendor = self.include_vendor;
120
121 pool.execute(move || {
124 let hostname = if include_host_names {
125 log::debug!("looking up hostname for {}", ip4);
126 dns_lookup::lookup_addr(&ip4.into()).unwrap_or_default()
127 } else {
128 String::new()
129 };
130
131 let vendor = if include_vendor {
132 oui_data::lookup(&mac.to_string())
133 .map(|v| v.organization().to_owned())
134 .unwrap_or_default()
135 } else {
136 String::new()
137 };
138
139 let _ =
140 notification_sender.send(ScanMessage::ARPScanDevice(Device {
141 hostname,
142 ip: ip4,
143 mac,
144 vendor,
145 is_current_host: ip4 == interface.ipv4,
146 open_ports: PortSet::new(),
147 latency_ms,
148 }));
149 });
150
151 Ok(())
152 }
153
154 fn read_packets(
157 &self,
158 done: sync::mpsc::Receiver<()>,
159 ) -> Result<JoinHandle<Result<()>>> {
160 let (heartbeat_tx, heartbeat_rx) = sync::mpsc::channel::<()>();
161
162 let heartbeat = HeartBeat::builder()
163 .source_mac(self.interface.mac)
164 .source_ipv4(self.interface.ipv4)
165 .source_port(self.source_port)
166 .packet_sender(Arc::clone(&self.wire.0))
167 .build()?;
168
169 heartbeat.start_in_thread(heartbeat_rx)?;
170
171 let self_clone = self.clone();
172
173 Ok(thread::spawn(move || -> Result<()> {
174 let mut reader = self_clone.wire.1.lock()?;
175 let lookup_pool = ThreadPool::new(8);
178
179 loop {
180 if done.try_recv().is_ok() {
181 log::debug!("exiting arp packet reader");
182 if let Err(e) = heartbeat_tx.send(()) {
183 log::error!("failed to stop heartbeat: {}", e);
184 }
185 break;
186 }
187
188 let pkt = reader.next_packet()?;
189
190 self_clone.process_incoming_packet(pkt, &lookup_pool)?;
191 }
192
193 Ok(())
194 }))
195 }
196}
197
198impl Scanner for ARPScanner {
200 fn scan(&self) -> Result<JoinHandle<Result<()>>> {
201 log::debug!("performing ARP scan on targets: {:?}", self.targets);
202 log::debug!("include_vendor: {}", self.include_vendor);
203 log::debug!("include_host_names: {}", self.include_host_names);
204 log::debug!("starting arp packet reader");
205
206 let self_clone = self.clone();
207 let (done_tx, done_rx) = sync::mpsc::channel::<()>();
208
209 let read_handle = self.read_packets(done_rx)?;
210
211 let scan_handle = thread::spawn(move || -> Result<()> {
213 let mut scan_error: Option<RLanLibError> = None;
214
215 if let Err(err) = self_clone
216 .targets
217 .lazy_loop(|t| self_clone.process_target(t))
218 {
219 scan_error = Some(err);
220 }
221
222 thread::sleep(self_clone.idle_timeout);
223
224 self_clone
225 .notifier
226 .send(ScanMessage::Done)
227 .map_err(RLanLibError::from_channel_send_error)?;
228
229 let _ = done_tx.send(());
232
233 let read_result = read_handle.join()?;
234
235 if let Some(err) = scan_error {
236 return Err(err);
237 }
238
239 read_result
240 });
241
242 Ok(scan_handle)
243 }
244}
245
246#[cfg(test)]
247#[path = "./arp_scanner_tests.rs"]
248mod tests;