#![allow(clippy::unwrap_used, clippy::expect_used)]
use ant_quic::{NatConfig, P2pConfig, P2pEndpoint, PqcConfig};
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
time::Duration,
};
use tokio::time::timeout;
use tracing_subscriber::EnvFilter;
const TIMEOUT: Duration = Duration::from_secs(5);
fn normalize(addr: SocketAddr) -> SocketAddr {
if addr.ip().is_unspecified() {
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), addr.port())
} else {
addr
}
}
async fn make_node(known: Vec<SocketAddr>) -> P2pEndpoint {
P2pEndpoint::new(
P2pConfig::builder()
.bind_addr(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
.known_peers(known)
.nat(NatConfig {
enable_relay_fallback: false,
..Default::default()
})
.pqc(PqcConfig::default())
.build()
.expect("test config"),
)
.await
.expect("node creation")
}
#[tokio::test]
async fn recv_after_reconnect() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.with_test_writer()
.try_init();
let b = Arc::new(make_node(vec![]).await);
let b_addr = normalize(b.local_addr().expect("bound addr"));
let b_id = b.peer_id();
let a = Arc::new(make_node(vec![b_addr]).await);
let a_id = a.peer_id();
let b2 = Arc::clone(&b);
let accept1 = tokio::spawn(async move { timeout(TIMEOUT, b2.accept()).await });
tokio::time::sleep(Duration::from_millis(100)).await;
let conn = a.connect_addr(b_addr).await.expect("connect");
assert_eq!(conn.peer_id, b_id);
let accepted1 = accept1
.await
.expect("accept1 join")
.expect("accept1 timeout")
.expect("accept1 none");
assert_eq!(accepted1.peer_id, a_id);
tokio::time::sleep(Duration::from_millis(200)).await;
a.send(&b_id, b"msg1").await.expect("send1");
let (from, data) = timeout(TIMEOUT, b.recv())
.await
.expect("recv1 timeout")
.expect("recv1 error");
assert_eq!(from, a_id);
assert_eq!(&data, b"msg1");
a.disconnect(&b_id).await.expect("disconnect a→b");
let _ = b.disconnect(&a_id).await;
tokio::time::sleep(Duration::from_millis(500)).await;
let b3 = Arc::clone(&b);
let accept2 = tokio::spawn(async move { timeout(TIMEOUT, b3.accept()).await });
tokio::time::sleep(Duration::from_millis(100)).await;
let reconnect = a.connect_addr(b_addr).await.expect("reconnect");
assert_eq!(reconnect.peer_id, b_id);
let accepted2 = accept2
.await
.expect("accept2 join")
.expect("accept2 timeout")
.expect("accept2 none");
assert_eq!(accepted2.peer_id, a_id);
tokio::time::sleep(Duration::from_millis(200)).await;
a.send(&b_id, b"msg2").await.expect("send2");
let (from2, data2) = timeout(TIMEOUT, b.recv())
.await
.expect("recv2 TIMED OUT — reader task replacement bug regressed!")
.expect("recv2 error");
assert_eq!(from2, a_id);
assert_eq!(&data2, b"msg2");
let _ = timeout(Duration::from_secs(2), a.shutdown()).await;
let _ = timeout(Duration::from_secs(2), b.shutdown()).await;
}