use std::time::{Duration, Instant};
use colored::*;
use serde::Serialize;
use crate::i18n::t;
use crate::output::{print_json, print_json_error, OutputMode};
use crate::table::print_table;
#[derive(Serialize, Clone, Default)]
pub struct TimingBreakdown {
pub dns_ms: f64,
pub connect_ms: f64,
pub tls_ms: f64,
pub ttfb_ms: f64,
pub total_ms: f64,
}
#[derive(Serialize, Clone)]
pub struct CheckProbe {
pub success: bool,
pub rtt_ms: f64,
pub status_code: Option<u16>,
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timing: Option<TimingBreakdown>,
}
#[derive(Serialize)]
pub struct CheckOutput {
pub target: String,
pub check_type: String,
pub probes: Vec<CheckProbe>,
pub stats: CheckStats,
}
#[derive(Serialize, Clone)]
pub struct CheckStats {
pub total: usize,
pub success: usize,
pub failed: usize,
pub min_ms: Option<f64>,
pub max_ms: Option<f64>,
pub avg_ms: Option<f64>,
}
pub async fn run(target: &str, count: u32, timeout: Duration, timing: bool, proxy: Option<String>, no_proxy: bool, concurrency: usize, mode: OutputMode) {
if target.starts_with("http://") || target.starts_with("https://") {
run_http(target, count, timeout, timing, proxy, no_proxy, concurrency, mode).await;
} else {
run_tcp(target, count, timeout, mode).await;
}
}
pub(crate) fn parse_host_port(target: &str) -> Option<(String, u16)> {
if let Ok(addr) = target.parse::<std::net::SocketAddr>() {
return Some((addr.ip().to_string(), addr.port()));
}
if let Some(idx) = target.rfind(':') {
let host = &target[..idx];
let port_str = &target[idx + 1..];
if let Ok(port) = port_str.parse::<u16>() {
let host = host.trim_start_matches('[').trim_end_matches(']');
return Some((host.to_string(), port));
}
}
None
}
fn parse_url(url: &str) -> Option<(String, u16, bool)> {
let (scheme, rest) = url.split_once("://")?;
let is_https = scheme == "https";
let default_port = if is_https { 443 } else { 80 };
let host = rest.split('/').next()?;
let (host, port) = if let Some((h, p)) = host.rsplit_once(':') {
(h.to_string(), p.parse().unwrap_or(default_port))
} else {
(host.to_string(), default_port)
};
Some((host, port, is_https))
}
async fn run_tcp(target: &str, count: u32, connect_timeout: Duration, mode: OutputMode) {
use tokio::net::TcpStream;
use tokio::time::timeout;
let (host, port) = match parse_host_port(target) {
Some(hp) => hp,
None => {
if mode == OutputMode::Json {
print_json_error(&t("check.format_err"));
} else {
println!(" {}", t("check.format_err").red());
}
return;
}
};
let mut probes = Vec::new();
for i in 0..count {
let start = Instant::now();
let addr = format!("{}:{}", host, port);
let result = timeout(connect_timeout, TcpStream::connect(&addr)).await;
let elapsed = start.elapsed();
match result {
Ok(Ok(_stream)) => {
if mode == OutputMode::Table {
println!(
" {}",
t("check.tcp_ok")
.replace("{0}", &(i + 1).to_string())
.replace("{1}", &count.to_string())
.replace("{2}", &format!("{:.2}", elapsed.as_secs_f64() * 1000.0))
.green()
);
}
probes.push(CheckProbe {
success: true,
rtt_ms: elapsed.as_secs_f64() * 1000.0,
status_code: None,
error: None,
timing: None,
});
}
Ok(Err(e)) => {
if mode == OutputMode::Table {
println!(
" {}",
t("check.tcp_fail")
.replace("{0}", &(i + 1).to_string())
.replace("{1}", &count.to_string())
.replace("{2}", &e.to_string())
.red()
);
}
probes.push(CheckProbe {
success: false,
rtt_ms: elapsed.as_secs_f64() * 1000.0,
status_code: None,
error: Some(e.to_string()),
timing: None,
});
}
Err(_) => {
if mode == OutputMode::Table {
println!(
" {}",
t("check.tcp_timeout")
.replace("{0}", &(i + 1).to_string())
.replace("{1}", &count.to_string())
.replace("{2}", &connect_timeout.as_secs().to_string())
.red()
);
}
probes.push(CheckProbe {
success: false,
rtt_ms: connect_timeout.as_secs_f64() * 1000.0,
status_code: None,
error: Some(t("check.req_timeout")),
timing: None,
});
}
}
if i + 1 < count {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
let stats = compute_stats(&probes);
let output = CheckOutput {
target: target.to_string(),
check_type: "tcp".to_string(),
probes: probes.clone(),
stats: stats.clone(),
};
if mode == OutputMode::Json {
print_json(&output);
return;
}
print_stats(&stats, false);
}
async fn run_http(url: &str, count: u32, connect_timeout: Duration, timing: bool, proxy: Option<String>, no_proxy: bool, concurrency: usize, mode: OutputMode) {
let proxy_addr = if let Some(ref p) = proxy {
Some(p.clone())
} else if no_proxy {
None
} else {
crate::util::get_system_proxy_addr()
};
let can_breakdown = timing && proxy_addr.is_none() && url.starts_with("https://");
let mut builder = reqwest::Client::builder().timeout(connect_timeout);
builder = builder.no_proxy();
if let Some(ref proxy_url) = proxy_addr {
if let Ok(proxy) = reqwest::Proxy::all(proxy_url) {
builder = builder.proxy(proxy);
}
}
let client = builder.build().unwrap();
let proxy_tag = if proxy_addr.is_some() {
format!(" [{}]", t("diagnose.via_proxy"))
} else {
String::new()
};
let is_concurrent = concurrency > 1;
let mut probes = Vec::new();
if is_concurrent {
use tokio::sync::Semaphore;
let sem = std::sync::Arc::new(Semaphore::new(concurrency));
let url = std::sync::Arc::new(url.to_string());
let client = std::sync::Arc::new(client);
let mut handles = Vec::new();
for i in 0..count {
let sem = sem.clone();
let url = url.clone();
let client = client.clone();
let connect_timeout = connect_timeout;
let can_breakdown = can_breakdown;
handles.push(tokio::spawn(async move {
let _permit = sem.acquire_owned().await.unwrap();
if can_breakdown {
run_http_timing(&url, connect_timeout, OutputMode::Json, i, count).await
} else {
run_http_single(&client, &url, i, count).await
}
}));
}
for handle in handles {
if let Ok(probe) = handle.await {
probes.push(probe);
}
}
} else {
for i in 0..count {
if can_breakdown {
let probe = run_http_timing(url, connect_timeout, mode, i, count).await;
probes.push(probe);
} else {
let probe = run_http_single(&client, url, i, count).await;
if mode == OutputMode::Table {
print_probe(&probe, i, count, &proxy_tag);
}
probes.push(probe);
}
if i + 1 < count {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
if is_concurrent && mode == OutputMode::Table {
probes.sort_by_key(|p| p.rtt_ms as u64); for (i, probe) in probes.iter().enumerate() {
print_probe(probe, i as u32, count as u32, &proxy_tag);
}
}
let stats = compute_stats(&probes);
let output = CheckOutput {
target: url.to_string(),
check_type: if is_concurrent { "http-concurrent".to_string() } else { "http".to_string() },
probes: probes.clone(),
stats: stats.clone(),
};
if mode == OutputMode::Json {
print_json(&output);
return;
}
print_stats(&stats, true);
if is_concurrent {
print_concurrent_stats(&probes, concurrency);
}
}
async fn run_http_single(client: &reqwest::Client, url: &str, _i: u32, _count: u32) -> CheckProbe {
let start = Instant::now();
let result = client.get(url).send().await;
let elapsed = start.elapsed();
match result {
Ok(resp) => {
let status_code = resp.status().as_u16();
let is_success = resp.status().is_success();
CheckProbe {
success: is_success,
rtt_ms: elapsed.as_secs_f64() * 1000.0,
status_code: Some(status_code),
error: None,
timing: None,
}
}
Err(e) => {
let msg = if e.is_connect() {
t("check.conn_fail")
} else if e.is_timeout() {
t("check.req_timeout")
} else {
e.to_string()
};
CheckProbe {
success: false,
rtt_ms: elapsed.as_secs_f64() * 1000.0,
status_code: None,
error: Some(msg),
timing: None,
}
}
}
}
fn print_probe(probe: &CheckProbe, i: u32, count: u32, proxy_tag: &str) {
if probe.success {
let status = probe.status_code.unwrap_or(0);
let symbol = if (200..300).contains(&status) { "✓".green() } else { "⚠".yellow() };
println!(
" [{}/{}] {} {} {:.2}ms{}",
i + 1, count, symbol, status, probe.rtt_ms, proxy_tag
);
if let Some(ref tm) = probe.timing {
println!(" {:<10} {:.2}ms", t("check.timing_dns"), tm.dns_ms);
println!(" {:<10} {:.2}ms", t("check.timing_connect"), tm.connect_ms);
println!(" {:<10} {:.2}ms", t("check.timing_tls"), tm.tls_ms);
println!(" {:<10} {:.2}ms", t("check.timing_ttfb"), tm.ttfb_ms);
println!(" {:<10} {:.2}ms", t("check.timing_total"), tm.total_ms);
}
} else {
let msg = probe.error.as_deref().unwrap_or("unknown");
println!(
" {}",
format!("[{}/{}] ✗ {} {:.2}ms{}", i + 1, count, msg, probe.rtt_ms, proxy_tag).red()
);
}
}
fn print_concurrent_stats(probes: &[CheckProbe], concurrency: usize) {
println!();
let success = probes.iter().filter(|p| p.success).count();
let total = probes.len();
let rtts: Vec<f64> = probes.iter().filter(|p| p.success).map(|p| p.rtt_ms).collect();
let h_metric = t("common.metric");
let h_value = t("proxy.value");
let headers = [h_metric.as_str(), h_value.as_str()];
let mut rows = Vec::new();
rows.push(vec![t("check.concurrency"), concurrency.to_string()]);
rows.push(vec![t("check.total_reqs"), total.to_string()]);
rows.push(vec![t("check.success_reqs"), success.to_string()]);
rows.push(vec![t("check.fail_reqs"), (total - success).to_string()]);
if !rtts.is_empty() {
let stats = crate::util::compute_stats(&rtts);
if let (Some(min), Some(max), Some(avg)) = (stats.min_ms, stats.max_ms, stats.avg_ms) {
rows.push(vec![t("ping.min"), format!("{:.2}ms", min)]);
rows.push(vec![t("ping.max"), format!("{:.2}ms", max)]);
rows.push(vec![t("ping.avg"), format!("{:.2}ms", avg)]);
let max_sec = max / 1000.0;
if max_sec > 0.0 {
let qps = success as f64 / max_sec;
rows.push(vec![t("check.qps"), format!("{:.1}", qps)]);
}
}
}
print_table(&headers, &rows);
}
async fn run_http_timing(url: &str, timeout: Duration, mode: OutputMode, i: u32, count: u32) -> CheckProbe {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio_rustls::TlsConnector;
let (host, port, _) = match parse_url(url) {
Some(hp) => hp,
None => {
return CheckProbe {
success: false,
rtt_ms: 0.0,
status_code: None,
error: Some("URL parse error".to_string()),
timing: None,
};
}
};
let path = url.split("://").nth(1).and_then(|s| s.find('/').map(|idx| &s[idx..])).unwrap_or("/");
let t0 = Instant::now();
let ip = match crate::util::resolve_host(&host).await {
Some(ip) => ip,
None => {
return CheckProbe {
success: false,
rtt_ms: t0.elapsed().as_secs_f64() * 1000.0,
status_code: None,
error: Some("DNS resolve failed".to_string()),
timing: None,
};
}
};
let dns_ms = t0.elapsed().as_secs_f64() * 1000.0;
let t1 = Instant::now();
let tcp_result = tokio::time::timeout(timeout, TcpStream::connect((ip, port))).await;
let tcp_stream = match tcp_result {
Ok(Ok(s)) => s,
Ok(Err(e)) => {
return CheckProbe {
success: false,
rtt_ms: t0.elapsed().as_secs_f64() * 1000.0,
status_code: None,
error: Some(format!("TCP: {}", e)),
timing: Some(TimingBreakdown {
dns_ms,
connect_ms: t1.elapsed().as_secs_f64() * 1000.0,
..Default::default()
}),
};
}
Err(_) => {
return CheckProbe {
success: false,
rtt_ms: t0.elapsed().as_secs_f64() * 1000.0,
status_code: None,
error: Some("TCP: timeout".to_string()),
timing: Some(TimingBreakdown {
dns_ms,
connect_ms: t1.elapsed().as_secs_f64() * 1000.0,
..Default::default()
}),
};
}
};
let connect_ms = t1.elapsed().as_secs_f64() * 1000.0;
let t2 = Instant::now();
let root_store = rustls::RootCertStore {
roots: webpki_roots::TLS_SERVER_ROOTS.iter().cloned().collect(),
};
let config = rustls::ClientConfig::builder_with_provider(std::sync::Arc::new(rustls::crypto::ring::default_provider()))
.with_safe_default_protocol_versions()
.unwrap()
.with_root_certificates(root_store)
.with_no_client_auth();
let connector = TlsConnector::from(std::sync::Arc::new(config));
let server_name = match rustls::pki_types::ServerName::try_from(host.clone()) {
Ok(n) => n,
Err(e) => {
return CheckProbe {
success: false,
rtt_ms: t0.elapsed().as_secs_f64() * 1000.0,
status_code: None,
error: Some(format!("TLS: {}", e)),
timing: Some(TimingBreakdown {
dns_ms,
connect_ms,
..Default::default()
}),
};
}
};
let tls_stream = match connector.connect(server_name, tcp_stream).await {
Ok(s) => s,
Err(e) => {
return CheckProbe {
success: false,
rtt_ms: t0.elapsed().as_secs_f64() * 1000.0,
status_code: None,
error: Some(format!("TLS: {}", e)),
timing: Some(TimingBreakdown {
dns_ms,
connect_ms,
tls_ms: t2.elapsed().as_secs_f64() * 1000.0,
..Default::default()
}),
};
}
};
let tls_ms = t2.elapsed().as_secs_f64() * 1000.0;
let t3 = Instant::now();
let mut tls_stream = tls_stream;
let request = format!(
"GET {} HTTP/1.1\r\nHost: {}\r\nUser-Agent: netutils/0.3\r\nConnection: close\r\n\r\n",
path, host
);
if tls_stream.write_all(request.as_bytes()).await.is_err() {
return CheckProbe {
success: false,
rtt_ms: t0.elapsed().as_secs_f64() * 1000.0,
status_code: None,
error: Some("HTTP: write failed".to_string()),
timing: Some(TimingBreakdown {
dns_ms,
connect_ms,
tls_ms,
..Default::default()
}),
};
}
let mut buf = [0u8; 4096];
let n = match tls_stream.read(&mut buf).await {
Ok(n) => n,
Err(e) => {
return CheckProbe {
success: false,
rtt_ms: t0.elapsed().as_secs_f64() * 1000.0,
status_code: None,
error: Some(format!("HTTP: {}", e)),
timing: Some(TimingBreakdown {
dns_ms,
connect_ms,
tls_ms,
..Default::default()
}),
};
}
};
let ttfb_ms = t3.elapsed().as_secs_f64() * 1000.0;
let total_ms = t0.elapsed().as_secs_f64() * 1000.0;
let response = String::from_utf8_lossy(&buf[..n]);
let status_code = response
.lines()
.next()
.and_then(|line| line.split_whitespace().nth(1))
.and_then(|s| s.parse::<u16>().ok())
.unwrap_or(0);
let is_success = (200..300).contains(&status_code);
let timing_bd = TimingBreakdown {
dns_ms,
connect_ms,
tls_ms,
ttfb_ms,
total_ms,
};
if mode == OutputMode::Table {
let symbol = if is_success { "✓".green() } else { "⚠".yellow() };
println!(
" [{}/{}] {} {} {:.2}ms",
i + 1,
count,
symbol,
status_code,
total_ms
);
println!(" {:<10} {:.2}ms", t("check.timing_dns"), dns_ms);
println!(" {:<10} {:.2}ms", t("check.timing_connect"), connect_ms);
println!(" {:<10} {:.2}ms", t("check.timing_tls"), tls_ms);
println!(" {:<10} {:.2}ms", t("check.timing_ttfb"), ttfb_ms);
println!(" {:<10} {:.2}ms", t("check.timing_total"), total_ms);
}
CheckProbe {
success: is_success,
rtt_ms: total_ms,
status_code: Some(status_code),
error: None,
timing: Some(timing_bd),
}
}
fn compute_stats(probes: &[CheckProbe]) -> CheckStats {
let total = probes.len();
let success = probes.iter().filter(|p| p.success).count();
let rtts: Vec<f64> = probes.iter().filter(|p| p.success).map(|p| p.rtt_ms).collect();
let stats = crate::util::compute_stats(&rtts);
CheckStats {
total,
success,
failed: total - success,
min_ms: stats.min_ms,
max_ms: stats.max_ms,
avg_ms: stats.avg_ms,
}
}
fn print_stats(stats: &CheckStats, is_http: bool) {
println!();
println!("{}", t("ping.stats").bold());
let h_metric = t("common.metric");
let h_value = t("proxy.value");
let headers = [h_metric.as_str(), h_value.as_str()];
let mut rows = Vec::new();
rows.push(vec![t("check.count"), stats.total.to_string()]);
if is_http {
rows.push(vec![t("check.ok_2xx"), stats.success.to_string()]);
} else {
rows.push(vec![t("check.ok"), stats.success.to_string()]);
}
rows.push(vec![t("check.fail_count"), stats.failed.to_string()]);
if let (Some(min), Some(max), Some(avg)) = (stats.min_ms, stats.max_ms, stats.avg_ms) {
rows.push(vec![t("ping.min"), format!("{:.2}ms", min)]);
rows.push(vec![t("ping.max"), format!("{:.2}ms", max)]);
rows.push(vec![t("ping.avg"), format!("{:.2}ms", avg)]);
}
print_table(&headers, &rows);
}