ftr 0.7.0

A fast, parallel ICMP traceroute with ASN lookup, reverse DNS, and ISP detection
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
//! Linux async socket implementation for traceroute
//!
//! This module provides async UDP traceroute using IP_RECVERR for non-root operation.

use super::traits::{ProbeMode, ProbeSocket};
use crate::probe::{ProbeInfo, ProbeResponse};
use crate::socket::icmp;
use crate::traceroute::TracerouteError;
use std::future::Future;
use std::mem;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::os::unix::io::AsRawFd;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::net::UdpSocket;
use tokio::sync::oneshot;

// Linux specific constants for IP_RECVERR
const SO_EE_ORIGIN_ICMP: u8 = 2;

// sock_extended_err structure from Linux
#[repr(C)]
#[derive(Debug, Clone, Copy)]
struct SockExtendedErr {
    ee_errno: u32,
    ee_origin: u8,
    ee_type: u8,
    ee_code: u8,
    ee_pad: u8,
    ee_info: u32,
    ee_data: u32,
}

/// Async UDP socket for Linux using IP_RECVERR
pub struct LinuxAsyncUdpSocket {
    mode: ProbeMode,
    destination_reached: Arc<AtomicBool>,
    pending_count: Arc<AtomicUsize>,
    dest_port: u16,
}

/// Result of checking for ICMP error
enum IcmpCheckResult {
    /// Found a matching response
    Found(IpAddr, bool), // (from_addr, is_destination)
    /// No data available yet
    NoData,
    /// Error occurred
    Error,
}

impl LinuxAsyncUdpSocket {
    /// Check for ICMP error using MSG_ERRQUEUE
    unsafe fn check_icmp_error(fd: i32, _sequence: u16) -> IcmpCheckResult {
        let mut buf = [0u8; 512];
        let mut control_buf = [0u8; 512];
        let mut from_addr: libc::sockaddr_in = std::mem::zeroed();
        let from_len = std::mem::size_of::<libc::sockaddr_in>() as libc::socklen_t;

        // Debug logging in CI
        if std::env::var("CI").is_ok() {
            use std::sync::atomic::{AtomicU32, Ordering};
            static CHECK_COUNT: AtomicU32 = AtomicU32::new(0);
            let count = CHECK_COUNT.fetch_add(1, Ordering::Relaxed) + 1;
            if count <= 5 || count % 100 == 0 {
                eprintln!("[DEBUG] check_icmp_error: attempt {} on fd {}", count, fd);
            }
        }

        let mut iovec = libc::iovec {
            iov_base: buf.as_mut_ptr() as *mut libc::c_void,
            iov_len: buf.len(),
        };

        // Initialize msghdr - musl and glibc have different field types
        let mut msg: libc::msghdr = unsafe { std::mem::zeroed() };
        msg.msg_name = &mut from_addr as *mut _ as *mut libc::c_void;
        msg.msg_namelen = from_len;
        msg.msg_iov = &mut iovec;
        msg.msg_iovlen = 1;
        msg.msg_control = control_buf.as_mut_ptr() as *mut libc::c_void;
        msg.msg_controllen = control_buf.len() as _; // Cast to handle u32 vs usize
        msg.msg_flags = 0;

        let ret = libc::recvmsg(fd, &mut msg, libc::MSG_ERRQUEUE | libc::MSG_DONTWAIT);

        if ret >= 0 {
            // Debug logging in CI
            if std::env::var("CI").is_ok() {
                eprintln!("[DEBUG] recvmsg returned {} bytes", ret);
            }

            // Parse control messages to find IP_RECVERR
            let mut cmsg: *const libc::cmsghdr = libc::CMSG_FIRSTHDR(&msg);

            while !cmsg.is_null() {
                let cmsg_data = std::ptr::read_unaligned(cmsg);

                if cmsg_data.cmsg_level == libc::IPPROTO_IP
                    && cmsg_data.cmsg_type == libc::IP_RECVERR
                {
                    let err_ptr = libc::CMSG_DATA(cmsg) as *const SockExtendedErr;
                    let sock_err = std::ptr::read_unaligned(err_ptr);

                    if sock_err.ee_origin == SO_EE_ORIGIN_ICMP {
                        // Get the offending address (follows the SockExtendedErr structure)
                        let addr_ptr = (err_ptr as *const u8).add(mem::size_of::<SockExtendedErr>())
                            as *const libc::sockaddr_in;
                        let offender_addr = std::ptr::read_unaligned(addr_ptr);
                        let from_ip =
                            IpAddr::V4(Ipv4Addr::from(u32::from_be(offender_addr.sin_addr.s_addr)));

                        // Since we're using a dedicated socket per probe, any error on this socket
                        // must be for our probe. We don't need to check identifier/sequence.
                        // Determine if this is destination (Port Unreachable)
                        let is_destination = sock_err.ee_type == 3 && sock_err.ee_code == 3;

                        // Debug logging in CI
                        if std::env::var("CI").is_ok() {
                            eprintln!(
                                "[DEBUG] ICMP response: from={}, type={}, code={}, is_dest={}",
                                from_ip, sock_err.ee_type, sock_err.ee_code, is_destination
                            );
                        }

                        return IcmpCheckResult::Found(from_ip, is_destination);
                    }
                }

                cmsg = libc::CMSG_NXTHDR(&msg, cmsg);
            }
            IcmpCheckResult::NoData
        } else {
            let err = std::io::Error::last_os_error();
            if err.raw_os_error() == Some(libc::EAGAIN) {
                IcmpCheckResult::NoData
            } else {
                // Debug logging in CI
                if std::env::var("CI").is_ok() {
                    use std::sync::atomic::{AtomicU32, Ordering};
                    static ERR_COUNT: AtomicU32 = AtomicU32::new(0);
                    let count = ERR_COUNT.fetch_add(1, Ordering::Relaxed) + 1;
                    if count <= 5 {
                        eprintln!("[DEBUG] recvmsg error: {}", err);
                    }
                }
                IcmpCheckResult::Error
            }
        }
    }

    /// Create a new async UDP socket with IP_RECVERR for Linux
    pub fn new() -> Result<Self, TracerouteError> {
        Self::new_with_config(crate::TimingConfig::default())
    }

    /// Create with timing configuration
    pub fn new_with_config(_timing_config: crate::TimingConfig) -> Result<Self, TracerouteError> {
        let dest_port = 33434; // Traditional traceroute port

        Ok(LinuxAsyncUdpSocket {
            mode: ProbeMode::UdpWithRecverr,
            destination_reached: Arc::new(AtomicBool::new(false)),
            pending_count: Arc::new(AtomicUsize::new(0)),
            dest_port,
        })
    }
}

impl ProbeSocket for LinuxAsyncUdpSocket {
    fn mode(&self) -> ProbeMode {
        self.mode
    }

    fn send_probe_and_recv(
        &self,
        dest: IpAddr,
        probe: ProbeInfo,
    ) -> Pin<Box<dyn Future<Output = Result<ProbeResponse, TracerouteError>> + Send + '_>> {
        Box::pin(async move {
            // Increment pending count
            self.pending_count.fetch_add(1, Ordering::Relaxed);

            // Create a new UDP socket for this probe
            let socket = std::net::UdpSocket::bind("0.0.0.0:0").map_err(|e| {
                TracerouteError::SocketError(format!("Failed to create UDP socket: {e}"))
            })?;

            let fd = socket.as_raw_fd();

            // Enable IP_RECVERR
            unsafe {
                let enable: i32 = 1;
                let ret = libc::setsockopt(
                    fd,
                    libc::IPPROTO_IP,
                    libc::IP_RECVERR,
                    &enable as *const _ as *const libc::c_void,
                    std::mem::size_of::<i32>() as libc::socklen_t,
                );
                if ret != 0 {
                    self.pending_count.fetch_sub(1, Ordering::Relaxed);
                    return Err(TracerouteError::SocketError(
                        "Failed to set IP_RECVERR".to_string(),
                    ));
                }
            }

            // Set TTL
            socket
                .set_ttl(probe.ttl as u32)
                .map_err(|e| TracerouteError::SocketError(format!("Failed to set TTL: {e}")))?;

            // Set non-blocking
            socket.set_nonblocking(true).map_err(|e| {
                TracerouteError::SocketError(format!("Failed to set non-blocking: {e}"))
            })?;

            // Convert to Tokio socket
            let async_socket = UdpSocket::from_std(socket).map_err(|e| {
                TracerouteError::SocketError(format!("Failed to convert to async socket: {e}"))
            })?;

            // Connect to destination
            let target_addr = SocketAddr::new(dest, self.dest_port);
            async_socket.connect(target_addr).await.map_err(|e| {
                TracerouteError::SocketError(format!("Failed to connect to destination: {e}"))
            })?;

            // Create payload
            let identifier = std::process::id() as u16;
            let mut payload = Vec::with_capacity(32);
            payload.extend_from_slice(&identifier.to_be_bytes());
            payload.extend_from_slice(&probe.sequence.to_be_bytes());
            payload.extend_from_slice(b"ftr-traceroute-probe-padding");

            // Record send time
            let sent_at = Instant::now();

            // Send probe
            let bytes_sent = async_socket.send(&payload).await.map_err(|e| {
                TracerouteError::SocketError(format!("Failed to send UDP probe: {e}"))
            })?;

            // Debug logging in CI
            if std::env::var("CI").is_ok() {
                eprintln!(
                    "[DEBUG] UDP probe sent: {} bytes to {}, TTL={}, seq={}",
                    bytes_sent, target_addr, probe.ttl, probe.sequence
                );
            }

            // Clone necessary data for the spawned task
            let destination_reached = self.destination_reached.clone();
            let pending_count = self.pending_count.clone();
            let sequence = probe.sequence;
            let ttl = probe.ttl;

            // Create oneshot channel for response
            let (tx, rx) = oneshot::channel();

            // Get the raw fd before moving the socket
            let fd = async_socket.as_raw_fd();

            // Spawn task to read from error queue
            tokio::spawn(async move {
                // Keep the socket alive in this task
                let _socket_guard = async_socket;
                let mut retry_count = 0;
                const MAX_RETRIES: u32 = 1000; // 1 second with 1ms delays

                loop {
                    // Check for ICMP error using MSG_ERRQUEUE
                    let result = unsafe { LinuxAsyncUdpSocket::check_icmp_error(fd, sequence) };

                    match result {
                        IcmpCheckResult::Found(from_addr, is_destination) => {
                            let rtt = Instant::now().duration_since(sent_at);

                            // Update destination reached
                            if is_destination {
                                destination_reached.store(true, Ordering::Relaxed);
                            }

                            // Decrement pending count
                            pending_count.fetch_sub(1, Ordering::Relaxed);

                            let response = ProbeResponse {
                                from_addr,
                                sequence,
                                ttl,
                                rtt,
                                received_at: Instant::now(),
                                is_destination,
                                is_timeout: false,
                            };

                            let _ = tx.send(response);
                            break;
                        }
                        IcmpCheckResult::Error => {
                            // Actual error
                            pending_count.fetch_sub(1, Ordering::Relaxed);
                            break;
                        }
                        IcmpCheckResult::NoData => {
                            // No data yet, continue polling
                        }
                    }

                    retry_count += 1;
                    if retry_count >= MAX_RETRIES {
                        // Timeout
                        pending_count.fetch_sub(1, Ordering::Relaxed);
                        let _ = tx.send(ProbeResponse {
                            from_addr: dest,
                            sequence,
                            ttl,
                            rtt: Duration::from_secs(1),
                            received_at: Instant::now(),
                            is_destination: false,
                            is_timeout: true,
                        });
                        break;
                    }

                    // Brief yield before retrying
                    tokio::time::sleep(Duration::from_millis(1)).await;
                }
            });

            // Wait for response
            match rx.await {
                Ok(response) => Ok(response),
                Err(_) => {
                    // Channel closed without response
                    self.pending_count.fetch_sub(1, Ordering::Relaxed);
                    Err(TracerouteError::SocketError(
                        "Failed to receive response".to_string(),
                    ))
                }
            }
        })
    }

    fn destination_reached(&self) -> bool {
        self.destination_reached.load(Ordering::Relaxed)
    }

    fn pending_count(&self) -> usize {
        self.pending_count.load(Ordering::Relaxed)
    }
}

/// Async ICMP socket for Linux (requires root or CAP_NET_RAW)
pub struct LinuxAsyncIcmpSocket {
    mode: ProbeMode,
    icmp_identifier: u16,
    destination_reached: Arc<AtomicBool>,
    pending_count: Arc<AtomicUsize>,
}

impl LinuxAsyncIcmpSocket {
    /// Create a new async ICMP socket for Linux
    pub fn new() -> Result<Self, TracerouteError> {
        Self::new_with_config(crate::TimingConfig::default())
    }

    /// Create with timing configuration
    pub fn new_with_config(_timing_config: crate::TimingConfig) -> Result<Self, TracerouteError> {
        let icmp_identifier = std::process::id() as u16;

        Ok(LinuxAsyncIcmpSocket {
            mode: ProbeMode::RawIcmp,
            icmp_identifier,
            destination_reached: Arc::new(AtomicBool::new(false)),
            pending_count: Arc::new(AtomicUsize::new(0)),
        })
    }

    /// Create ICMP echo request packet
    fn create_echo_request(&self, sequence: u16) -> Vec<u8> {
        let mut payload = vec![0u8; 16];
        let tag = b"ftr-traceroute";
        payload[..tag.len()].copy_from_slice(tag);

        icmp::build_echo_request(self.icmp_identifier, sequence, &payload)
    }

    /// Parse ICMP response
    fn parse_icmp_response(
        &self,
        data: &[u8],
        from_addr: IpAddr,
        sequence: u16,
    ) -> Option<(IpAddr, bool)> {
        let icmp_data = icmp::ipv4_payload(data)?;
        let hdr = icmp::parse_icmp_header(icmp_data)?;

        match hdr.icmp_type {
            icmp::ICMP_ECHO_REPLY => {
                if let Some((id, seq)) = icmp::parse_echo_reply(icmp_data) {
                    if id == self.icmp_identifier && seq == sequence {
                        return Some((from_addr, true));
                    }
                }
            }
            icmp::ICMP_TIME_EXCEEDED | icmp::ICMP_DEST_UNREACHABLE => {
                const ICMP_ERROR_HEADER_LEN: usize = 8;

                if icmp_data.len() >= ICMP_ERROR_HEADER_LEN {
                    let inner_data = &icmp_data[ICMP_ERROR_HEADER_LEN..];
                    if let Some((inner_hdr_len, _)) = icmp::parse_ipv4_header(inner_data) {
                        let inner_icmp_data = &inner_data[inner_hdr_len..];

                        if inner_icmp_data.len() >= 8 {
                            let inner_type = inner_icmp_data[0];
                            if inner_type == icmp::ICMP_ECHO_REQUEST {
                                let identifier =
                                    u16::from_be_bytes([inner_icmp_data[4], inner_icmp_data[5]]);
                                let seq =
                                    u16::from_be_bytes([inner_icmp_data[6], inner_icmp_data[7]]);

                                if identifier == self.icmp_identifier && seq == sequence {
                                    return Some((from_addr, false));
                                }
                            }
                        }
                    }
                }
            }
            _ => {}
        }

        None
    }
}

impl ProbeSocket for LinuxAsyncIcmpSocket {
    fn mode(&self) -> ProbeMode {
        self.mode
    }

    fn send_probe_and_recv(
        &self,
        dest: IpAddr,
        probe: ProbeInfo,
    ) -> Pin<Box<dyn Future<Output = Result<ProbeResponse, TracerouteError>> + Send + '_>> {
        Box::pin(async move {
            use socket2::{Domain, Protocol, Socket as Socket2, Type};
            use std::os::unix::io::AsRawFd;

            // Increment pending count
            self.pending_count.fetch_add(1, Ordering::Relaxed);

            // Create raw ICMP socket
            let socket =
                Socket2::new(Domain::IPV4, Type::RAW, Some(Protocol::ICMPV4)).map_err(|e| {
                    TracerouteError::SocketError(format!("Failed to create raw ICMP socket: {e}"))
                })?;

            // Set TTL
            socket
                .set_ttl_v4(probe.ttl as u32)
                .map_err(|e| TracerouteError::SocketError(format!("Failed to set TTL: {e}")))?;

            // Set non-blocking
            socket.set_nonblocking(true).map_err(|e| {
                TracerouteError::SocketError(format!("Failed to set non-blocking: {e}"))
            })?;

            // Convert to Tokio socket (we'll use it through raw fd)
            let _fd = socket.as_raw_fd();

            // Create ICMP echo request packet
            let packet = self.create_echo_request(probe.sequence);

            // Send packet
            let dest_addr: SocketAddr = SocketAddr::new(dest, 0);
            let sent_at = Instant::now();
            socket.send_to(&packet, &dest_addr.into()).map_err(|e| {
                TracerouteError::SocketError(format!("Failed to send ICMP packet: {e}"))
            })?;

            // Clone necessary data for the spawned task
            let destination_reached = self.destination_reached.clone();
            let pending_count = self.pending_count.clone();
            let sequence = probe.sequence;
            let ttl = probe.ttl;
            let icmp_identifier = self.icmp_identifier;

            // Create oneshot channel for response
            let (tx, rx) = oneshot::channel();

            // Spawn task to read responses
            let socket = Arc::new(socket);
            let socket_clone = socket.clone();
            tokio::spawn(async move {
                let mut retry_count = 0;
                const MAX_RETRIES: u32 = 1000; // 1 second with 1ms delays

                loop {
                    // Try to receive response
                    let mut buf = [std::mem::MaybeUninit::uninit(); 1500];
                    match socket_clone.recv_from(&mut buf) {
                        Ok((size, addr)) => {
                            if let Some(from_addr) = addr.as_socket_ipv4() {
                                let from_ip = IpAddr::V4(*from_addr.ip());

                                // Convert MaybeUninit buffer to initialized slice
                                let initialized_buf = unsafe {
                                    std::slice::from_raw_parts(buf.as_ptr() as *const u8, size)
                                };

                                // Parse ICMP response
                                let parser = LinuxAsyncIcmpSocket {
                                    mode: ProbeMode::RawIcmp,
                                    icmp_identifier,
                                    destination_reached: Arc::new(AtomicBool::new(false)),
                                    pending_count: Arc::new(AtomicUsize::new(0)),
                                };

                                if let Some((resp_addr, is_destination)) =
                                    parser.parse_icmp_response(initialized_buf, from_ip, sequence)
                                {
                                    let rtt = Instant::now().duration_since(sent_at);

                                    // Update destination reached
                                    if is_destination {
                                        destination_reached.store(true, Ordering::Relaxed);
                                    }

                                    // Decrement pending count
                                    pending_count.fetch_sub(1, Ordering::Relaxed);

                                    let response = ProbeResponse {
                                        from_addr: resp_addr,
                                        sequence,
                                        ttl,
                                        rtt,
                                        received_at: Instant::now(),
                                        is_destination,
                                        is_timeout: false,
                                    };

                                    let _ = tx.send(response);
                                    break;
                                }
                            }
                        }
                        Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                            // No data yet, continue
                        }
                        Err(_) => {
                            // Other error
                            pending_count.fetch_sub(1, Ordering::Relaxed);
                            break;
                        }
                    }

                    retry_count += 1;
                    if retry_count >= MAX_RETRIES {
                        // Timeout
                        pending_count.fetch_sub(1, Ordering::Relaxed);
                        let _ = tx.send(ProbeResponse {
                            from_addr: dest,
                            sequence,
                            ttl,
                            rtt: Duration::from_secs(1),
                            received_at: Instant::now(),
                            is_destination: false,
                            is_timeout: true,
                        });
                        break;
                    }

                    // Brief yield before retrying
                    tokio::time::sleep(Duration::from_millis(1)).await;
                }
            });

            // Wait for response
            match rx.await {
                Ok(response) => Ok(response),
                Err(_) => {
                    // Channel closed without response
                    self.pending_count.fetch_sub(1, Ordering::Relaxed);
                    Err(TracerouteError::SocketError(
                        "Failed to receive response".to_string(),
                    ))
                }
            }
        })
    }

    fn destination_reached(&self) -> bool {
        self.destination_reached.load(Ordering::Relaxed)
    }

    fn pending_count(&self) -> usize {
        self.pending_count.load(Ordering::Relaxed)
    }
}