use anyhow::{Context, Result};
use clap::{Parser, Subcommand, ValueEnum};
use netpulse::{
config::{NetPulseConfig, ProbeType},
export::{prometheus::serve, CsvExporter, JsonExporter},
probers::{icmp::IcmpProber, tcp::TcpProber, udp::UdpProber, ProbeResult, Prober},
state::{new_app_state, AppState, TargetState},
stats::engine::StatsEngine,
};
use std::{
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tokio::time;
#[derive(Parser)]
#[command(
name = "netpulse",
version,
about = "Continuous network quality monitor — RTT percentiles, jitter, and burst-loss in one binary",
long_about = "Monitor network quality continuously with statistical RTT percentiles,\n\
jitter (RFC 3393), burst-loss detection, and packet reorder analysis.\n\
Supports ICMP and TCP probing with TUI, JSON, and text output."
)]
struct Cli {
#[arg(short, long, global = true, value_name = "FILE")]
config: Option<PathBuf>,
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
Monitor {
#[arg(required = true, value_name = "TARGET")]
targets: Vec<String>,
#[arg(short, long, default_value = "1000", value_name = "MS")]
interval: u64,
#[arg(short = 'T', long, default_value = "3000", value_name = "MS")]
timeout: u64,
#[arg(short, long, default_value = "300", value_name = "N")]
window: usize,
#[arg(short, long, default_value = "10", value_name = "N")]
every: u64,
#[arg(short, long, default_value = "icmp", value_name = "TYPE")]
probe: ProbeTypeArg,
#[arg(short = 'P', long, default_value = "80", value_name = "PORT")]
port: u16,
#[arg(short, long, default_value = "tui", value_name = "MODE")]
output: OutputMode,
#[arg(long)]
pretty: bool,
#[arg(long, value_name = "FILE")]
csv: Option<PathBuf>,
#[arg(long, value_name = "PORT")]
prometheus: Option<u16>,
},
Trace {
#[arg(required = true, value_name = "TARGET")]
target: String,
#[arg(short, long, default_value = "icmp", value_name = "TYPE")]
probe: ProbeTypeArg,
#[arg(short, long, default_value = "1000", value_name = "MS")]
interval: u64,
#[arg(short = 'T', long, default_value = "1000", value_name = "MS")]
timeout: u64,
#[arg(short, long, default_value = "300", value_name = "N")]
window: usize,
},
}
#[derive(Copy, Clone, PartialEq, ValueEnum)]
enum ProbeTypeArg {
Icmp,
Tcp,
Udp,
}
#[derive(Copy, Clone, PartialEq, ValueEnum)]
enum OutputMode {
Tui,
Json,
Text,
}
async fn resolve_target_ip(target: &str) -> Result<String> {
if target.parse::<std::net::IpAddr>().is_ok() {
return Ok(target.to_string());
}
let lookup_str = if target.contains(':') {
target.to_string()
} else {
format!("{}:0", target)
};
let addrs: Vec<std::net::SocketAddr> = tokio::net::lookup_host(&lookup_str)
.await
.with_context(|| format!("DNS resolution failed for '{}'", target))?
.collect();
if addrs.is_empty() {
return Err(anyhow::anyhow!("no addresses found for '{}'", target));
}
let chosen = addrs.iter().find(|a| a.is_ipv4()).or(addrs.first());
Ok(chosen.unwrap().ip().to_string())
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
let _file_config = if let Some(ref path) = cli.config {
NetPulseConfig::from_file(path)
.with_context(|| format!("failed to load config from {:?}", path))?
} else {
NetPulseConfig::from_file(&PathBuf::from("netpulse.toml")).unwrap_or_default()
};
match cli.command {
Commands::Monitor {
targets,
interval,
timeout,
window,
every,
probe,
port,
output,
pretty,
csv,
prometheus,
} => {
let probe_type: ProbeType = match probe {
ProbeTypeArg::Icmp => ProbeType::Icmp,
ProbeTypeArg::Tcp => ProbeType::Tcp,
ProbeTypeArg::Udp => ProbeType::Udp,
};
let quiet = output == OutputMode::Tui;
if !quiet {
eprintln!(
"netpulse v{} · resolving targets…",
env!("CARGO_PKG_VERSION")
);
}
let mut resolved: Vec<(String, String)> = Vec::new();
for t in &targets {
match resolve_target_ip(t).await {
Ok(ip) => {
if !quiet && &ip != t {
eprintln!(" {} → {}", t, ip);
}
resolved.push((t.clone(), ip));
}
Err(e) => {
if !quiet {
eprintln!(" {} → ERROR: {}", t, e);
}
resolved.push((t.clone(), t.clone()));
}
}
}
let probe_type_str: &'static str = match probe_type {
ProbeType::Icmp => "icmp",
ProbeType::Tcp => "tcp",
ProbeType::Udp => "udp",
};
match output {
OutputMode::Tui => {
run_tui_mode(
resolved,
probe_type,
port,
timeout,
interval,
window,
probe_type_str,
prometheus,
)
.await
}
OutputMode::Json | OutputMode::Text => {
run_stream_mode(
resolved, probe_type, port, timeout, interval, window, every, output,
pretty, csv, prometheus,
)
.await
}
}
}
Commands::Trace {
target,
probe,
interval,
timeout,
window,
} => {
let probe_type = match probe {
ProbeTypeArg::Icmp => ProbeType::Icmp,
ProbeTypeArg::Udp => ProbeType::Udp,
ProbeTypeArg::Tcp => {
eprintln!("TCP tracing is not currently supported. Use 'icmp' or 'udp'.");
std::process::exit(1);
}
};
let resolved_ip = resolve_target_ip(&target)
.await
.with_context(|| format!("Failed to resolve trace target {:?}", target))?;
netpulse::trace::run_trace_mode(
target,
resolved_ip,
probe_type,
timeout,
interval,
window,
)
.await
}
}
}
#[allow(clippy::too_many_arguments)]
async fn run_tui_mode(
resolved: Vec<(String, String)>,
probe_type: ProbeType,
tcp_port: u16,
timeout_ms: u64,
interval_ms: u64,
window: usize,
probe_type_str: &'static str,
prometheus: Option<u16>,
) -> Result<()> {
let prober: Arc<dyn Prober> = match probe_type {
ProbeType::Icmp => Arc::new(IcmpProber::new(timeout_ms)),
ProbeType::Tcp => Arc::new(TcpProber::new(tcp_port, timeout_ms)),
ProbeType::Udp => Arc::new(UdpProber::new(timeout_ms)),
};
let state = new_app_state();
{
let mut guard = state.lock().unwrap();
for (name, addr) in &resolved {
guard.insert(name.clone(), TargetState::new(name, addr, window));
}
}
let display_names: Vec<String> = resolved.iter().map(|(n, _)| n.clone()).collect();
if let Some(prom_port) = prometheus {
tokio::spawn(serve(state.clone(), display_names.clone(), prom_port));
}
let running = Arc::new(AtomicBool::new(true));
let mut handles = Vec::new();
for (display_name, resolved_addr) in &resolved {
let prober = Arc::clone(&prober);
let state = Arc::clone(&state);
let running = Arc::clone(&running);
let display_name = display_name.clone();
let resolved_addr = resolved_addr.clone();
let probe_addr = match probe_type {
ProbeType::Icmp | ProbeType::Udp => resolved_addr.clone(),
ProbeType::Tcp => {
if display_name.contains(':') {
display_name.clone()
} else {
format!("{}:{}", display_name, tcp_port)
}
}
};
let handle = tokio::spawn(async move {
tui_probe_loop(
display_name,
probe_addr,
prober,
state,
interval_ms,
running,
)
.await;
});
handles.push(handle);
}
let tui_result = tokio::select! {
result = netpulse::tui::run(display_names, state, probe_type_str, interval_ms) => result,
_ = tokio::signal::ctrl_c() => Ok(()),
};
running.store(false, Ordering::Relaxed);
for handle in handles {
handle.abort();
}
tui_result
}
async fn tui_probe_loop(
display_name: String,
probe_addr: String,
prober: Arc<dyn Prober>,
state: AppState,
interval_ms: u64,
running: Arc<AtomicBool>,
) {
let mut ticker = time::interval(Duration::from_millis(interval_ms));
let mut seq: u64 = 0;
while running.load(Ordering::Relaxed) {
ticker.tick().await;
let probe = match prober.probe(&probe_addr, seq, None).await {
Ok(p) => p,
Err(netpulse::NetPulseError::InsufficientPrivileges) => {
eprintln!(
"[{}] ICMP requires root or CAP_NET_RAW. Try: sudo netpulse monitor {}",
display_name, display_name
);
break;
}
Err(_) => ProbeResult::loss(&display_name, seq),
};
let last_rtt_us = probe.rtt_us;
{
let mut guard = state.lock().unwrap();
if let Some(ts) = guard.get_mut(&display_name) {
ts.buffer.push(probe);
ts.seq = seq + 1;
ts.last_rtt_us = last_rtt_us;
}
}
seq += 1;
}
}
#[allow(clippy::too_many_arguments)]
async fn run_stream_mode(
resolved: Vec<(String, String)>,
probe_type: ProbeType,
tcp_port: u16,
timeout_ms: u64,
interval_ms: u64,
window: usize,
report_every: u64,
output: OutputMode,
pretty: bool,
csv_path: Option<PathBuf>,
prometheus: Option<u16>,
) -> Result<()> {
let prober: Arc<dyn Prober> = match probe_type {
ProbeType::Icmp => Arc::new(IcmpProber::new(timeout_ms)),
ProbeType::Tcp => Arc::new(TcpProber::new(tcp_port, timeout_ms)),
ProbeType::Udp => Arc::new(UdpProber::new(timeout_ms)),
};
let state = new_app_state();
{
let mut guard = state.lock().unwrap();
for (name, addr) in &resolved {
guard.insert(name.clone(), TargetState::new(name, addr, window));
}
}
let display_names: Vec<String> = resolved.iter().map(|(n, _)| n.clone()).collect();
if let Some(prom_port) = prometheus {
tokio::spawn(serve(state.clone(), display_names.clone(), prom_port));
}
let json_exporter = Arc::new(JsonExporter::new(pretty));
let csv_exporter = csv_path.map(|p| Arc::new(CsvExporter::new(p)));
eprintln!(
"netpulse starting — probe={} interval={}ms targets={}",
match probe_type {
ProbeType::Icmp => "icmp",
ProbeType::Tcp => "tcp",
ProbeType::Udp => "udp",
},
interval_ms,
resolved
.iter()
.map(|(n, _)| n.as_str())
.collect::<Vec<_>>()
.join(", ")
);
let mut handles = Vec::new();
for (display_name, resolved_addr) in resolved {
let prober = Arc::clone(&prober);
let json_exporter = Arc::clone(&json_exporter);
let csv_exporter = csv_exporter.clone();
let state = Arc::clone(&state);
let probe_addr = match probe_type {
ProbeType::Icmp | ProbeType::Udp => resolved_addr.clone(),
ProbeType::Tcp => {
if display_name.contains(':') {
display_name.clone()
} else {
format!("{}:{}", display_name, tcp_port)
}
}
};
let handle = tokio::spawn(async move {
stream_probe_loop(
display_name,
probe_addr,
prober,
state,
json_exporter,
csv_exporter,
interval_ms,
report_every,
output,
)
.await;
});
handles.push(handle);
}
tokio::signal::ctrl_c()
.await
.context("failed to listen for Ctrl-C")?;
eprintln!("\nnetpulse stopping.");
for h in handles {
h.abort();
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn stream_probe_loop(
display_name: String,
probe_addr: String,
prober: Arc<dyn Prober>,
state: AppState,
json_exporter: Arc<JsonExporter>,
csv_exporter: Option<Arc<CsvExporter>>,
interval_ms: u64,
report_every: u64,
output: OutputMode,
) {
let mut ticker = time::interval(Duration::from_millis(interval_ms));
let mut seq: u64 = 0;
loop {
ticker.tick().await;
let probe = match prober.probe(&probe_addr, seq, None).await {
Ok(p) => p,
Err(netpulse::NetPulseError::InsufficientPrivileges) => {
eprintln!(
"[{}] ICMP/UDP requires root. Try: sudo netpulse monitor {}",
display_name, display_name
);
break;
}
Err(_) => ProbeResult::loss(&display_name, seq),
};
if output == OutputMode::Text {
print_probe_text(&probe);
}
let last_rtt_us = probe.rtt_us;
{
let mut guard = state.lock().unwrap();
if let Some(ts) = guard.get_mut(&display_name) {
ts.buffer.push(probe);
ts.seq = seq + 1;
ts.last_rtt_us = last_rtt_us;
}
}
seq += 1;
if seq.is_multiple_of(report_every) {
let snapshot = {
let guard = state.lock().unwrap();
guard.get(&display_name).unwrap().buffer.snapshot()
};
let stats = StatsEngine::compute(&display_name, snapshot);
match output {
OutputMode::Json => {
if let Err(e) = json_exporter.emit(&stats) {
eprintln!("json export error: {}", e);
}
}
OutputMode::Text => print_stats_text(&stats),
OutputMode::Tui => unreachable!(),
}
if let Some(ref csv) = csv_exporter {
if let Err(e) = csv.emit(&stats) {
eprintln!("csv export error: {}", e);
}
}
}
}
}
fn print_probe_text(probe: &ProbeResult) {
match probe.rtt_us {
Some(rtt) => println!(
"[{}] seq={} rtt={:.3}ms",
probe.target,
probe.seq,
rtt as f64 / 1000.0
),
None => println!("[{}] seq={} TIMEOUT", probe.target, probe.seq),
}
}
fn print_stats_text(stats: &netpulse::StatsSnapshot) {
println!(
"─── {} | n={} loss={:.1}% | p50={} p95={} p99={} | jitter={} | burst={}",
stats.target,
stats.sample_count,
stats.loss_pct,
fmt_us(stats.rtt_p50_us),
fmt_us(stats.rtt_p95_us),
fmt_us(stats.rtt_p99_us),
stats
.jitter_us
.map(|j| format!("{:.2}ms", j / 1000.0))
.unwrap_or_else(|| "n/a".to_string()),
stats.max_burst_loss,
);
}
fn fmt_us(val: Option<u64>) -> String {
val.map(|v| format!("{:.2}ms", v as f64 / 1000.0))
.unwrap_or_else(|| "n/a".to_string())
}