async-icmp 0.2.1

Async ICMP library
Documentation
use async_icmp::{
    message::echo::EchoSeq,
    ping::{PingMultiplexer, SessionHandle},
    socket::SocketConfig,
};
use clap::Parser as _;
use itertools::Itertools;
use log::{error, info, warn};
use std::{collections, net, process, time};
use tokio::{select, sync::mpsc as tmpsc, task};

/// A ping CLI tool.
///
/// Returns nonzero exit code if there is any packet loss.
#[tokio::main]
async fn main() -> anyhow::Result<process::ExitCode> {
    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("INFO"))
        .format_timestamp_millis()
        .init();

    let cli = Cli::parse();

    let multiplexer = PingMultiplexer::new(SocketConfig::default(), SocketConfig::default())?;
    let (handle, mut recv_channel) = multiplexer
        .add_session(
            cli.dest,
            multiplexer
                .platform_echo_id(cli.dest.into())
                .unwrap_or_else(rand::random),
            rand::random::<[u8; 32]>().to_vec(),
        )
        .await?;

    info!(
        "Pinging {} every {}, {} times",
        cli.dest, cli.between_pings, cli.count
    );

    let (send_tx, mut send_rx) = tmpsc::channel(cli.count.into());
    let send_task = spawn_send_task(
        cli.count,
        cli.between_pings.into(),
        send_tx,
        handle,
        multiplexer.clone(),
    );

    // get sent tstamp but no reply tstamp yet
    let mut send_unpaired_timestamps = collections::HashMap::<EchoSeq, time::Instant>::new();
    // got reply tstamp but no sent tstamp yet
    let mut recv_unpaired_timestamps = collections::HashMap::<EchoSeq, time::Instant>::new();
    // send tstamps with not yet timed out pending replies.
    // Like `send_unpaired_timestamps` but filtered out entries that are too old.
    let mut waitable_seqs = collections::HashMap::<EchoSeq, time::Instant>::new();
    // durations for matching send/recv pairs
    let mut durations = collections::HashMap::new();

    let mut drained_send = false;
    let mut drained_recv = false;

    loop {
        // keep the set from growing without bound during packet loss:
        // retain only send tstamps that haven't expired
        let oldest_non_expired_timestamp = time::Instant::now()
            .checked_sub(cli.timeout.into())
            .expect("Any reasonable timeout will still fit in Instant");
        waitable_seqs.retain(|_seq, tstamp| *tstamp >= oldest_non_expired_timestamp);

        // Wait up to `timeout` past the latest send tstamp
        let recv_timeout_at = waitable_seqs
            .iter()
            .max_by_key(|(_seq, ts)| **ts)
            .and_then(|(_seq, ts)| ts.checked_add(cli.timeout.into()))
            .unwrap_or_else(||
                // If there's no response to wait for, use effectively infinite timeout so we don't
                // prematurely bail when we could be waiting for the send channel
                time::Instant::now()
                    .checked_add(time::Duration::from_secs(3600 * 24 * 365))
                    .expect("1yr in the future should fit in Instant"));

        // We've stopped sending, and we've received every reply we expect or given up waiting,
        // so the session can end. This will close the recv channel.
        if drained_send && (send_unpaired_timestamps.is_empty() || waitable_seqs.is_empty()) {
            info!("closing session");
            multiplexer.close_session(handle).await?;
        }

        let opt_dur = select! {
            send = send_rx.recv(), if !drained_send => {
                match send {
                    None => {
                        // channel just closed
                        drained_send = true;
                        continue;
                    }
                    Some((seq, tstamp)) =>  {
                        // task scheduling might lead to finding the send after the recv
                        if let Some(recv_ts) = recv_unpaired_timestamps.remove(&seq) {
                            Some((seq, recv_ts.saturating_duration_since(tstamp)))
                        } else {
                            send_unpaired_timestamps.insert(seq, tstamp);
                            waitable_seqs.insert(seq, tstamp);
                            None
                        }
                    }
                }
            }
            // include waitable_seqs in condition so we eventually stop waiting when a dropped
            // response is filtered out, even though we haven't seen the final None in the channel
            recv = tokio::time::timeout_at(recv_timeout_at.into(), recv_channel.recv()), if !waitable_seqs.is_empty() && !drained_recv => {
                match recv {
                    Ok(opt) => match opt {
                        None => {
                            // channel just closed
                            drained_recv = true;
                            continue;
                        }
                        Some(reply_ts) => {
                            if let Some(send_ts) = send_unpaired_timestamps.remove(&reply_ts.seq) {
                                waitable_seqs.remove(&reply_ts.seq);
                                Some((reply_ts.seq, reply_ts.received_at.duration_since(send_ts)))
                            } else {
                                recv_unpaired_timestamps.insert(reply_ts.seq, reply_ts.received_at);
                                None
                            }
                        }
                    }
                    Err(_e) => {
                        // we timed out while waiting for recv.
                        // loop again to let waitable set filtering apply
                        continue;
                    }
                }

            }
            else => {
                // no send seq #'s left, and nothing to wait for: we're done
                break;
            }
        };

        if let Some((seq, dur)) = opt_dur {
            info!("Ping seq={seq:?} response received in {dur:?}");
            durations.insert(seq, dur);
        }
    }

    drop(send_rx);
    send_task.await?;
    multiplexer.shutdown().await;

    info!(
        "{} pings total, {} responses received ({}%{}), min {:?}, mean {:?}, max {:?}",
        cli.count,
        durations.len(),
        durations.len() as f64 / cli.count as f64 * 100.0_f64,
        if send_unpaired_timestamps.is_empty() {
            "".to_string()
        } else {
            format!(
                ", missing seqs {:?}",
                send_unpaired_timestamps
                    .keys()
                    .sorted_by_key(|seq| seq.as_be())
                    .collect_vec()
            )
        },
        durations.values().min().unwrap_or(&time::Duration::ZERO),
        durations.values().sum::<time::Duration>()
            / if durations.is_empty() {
                1 // sum will be zero anyway
            } else {
                durations.len().try_into().expect("# pings will fit in u32")
            },
        durations.values().max().unwrap_or(&time::Duration::ZERO),
    );

    if durations.len() == cli.count.into() {
        Ok(process::ExitCode::SUCCESS)
    } else {
        Ok(process::ExitCode::FAILURE)
    }
}

fn spawn_send_task(
    count: u16,
    between_pings: time::Duration,
    tx: tmpsc::Sender<(EchoSeq, time::Instant)>,
    session_handle: SessionHandle,
    multiplexer: PingMultiplexer,
) -> task::JoinHandle<()> {
    tokio::spawn(async move {
        let start = time::Instant::now();

        for seq in (0..count).map(EchoSeq::from_be) {
            tokio::time::sleep_until((start + between_pings * seq.as_be().into()).into()).await;

            let tstamp = match multiplexer.send_ping(session_handle, seq).await {
                Err(e) => {
                    warn!("Could not send ping seq={seq:?}: {}", e);
                    // don't send a timestamp for a message we didn't send
                    continue;
                }
                Ok(ts) => {
                    info!("Ping seq={:?} sent", seq);
                    ts
                }
            };

            if let Err(e) = tx.try_send((seq, tstamp)) {
                match e {
                    tmpsc::error::TrySendError::Full(_) => {
                        error!(
                            "Dropped send timestamp -- consumer dead? seq={:?} time={:?}",
                            seq, tstamp
                        );
                    }
                    tmpsc::error::TrySendError::Closed(_) => {
                        // upstream must have shut down
                        return;
                    }
                }
            }
        }
    })
}

#[derive(clap::Parser)]
struct Cli {
    /// The number of times to ping
    #[arg(short, long, default_value_t = 3)]
    count: u16,

    /// How long to wait between sending pings
    #[arg(long, default_value_t = time::Duration::from_millis(250).into())]
    between_pings: humantime::Duration,

    /// Wait at least this long for a response
    #[arg(long, default_value_t = time::Duration::from_secs(2).into())]
    timeout: humantime::Duration,

    /// Ip address to ping
    dest: net::IpAddr,
}