1extern crate pnet;
2extern crate pnet_macros_support;
3#[macro_use]
4extern crate log;
5extern crate rand;
6
7mod ping;
8
9use ping::{send_pings, Ping, ReceivedPing};
10use pnet::packet::icmp::echo_reply::EchoReplyPacket as IcmpEchoReplyPacket;
11use pnet::packet::icmpv6::echo_reply::EchoReplyPacket as Icmpv6EchoReplyPacket;
12use pnet::packet::ip::IpNextHeaderProtocols;
13use pnet::packet::Packet;
14use pnet::packet::{icmp, icmpv6};
15use pnet::transport::transport_channel;
16use pnet::transport::TransportChannelType::Layer4;
17use pnet::transport::TransportProtocol::{Ipv4, Ipv6};
18use pnet::transport::{icmp_packet_iter, icmpv6_packet_iter};
19use pnet::transport::{TransportReceiver, TransportSender};
20use std::collections::BTreeMap;
21use std::net::IpAddr;
22use std::sync::mpsc::{channel, Receiver, Sender};
23use std::sync::{Arc, Mutex, RwLock};
24use std::thread;
25use std::time::{Duration, Instant};
26
27pub type NewPingerResult = Result<(Pinger, Receiver<PingResult>), String>;
29
30pub enum PingResult {
33 Idle { addr: IpAddr },
34 Receive { addr: IpAddr, rtt: Duration },
35}
36
37pub struct Pinger {
38 max_rtt: Arc<Duration>,
41
42 targets: Arc<Mutex<BTreeMap<IpAddr, Ping>>>,
44
45 size: usize,
47
48 results_sender: Sender<PingResult>,
50
51 tx: Arc<Mutex<TransportSender>>,
53
54 rx: Arc<Mutex<TransportReceiver>>,
56
57 txv6: Arc<Mutex<TransportSender>>,
59
60 rxv6: Arc<Mutex<TransportReceiver>>,
62
63 thread_tx: Sender<ReceivedPing>,
65
66 thread_rx: Arc<Mutex<Receiver<ReceivedPing>>>,
68
69 timer: Arc<RwLock<Instant>>,
71
72 stop: Arc<Mutex<bool>>,
74}
75
76impl Pinger {
77 pub fn new(_max_rtt: Option<u64>, _size: Option<usize>) -> NewPingerResult {
79 let targets = BTreeMap::new();
80 let (sender, receiver) = channel();
81
82 let protocol = Layer4(Ipv4(IpNextHeaderProtocols::Icmp));
83 let (tx, rx) = match transport_channel(4096, protocol) {
84 Ok((tx, rx)) => (tx, rx),
85 Err(e) => return Err(e.to_string()),
86 };
87
88 let protocolv6 = Layer4(Ipv6(IpNextHeaderProtocols::Icmpv6));
89 let (txv6, rxv6) = match transport_channel(4096, protocolv6) {
90 Ok((txv6, rxv6)) => (txv6, rxv6),
91 Err(e) => return Err(e.to_string()),
92 };
93
94 let (thread_tx, thread_rx) = channel();
95
96 let mut pinger = Pinger {
97 max_rtt: Arc::new(Duration::from_millis(2000)),
98 targets: Arc::new(Mutex::new(targets)),
99 size: _size.unwrap_or(16),
100 results_sender: sender,
101 tx: Arc::new(Mutex::new(tx)),
102 rx: Arc::new(Mutex::new(rx)),
103 txv6: Arc::new(Mutex::new(txv6)),
104 rxv6: Arc::new(Mutex::new(rxv6)),
105 thread_rx: Arc::new(Mutex::new(thread_rx)),
106 thread_tx,
107 timer: Arc::new(RwLock::new(Instant::now())),
108 stop: Arc::new(Mutex::new(false)),
109 };
110 if let Some(rtt_value) = _max_rtt {
111 pinger.max_rtt = Arc::new(Duration::from_millis(rtt_value));
112 }
113 if let Some(size_value) = _size {
114 pinger.size = size_value;
115 }
116
117 pinger.start_listener();
118 Ok((pinger, receiver))
119 }
120
121 pub fn add_ipaddr(&self, ipaddr: &str) {
123 let addr = ipaddr.parse::<IpAddr>();
124 match addr {
125 Ok(valid_addr) => {
126 debug!("Address added {}", valid_addr);
127 let new_ping = Ping::new(valid_addr);
128 self.targets.lock().unwrap().insert(valid_addr, new_ping);
129 }
130 Err(e) => {
131 error!("Error adding ip address {}. Error: {}", ipaddr, e);
132 }
133 };
134 }
135
136 pub fn remove_ipaddr(&self, ipaddr: &str) {
138 let addr = ipaddr.parse::<IpAddr>();
139 match addr {
140 Ok(valid_addr) => {
141 debug!("Address removed {}", valid_addr);
142 self.targets.lock().unwrap().remove(&valid_addr);
143 }
144 Err(e) => {
145 error!("Error removing ip address {}. Error: {}", ipaddr, e);
146 }
147 };
148 }
149
150 pub fn stop_pinger(&self) {
152 let mut stop = self.stop.lock().unwrap();
153 *stop = true;
154 }
155
156 pub fn ping_once(&self) {
158 self.run_pings(true)
159 }
160
161 pub fn run_pinger(&self) {
163 self.run_pings(false)
164 }
165
166 fn run_pings(&self, run_once: bool) {
168 let thread_rx = self.thread_rx.clone();
169 let tx = self.tx.clone();
170 let txv6 = self.txv6.clone();
171 let results_sender = self.results_sender.clone();
172 let stop = self.stop.clone();
173 let targets = self.targets.clone();
174 let timer = self.timer.clone();
175 let max_rtt = self.max_rtt.clone();
176 let size = self.size;
177
178 {
179 let mut stop = self.stop.lock().unwrap();
180 if run_once {
181 debug!("Running pinger for one round");
182 *stop = true;
183 } else {
184 *stop = false;
185 }
186 }
187
188 if run_once {
189 send_pings(
190 size,
191 timer,
192 stop,
193 results_sender,
194 thread_rx,
195 tx,
196 txv6,
197 targets,
198 max_rtt,
199 );
200 } else {
201 thread::spawn(move || {
202 send_pings(
203 size,
204 timer,
205 stop,
206 results_sender,
207 thread_rx,
208 tx,
209 txv6,
210 targets,
211 max_rtt,
212 );
213 });
214 }
215 }
216
217 fn start_listener(&self) {
218 let thread_tx = self.thread_tx.clone();
222 let rx = self.rx.clone();
223 let timer = self.timer.clone();
224 let stop = self.stop.clone();
225
226 thread::spawn(move || {
227 let mut receiver = rx.lock().unwrap();
228 let mut iter = icmp_packet_iter(&mut receiver);
229 loop {
230 match iter.next() {
231 Ok((packet, addr)) => match IcmpEchoReplyPacket::new(packet.packet()) {
232 Some(echo_reply) => {
233 if packet.get_icmp_type() == icmp::IcmpTypes::EchoReply {
234 let start_time = timer.read().unwrap();
235 match thread_tx.send(ReceivedPing {
236 addr,
237 identifier: echo_reply.get_identifier(),
238 sequence_number: echo_reply.get_sequence_number(),
239 rtt: Instant::now().duration_since(*start_time),
240 }) {
241 Ok(_) => {}
242 Err(e) => {
243 if !*stop.lock().unwrap() {
244 error!("Error sending ping result on channel: {}", e)
245 } else {
246 return;
247 }
248 }
249 }
250 } else {
251 debug!(
252 "ICMP type other than reply (0) received from {:?}: {:?}",
253 addr,
254 packet.get_icmp_type()
255 );
256 }
257 }
258 None => {}
259 },
260 Err(e) => {
261 error!("An error occurred while reading: {}", e);
262 }
263 }
264 }
265 });
266
267 let thread_txv6 = self.thread_tx.clone();
269 let rxv6 = self.rxv6.clone();
270 let timerv6 = self.timer.clone();
271 let stopv6 = self.stop.clone();
272
273 thread::spawn(move || {
274 let mut receiver = rxv6.lock().unwrap();
275 let mut iter = icmpv6_packet_iter(&mut receiver);
276 loop {
277 match iter.next() {
278 Ok((packet, addr)) => match Icmpv6EchoReplyPacket::new(packet.packet()) {
279 Some(echo_reply) => {
280 if packet.get_icmpv6_type() == icmpv6::Icmpv6Types::EchoReply {
281 let start_time = timerv6.read().unwrap();
282 match thread_txv6.send(ReceivedPing {
283 addr,
284 identifier: echo_reply.get_identifier(),
285 sequence_number: echo_reply.get_sequence_number(),
286 rtt: Instant::now().duration_since(*start_time),
287 }) {
288 Ok(_) => {}
289 Err(e) => {
290 if !*stopv6.lock().unwrap() {
291 error!("Error sending ping result on channel: {}", e)
292 } else {
293 return;
294 }
295 }
296 }
297 } else {
298 debug!(
299 "ICMPv6 type other than reply (129) received from {:?}: {:?}",
300 addr,
301 packet.get_icmpv6_type()
302 );
303 }
304 }
305 None => {}
306 },
307 Err(e) => {
308 error!("An error occurred while reading: {}", e);
309 }
310 }
311 }
312 });
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319
320 #[test]
321 fn test_newpinger() {
322 match Pinger::new(Some(3000 as u64), Some(24)) {
326 Ok((test_pinger, test_channel)) => {
327 assert_eq!(test_pinger.max_rtt, Arc::new(Duration::new(3, 0)));
328 assert_eq!(test_pinger.size, 24);
329
330 match test_pinger.results_sender.send(PingResult::Idle {
331 addr: "127.0.0.1".parse::<IpAddr>().unwrap(),
332 }) {
333 Ok(_) => match test_channel.recv() {
334 Ok(result) => match result {
335 PingResult::Idle { addr } => {
336 assert_eq!(addr, "127.0.0.1".parse::<IpAddr>().unwrap());
337 }
338 _ => {}
339 },
340 Err(_) => assert!(false),
341 },
342 Err(_) => assert!(false),
343 }
344 }
345 Err(e) => {
346 println!("Test failed: {}", e);
347 assert!(false)
348 }
349 };
350 }
351
352 #[test]
353 fn test_add_remove_addrs() {
354 match Pinger::new(None, None) {
355 Ok((test_pinger, _)) => {
356 test_pinger.add_ipaddr("127.0.0.1");
357 assert_eq!(test_pinger.targets.lock().unwrap().len(), 1);
358 assert!(test_pinger
359 .targets
360 .lock()
361 .unwrap()
362 .contains_key(&"127.0.0.1".parse::<IpAddr>().unwrap()));
363
364 test_pinger.remove_ipaddr("127.0.0.1");
365 assert_eq!(test_pinger.targets.lock().unwrap().len(), 0);
366 assert_eq!(
367 test_pinger
368 .targets
369 .lock()
370 .unwrap()
371 .contains_key(&"127.0.0.1".parse::<IpAddr>().unwrap()),
372 false
373 );
374 }
375 Err(e) => {
376 println!("Test failed: {}", e);
377 assert!(false)
378 }
379 }
380 }
381
382 #[test]
383 fn test_stop() {
384 match Pinger::new(None, None) {
385 Ok((test_pinger, _)) => {
386 assert_eq!(*test_pinger.stop.lock().unwrap(), false);
387 test_pinger.stop_pinger();
388 assert_eq!(*test_pinger.stop.lock().unwrap(), true);
389 }
390 Err(e) => {
391 println!("Test failed: {}", e);
392 assert!(false)
393 }
394 }
395 }
396
397 #[test]
398 fn test_integration() {
399 match Pinger::new(None, None) {
401 Ok((test_pinger, test_channel)) => {
402 let test_addrs = vec!["127.0.0.1", "7.7.7.7", "::1"];
403 for target in test_addrs.iter() {
404 test_pinger.add_ipaddr(target);
405 }
406 test_pinger.ping_once();
407 for _ in test_addrs.iter() {
408 match test_channel.recv() {
409 Ok(result) => match result {
410 PingResult::Idle { addr } => {
411 assert_eq!("7.7.7.7".parse::<IpAddr>().unwrap(), addr);
412 }
413 PingResult::Receive { addr, rtt: _ } => {
414 if addr == "::1".parse::<IpAddr>().unwrap()
415 || addr == "127.0.0.1".parse::<IpAddr>().unwrap()
416 {
417 assert!(true)
418 } else {
419 assert!(false)
420 }
421 }
422 _ => {
423 assert!(false)
424 }
425 },
426 Err(_) => assert!(false),
427 }
428 }
429 }
430 Err(e) => {
431 println!("Test failed: {}", e);
432 assert!(false)
433 }
434 }
435 }
436}