async-icmp 0.2.1

Async ICMP library
Documentation
use super::*;
use anyhow::Error;
use itertools::Itertools;
use rand::Rng;
use std::collections;

#[tokio::test]
async fn closing_session_closes_things() -> anyhow::Result<()> {
    let multiplexer = PingMultiplexer::new(SocketConfig::default(), SocketConfig::default())?;

    let (handle, rx) = multiplexer
        .add_session(
            net::Ipv4Addr::LOCALHOST.into(),
            multiplexer
                .platform_echo_id(IpVersion::V4)
                .unwrap_or_else(rand::random),
            rand::random::<[u8; 32]>().to_vec(),
        )
        .await?;

    assert!(!rx.is_closed());

    multiplexer.close_session(handle).await?;

    // receive channel should be closed
    assert!(rx.is_closed());

    // sending should fail
    match multiplexer
        .send_ping(handle, EchoSeq::from_be(3))
        .await
        .unwrap_err()
    {
        SendPingError::InvalidSessionHandle => (),
        other => panic!("Unexpected error: {other}"),
    }

    // closing again is not an error
    multiplexer.close_session(handle).await?;

    Ok(())
}

#[tokio::test]
async fn shutdown_twice_is_ok() -> anyhow::Result<()> {
    let multiplexer = PingMultiplexer::new(SocketConfig::default(), SocketConfig::default())?;

    multiplexer.shutdown().await;
    multiplexer.shutdown().await;

    Ok(())
}

#[tokio::test]
async fn dropping_rx_closes_session_at_next_reply() -> anyhow::Result<()> {
    let multiplexer = PingMultiplexer::new(SocketConfig::default(), SocketConfig::default())?;

    let ip_version = IpVersion::V4;
    let (handle, rx) = multiplexer
        .add_session(
            net::Ipv4Addr::LOCALHOST.into(),
            multiplexer
                .platform_echo_id(ip_version)
                .unwrap_or_else(rand::random),
            rand::random::<[u8; 32]>().to_vec(),
        )
        .await?;

    drop(rx);

    // first send will work because the task hasn't yet realized its tx is now dead
    multiplexer.send_ping(handle, EchoSeq::from_be(3)).await?;

    // localhost is usually a few hundred micros
    tokio::time::sleep(time::Duration::from_millis(10)).await;

    match multiplexer
        .send_ping(handle, EchoSeq::from_be(3))
        .await
        .unwrap_err()
    {
        SendPingError::InvalidSessionHandle => (),
        other => panic!("Unexpected error: {other}"),
    }

    Ok(())
}

#[tokio::test]
async fn duplicate_session_rejected() -> anyhow::Result<()> {
    let multiplexer = PingMultiplexer::new(SocketConfig::default(), SocketConfig::default())?;

    let ip_version = IpVersion::V4;
    let id = multiplexer
        .platform_echo_id(ip_version)
        .unwrap_or_else(rand::random);
    let ip = net::Ipv4Addr::LOCALHOST.into();
    let data = vec![1, 2, 3, 4];
    let _ = multiplexer.add_session(ip, id, data.clone()).await?;

    let res = multiplexer.add_session(ip, id, data).await;

    match res.unwrap_err() {
        AddSessionError::Duplicate => {
            // expected
        }
        AddSessionError::Lifecycle(_) => {
            panic!()
        }
    };

    Ok(())
}

#[tokio::test]
async fn not_reading_causes_dropped_timestamps_but_can_be_resumed() -> anyhow::Result<()> {
    let multiplexer = PingMultiplexer::new(SocketConfig::default(), SocketConfig::default())?;

    let ip_version = IpVersion::V4;
    let (handle, mut rx) = multiplexer
        .add_session(
            net::Ipv4Addr::LOCALHOST.into(),
            multiplexer
                .platform_echo_id(ip_version)
                .unwrap_or_else(rand::random),
            rand::random::<[u8; 32]>().to_vec(),
        )
        .await?;

    for seq in (0..100).map(EchoSeq::from_be) {
        multiplexer.send_ping(handle, seq).await?;
        // slow down for macOS
        tokio::time::sleep(time::Duration::from_millis(1)).await;
    }

    // now start reading
    let recv_handle = tokio::spawn(async move {
        let mut vec = vec![];
        while let Some(ts) = rx.recv().await {
            vec.push(ts);
        }

        vec
    });

    tokio::time::sleep(time::Duration::from_millis(10)).await;

    // should get all of these
    for seq in (100..200).map(EchoSeq::from_be) {
        multiplexer.send_ping(handle, seq).await?;
        // slow down for macOS
        tokio::time::sleep(time::Duration::from_millis(1)).await;
    }

    // wait for the last replies to trickle in
    tokio::time::sleep(time::Duration::from_millis(10)).await;

    multiplexer.close_session(handle).await?;

    let recv_ts = recv_handle.await?;

    // should have a few buffered from the first, and all of the rest
    assert!(
        recv_ts.len() > 100 && recv_ts.len() < 200,
        "len: {}",
        recv_ts.len()
    );

    let recv_map = recv_ts
        .iter()
        .map(|ts| (ts.seq, ts.received_at))
        .collect::<collections::HashMap<_, _>>();
    assert_eq!(recv_ts.len(), recv_map.len());
    // at least some of the first batch
    assert!(
        (0..100)
            .map(EchoSeq::from_be)
            .filter(|seq| recv_map.contains_key(seq))
            .count()
            > 0
    );

    let missing = (100..200)
        .map(EchoSeq::from_be)
        .filter(|seq| !recv_map.contains_key(seq))
        .collect_vec();
    assert_eq!(Vec::<EchoSeq>::new(), missing);

    Ok(())
}

#[tokio::test]
async fn recv_with_long_data_gets_all_ipv4() -> anyhow::Result<()> {
    recv_with_long_data(net::Ipv4Addr::LOCALHOST.into()).await?
}

#[tokio::test]
async fn recv_with_long_data_gets_all_ipv6() -> anyhow::Result<()> {
    recv_with_long_data(net::Ipv6Addr::LOCALHOST.into()).await?
}

async fn recv_with_long_data(addr: net::IpAddr) -> Result<Result<(), Error>, Error> {
    let _ = env_logger::builder()
        .is_test(true)
        .format_timestamp_millis()
        .try_init();

    let multiplexer = PingMultiplexer::new(SocketConfig::default(), SocketConfig::default())?;

    // force a receive buffer resize
    let mut data = vec![0; 100];
    rand::thread_rng().fill(&mut data[..]);
    let (handle, mut rx) = multiplexer
        .add_session(
            addr,
            multiplexer
                .platform_echo_id(addr.into())
                .unwrap_or_else(rand::random),
            data,
        )
        .await?;

    // different seqs to make logging more comprehensible
    let seq = EchoSeq::from_be(match IpVersion::from(addr) {
        IpVersion::V4 => 4,
        IpVersion::V6 => 6,
    });
    multiplexer.send_ping(handle, seq).await?;

    let ts = tokio::time::timeout(time::Duration::from_secs(1), rx.recv())
        .await?
        .unwrap();

    assert_eq!(seq, ts.seq);

    Ok(Ok(()))
}