1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
extern crate pnet;
extern crate pnet_macros_support;
#[macro_use]
extern crate log;
extern crate rand;

mod ping;

use pnet::packet::ip::IpNextHeaderProtocols;
use pnet::transport::{icmp_packet_iter, icmpv6_packet_iter};
use pnet::transport::transport_channel;
use pnet::transport::{TransportSender, TransportReceiver};
use pnet::transport::TransportChannelType::Layer4;
use pnet::transport::TransportProtocol::{Ipv4, Ipv6};
use std::net::{IpAddr};
use ::ping::{send_pings};
use std::time::{Duration, Instant};
use std::collections::BTreeMap;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::thread;
use std::sync::{Arc, Mutex, RwLock};


// result type returned by fastping_rs::Pinger::new()
pub type NewPingerResult = Result<(Pinger, Receiver<PingResult>), String>;

// ping result type.  Idle represents pings that have not received a repsonse within the max_rtt.
// Receive represents pings which have received a repsonse
pub enum PingResult {
    Idle{addr: IpAddr},
    Receive{addr: IpAddr, rtt: Duration},
}

pub struct Pinger {
    // Number of milliseconds of an idle timeout. Once it passed,
	// the library calls an idle callback function.  Default is 2000
    max_rtt: Arc<Duration>,

    // map of addresses to ping on each run
    addrs: Arc<Mutex<BTreeMap<IpAddr, bool>>>,

    // Size in bytes of the payload to send.  Default is 16 bytes
    size: i32,

    // sender end of the channel for piping results to client
    results_sender: Sender<PingResult>,

    // sender end of libpnet icmp v4 transport channel
    tx: Arc<Mutex<TransportSender>>,

    // receiver end of libpnet icmp v4 transport channel
    rx: Arc<Mutex<TransportReceiver>>,

    // sender end of libpnet icmp v6 transport channel
    txv6: Arc<Mutex<TransportSender>>,

    // receiver end of libpnet icmp v6 transport channel
    rxv6: Arc<Mutex<TransportReceiver>>,

    // sender for internal result passing beween threads
    thread_tx: Sender<PingResult>,

    // receiver for internal result passing beween threads
    thread_rx: Arc<Mutex<Receiver<PingResult>>>,

    // timer for tracking round trip times
    timer: Arc<RwLock<Instant>>,

    // flag to stop pinging
    stop: Arc<Mutex<bool>>,
}

impl Pinger {
    // initialize the pinger and start the icmp and icmpv6 listeners
    pub fn new(_max_rtt: Option<u64>, _size: Option<i32>) -> NewPingerResult {
        let addrs = BTreeMap::new();
        let (sender, receiver) = channel();

        let protocol = Layer4(Ipv4(IpNextHeaderProtocols::Icmp));
        let (tx, rx) = match transport_channel(4096, protocol) {
            Ok((tx, rx)) => (tx, rx),
            Err(e) => return Err(e.to_string()),
        };

        let protocolv6 = Layer4(Ipv6(IpNextHeaderProtocols::Icmpv6));
        let (txv6, rxv6) = match transport_channel(4096, protocolv6) {
            Ok((txv6, rxv6)) => (txv6, rxv6),
            Err(e) => return Err(e.to_string()),
        };

        let (thread_tx, thread_rx) = channel();

        let mut pinger = Pinger{
            max_rtt: Arc::new(Duration::from_millis(2000)),
            addrs: Arc::new(Mutex::new(addrs)),
            size: 16,
            results_sender: sender,
            tx: Arc::new(Mutex::new(tx)),
            rx: Arc::new(Mutex::new(rx)),
            txv6: Arc::new(Mutex::new(txv6)),
            rxv6: Arc::new(Mutex::new(rxv6)),
            thread_rx: Arc::new(Mutex::new(thread_rx)),
            thread_tx: thread_tx,
            timer: Arc::new(RwLock::new(Instant::now())),
            stop: Arc::new(Mutex::new(false)),
        };
        if let Some(rtt_value) = _max_rtt {
            pinger.max_rtt = Arc::new(Duration::from_millis(rtt_value));
        }
        if let Some(size_value) = _size {
            pinger.size = size_value;
        }

        pinger.start_listener();
        Ok((pinger, receiver))
    }

    // add either an ipv4 or ipv6 target address for pinging
    pub fn add_ipaddr(&self, ipaddr: &str) {
        let addr = ipaddr.parse::<IpAddr>();
        match addr {
            Ok(valid_addr) => {
                debug!("Address added {}", valid_addr);
                self.addrs.lock().unwrap().insert(valid_addr, true);
            }
            Err(e) => {
                error!("Error adding ip address {}. Error: {}", ipaddr, e);
            },
        };
    }

    // remove a previously added ipv4 or ipv6 target address
    pub fn remove_ipaddr(&self, ipaddr: &str) {
        let addr = ipaddr.parse::<IpAddr>();
        match addr {
            Ok(valid_addr) => {
                debug!("Address removed {}", valid_addr);
                self.addrs.lock().unwrap().remove(&valid_addr);
            }
            Err(e) => {
                error!("Error removing ip address {}. Error: {}", ipaddr, e);
            },
        };
    }

    // stop running the continous pinger
    pub fn stop_pinger(&self) {
        let mut stop = self.stop.lock().unwrap();
        *stop = true;
    }

    // run one round of pinging and stop
    pub fn ping_once(&self) {
        self.run_pings(true)
    }

    // run the continuous pinger
    pub fn run_pinger(&self) {
        self.run_pings(false)
    }

    // run pinger either once or continuously
    fn run_pings(&self, run_once: bool) {
        let thread_rx = self.thread_rx.clone();
        let tx = self.tx.clone();
        let txv6 = self.txv6.clone();
        let results_sender = self.results_sender.clone();
        let stop = self.stop.clone();
        let addrs = self.addrs.clone();
        let timer = self.timer.clone();
        let max_rtt = self.max_rtt.clone();

        {
            let mut stop = self.stop.lock().unwrap();
            if run_once {
                debug!("Running pinger for one round");
                *stop = true;
            } else {
                *stop = false;
            }
        }
        
        if run_once {
            send_pings(timer, stop, results_sender, thread_rx, tx, txv6, addrs, max_rtt);
        } else {
            thread::spawn(move ||{
                send_pings(timer, stop, results_sender, thread_rx, tx, txv6, addrs, max_rtt);
            });
        }
    }

    fn start_listener(&self) {
        // start icmp listeners in the background and use internal channels for results

        // setup ipv4 listener
        let thread_tx = self.thread_tx.clone();
        let rx = self.rx.clone();
        let timer = self.timer.clone();
        let stop = self.stop.clone();

        thread::spawn(move || {
            let mut receiver = rx.lock().unwrap();
            let mut iter = icmp_packet_iter(&mut receiver);
            loop {
                match iter.next() {
                    Ok((_, addr)) => {
                        let start_time = timer.read().unwrap();
                        match thread_tx.send(PingResult::Receive{addr: addr, rtt: Instant::now().duration_since(*start_time)}) {
                            Ok(_) => {},
                            Err(e) => {
                                if !*stop.lock().unwrap() {
                                    error!("Error sending ping result on channel: {}", e)
                                }
                            }
                        }
                    },
                    Err(e) => {
                        error!("An error occurred while reading: {}", e);
                    }
                }
            }
        });

        // setup ipv6 listener
        let thread_txv6 = self.thread_tx.clone();
        let rxv6 = self.rxv6.clone();
        let timerv6 = self.timer.clone();
        let stopv6 = self.stop.clone();

        thread::spawn(move || {
            let mut receiver = rxv6.lock().unwrap();
            let mut iter = icmpv6_packet_iter(&mut receiver);
            loop {
                match iter.next() {
                    Ok((_, addr)) => {
                        let start_time = timerv6.read().unwrap();
                        match thread_txv6.send(PingResult::Receive{addr: addr, rtt: Instant::now().duration_since(*start_time)}) {
                            Ok(_) => {},
                            Err(e) => {
                                if !*stopv6.lock().unwrap() {
                                    error!("Error sending ping result on channel: {}", e)
                                }
                            }
                        }
                    },
                    Err(e) => {
                        error!("An error occurred while reading: {}", e);
                    }
                }
            }
        });
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_newpinger() {
        // test we can create a new pinger with optional arguments,
        // test it returns the new pinger and a client channel
        // test we can use the client channel
        match Pinger::new(Some(3000 as u64), Some(24 as i32)) {
            Ok((test_pinger, test_channel)) => {
                assert_eq!(test_pinger.max_rtt, Arc::new(Duration::new(3, 0)));
                assert_eq!(test_pinger.size, 24 as i32);

                match test_pinger.results_sender.send(PingResult::Idle{addr: "127.0.0.1".parse::<IpAddr>().unwrap()}) {
                    Ok(_) => {
                        match test_channel.recv() {
                            Ok(result) => {
                                match result {
                                    PingResult::Idle{addr} => {
                                        assert_eq!(addr, "127.0.0.1".parse::<IpAddr>().unwrap());
                                    },
                                    _ => {}
                                }
                            },
                            Err(_) => assert!(false),
                        }
                    }
                    Err(_) => assert!(false)
                }
            },
            Err(e) => {
                println!("Test failed: {}", e);
                assert!(false)
            }
        };
    }

    #[test]
    fn test_add_remove_addrs() {
        match Pinger::new(None, None) {
            Ok((test_pinger, _)) => {
                test_pinger.add_ipaddr("127.0.0.1");
                assert_eq!(test_pinger.addrs.lock().unwrap().len(), 1);
                assert!(test_pinger.addrs.lock().unwrap().contains_key(&"127.0.0.1".parse::<IpAddr>().unwrap()));

                test_pinger.remove_ipaddr("127.0.0.1");
                assert_eq!(test_pinger.addrs.lock().unwrap().len(), 0);
                assert_eq!(test_pinger.addrs.lock().unwrap().contains_key(&"127.0.0.1".parse::<IpAddr>().unwrap()), false);
            }
            Err(e) => {
                println!("Test failed: {}", e);
                assert!(false)
            }
        }
    }

    #[test]
    fn test_stop() {
        match Pinger::new(None, None) {
            Ok((test_pinger, _)) => {
                assert_eq!(*test_pinger.stop.lock().unwrap(), false);
                test_pinger.stop_pinger();
                assert_eq!(*test_pinger.stop.lock().unwrap(), true);
            }
            Err(e) => {
                println!("Test failed: {}", e);
                assert!(false)
            }
        }
    }

    #[test]
    fn test_integration() {
        // more comprehensive integration test
        match Pinger::new(None, None) {
            Ok((test_pinger, test_channel)) => {
                let test_addrs = vec!["127.0.0.1", "7.7.7.7", "::1"];
                for addr in test_addrs.iter() {
                    test_pinger.add_ipaddr(addr);
                }
                test_pinger.ping_once();
                for _ in test_addrs.iter() {
                    match test_channel.recv() {
                        Ok(result) => {
                            match result {
                                PingResult::Idle{addr} => {
                                    assert_eq!("7.7.7.7".parse::<IpAddr>().unwrap(), addr);
                                },
                                PingResult::Receive{addr, rtt: _} => {
                                    if addr == "::1".parse::<IpAddr>().unwrap() {
                                        assert!(true)
                                    } else if addr == "127.0.0.1".parse::<IpAddr>().unwrap() {
                                        assert!(true)
                                    } else {
                                        assert!(false)
                                    }
                                }
                            }
                        },
                        Err(_) => assert!(false),
                    }
                }
            }
            Err(e) => {
                println!("Test failed: {}", e);
                assert!(false)
            }
        }
    }
}