use crate::probe::{ProbeInfo, ProbeResponse};
use crate::socket::icmp;
use crate::socket::traits::{ProbeMode, ProbeSocket};
use crate::traceroute::TracerouteError;
use crate::TimingConfig;
use socket2::{Domain, Protocol, Socket as Socket2, Type};
use std::future::Future;
use std::mem::MaybeUninit;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::oneshot;
const ICMP_ECHO_PAYLOAD_SIZE: usize = 16;
const ICMP_ERROR_HEADER_LEN_BYTES: usize = 8;
const IPV4_HEADER_MIN_LEN_BYTES: usize = 20;
pub struct MacOSAsyncIcmpSocket {
icmp_identifier: u16,
destination_reached: Arc<AtomicBool>,
pending_count: Arc<AtomicUsize>,
timing_config: TimingConfig,
verbose: u8,
}
impl MacOSAsyncIcmpSocket {
pub fn new() -> Result<Self, TracerouteError> {
Self::new_with_config(TimingConfig::default())
}
pub fn new_with_config(timing_config: TimingConfig) -> Result<Self, TracerouteError> {
let verbose = std::env::var("FTR_VERBOSE")
.ok()
.and_then(|v| v.parse::<u8>().ok())
.unwrap_or(0);
trace_time!(
verbose,
"Creating macOS async ICMP socket (per-probe version)"
);
Ok(Self {
icmp_identifier: std::process::id() as u16,
destination_reached: Arc::new(AtomicBool::new(false)),
pending_count: Arc::new(AtomicUsize::new(0)),
timing_config,
verbose,
})
}
fn parse_response(
&self,
packet_data: &[u8],
from_addr: IpAddr,
recv_time: Instant,
expected_sequence: u16,
dest: IpAddr,
) -> Option<ProbeResponse> {
let icmp_data = icmp::ipv4_payload(packet_data)?;
let hdr = icmp::parse_icmp_header(icmp_data)?;
trace_time!(
self.verbose,
"Received ICMP type {} code {} from {}",
hdr.icmp_type,
hdr.icmp_code,
from_addr
);
match hdr.icmp_type {
icmp::ICMP_TIME_EXCEEDED | icmp::ICMP_DEST_UNREACHABLE => {
let original_datagram_bytes = if icmp_data.len() >= ICMP_ERROR_HEADER_LEN_BYTES {
&icmp_data[ICMP_ERROR_HEADER_LEN_BYTES..]
} else {
return None;
};
if original_datagram_bytes.len() < IPV4_HEADER_MIN_LEN_BYTES {
return None;
}
let (inner_hdr_len, _) = icmp::parse_ipv4_header(original_datagram_bytes)?;
let original_icmp_bytes = &original_datagram_bytes[inner_hdr_len..];
if original_icmp_bytes.len() < 8 {
return None;
}
let original_type = original_icmp_bytes[0];
let original_id =
u16::from_be_bytes([original_icmp_bytes[4], original_icmp_bytes[5]]);
let original_seq =
u16::from_be_bytes([original_icmp_bytes[6], original_icmp_bytes[7]]);
if original_type == icmp::ICMP_ECHO_REQUEST
&& original_id == self.icmp_identifier
&& original_seq == expected_sequence
{
let is_destination = hdr.icmp_type == icmp::ICMP_DEST_UNREACHABLE;
return Some(ProbeResponse {
from_addr,
sequence: expected_sequence,
ttl: 0, rtt: recv_time.duration_since(recv_time), received_at: recv_time,
is_destination,
is_timeout: false,
});
}
}
icmp::ICMP_ECHO_REPLY => {
if let Some((reply_id, reply_seq)) = icmp::parse_echo_reply(icmp_data) {
if reply_id == self.icmp_identifier && reply_seq == expected_sequence {
let is_destination = from_addr == dest;
return Some(ProbeResponse {
from_addr,
sequence: expected_sequence,
ttl: 0, rtt: recv_time.duration_since(recv_time), received_at: recv_time,
is_destination,
is_timeout: false,
});
}
}
}
_ => {}
}
None
}
async fn send_and_recv_probe(
&self,
dest: Ipv4Addr,
probe: ProbeInfo,
) -> Result<ProbeResponse, TracerouteError> {
let send_start = probe.sent_at;
let socket =
Socket2::new(Domain::IPV4, Type::DGRAM, Some(Protocol::ICMPV4)).map_err(|e| {
TracerouteError::SocketError(format!("Failed to create ICMP socket: {e}"))
})?;
socket
.set_ttl_v4(probe.ttl as u32)
.map_err(|e| TracerouteError::SocketError(format!("Failed to set TTL: {e}")))?;
socket.set_nonblocking(true).map_err(|e| {
TracerouteError::SocketError(format!("Failed to set non-blocking: {e}"))
})?;
let payload_data = (self.icmp_identifier as u32) << 16 | (probe.sequence as u32);
let payload_bytes = payload_data.to_be_bytes();
let mut final_payload = vec![0u8; ICMP_ECHO_PAYLOAD_SIZE];
let bytes_to_copy = payload_bytes.len().min(ICMP_ECHO_PAYLOAD_SIZE);
final_payload[..bytes_to_copy].copy_from_slice(&payload_bytes[..bytes_to_copy]);
let icmp_buf =
icmp::build_echo_request(self.icmp_identifier, probe.sequence, &final_payload);
let dest_addr = SocketAddr::new(IpAddr::V4(dest), 0);
socket.send_to(&icmp_buf, &dest_addr.into()).map_err(|e| {
TracerouteError::ProbeSendError(format!("Failed to send ICMP packet: {e}"))
})?;
trace_time!(
self.verbose,
"Sent ICMP echo seq={} ttl={} to {}",
probe.sequence,
probe.ttl,
dest
);
let (tx, rx) = oneshot::channel();
let socket_clone = socket
.try_clone()
.map_err(|e| TracerouteError::SocketError(format!("Failed to clone socket: {e}")))?;
let icmp_identifier = self.icmp_identifier;
let sequence = probe.sequence;
let ttl = probe.ttl;
let dest_ip = IpAddr::V4(dest);
let verbose = self.verbose;
let destination_reached = Arc::clone(&self.destination_reached);
let pending_count = Arc::clone(&self.pending_count);
let socket_timeout = self.timing_config.socket_read_timeout;
tokio::spawn(async move {
let mut buf = vec![MaybeUninit::uninit(); 1500];
let timeout = socket_timeout;
let deadline = Instant::now() + timeout;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
trace_time!(verbose, "Timeout waiting for response to seq={}", sequence);
break;
}
let result = tokio::time::timeout(remaining, async {
loop {
match socket_clone.recv_from(&mut buf[..]) {
Ok((size, addr)) => {
return Ok((size, addr));
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
tokio::time::sleep(Duration::from_millis(1)).await;
continue;
}
Err(e) => return Err(e),
}
}
})
.await;
match result {
Ok(Ok((size, addr))) => {
let recv_time = Instant::now();
let from_addr = match addr.as_socket_ipv4() {
Some(ipv4) => IpAddr::V4(*ipv4.ip()),
None => continue,
};
trace_time!(verbose, "Received {} bytes from {}", size, from_addr);
let initialized_buf =
unsafe { std::slice::from_raw_parts(buf.as_ptr() as *const u8, size) };
let parser = MacOSAsyncIcmpSocket {
icmp_identifier,
destination_reached: Arc::new(AtomicBool::new(false)),
pending_count: Arc::new(AtomicUsize::new(0)),
timing_config: TimingConfig::default(),
verbose,
};
if let Some(mut response) = parser.parse_response(
initialized_buf,
from_addr,
recv_time,
sequence,
dest_ip,
) {
response.ttl = ttl;
response.rtt = recv_time.duration_since(send_start);
trace_time!(
verbose,
"Matched response for seq={} from {} rtt={:?}",
sequence,
from_addr,
response.rtt
);
if response.is_destination {
destination_reached.store(true, Ordering::Relaxed);
}
pending_count.fetch_sub(1, Ordering::Relaxed);
let _ = tx.send(response);
return;
}
}
Ok(Err(e)) => {
trace_time!(verbose, "Error receiving: {}", e);
break;
}
Err(_) => {
break;
}
}
}
pending_count.fetch_sub(1, Ordering::Relaxed);
let _ = tx.send(ProbeResponse {
from_addr: dest_ip,
sequence,
ttl,
rtt: Duration::from_millis(1000),
received_at: Instant::now(),
is_destination: false,
is_timeout: true,
});
});
match tokio::time::timeout(self.timing_config.socket_read_timeout, rx).await {
Ok(Ok(response)) => Ok(response),
Ok(Err(_)) => {
Err(TracerouteError::SocketError(
"Response channel closed unexpectedly".to_string(),
))
}
Err(_) => {
Ok(ProbeResponse {
from_addr: IpAddr::V4(dest),
sequence: probe.sequence,
ttl: probe.ttl,
rtt: self.timing_config.socket_read_timeout,
received_at: Instant::now(),
is_destination: false,
is_timeout: true,
})
}
}
}
}
impl ProbeSocket for MacOSAsyncIcmpSocket {
fn mode(&self) -> ProbeMode {
ProbeMode::DgramIcmp
}
fn send_probe_and_recv(
&self,
dest: IpAddr,
probe: ProbeInfo,
) -> Pin<Box<dyn Future<Output = Result<ProbeResponse, TracerouteError>> + Send + '_>> {
Box::pin(async move {
let dest_v4 = match dest {
IpAddr::V4(addr) => addr,
_ => return Err(TracerouteError::Ipv6NotSupported),
};
self.pending_count.fetch_add(1, Ordering::Relaxed);
self.send_and_recv_probe(dest_v4, probe).await
})
}
fn destination_reached(&self) -> bool {
self.destination_reached.load(Ordering::Relaxed)
}
fn pending_count(&self) -> usize {
self.pending_count.load(Ordering::Relaxed)
}
}
unsafe impl Send for MacOSAsyncIcmpSocket {}
unsafe impl Sync for MacOSAsyncIcmpSocket {}