use async_icmp::{
message::echo::EchoSeq,
ping::{PingMultiplexer, SessionHandle},
socket::SocketConfig,
};
use clap::Parser as _;
use itertools::Itertools;
use log::{error, info, warn};
use std::{collections, net, process, time};
use tokio::{select, sync::mpsc as tmpsc, task};
#[tokio::main]
async fn main() -> anyhow::Result<process::ExitCode> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("INFO"))
.format_timestamp_millis()
.init();
let cli = Cli::parse();
let multiplexer = PingMultiplexer::new(SocketConfig::default(), SocketConfig::default())?;
let (handle, mut recv_channel) = multiplexer
.add_session(
cli.dest,
multiplexer
.platform_echo_id(cli.dest.into())
.unwrap_or_else(rand::random),
rand::random::<[u8; 32]>().to_vec(),
)
.await?;
info!(
"Pinging {} every {}, {} times",
cli.dest, cli.between_pings, cli.count
);
let (send_tx, mut send_rx) = tmpsc::channel(cli.count.into());
let send_task = spawn_send_task(
cli.count,
cli.between_pings.into(),
send_tx,
handle,
multiplexer.clone(),
);
let mut send_unpaired_timestamps = collections::HashMap::<EchoSeq, time::Instant>::new();
let mut recv_unpaired_timestamps = collections::HashMap::<EchoSeq, time::Instant>::new();
let mut waitable_seqs = collections::HashMap::<EchoSeq, time::Instant>::new();
let mut durations = collections::HashMap::new();
let mut drained_send = false;
let mut drained_recv = false;
loop {
let oldest_non_expired_timestamp = time::Instant::now()
.checked_sub(cli.timeout.into())
.expect("Any reasonable timeout will still fit in Instant");
waitable_seqs.retain(|_seq, tstamp| *tstamp >= oldest_non_expired_timestamp);
let recv_timeout_at = waitable_seqs
.iter()
.max_by_key(|(_seq, ts)| **ts)
.and_then(|(_seq, ts)| ts.checked_add(cli.timeout.into()))
.unwrap_or_else(||
time::Instant::now()
.checked_add(time::Duration::from_secs(3600 * 24 * 365))
.expect("1yr in the future should fit in Instant"));
if drained_send && (send_unpaired_timestamps.is_empty() || waitable_seqs.is_empty()) {
info!("closing session");
multiplexer.close_session(handle).await?;
}
let opt_dur = select! {
send = send_rx.recv(), if !drained_send => {
match send {
None => {
drained_send = true;
continue;
}
Some((seq, tstamp)) => {
if let Some(recv_ts) = recv_unpaired_timestamps.remove(&seq) {
Some((seq, recv_ts.saturating_duration_since(tstamp)))
} else {
send_unpaired_timestamps.insert(seq, tstamp);
waitable_seqs.insert(seq, tstamp);
None
}
}
}
}
recv = tokio::time::timeout_at(recv_timeout_at.into(), recv_channel.recv()), if !waitable_seqs.is_empty() && !drained_recv => {
match recv {
Ok(opt) => match opt {
None => {
drained_recv = true;
continue;
}
Some(reply_ts) => {
if let Some(send_ts) = send_unpaired_timestamps.remove(&reply_ts.seq) {
waitable_seqs.remove(&reply_ts.seq);
Some((reply_ts.seq, reply_ts.received_at.duration_since(send_ts)))
} else {
recv_unpaired_timestamps.insert(reply_ts.seq, reply_ts.received_at);
None
}
}
}
Err(_e) => {
continue;
}
}
}
else => {
break;
}
};
if let Some((seq, dur)) = opt_dur {
info!("Ping seq={seq:?} response received in {dur:?}");
durations.insert(seq, dur);
}
}
drop(send_rx);
send_task.await?;
multiplexer.shutdown().await;
info!(
"{} pings total, {} responses received ({}%{}), min {:?}, mean {:?}, max {:?}",
cli.count,
durations.len(),
durations.len() as f64 / cli.count as f64 * 100.0_f64,
if send_unpaired_timestamps.is_empty() {
"".to_string()
} else {
format!(
", missing seqs {:?}",
send_unpaired_timestamps
.keys()
.sorted_by_key(|seq| seq.as_be())
.collect_vec()
)
},
durations.values().min().unwrap_or(&time::Duration::ZERO),
durations.values().sum::<time::Duration>()
/ if durations.is_empty() {
1 } else {
durations.len().try_into().expect("# pings will fit in u32")
},
durations.values().max().unwrap_or(&time::Duration::ZERO),
);
if durations.len() == cli.count.into() {
Ok(process::ExitCode::SUCCESS)
} else {
Ok(process::ExitCode::FAILURE)
}
}
fn spawn_send_task(
count: u16,
between_pings: time::Duration,
tx: tmpsc::Sender<(EchoSeq, time::Instant)>,
session_handle: SessionHandle,
multiplexer: PingMultiplexer,
) -> task::JoinHandle<()> {
tokio::spawn(async move {
let start = time::Instant::now();
for seq in (0..count).map(EchoSeq::from_be) {
tokio::time::sleep_until((start + between_pings * seq.as_be().into()).into()).await;
let tstamp = match multiplexer.send_ping(session_handle, seq).await {
Err(e) => {
warn!("Could not send ping seq={seq:?}: {}", e);
continue;
}
Ok(ts) => {
info!("Ping seq={:?} sent", seq);
ts
}
};
if let Err(e) = tx.try_send((seq, tstamp)) {
match e {
tmpsc::error::TrySendError::Full(_) => {
error!(
"Dropped send timestamp -- consumer dead? seq={:?} time={:?}",
seq, tstamp
);
}
tmpsc::error::TrySendError::Closed(_) => {
return;
}
}
}
}
})
}
#[derive(clap::Parser)]
struct Cli {
#[arg(short, long, default_value_t = 3)]
count: u16,
#[arg(long, default_value_t = time::Duration::from_millis(250).into())]
between_pings: humantime::Duration,
#[arg(long, default_value_t = time::Duration::from_secs(2).into())]
timeout: humantime::Duration,
dest: net::IpAddr,
}