1#![doc = include_str!("../README.md")]
2#![deny(missing_docs)]
3#![allow(
14 clippy::doc_markdown,
15 clippy::missing_errors_doc,
16 clippy::missing_panics_doc,
17 clippy::must_use_candidate,
18 clippy::cast_possible_truncation,
19 clippy::cast_possible_wrap,
20 clippy::cast_sign_loss
21)]
22
23pub mod addr;
25mod net;
26pub(crate) mod time;
27
28use std::{
29 mem::MaybeUninit,
30 net::{Ipv4Addr, Ipv6Addr, SocketAddrV6},
31 sync::{
32 atomic::{AtomicU16, Ordering},
33 LazyLock,
34 },
35 time::Duration,
36};
37
38pub use net::IcmpSocket;
39use socket2::{MaybeUninitSlice, SockAddr};
40use tokio::time::timeout;
41
42use crate::{addr::ToIpAddr, net::MsgHdrMut};
43
44const IP_HEADER_SIZE: usize = 20;
45const ICMP_HEADER_SIZE: usize = 8;
46
47const ICMP_ECHO_REQUEST: u8 = 8;
48const ICMP_ECHO_REPLY: u8 = 0;
49const ICMP6_ECHO_REQUEST: u8 = 128;
50const ICMP6_ECHO_REPLY: u8 = 129;
51
52static REQ_ID: LazyLock<AtomicU16> = LazyLock::new(|| {
60 let pid = u64::from(std::process::id());
61 let nanos = std::time::SystemTime::now()
62 .duration_since(std::time::UNIX_EPOCH)
63 .map_or(0, |d| u64::from(d.subsec_nanos()));
64 #[allow(clippy::cast_possible_truncation)]
65 AtomicU16::new(((pid ^ nanos ^ (nanos >> 16)) & 0xffff) as u16)
66});
67
68#[derive(Clone, Copy, Debug)]
73pub struct PingStats {
74 pub packets_tx: u32,
76 pub packets_rx: u32,
78 pub rtt_min: Duration,
80 pub rtt_avg: Duration,
82 pub rtt_max: Duration,
84 pub rtt_std_dev: Duration,
86}
87
88#[derive(Clone, Copy, Debug)]
90pub struct IcmpEchoReply {
91 pub src_addr: Ipv4Addr,
93 pub len: usize,
95 pub seq: u16,
97 pub ttl: u8,
99 pub rtt: Duration,
101}
102
103pub async fn ping<A: ToIpAddr>(
126 src: A,
127 dest: A,
128 count: u32,
129 interval: Duration,
130 size: u16,
131) -> std::io::Result<PingStats> {
132 use std::net::IpAddr;
133
134 let dest = dest.to_ip_addr().await?;
135 let ts_len = time::Timestamp::len();
136 if (size as usize) <= ts_len {
137 return Err(std::io::Error::new(
138 std::io::ErrorKind::InvalidInput,
139 format!("size must be greater than {ts_len} (timestamp bytes)"),
140 ));
141 }
142 let payload = generate_payload(size as usize - ts_len);
143 let tout = Duration::from_secs(5);
144
145 let socket = IcmpSocket::bind(src).await?;
146 socket.connect(dest).await?;
147
148 let mut packets_rx: u32 = 0;
149 let mut rtts: Vec<Duration> = Vec::with_capacity(count as usize);
150
151 for seq in 1..=count {
152 let result = match dest {
153 IpAddr::V4(_) => send_icmp_echo_v4(&socket, &payload, seq as u16, tout)
154 .await
155 .map(|r| r.rtt),
156 IpAddr::V6(_) => send_icmp_echo_v6(&socket, &payload, seq as u16, tout)
157 .await
158 .map(|r| r.rtt),
159 };
160 if let Ok(rtt) = result {
161 packets_rx += 1;
162 rtts.push(rtt);
163 }
164 if seq < count {
165 tokio::time::sleep(interval).await;
166 }
167 }
168
169 let packets_tx = count;
170 let mut stats = compute_rtt_stats(&rtts);
171 stats.packets_tx = packets_tx;
172 stats.packets_rx = packets_rx;
173 Ok(stats)
174}
175
176fn compute_rtt_stats(rtts: &[Duration]) -> PingStats {
181 let (rtt_min, rtt_avg, rtt_max, rtt_std_dev) = if rtts.is_empty() {
182 (
183 Duration::ZERO,
184 Duration::ZERO,
185 Duration::ZERO,
186 Duration::ZERO,
187 )
188 } else {
189 let min = *rtts.iter().min().unwrap();
190 let max = *rtts.iter().max().unwrap();
191 let avg_nanos = rtts.iter().map(|d| d.as_nanos() as u64).sum::<u64>() / rtts.len() as u64;
192 let avg = Duration::from_nanos(avg_nanos);
193 let variance = rtts
194 .iter()
195 .map(|d| {
196 let diff = d.as_nanos() as i64 - avg_nanos as i64;
197 (diff * diff) as u64
198 })
199 .sum::<u64>()
200 / rtts.len() as u64;
201 let std_dev = Duration::from_nanos(variance.isqrt());
202 (min, avg, max, std_dev)
203 };
204 PingStats {
205 packets_tx: 0,
206 packets_rx: 0,
207 rtt_min,
208 rtt_avg,
209 rtt_max,
210 rtt_std_dev,
211 }
212}
213
214#[derive(Clone, Copy, Debug)]
216pub struct IcmpV6EchoReply {
217 pub src_addr: Ipv6Addr,
219 pub len: usize,
221 pub seq: u16,
223 pub hlim: u8,
225 pub rtt: Duration,
227}
228
229pub async fn send_icmp_echo_v4(
244 socket: &IcmpSocket,
245 payload: &[u8],
246 seq: u16,
247 tout: Duration,
248) -> std::io::Result<IcmpEchoReply> {
249 let mut buf: Vec<u8> = Vec::with_capacity(
250 IP_HEADER_SIZE + ICMP_HEADER_SIZE + time::Timestamp::len() + payload.len(),
251 );
252 let req_id = REQ_ID.fetch_add(1, Ordering::Relaxed);
253 add_icmp_header(&mut buf, ICMP_ECHO_REQUEST, req_id, seq);
254 let sent_ts_bytes = time::Timestamp::now().as_bytes();
255 buf.extend_from_slice(&sent_ts_bytes);
256 buf.extend_from_slice(payload);
257
258 let checksum = calculate_checksum(&buf);
259 buf[2] = (checksum >> 8) as u8;
260 buf[3] = (checksum & 0xff) as u8;
261
262 socket.send(&buf).await?;
263 let overall = timeout(tout, async {
264 loop {
265 buf.clear();
266 let received = socket.recv(buf.spare_capacity_mut()).await?;
267 unsafe { buf.set_len(received) };
268 if received < IP_HEADER_SIZE + ICMP_HEADER_SIZE + time::Timestamp::len() {
269 continue;
270 }
271 let msg_type = buf[IP_HEADER_SIZE];
272 if msg_type != ICMP_ECHO_REPLY {
273 continue;
274 }
275 let reply_id = u16::from_be_bytes([buf[IP_HEADER_SIZE + 4], buf[IP_HEADER_SIZE + 5]]);
276 if req_id != reply_id {
277 continue;
278 }
279 let reply_seq = u16::from_be_bytes([buf[IP_HEADER_SIZE + 6], buf[IP_HEADER_SIZE + 7]]);
280 if reply_seq != seq {
281 continue;
282 }
283 let ts_start = IP_HEADER_SIZE + ICMP_HEADER_SIZE;
287 let ts_end = ts_start + time::Timestamp::len();
288 if buf[ts_start..ts_end] != sent_ts_bytes {
289 continue;
290 }
291 let now = time::Timestamp::now();
292 let src_addr = Ipv4Addr::new(
293 buf[IP_HEADER_SIZE - 8],
294 buf[IP_HEADER_SIZE - 7],
295 buf[IP_HEADER_SIZE - 6],
296 buf[IP_HEADER_SIZE - 5],
297 );
298 let reply_ttl = buf[8];
299 let reply_ts =
300 time::Timestamp::from(<[u8; 8]>::try_from(&buf[ts_start..ts_end]).unwrap());
301 let rtt = now - reply_ts;
302 return Ok(IcmpEchoReply {
303 src_addr,
304 len: received - IP_HEADER_SIZE,
305 seq: reply_seq,
306 ttl: reply_ttl,
307 rtt,
308 });
309 }
310 });
311
312 match overall.await {
313 Ok(result) => result,
314 Err(_) => Err(std::io::Error::new(
315 std::io::ErrorKind::TimedOut,
316 "timed out",
317 )),
318 }
319}
320
321pub async fn send_icmp_echo_v6(
336 socket: &IcmpSocket,
337 payload: &[u8],
338 seq: u16,
339 tout: Duration,
340) -> std::io::Result<IcmpV6EchoReply> {
341 let mut buf: Vec<u8> =
342 Vec::with_capacity(ICMP_HEADER_SIZE + time::Timestamp::len() + payload.len());
343 let req_id = REQ_ID.fetch_add(1, Ordering::Relaxed);
344 add_icmp_header(&mut buf, ICMP6_ECHO_REQUEST, req_id, seq);
345 let sent_ts_bytes = time::Timestamp::now().as_bytes();
346 buf.extend_from_slice(&sent_ts_bytes);
347 buf.extend_from_slice(payload);
348
349 socket.send(&buf).await?;
350
351 let mut from: SockAddr = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0u16, 0, 0).into();
352
353 let mut control_storage: [MaybeUninit<u64>; 8] = [MaybeUninit::uninit(); 8];
359
360 let overall = timeout(tout, async {
361 loop {
362 buf.clear();
363
364 let (received, flags, reply_hlim_opt) = {
369 let bufs = &mut [MaybeUninitSlice::new(buf.spare_capacity_mut())];
370 let control_bytes: &mut [MaybeUninit<u8>] = unsafe {
371 std::slice::from_raw_parts_mut(
372 control_storage.as_mut_ptr().cast::<MaybeUninit<u8>>(),
373 std::mem::size_of_val(&control_storage),
374 )
375 };
376 let mut msg = MsgHdrMut::new()
377 .with_addr(&mut from)
378 .with_control(control_bytes)
379 .with_buffers(bufs);
380
381 let received = socket.recvmsg(&mut msg).await?;
382 let flags = msg.flags();
383 let hlim = decode_hlim(&msg);
384 (received, flags, hlim)
385 };
386 unsafe { buf.set_len(received) };
387
388 if flags & libc::MSG_CTRUNC != 0 {
392 return Err(std::io::Error::other(
393 "recvmsg control buffer truncated (MSG_CTRUNC)",
394 ));
395 }
396
397 if received < ICMP_HEADER_SIZE + time::Timestamp::len() {
398 continue;
399 }
400 let msg_type = buf[0];
401 if msg_type != ICMP6_ECHO_REPLY {
402 continue;
403 }
404 let reply_id = u16::from_be_bytes([buf[4], buf[5]]);
405 if req_id != reply_id {
406 continue;
407 }
408 let reply_seq = u16::from_be_bytes([buf[6], buf[7]]);
409 if reply_seq != seq {
410 continue;
411 }
412 let ts_end = ICMP_HEADER_SIZE + time::Timestamp::len();
416 if buf[ICMP_HEADER_SIZE..ts_end] != sent_ts_bytes {
417 continue;
418 }
419 let now = time::Timestamp::now();
420 let src_addr = from.as_socket_ipv6().map(|s| *s.ip()).ok_or_else(|| {
421 std::io::Error::new(
422 std::io::ErrorKind::InvalidData,
423 "recvmsg returned no source address",
424 )
425 })?;
426 let reply_hlim = reply_hlim_opt.ok_or_else(|| {
427 std::io::Error::new(
428 std::io::ErrorKind::InvalidData,
429 "reply missing IPV6_HOPLIMIT control message",
430 )
431 })?;
432 let reply_ts =
433 time::Timestamp::from(<[u8; 8]>::try_from(&buf[ICMP_HEADER_SIZE..ts_end]).unwrap());
434 let rtt = now - reply_ts;
435 return Ok(IcmpV6EchoReply {
436 src_addr,
437 len: received,
438 seq: reply_seq,
439 hlim: reply_hlim,
440 rtt,
441 });
442 }
443 });
444
445 match overall.await {
446 Ok(result) => result,
447 Err(_) => Err(std::io::Error::new(
448 std::io::ErrorKind::TimedOut,
449 "timed out",
450 )),
451 }
452}
453
454#[allow(clippy::cast_possible_truncation)]
456pub fn generate_payload(size: usize) -> Vec<u8> {
457 (0..size).map(|i| (i % 256) as u8).collect()
458}
459
460fn add_icmp_header(buf: &mut Vec<u8>, typ: u8, id: u16, seq: u16) {
465 buf.push(typ);
467 buf.push(0);
469 buf.push(0);
471 buf.push(0);
472
473 #[cfg(target_endian = "big")]
475 {
476 buf.push((id & 0xff) as u8);
477 buf.push((id >> 8) as u8);
478 }
479 #[cfg(not(target_endian = "big"))]
480 {
481 buf.push((id >> 8) as u8);
482 buf.push((id & 0xff) as u8);
483 }
484
485 #[cfg(target_endian = "big")]
487 {
488 buf.push((seq & 0xff) as u8);
489 buf.push((seq >> 8) as u8);
490 }
491 #[cfg(not(target_endian = "big"))]
492 {
493 buf.push((seq >> 8) as u8);
494 buf.push((seq & 0xff) as u8);
495 }
496}
497
498fn calculate_checksum(data: &[u8]) -> u16 {
500 let mut sum: u32 = 0;
501 let mut i = 0;
502
503 while i < data.len() - 1 {
505 let word = u32::from(u16::from_be_bytes([data[i], data[i + 1]]));
506 sum += word;
507 i += 2;
508 }
509
510 if data.len() % 2 == 1 {
512 sum += u32::from(data[data.len() - 1]) << 8;
513 }
514
515 while sum >> 16 != 0 {
517 sum = (sum & 0xffff) + (sum >> 16);
518 }
519
520 #[allow(clippy::cast_possible_truncation)]
522 {
523 !sum as u16
524 }
525}
526
527fn decode_hlim(msg: &MsgHdrMut<'_, '_, '_>) -> Option<u8> {
535 let hdr = msg.as_msghdr();
536 let want_len = unsafe { libc::CMSG_LEN(size_of::<libc::c_int>() as u32) } as usize;
540 let mut p = unsafe { libc::CMSG_FIRSTHDR(hdr) };
541 while !p.is_null() {
542 let h = unsafe { &*p };
543 if h.cmsg_level == libc::IPPROTO_IPV6
544 && h.cmsg_type == libc::IPV6_HOPLIMIT
545 && h.cmsg_len as usize >= want_len
546 {
547 let mut value = MaybeUninit::<libc::c_int>::uninit();
548 let hlim = unsafe {
549 std::ptr::copy_nonoverlapping(
550 libc::CMSG_DATA(p),
551 value.as_mut_ptr().cast::<u8>(),
552 size_of::<libc::c_int>(),
553 );
554 value.assume_init()
555 };
556 return u8::try_from(hlim).ok();
557 }
558 p = unsafe { libc::CMSG_NXTHDR(hdr, p) };
559 }
560 None
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566
567 #[test]
568 fn add_icmp_header_writes_8_bytes() {
569 let mut buf = Vec::new();
570 add_icmp_header(&mut buf, 8, 0, 0);
571 assert_eq!(buf.len(), 8);
572 }
573
574 #[test]
575 fn add_icmp_header_type_field() {
576 let mut buf = Vec::new();
577 add_icmp_header(&mut buf, 0x08, 0, 0);
578 assert_eq!(buf[0], 0x08);
579 }
580
581 #[test]
582 fn add_icmp_header_code_is_zero() {
583 let mut buf = Vec::new();
584 add_icmp_header(&mut buf, 8, 0xffff, 0xffff);
585 assert_eq!(buf[1], 0);
586 }
587
588 #[test]
589 fn add_icmp_header_id_big_endian() {
590 let mut buf = Vec::new();
591 add_icmp_header(&mut buf, 8, 0x1234, 0);
592 assert_eq!(buf[4], 0x12);
593 assert_eq!(buf[5], 0x34);
594 }
595
596 #[test]
597 fn add_icmp_header_seq_big_endian() {
598 let mut buf = Vec::new();
599 add_icmp_header(&mut buf, 8, 0, 1);
600 assert_eq!(buf[6], 0);
601 assert_eq!(buf[7], 1);
602 }
603
604 #[test]
605 fn add_icmp_header_appends_to_existing_content() {
606 let mut buf = vec![0xde, 0xad];
607 add_icmp_header(&mut buf, 8, 0, 0);
608 assert_eq!(buf.len(), 10);
609 assert_eq!(&buf[..2], &[0xde, 0xad]);
610 }
611
612 #[test]
613 fn test_checksum() {
614 let data = vec![0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00];
617 let checksum = calculate_checksum(&data);
618 assert_eq!(checksum, 0xf7ff);
620
621 let data = vec![0x00, 0x01, 0x02];
623 let checksum = calculate_checksum(&data);
624 assert_eq!(checksum, 0xfdfe);
626 }
627
628 #[test]
629 fn test_compute_rtt_stats_empty() {
630 let stats = compute_rtt_stats(&[]);
631 assert_eq!(stats.rtt_min, Duration::ZERO);
632 assert_eq!(stats.rtt_avg, Duration::ZERO);
633 assert_eq!(stats.rtt_max, Duration::ZERO);
634 assert_eq!(stats.rtt_std_dev, Duration::ZERO);
635 }
636
637 #[test]
638 fn test_compute_rtt_stats_single() {
639 let rtts = vec![Duration::from_millis(10)];
640 let stats = compute_rtt_stats(&rtts);
641 assert_eq!(stats.rtt_min, Duration::from_millis(10));
642 assert_eq!(stats.rtt_avg, Duration::from_millis(10));
643 assert_eq!(stats.rtt_max, Duration::from_millis(10));
644 assert_eq!(stats.rtt_std_dev, Duration::ZERO);
645 }
646
647 #[test]
648 fn test_compute_rtt_stats_multiple() {
649 let rtts = vec![
651 Duration::from_millis(10),
652 Duration::from_millis(20),
653 Duration::from_millis(30),
654 ];
655 let stats = compute_rtt_stats(&rtts);
656 assert_eq!(stats.rtt_min, Duration::from_millis(10));
657 assert_eq!(stats.rtt_max, Duration::from_millis(30));
658 assert_eq!(stats.rtt_avg, Duration::from_millis(20));
659 let expected_std_dev_nanos: u64 = {
662 let avg_ns: u64 = 20_000_000;
663 let variance = [10_000_000u64, 20_000_000u64, 30_000_000u64]
664 .iter()
665 .map(|&d| {
666 let diff = d as i64 - avg_ns as i64;
667 (diff * diff) as u64
668 })
669 .sum::<u64>()
670 / 3;
671 variance.isqrt()
672 };
673 assert_eq!(
674 stats.rtt_std_dev,
675 Duration::from_nanos(expected_std_dev_nanos)
676 );
677 }
678
679 #[test]
680 fn test_compute_rtt_stats_identical() {
681 let rtts = vec![
682 Duration::from_millis(5),
683 Duration::from_millis(5),
684 Duration::from_millis(5),
685 ];
686 let stats = compute_rtt_stats(&rtts);
687 assert_eq!(stats.rtt_min, Duration::from_millis(5));
688 assert_eq!(stats.rtt_avg, Duration::from_millis(5));
689 assert_eq!(stats.rtt_max, Duration::from_millis(5));
690 assert_eq!(stats.rtt_std_dev, Duration::ZERO);
691 }
692
693 #[tokio::test]
694 async fn test_send_icmp_echo_v4() {
695 let sock = IcmpSocket::bind(Ipv4Addr::UNSPECIFIED).await.unwrap();
696 sock.connect("127.0.0.1").await.unwrap();
697
698 let payload = generate_payload(48);
699
700 let reply = send_icmp_echo_v4(&sock, &payload, 1, Duration::from_secs(5))
701 .await
702 .unwrap();
703 assert_eq!(reply.src_addr, Ipv4Addr::LOCALHOST);
704 assert_eq!(reply.len, 64);
705 assert_eq!(reply.seq, 1);
706 assert!(reply.ttl > 0);
707 assert!(reply.rtt > Duration::ZERO);
708 }
709
710 #[tokio::test]
711 async fn test_send_icmp_echo_v6() {
712 let sock = IcmpSocket::bind(Ipv6Addr::UNSPECIFIED).await.unwrap();
713 sock.connect("::1").await.unwrap();
714
715 let payload = [];
716
717 let reply = send_icmp_echo_v6(&sock, &payload, 1, Duration::from_secs(5))
718 .await
719 .unwrap();
720 assert_eq!(reply.src_addr, Ipv6Addr::LOCALHOST);
721 assert_eq!(reply.len, 16);
722 assert_eq!(reply.seq, 1);
723 assert!(reply.hlim > 0);
724 assert!(reply.rtt > Duration::ZERO);
725 }
726
727 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
728 async fn test_send_icmp_echo_v6_send() {
729 let reply = tokio::task::spawn(async {
730 let sock = IcmpSocket::bind(Ipv6Addr::UNSPECIFIED).await.unwrap();
731 sock.connect("::1").await.unwrap();
732
733 let payload = [];
734
735 let reply = send_icmp_echo_v6(&sock, &payload, 1, Duration::from_secs(5))
736 .await
737 .unwrap();
738 reply
739 })
740 .await
741 .unwrap();
742
743 assert_eq!(reply.src_addr, Ipv6Addr::LOCALHOST);
744 assert_eq!(reply.len, 16);
745 assert_eq!(reply.seq, 1);
746 assert!(reply.hlim > 0);
747 assert!(reply.rtt > Duration::ZERO);
748 }
749}