#![cfg(feature = "net")]
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use net::adapter::net::{EntityKeypair, MeshNode, MeshNodeConfig, SocketBufferConfig};
use net::error::AdapterError;
const TEST_BUFFER_SIZE: usize = 256 * 1024;
const PSK: [u8; 32] = [0x42u8; 32];
fn test_config() -> MeshNodeConfig {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut cfg = MeshNodeConfig::new(addr, PSK)
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(5))
.with_handshake(3, Duration::from_secs(2));
cfg.socket_buffers = SocketBufferConfig {
send_buffer_size: TEST_BUFFER_SIZE,
recv_buffer_size: TEST_BUFFER_SIZE,
};
cfg
}
async fn build_node() -> Arc<MeshNode> {
Arc::new(
MeshNode::new(EntityKeypair::generate(), test_config())
.await
.expect("MeshNode::new"),
)
}
async fn connect_pair(a: &Arc<MeshNode>, b: &Arc<MeshNode>) {
let a_id = a.node_id();
let b_pub = *b.public_key();
let b_addr = b.local_addr();
let b_id = b.node_id();
let b_clone = b.clone();
let accept = tokio::spawn(async move { b_clone.accept(a_id).await });
a.connect(b_addr, &b_pub, b_id)
.await
.expect("connect failed");
accept
.await
.expect("accept task panicked")
.expect("accept failed");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn send_to_unknown_peer_returns_clean_error() {
let a = build_node().await;
let bogus: SocketAddr = "127.0.0.1:1".parse().unwrap();
let err = a
.send_subprotocol(bogus, 0x0500, b"payload")
.await
.expect_err("send to unknown peer must fail");
match err {
AdapterError::Connection(msg) => {
assert!(
msg.contains("unknown peer"),
"error message should mention unknown peer; got {msg:?}",
);
}
other => panic!("expected Connection error; got {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_send_to_unknown_peer_is_consistently_errored() {
let a = build_node().await;
let bogus: SocketAddr = "127.0.0.1:2".parse().unwrap();
let mut handles = Vec::with_capacity(16);
for _ in 0..16 {
let a_clone = a.clone();
handles.push(tokio::spawn(async move {
a_clone.send_subprotocol(bogus, 0x0500, b"payload").await
}));
}
let mut ok_count = 0;
let mut err_count = 0;
for h in handles {
match h.await.expect("task panicked") {
Ok(()) => ok_count += 1,
Err(AdapterError::Connection(msg)) => {
assert!(
msg.contains("unknown peer"),
"every concurrent failure must be the unknown-peer error; got {msg:?}",
);
err_count += 1;
}
Err(other) => panic!("unexpected error variant: {other:?}"),
}
}
assert_eq!(
ok_count, 0,
"no concurrent send should have spuriously succeeded"
);
assert_eq!(
err_count, 16,
"every concurrent send should have errored cleanly"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn send_to_partition_filtered_peer_returns_ok_silently() {
let a = build_node().await;
let b = build_node().await;
connect_pair(&a, &b).await;
a.start();
b.start();
let b_addr = b.local_addr();
a.block_peer(b_addr);
assert!(a.is_blocked(&b_addr));
let res = a.send_subprotocol(b_addr, 0x0500, b"payload").await;
assert!(
res.is_ok(),
"send to partition-filtered peer must be a silent Ok; got {res:?}",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn send_empty_payload_does_not_panic() {
let a = build_node().await;
let b = build_node().await;
connect_pair(&a, &b).await;
a.start();
b.start();
let b_addr = b.local_addr();
a.send_subprotocol(b_addr, 0x0500, b"")
.await
.expect("empty payload send should succeed");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn send_with_reserved_subprotocol_id_succeeds() {
let a = build_node().await;
let b = build_node().await;
connect_pair(&a, &b).await;
a.start();
b.start();
let b_addr = b.local_addr();
a.send_subprotocol(b_addr, 0x0002, b"payload")
.await
.expect("reserved-range ID must not error at send time");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn send_with_vendor_range_subprotocol_id_succeeds() {
let a = build_node().await;
let b = build_node().await;
connect_pair(&a, &b).await;
a.start();
b.start();
let b_addr = b.local_addr();
a.send_subprotocol(b_addr, 0x7777, b"vendor payload")
.await
.expect("vendor-range ID must not error at send time");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn send_with_max_subprotocol_id_succeeds() {
let a = build_node().await;
let b = build_node().await;
connect_pair(&a, &b).await;
a.start();
b.start();
let b_addr = b.local_addr();
a.send_subprotocol(b_addr, u16::MAX, b"edge")
.await
.expect("u16::MAX subprotocol ID must not overflow or panic");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn receiver_survives_unknown_subprotocol_id() {
let a = build_node().await;
let b = build_node().await;
connect_pair(&a, &b).await;
a.start();
b.start();
let b_addr = b.local_addr();
for id in [0x0002u16, 0x03FF, 0x1234, 0x7FFF, 0xEFFF] {
a.send_subprotocol(b_addr, id, b"junk")
.await
.expect("send with unknown ID must not error");
}
tokio::time::sleep(Duration::from_millis(100)).await;
let a_id = a.node_id();
a.announce_capabilities(net::adapter::net::behavior::capability::CapabilitySet::new())
.await
.expect("real announce after junk barrage");
let mut propagated = false;
for _ in 0..40 {
if b.test_capability_fold_has(a_id) {
propagated = true;
break;
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
assert!(
propagated,
"B's receive loop must survive the unknown-ID barrage and still \
dispatch a follow-up capability announcement — if B's recv loop \
had died this would time out",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn receiver_drops_malformed_capability_announcement_without_panic() {
let a = build_node().await;
let b = build_node().await;
connect_pair(&a, &b).await;
a.start();
b.start();
let b_addr = b.local_addr();
a.send_subprotocol(b_addr, 0x0C00, b"not-an-announcement")
.await
.expect("send should succeed");
a.send_subprotocol(b_addr, 0x0C00, &[0u8; 3])
.await
.expect("send should succeed on tiny payload");
a.send_subprotocol(b_addr, 0x0C00, &[0xFFu8; 4096])
.await
.expect("send should succeed on large junk payload");
tokio::time::sleep(Duration::from_millis(100)).await;
let a_id = a.node_id();
assert!(
!b.test_capability_fold_has(a_id),
"malformed announcement must not populate B's capability index",
);
a.announce_capabilities(net::adapter::net::behavior::capability::CapabilitySet::new())
.await
.expect("A real announce");
let mut propagated = false;
for _ in 0..40 {
if b.test_capability_fold_has(a_id) {
propagated = true;
break;
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
assert!(
propagated,
"after malformed-payload barrage, B should still index a valid announcement",
);
}