#![cfg(feature = "net")]
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use net::adapter::net::behavior::capability::{CapabilityFilter, CapabilitySet};
use net::adapter::net::{EntityKeypair, MeshNode, MeshNodeConfig, SocketBufferConfig};
const TEST_BUFFER_SIZE: usize = 256 * 1024;
const PSK: [u8; 32] = [0x42u8; 32];
fn test_config() -> MeshNodeConfig {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut cfg = MeshNodeConfig::new(addr, PSK)
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(5))
.with_handshake(3, Duration::from_secs(2))
.with_capability_gc_interval(Duration::from_millis(250));
cfg.socket_buffers = SocketBufferConfig {
send_buffer_size: TEST_BUFFER_SIZE,
recv_buffer_size: TEST_BUFFER_SIZE,
};
cfg
}
async fn build_node() -> Arc<MeshNode> {
let cfg = test_config();
let keypair = EntityKeypair::generate();
Arc::new(MeshNode::new(keypair, cfg).await.expect("MeshNode::new"))
}
async fn handshake_no_start(a: &Arc<MeshNode>, b: &Arc<MeshNode>) {
let a_id = a.node_id();
let b_id = b.node_id();
let b_pub = *b.public_key();
let b_addr = b.local_addr();
let b_clone = b.clone();
let accept = tokio::spawn(async move { b_clone.accept(a_id).await });
a.connect(b_addr, &b_pub, b_id)
.await
.expect("connect failed");
accept
.await
.expect("accept task panicked")
.expect("accept failed");
}
fn start_all(nodes: &[&Arc<MeshNode>]) {
for n in nodes {
n.start();
}
}
async fn wait_until<F>(node: &Arc<MeshNode>, mut cond: F) -> bool
where
F: FnMut(&MeshNode) -> bool,
{
let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
while tokio::time::Instant::now() < deadline {
if cond(node) {
return true;
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
cond(node)
}
#[tokio::test]
async fn three_node_chain_propagates() {
let a = build_node().await;
let b = build_node().await;
let c = build_node().await;
handshake_no_start(&a, &b).await;
handshake_no_start(&b, &c).await;
start_all(&[&a, &b, &c]);
a.announce_capabilities(CapabilitySet::new().add_tag("far-gpu"))
.await
.expect("A announce failed");
let filter = CapabilityFilter::new().require_tag("far-gpu");
let a_id = a.node_id();
assert!(
wait_until(&b, |n| n.find_nodes_by_filter(&filter).contains(&a_id)).await,
"B (direct peer of A) did not receive announcement",
);
assert!(
wait_until(&c, |n| n.find_nodes_by_filter(&filter).contains(&a_id)).await,
"C did not receive A's announcement via B's re-broadcast",
);
}
#[tokio::test]
async fn origin_rate_limit_coalesces_bursts() {
let a = {
let cfg = test_config().with_min_announce_interval(Duration::from_secs(5));
let keypair = EntityKeypair::generate();
Arc::new(MeshNode::new(keypair, cfg).await.expect("MeshNode::new"))
};
let b = build_node().await;
handshake_no_start(&a, &b).await;
start_all(&[&a, &b]);
a.announce_capabilities(CapabilitySet::new().add_tag("burst-v1"))
.await
.expect("v1 announce");
let a_id = a.node_id();
let v1_filter = CapabilityFilter::new().require_tag("burst-v1");
assert!(
wait_until(&b, |n| n.find_nodes_by_filter(&v1_filter).contains(&a_id)).await,
"B did not receive the first announcement",
);
for i in 2..=10u32 {
a.announce_capabilities(CapabilitySet::new().add_tag(format!("burst-v{}", i)))
.await
.expect("rapid announce");
}
let v10_filter = CapabilityFilter::new().require_tag("burst-v10");
assert!(
a.find_nodes_by_filter(&v10_filter).contains(&a_id),
"A's self-index doesn't reflect the latest caps",
);
tokio::time::sleep(Duration::from_millis(100)).await;
for i in 2..=10u32 {
let tag = format!("burst-v{}", i);
let filter = CapabilityFilter::new().require_tag(tag);
assert!(
!b.find_nodes_by_filter(&filter).contains(&a_id),
"B received coalesced announcement (tag burst-v{}) that should have been rate-limited",
i
);
}
}
#[tokio::test]
async fn route_install_from_multihop_receipt() {
let a = build_node().await;
let b = build_node().await;
let c = build_node().await;
handshake_no_start(&a, &b).await;
handshake_no_start(&b, &c).await;
start_all(&[&a, &b, &c]);
a.announce_capabilities(CapabilitySet::new().add_tag("route-probe"))
.await
.expect("announce");
let filter = CapabilityFilter::new().require_tag("route-probe");
let a_id = a.node_id();
assert!(
wait_until(&c, |n| n.find_nodes_by_filter(&filter).contains(&a_id)).await,
"C never saw A's announcement — upstream forwarding failed",
);
let route = c.router().routing_table().lookup(a.node_id());
assert_eq!(
route,
Some(b.local_addr()),
"C's routing table didn't install the multi-hop route to A via B"
);
}
#[tokio::test]
async fn dedup_drops_duplicate_at_converge_point() {
let a = build_node().await;
let b = build_node().await;
let c = build_node().await;
let d = build_node().await;
handshake_no_start(&a, &b).await;
handshake_no_start(&a, &c).await;
handshake_no_start(&b, &d).await;
handshake_no_start(&c, &d).await;
start_all(&[&a, &b, &c, &d]);
a.announce_capabilities(CapabilitySet::new().add_tag("diamond"))
.await
.expect("A announce");
let a_id = a.node_id();
let filter = CapabilityFilter::new().require_tag("diamond");
assert!(
wait_until(&d, |n| n.find_nodes_by_filter(&filter).contains(&a_id)).await,
"D did not see A's diamond announcement",
);
tokio::time::sleep(Duration::from_millis(200)).await;
let hits = d.find_nodes_by_filter(&filter);
assert_eq!(
hits.iter().filter(|&&id| id == a_id).count(),
1,
"D's index accidentally registered A twice",
);
}
#[tokio::test]
async fn late_joiner_converges_via_multihop_rebroadcast() {
let build_with_interval = |interval: Duration| async move {
let cfg = test_config().with_min_announce_interval(interval);
let keypair = EntityKeypair::generate();
Arc::new(MeshNode::new(keypair, cfg).await.expect("MeshNode::new"))
};
let a = build_with_interval(Duration::from_millis(50)).await;
let b = build_node().await;
handshake_no_start(&a, &b).await;
start_all(&[&a, &b]);
a.announce_capabilities(CapabilitySet::new().add_tag("pre-late"))
.await
.expect("initial announce");
let a_id = a.node_id();
let filter = CapabilityFilter::new().require_tag("pre-late");
assert!(
wait_until(&b, |n| n.find_nodes_by_filter(&filter).contains(&a_id)).await,
"B didn't see initial announcement",
);
let c = build_node().await;
handshake_no_start(&b, &c).await;
c.start();
assert!(
!c.find_nodes_by_filter(&filter).contains(&a_id),
"C unexpectedly converged before the re-announce"
);
tokio::time::sleep(Duration::from_millis(100)).await;
a.announce_capabilities(CapabilitySet::new().add_tag("pre-late"))
.await
.expect("re-announce");
assert!(
wait_until(&c, |n| n.find_nodes_by_filter(&filter).contains(&a_id)).await,
"C never converged after A's re-announce",
);
}
#[tokio::test]
async fn wire_replay_is_dropped_at_dedup_cache() {
use net::adapter::net::behavior::SUBPROTOCOL_CAPABILITY_ANN;
let a = build_node().await;
let b = build_node().await;
handshake_no_start(&a, &b).await;
start_all(&[&a, &b]);
a.announce_capabilities(CapabilitySet::new().add_tag("dedup-test"))
.await
.expect("A announce");
let a_id = a.node_id();
let filter = CapabilityFilter::new().require_tag("dedup-test");
assert!(
wait_until(&b, |n| n.find_nodes_by_filter(&filter).contains(&a_id)).await,
"B should index A's first announce",
);
let baseline = b.capability_fold().stats().entries as u64;
let ann = a
.local_announcement_for_test()
.expect("A should have a stored announcement after announce");
let bytes = ann.to_bytes();
let b_addr = b.local_addr();
for _ in 0..3 {
a.send_subprotocol(b_addr, SUBPROTOCOL_CAPABILITY_ANN, &bytes)
.await
.expect("replay send");
}
tokio::time::sleep(Duration::from_millis(150)).await;
let after = b.capability_fold().stats().entries as u64;
assert_eq!(
after, baseline,
"total_indexed must not bump on wire-replay — dedup cache \
should short-circuit all three replays BEFORE index() runs. \
baseline={baseline}, after={after}",
);
assert!(
b.find_nodes_by_filter(&filter).contains(&a_id),
"A's tag should still be indexed after the replay barrage",
);
}
#[tokio::test]
async fn version_collision_from_same_origin_is_dropped_at_dedup_cache() {
use net::adapter::net::behavior::capability::CapabilityAnnouncement;
use net::adapter::net::behavior::SUBPROTOCOL_CAPABILITY_ANN;
let a_keypair = EntityKeypair::generate();
let a = Arc::new(
MeshNode::new(a_keypair.clone(), test_config())
.await
.expect("A"),
);
let b = build_node().await;
handshake_no_start(&a, &b).await;
start_all(&[&a, &b]);
let a_id = a.node_id();
let a_entity_id = a.entity_id().clone();
let mut ann1 = CapabilityAnnouncement::new(
a_id,
a_entity_id.clone(),
42,
CapabilitySet::new().add_tag("first"),
);
ann1.sign(&a_keypair);
let bytes1 = ann1.to_bytes();
let mut ann2 = CapabilityAnnouncement::new(
a_id,
a_entity_id,
42,
CapabilitySet::new().add_tag("second"),
);
ann2.sign(&a_keypair);
let bytes2 = ann2.to_bytes();
let b_addr = b.local_addr();
a.send_subprotocol(b_addr, SUBPROTOCOL_CAPABILITY_ANN, &bytes1)
.await
.expect("send #1");
let first_filter = CapabilityFilter::new().require_tag("first");
assert!(
wait_until(&b, |n| n
.find_nodes_by_filter(&first_filter)
.contains(&a_id))
.await,
"B should index announcement #1 (tag=first, v=42)",
);
let indexed_after_first = b.capability_fold().stats().entries as u64;
a.send_subprotocol(b_addr, SUBPROTOCOL_CAPABILITY_ANN, &bytes2)
.await
.expect("send #2");
tokio::time::sleep(Duration::from_millis(150)).await;
let indexed_after_second = b.capability_fold().stats().entries as u64;
assert_eq!(
indexed_after_second, indexed_after_first,
"second same-version announcement must NOT increment total_indexed",
);
let second_filter = CapabilityFilter::new().require_tag("second");
assert!(
!b.find_nodes_by_filter(&second_filter).contains(&a_id),
"B must not have #2's tag — dedup dropped the collision",
);
assert!(
b.find_nodes_by_filter(&first_filter).contains(&a_id),
"B must still have #1's tag (first-writer-wins)",
);
}