fastping_rs/
lib.rs

1extern crate pnet;
2extern crate pnet_macros_support;
3#[macro_use]
4extern crate log;
5extern crate rand;
6
7mod ping;
8
9use ping::{send_pings, Ping, ReceivedPing};
10use pnet::packet::icmp::echo_reply::EchoReplyPacket as IcmpEchoReplyPacket;
11use pnet::packet::icmpv6::echo_reply::EchoReplyPacket as Icmpv6EchoReplyPacket;
12use pnet::packet::ip::IpNextHeaderProtocols;
13use pnet::packet::Packet;
14use pnet::packet::{icmp, icmpv6};
15use pnet::transport::transport_channel;
16use pnet::transport::TransportChannelType::Layer4;
17use pnet::transport::TransportProtocol::{Ipv4, Ipv6};
18use pnet::transport::{icmp_packet_iter, icmpv6_packet_iter};
19use pnet::transport::{TransportReceiver, TransportSender};
20use std::collections::BTreeMap;
21use std::net::IpAddr;
22use std::sync::mpsc::{channel, Receiver, Sender};
23use std::sync::{Arc, Mutex, RwLock};
24use std::thread;
25use std::time::{Duration, Instant};
26
27// result type returned by fastping_rs::Pinger::new()
28pub type NewPingerResult = Result<(Pinger, Receiver<PingResult>), String>;
29
30// ping result type.  Idle represents pings that have not received a repsonse within the max_rtt.
31// Receive represents pings which have received a repsonse
32pub enum PingResult {
33    Idle { addr: IpAddr },
34    Receive { addr: IpAddr, rtt: Duration },
35}
36
37pub struct Pinger {
38    // Number of milliseconds of an idle timeout. Once it passed,
39    // the library calls an idle callback function.  Default is 2000
40    max_rtt: Arc<Duration>,
41
42    // map of addresses to ping on each run
43    targets: Arc<Mutex<BTreeMap<IpAddr, Ping>>>,
44
45    // Size in bytes of the payload to send.  Default is 16 bytes
46    size: usize,
47
48    // sender end of the channel for piping results to client
49    results_sender: Sender<PingResult>,
50
51    // sender end of libpnet icmp v4 transport channel
52    tx: Arc<Mutex<TransportSender>>,
53
54    // receiver end of libpnet icmp v4 transport channel
55    rx: Arc<Mutex<TransportReceiver>>,
56
57    // sender end of libpnet icmp v6 transport channel
58    txv6: Arc<Mutex<TransportSender>>,
59
60    // receiver end of libpnet icmp v6 transport channel
61    rxv6: Arc<Mutex<TransportReceiver>>,
62
63    // sender for internal result passing beween threads
64    thread_tx: Sender<ReceivedPing>,
65
66    // receiver for internal result passing beween threads
67    thread_rx: Arc<Mutex<Receiver<ReceivedPing>>>,
68
69    // timer for tracking round trip times
70    timer: Arc<RwLock<Instant>>,
71
72    // flag to stop pinging
73    stop: Arc<Mutex<bool>>,
74}
75
76impl Pinger {
77    // initialize the pinger and start the icmp and icmpv6 listeners
78    pub fn new(_max_rtt: Option<u64>, _size: Option<usize>) -> NewPingerResult {
79        let targets = BTreeMap::new();
80        let (sender, receiver) = channel();
81
82        let protocol = Layer4(Ipv4(IpNextHeaderProtocols::Icmp));
83        let (tx, rx) = match transport_channel(4096, protocol) {
84            Ok((tx, rx)) => (tx, rx),
85            Err(e) => return Err(e.to_string()),
86        };
87
88        let protocolv6 = Layer4(Ipv6(IpNextHeaderProtocols::Icmpv6));
89        let (txv6, rxv6) = match transport_channel(4096, protocolv6) {
90            Ok((txv6, rxv6)) => (txv6, rxv6),
91            Err(e) => return Err(e.to_string()),
92        };
93
94        let (thread_tx, thread_rx) = channel();
95
96        let mut pinger = Pinger {
97            max_rtt: Arc::new(Duration::from_millis(2000)),
98            targets: Arc::new(Mutex::new(targets)),
99            size: _size.unwrap_or(16),
100            results_sender: sender,
101            tx: Arc::new(Mutex::new(tx)),
102            rx: Arc::new(Mutex::new(rx)),
103            txv6: Arc::new(Mutex::new(txv6)),
104            rxv6: Arc::new(Mutex::new(rxv6)),
105            thread_rx: Arc::new(Mutex::new(thread_rx)),
106            thread_tx,
107            timer: Arc::new(RwLock::new(Instant::now())),
108            stop: Arc::new(Mutex::new(false)),
109        };
110        if let Some(rtt_value) = _max_rtt {
111            pinger.max_rtt = Arc::new(Duration::from_millis(rtt_value));
112        }
113        if let Some(size_value) = _size {
114            pinger.size = size_value;
115        }
116
117        pinger.start_listener();
118        Ok((pinger, receiver))
119    }
120
121    // add either an ipv4 or ipv6 target address for pinging
122    pub fn add_ipaddr(&self, ipaddr: &str) {
123        let addr = ipaddr.parse::<IpAddr>();
124        match addr {
125            Ok(valid_addr) => {
126                debug!("Address added {}", valid_addr);
127                let new_ping = Ping::new(valid_addr);
128                self.targets.lock().unwrap().insert(valid_addr, new_ping);
129            }
130            Err(e) => {
131                error!("Error adding ip address {}. Error: {}", ipaddr, e);
132            }
133        };
134    }
135
136    // remove a previously added ipv4 or ipv6 target address
137    pub fn remove_ipaddr(&self, ipaddr: &str) {
138        let addr = ipaddr.parse::<IpAddr>();
139        match addr {
140            Ok(valid_addr) => {
141                debug!("Address removed {}", valid_addr);
142                self.targets.lock().unwrap().remove(&valid_addr);
143            }
144            Err(e) => {
145                error!("Error removing ip address {}. Error: {}", ipaddr, e);
146            }
147        };
148    }
149
150    // stop running the continous pinger
151    pub fn stop_pinger(&self) {
152        let mut stop = self.stop.lock().unwrap();
153        *stop = true;
154    }
155
156    // run one round of pinging and stop
157    pub fn ping_once(&self) {
158        self.run_pings(true)
159    }
160
161    // run the continuous pinger
162    pub fn run_pinger(&self) {
163        self.run_pings(false)
164    }
165
166    // run pinger either once or continuously
167    fn run_pings(&self, run_once: bool) {
168        let thread_rx = self.thread_rx.clone();
169        let tx = self.tx.clone();
170        let txv6 = self.txv6.clone();
171        let results_sender = self.results_sender.clone();
172        let stop = self.stop.clone();
173        let targets = self.targets.clone();
174        let timer = self.timer.clone();
175        let max_rtt = self.max_rtt.clone();
176        let size = self.size;
177
178        {
179            let mut stop = self.stop.lock().unwrap();
180            if run_once {
181                debug!("Running pinger for one round");
182                *stop = true;
183            } else {
184                *stop = false;
185            }
186        }
187
188        if run_once {
189            send_pings(
190                size,
191                timer,
192                stop,
193                results_sender,
194                thread_rx,
195                tx,
196                txv6,
197                targets,
198                max_rtt,
199            );
200        } else {
201            thread::spawn(move || {
202                send_pings(
203                    size,
204                    timer,
205                    stop,
206                    results_sender,
207                    thread_rx,
208                    tx,
209                    txv6,
210                    targets,
211                    max_rtt,
212                );
213            });
214        }
215    }
216
217    fn start_listener(&self) {
218        // start icmp listeners in the background and use internal channels for results
219
220        // setup ipv4 listener
221        let thread_tx = self.thread_tx.clone();
222        let rx = self.rx.clone();
223        let timer = self.timer.clone();
224        let stop = self.stop.clone();
225
226        thread::spawn(move || {
227            let mut receiver = rx.lock().unwrap();
228            let mut iter = icmp_packet_iter(&mut receiver);
229            loop {
230                match iter.next() {
231                    Ok((packet, addr)) => match IcmpEchoReplyPacket::new(packet.packet()) {
232                        Some(echo_reply) => {
233                            if packet.get_icmp_type() == icmp::IcmpTypes::EchoReply {
234                                let start_time = timer.read().unwrap();
235                                match thread_tx.send(ReceivedPing {
236                                    addr,
237                                    identifier: echo_reply.get_identifier(),
238                                    sequence_number: echo_reply.get_sequence_number(),
239                                    rtt: Instant::now().duration_since(*start_time),
240                                }) {
241                                    Ok(_) => {}
242                                    Err(e) => {
243                                        if !*stop.lock().unwrap() {
244                                            error!("Error sending ping result on channel: {}", e)
245                                        } else {
246                                            return;
247                                        }
248                                    }
249                                }
250                            } else {
251                                debug!(
252                                    "ICMP type other than reply (0) received from {:?}: {:?}",
253                                    addr,
254                                    packet.get_icmp_type()
255                                );
256                            }
257                        }
258                        None => {}
259                    },
260                    Err(e) => {
261                        error!("An error occurred while reading: {}", e);
262                    }
263                }
264            }
265        });
266
267        // setup ipv6 listener
268        let thread_txv6 = self.thread_tx.clone();
269        let rxv6 = self.rxv6.clone();
270        let timerv6 = self.timer.clone();
271        let stopv6 = self.stop.clone();
272
273        thread::spawn(move || {
274            let mut receiver = rxv6.lock().unwrap();
275            let mut iter = icmpv6_packet_iter(&mut receiver);
276            loop {
277                match iter.next() {
278                    Ok((packet, addr)) => match Icmpv6EchoReplyPacket::new(packet.packet()) {
279                        Some(echo_reply) => {
280                            if packet.get_icmpv6_type() == icmpv6::Icmpv6Types::EchoReply {
281                                let start_time = timerv6.read().unwrap();
282                                match thread_txv6.send(ReceivedPing {
283                                    addr,
284                                    identifier: echo_reply.get_identifier(),
285                                    sequence_number: echo_reply.get_sequence_number(),
286                                    rtt: Instant::now().duration_since(*start_time),
287                                }) {
288                                    Ok(_) => {}
289                                    Err(e) => {
290                                        if !*stopv6.lock().unwrap() {
291                                            error!("Error sending ping result on channel: {}", e)
292                                        } else {
293                                            return;
294                                        }
295                                    }
296                                }
297                            } else {
298                                debug!(
299                                    "ICMPv6 type other than reply (129) received from {:?}: {:?}",
300                                    addr,
301                                    packet.get_icmpv6_type()
302                                );
303                            }
304                        }
305                        None => {}
306                    },
307                    Err(e) => {
308                        error!("An error occurred while reading: {}", e);
309                    }
310                }
311            }
312        });
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319
320    #[test]
321    fn test_newpinger() {
322        // test we can create a new pinger with optional arguments,
323        // test it returns the new pinger and a client channel
324        // test we can use the client channel
325        match Pinger::new(Some(3000 as u64), Some(24)) {
326            Ok((test_pinger, test_channel)) => {
327                assert_eq!(test_pinger.max_rtt, Arc::new(Duration::new(3, 0)));
328                assert_eq!(test_pinger.size, 24);
329
330                match test_pinger.results_sender.send(PingResult::Idle {
331                    addr: "127.0.0.1".parse::<IpAddr>().unwrap(),
332                }) {
333                    Ok(_) => match test_channel.recv() {
334                        Ok(result) => match result {
335                            PingResult::Idle { addr } => {
336                                assert_eq!(addr, "127.0.0.1".parse::<IpAddr>().unwrap());
337                            }
338                            _ => {}
339                        },
340                        Err(_) => assert!(false),
341                    },
342                    Err(_) => assert!(false),
343                }
344            }
345            Err(e) => {
346                println!("Test failed: {}", e);
347                assert!(false)
348            }
349        };
350    }
351
352    #[test]
353    fn test_add_remove_addrs() {
354        match Pinger::new(None, None) {
355            Ok((test_pinger, _)) => {
356                test_pinger.add_ipaddr("127.0.0.1");
357                assert_eq!(test_pinger.targets.lock().unwrap().len(), 1);
358                assert!(test_pinger
359                    .targets
360                    .lock()
361                    .unwrap()
362                    .contains_key(&"127.0.0.1".parse::<IpAddr>().unwrap()));
363
364                test_pinger.remove_ipaddr("127.0.0.1");
365                assert_eq!(test_pinger.targets.lock().unwrap().len(), 0);
366                assert_eq!(
367                    test_pinger
368                        .targets
369                        .lock()
370                        .unwrap()
371                        .contains_key(&"127.0.0.1".parse::<IpAddr>().unwrap()),
372                    false
373                );
374            }
375            Err(e) => {
376                println!("Test failed: {}", e);
377                assert!(false)
378            }
379        }
380    }
381
382    #[test]
383    fn test_stop() {
384        match Pinger::new(None, None) {
385            Ok((test_pinger, _)) => {
386                assert_eq!(*test_pinger.stop.lock().unwrap(), false);
387                test_pinger.stop_pinger();
388                assert_eq!(*test_pinger.stop.lock().unwrap(), true);
389            }
390            Err(e) => {
391                println!("Test failed: {}", e);
392                assert!(false)
393            }
394        }
395    }
396
397    #[test]
398    fn test_integration() {
399        // more comprehensive integration test
400        match Pinger::new(None, None) {
401            Ok((test_pinger, test_channel)) => {
402                let test_addrs = vec!["127.0.0.1", "7.7.7.7", "::1"];
403                for target in test_addrs.iter() {
404                    test_pinger.add_ipaddr(target);
405                }
406                test_pinger.ping_once();
407                for _ in test_addrs.iter() {
408                    match test_channel.recv() {
409                        Ok(result) => match result {
410                            PingResult::Idle { addr } => {
411                                assert_eq!("7.7.7.7".parse::<IpAddr>().unwrap(), addr);
412                            }
413                            PingResult::Receive { addr, rtt: _ } => {
414                                if addr == "::1".parse::<IpAddr>().unwrap()
415                                    || addr == "127.0.0.1".parse::<IpAddr>().unwrap()
416                                {
417                                    assert!(true)
418                                } else {
419                                    assert!(false)
420                                }
421                            }
422                            _ => {
423                                assert!(false)
424                            }
425                        },
426                        Err(_) => assert!(false),
427                    }
428                }
429            }
430            Err(e) => {
431                println!("Test failed: {}", e);
432                assert!(false)
433            }
434        }
435    }
436}