netpulse-cli 0.1.1

A zero-config, single-binary network quality monitor with percentile stats, jitter, and MTR-style traceroute
Documentation
// src/trace.rs — MTR-style Traceroute Orchestrator

use crate::{
    config::ProbeType,
    error::NetPulseError,
    probers::{icmp::IcmpProber, udp::UdpProber, Prober},
    state::{new_app_state, TargetState},
};
use anyhow::Result;
use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc,
};
use tokio::task::JoinHandle;
use tokio::time::{self, Duration};

pub async fn run_trace_mode(
    target: String,
    resolved_ip: String,
    probe_type: ProbeType,
    timeout_ms: u64,
    interval_ms: u64,
    window: usize,
) -> Result<()> {
    let prober: Arc<dyn Prober> = match probe_type {
        ProbeType::Icmp => Arc::new(IcmpProber::new(timeout_ms)),
        ProbeType::Udp => Arc::new(UdpProber::new(timeout_ms)),
        ProbeType::Tcp => unreachable!("TCP tracing prevented by CLI"),
    };

    println!(
        "Tracing route to {} ({}), {} probes...",
        target,
        resolved_ip,
        prober.name()
    );

    let state = new_app_state();
    let running = Arc::new(AtomicBool::new(true));

    // Handle Ctrl-C gracefully
    let running_clone = Arc::clone(&running);
    tokio::spawn(async move {
        let _ = tokio::signal::ctrl_c().await;
        running_clone.store(false, Ordering::SeqCst);
    });

    // 1. Hop Discovery Phase
    // We send probes from TTL 1 up to 30 to figure out how many hops are in the path.
    let mut discovered_hops = Vec::new();
    let mut max_ttl = 30;

    for ttl in 1..=30 {
        if !running.load(Ordering::Relaxed) {
            return Ok(());
        }

        // Initialize state for this TTL so TUI can start showing something immediately
        {
            let mut guard = state.lock().unwrap();
            let mut ts = TargetState::new_hop(window);
            // We store the "name" as "Hop N: ???" initially
            ts.last_ip = Some(String::from("???"));
            guard.insert(ttl.to_string(), ts);
        }

        let seq = (ttl as u64) << 8;
        let result = prober.probe(&resolved_ip, seq, Some(ttl as u32)).await;

        match result {
            Ok(res) => {
                let ip = res
                    .responder_ip
                    .clone()
                    .unwrap_or_else(|| String::from("???"));

                {
                    let mut guard = state.lock().unwrap();
                    if let Some(ts) = guard.get_mut(&ttl.to_string()) {
                        ts.buffer.push(res.clone());
                        ts.last_rtt_us = res.rtt_us;
                        ts.last_ip = Some(ip.clone());
                        ts.seq += 1;
                    }
                }

                discovered_hops.push((ttl, ip.clone()));

                // If the responder is our actual final destination, we are done discovering!
                // For UDP, Port Unreachable comes from the final dest. For ICMP, Echo Reply comes from the final dest.
                // Our Prober abstraction might not explicitly flag "destination reached" versus "time exceeded" in the result struct,
                // BUT if the responder IP matches our resolved target IP, it's definitely the destination.
                if ip == resolved_ip {
                    max_ttl = ttl;
                    break;
                }
            }
            Err(e) => {
                if matches!(e, NetPulseError::InsufficientPrivileges) {
                    eprintln!("Fatal error: {}", e);
                    std::process::exit(1);
                }
                // Timeout or other error — might be a silent router.
                // We just continue to the next TTL.
                discovered_hops.push((ttl, String::from("???")));
                {
                    let mut guard = state.lock().unwrap();
                    if let Some(ts) = guard.get_mut(&ttl.to_string()) {
                        ts.seq += 1;
                    }
                }
            }
        }
    }

    // 2. Continuous Probing Phase
    // Now we spawn one async task per TTL and ping it continuously.
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for ttl in 1..=max_ttl {
        let prober_clone = Arc::clone(&prober);
        let target_clone = resolved_ip.clone();
        let state_clone = Arc::clone(&state);
        let running_task = Arc::clone(&running);

        let task = tokio::spawn(async move {
            let mut ticker = time::interval(Duration::from_millis(interval_ms));
            let mut local_seq = 1; // start from 1 since discovery used 0

            while running_task.load(Ordering::Relaxed) {
                ticker.tick().await;

                let seq_val = ((ttl as u64) << 8) | (local_seq % 256);
                let probe_res = match prober_clone
                    .probe(&target_clone, seq_val, Some(ttl as u32))
                    .await
                {
                    Ok(p) => p,
                    Err(_) => crate::probers::ProbeResult::loss(&target_clone, seq_val),
                };

                {
                    let mut guard = state_clone.lock().unwrap();
                    if let Some(ts) = guard.get_mut(&ttl.to_string()) {
                        ts.buffer.push(probe_res.clone());
                        ts.last_rtt_us = probe_res.rtt_us;
                        ts.seq += 1;
                        if let Some(ip) = probe_res.responder_ip {
                            ts.last_ip = Some(ip);
                        }
                    }
                }
                local_seq += 1;
            }
        });
        tasks.push(task);
    }

    // 3. Render Trace TUI
    crate::tui::trace_tui::run(
        state.clone(),
        max_ttl,
        &target,
        &resolved_ip,
        prober.name(),
        interval_ms,
    )
    .await?;

    // On TUI exit, signal shutdown and wait for tasks
    running.store(false, Ordering::SeqCst);
    for task in tasks {
        let _ = task.await;
    }

    Ok(())
}