#![cfg(all(feature = "net", feature = "nat-traversal"))]
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use net::adapter::net::behavior::capability::CapabilitySet;
use net::adapter::net::traversal::TraversalError;
use net::adapter::net::{EntityKeypair, MeshNode, MeshNodeConfig, SocketBufferConfig};
const TEST_BUFFER_SIZE: usize = 256 * 1024;
const PSK: [u8; 32] = [0x42u8; 32];
fn test_config() -> MeshNodeConfig {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut cfg = MeshNodeConfig::new(addr, PSK)
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(5))
.with_handshake(3, Duration::from_secs(2));
cfg.socket_buffers = SocketBufferConfig {
send_buffer_size: TEST_BUFFER_SIZE,
recv_buffer_size: TEST_BUFFER_SIZE,
};
cfg
}
async fn build_node() -> Arc<MeshNode> {
Arc::new(
MeshNode::new(EntityKeypair::generate(), test_config())
.await
.expect("MeshNode::new"),
)
}
async fn connect_pair(a: &Arc<MeshNode>, b: &Arc<MeshNode>) {
let a_id = a.node_id();
let b_pub = *b.public_key();
let b_addr = b.local_addr();
let b_id = b.node_id();
let b_clone = b.clone();
let accept = tokio::spawn(async move { b_clone.accept(a_id).await });
a.connect(b_addr, &b_pub, b_id)
.await
.expect("connect failed");
accept
.await
.expect("accept task panicked")
.expect("accept failed");
}
async fn rendezvous_topology() -> (Arc<MeshNode>, Arc<MeshNode>, Arc<MeshNode>, Arc<MeshNode>) {
let a = build_node().await;
let r = build_node().await;
let b = build_node().await;
let x = build_node().await;
connect_pair(&a, &r).await;
connect_pair(&b, &r).await;
connect_pair(&a, &x).await;
connect_pair(&b, &x).await;
connect_pair(&r, &x).await;
a.start();
r.start();
b.start();
x.start();
(a, r, b, x)
}
async fn wait_for<F: Fn() -> bool>(limit: Duration, check: F) -> bool {
let start = tokio::time::Instant::now();
while start.elapsed() < limit {
if check() {
return true;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
check()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn coordinator_fans_out_to_both_endpoints() {
let (a, r, b, _x) = rendezvous_topology().await;
a.reclassify_nat().await;
b.reclassify_nat().await;
a.announce_capabilities(CapabilitySet::new())
.await
.expect("A announce");
b.announce_capabilities(CapabilitySet::new())
.await
.expect("B announce");
let a_id = a.node_id();
let b_id = b.node_id();
let a_bind = a.local_addr();
let b_bind = b.local_addr();
let r_for_poll = r.clone();
let b_id_copy = b_id;
let b_bind_copy = b_bind;
let reflex_ready = wait_for(Duration::from_secs(3), || {
let got = r_for_poll.peer_reflex_addr(b_id_copy);
got == Some(b_bind_copy)
})
.await;
assert!(
reflex_ready,
"R should see B's reflex in its capability index; got {:?}",
r.peer_reflex_addr(b_id),
);
assert_eq!(
r.peer_reflex_addr(a_id),
Some(a_bind),
"R should see A's reflex too",
);
let b_clone = b.clone();
let r_id = r.node_id();
let b_wait = tokio::spawn(async move { b_clone.await_punch_introduce(a_id, r_id).await });
tokio::time::sleep(Duration::from_millis(50)).await;
let a_intro = a
.request_punch(r.node_id(), b_id, a_bind)
.await
.expect("A should receive PunchIntroduce");
let b_intro = b_wait
.await
.expect("B wait task panicked")
.expect("B should receive PunchIntroduce");
assert_eq!(a_intro.peer, b_id, "A's introduce.peer should be B");
assert_eq!(
a_intro.peer_reflex, b_bind,
"A's introduce.peer_reflex should be B's reflex",
);
assert_eq!(b_intro.peer, a_id, "B's introduce.peer should be A");
assert_eq!(
b_intro.peer_reflex, a_bind,
"B's introduce.peer_reflex should be A's reflex",
);
assert_eq!(
a_intro.fire_at_ms, b_intro.fire_at_ms,
"A and B should see the same fire_at_ms (single coordinator compute)",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn request_punch_times_out_when_target_has_no_cached_reflex() {
let (a, r, b, _x) = rendezvous_topology().await;
a.reclassify_nat().await;
a.announce_capabilities(CapabilitySet::new())
.await
.expect("A announce");
let start = tokio::time::Instant::now();
let result = a
.request_punch(r.node_id(), b.node_id(), a.local_addr())
.await;
let elapsed = start.elapsed();
match result {
Err(TraversalError::PunchFailed) => {}
other => panic!("expected PunchFailed, got {other:?}"),
}
assert!(
elapsed >= Duration::from_secs(4),
"should wait ~punch_deadline (5s) before failing; elapsed {elapsed:?}",
);
assert!(
elapsed < Duration::from_secs(6),
"should not wait much past punch_deadline; elapsed {elapsed:?}",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn request_punch_unknown_relay_fails_fast() {
let a = build_node().await;
a.start();
let start = tokio::time::Instant::now();
let err = a
.request_punch(0xDEAD_BEEF, 0xCAFE, a.local_addr())
.await
.expect_err("unknown relay should fail");
let elapsed = start.elapsed();
match err {
TraversalError::PeerNotReachable => {}
other => panic!("expected PeerNotReachable, got {other:?}"),
}
assert!(
elapsed < Duration::from_millis(500),
"fast-fail took {elapsed:?}; want < 500 ms",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn request_punch_times_out_when_targets_reflex_was_evicted_by_ttl_gc() {
let r = build_node().await;
let a = build_node().await;
let b = build_node().await;
let x = build_node().await;
connect_pair(&a, &r).await;
connect_pair(&b, &r).await;
connect_pair(&a, &x).await;
connect_pair(&b, &x).await;
connect_pair(&r, &x).await;
a.start();
r.start();
b.start();
x.start();
a.reclassify_nat().await;
a.announce_capabilities(CapabilitySet::new())
.await
.expect("A announce");
b.reclassify_nat().await;
b.announce_capabilities_with(CapabilitySet::new(), Duration::from_secs(1), true)
.await
.expect("B short-TTL announce");
let r_for_poll = r.clone();
let b_id = b.node_id();
let b_bind = b.local_addr();
let indexed = wait_for(Duration::from_secs(3), || {
r_for_poll.peer_reflex_addr(b_id) == Some(b_bind)
})
.await;
assert!(
indexed,
"R must index B's announcement before its TTL expires"
);
let r_for_evict = r.clone();
let evicted = wait_for(Duration::from_secs(3), || {
r_for_evict.peer_reflex_addr(b_id).is_none()
})
.await;
assert!(
evicted,
"R's capability fold should have evicted B by now; got {:?}",
r.peer_reflex_addr(b_id),
);
let start = tokio::time::Instant::now();
let result = a
.request_punch(r.node_id(), b.node_id(), a.local_addr())
.await;
let elapsed = start.elapsed();
match result {
Err(TraversalError::PunchFailed) => {}
other => panic!("expected PunchFailed after TTL eviction, got {other:?}"),
}
assert!(
elapsed >= Duration::from_secs(4),
"should wait ~punch_deadline (5s) before failing; elapsed {elapsed:?}",
);
assert!(
elapsed < Duration::from_secs(6),
"should not wait much past punch_deadline; elapsed {elapsed:?}",
);
}