use super::*;
use anyhow::Error;
use itertools::Itertools;
use rand::Rng;
use std::collections;
#[tokio::test]
async fn closing_session_closes_things() -> anyhow::Result<()> {
let multiplexer = PingMultiplexer::new(SocketConfig::default(), SocketConfig::default())?;
let (handle, rx) = multiplexer
.add_session(
net::Ipv4Addr::LOCALHOST.into(),
multiplexer
.platform_echo_id(IpVersion::V4)
.unwrap_or_else(rand::random),
rand::random::<[u8; 32]>().to_vec(),
)
.await?;
assert!(!rx.is_closed());
multiplexer.close_session(handle).await?;
assert!(rx.is_closed());
match multiplexer
.send_ping(handle, EchoSeq::from_be(3))
.await
.unwrap_err()
{
SendPingError::InvalidSessionHandle => (),
other => panic!("Unexpected error: {other}"),
}
multiplexer.close_session(handle).await?;
Ok(())
}
#[tokio::test]
async fn shutdown_twice_is_ok() -> anyhow::Result<()> {
let multiplexer = PingMultiplexer::new(SocketConfig::default(), SocketConfig::default())?;
multiplexer.shutdown().await;
multiplexer.shutdown().await;
Ok(())
}
#[tokio::test]
async fn dropping_rx_closes_session_at_next_reply() -> anyhow::Result<()> {
let multiplexer = PingMultiplexer::new(SocketConfig::default(), SocketConfig::default())?;
let ip_version = IpVersion::V4;
let (handle, rx) = multiplexer
.add_session(
net::Ipv4Addr::LOCALHOST.into(),
multiplexer
.platform_echo_id(ip_version)
.unwrap_or_else(rand::random),
rand::random::<[u8; 32]>().to_vec(),
)
.await?;
drop(rx);
multiplexer.send_ping(handle, EchoSeq::from_be(3)).await?;
tokio::time::sleep(time::Duration::from_millis(10)).await;
match multiplexer
.send_ping(handle, EchoSeq::from_be(3))
.await
.unwrap_err()
{
SendPingError::InvalidSessionHandle => (),
other => panic!("Unexpected error: {other}"),
}
Ok(())
}
#[tokio::test]
async fn duplicate_session_rejected() -> anyhow::Result<()> {
let multiplexer = PingMultiplexer::new(SocketConfig::default(), SocketConfig::default())?;
let ip_version = IpVersion::V4;
let id = multiplexer
.platform_echo_id(ip_version)
.unwrap_or_else(rand::random);
let ip = net::Ipv4Addr::LOCALHOST.into();
let data = vec![1, 2, 3, 4];
let _ = multiplexer.add_session(ip, id, data.clone()).await?;
let res = multiplexer.add_session(ip, id, data).await;
match res.unwrap_err() {
AddSessionError::Duplicate => {
}
AddSessionError::Lifecycle(_) => {
panic!()
}
};
Ok(())
}
#[tokio::test]
async fn not_reading_causes_dropped_timestamps_but_can_be_resumed() -> anyhow::Result<()> {
let multiplexer = PingMultiplexer::new(SocketConfig::default(), SocketConfig::default())?;
let ip_version = IpVersion::V4;
let (handle, mut rx) = multiplexer
.add_session(
net::Ipv4Addr::LOCALHOST.into(),
multiplexer
.platform_echo_id(ip_version)
.unwrap_or_else(rand::random),
rand::random::<[u8; 32]>().to_vec(),
)
.await?;
for seq in (0..100).map(EchoSeq::from_be) {
multiplexer.send_ping(handle, seq).await?;
tokio::time::sleep(time::Duration::from_millis(1)).await;
}
let recv_handle = tokio::spawn(async move {
let mut vec = vec![];
while let Some(ts) = rx.recv().await {
vec.push(ts);
}
vec
});
tokio::time::sleep(time::Duration::from_millis(10)).await;
for seq in (100..200).map(EchoSeq::from_be) {
multiplexer.send_ping(handle, seq).await?;
tokio::time::sleep(time::Duration::from_millis(1)).await;
}
tokio::time::sleep(time::Duration::from_millis(10)).await;
multiplexer.close_session(handle).await?;
let recv_ts = recv_handle.await?;
assert!(
recv_ts.len() > 100 && recv_ts.len() < 200,
"len: {}",
recv_ts.len()
);
let recv_map = recv_ts
.iter()
.map(|ts| (ts.seq, ts.received_at))
.collect::<collections::HashMap<_, _>>();
assert_eq!(recv_ts.len(), recv_map.len());
assert!(
(0..100)
.map(EchoSeq::from_be)
.filter(|seq| recv_map.contains_key(seq))
.count()
> 0
);
let missing = (100..200)
.map(EchoSeq::from_be)
.filter(|seq| !recv_map.contains_key(seq))
.collect_vec();
assert_eq!(Vec::<EchoSeq>::new(), missing);
Ok(())
}
#[tokio::test]
async fn recv_with_long_data_gets_all_ipv4() -> anyhow::Result<()> {
recv_with_long_data(net::Ipv4Addr::LOCALHOST.into()).await?
}
#[tokio::test]
async fn recv_with_long_data_gets_all_ipv6() -> anyhow::Result<()> {
recv_with_long_data(net::Ipv6Addr::LOCALHOST.into()).await?
}
async fn recv_with_long_data(addr: net::IpAddr) -> Result<Result<(), Error>, Error> {
let _ = env_logger::builder()
.is_test(true)
.format_timestamp_millis()
.try_init();
let multiplexer = PingMultiplexer::new(SocketConfig::default(), SocketConfig::default())?;
let mut data = vec![0; 100];
rand::thread_rng().fill(&mut data[..]);
let (handle, mut rx) = multiplexer
.add_session(
addr,
multiplexer
.platform_echo_id(addr.into())
.unwrap_or_else(rand::random),
data,
)
.await?;
let seq = EchoSeq::from_be(match IpVersion::from(addr) {
IpVersion::V4 => 4,
IpVersion::V6 => 6,
});
multiplexer.send_ping(handle, seq).await?;
let ts = tokio::time::timeout(time::Duration::from_secs(1), rx.recv())
.await?
.unwrap();
assert_eq!(seq, ts.seq);
Ok(Ok(()))
}