#![cfg(feature = "net")]
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use net::adapter::net::behavior::capability::{
CapabilityAnnouncement, CapabilityFilter, CapabilitySet,
};
use net::adapter::net::{EntityKeypair, MeshNode, MeshNodeConfig, SocketBufferConfig};
const TEST_BUFFER_SIZE: usize = 256 * 1024;
const PSK: [u8; 32] = [0x42u8; 32];
fn test_config() -> MeshNodeConfig {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut cfg = MeshNodeConfig::new(addr, PSK)
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(5))
.with_handshake(3, Duration::from_secs(2))
.with_capability_gc_interval(Duration::from_millis(250));
cfg.socket_buffers = SocketBufferConfig {
send_buffer_size: TEST_BUFFER_SIZE,
recv_buffer_size: TEST_BUFFER_SIZE,
};
cfg
}
async fn build_node() -> Arc<MeshNode> {
build_node_with(|cfg| cfg).await
}
async fn build_node_with<F>(tweak: F) -> Arc<MeshNode>
where
F: FnOnce(MeshNodeConfig) -> MeshNodeConfig,
{
let cfg = tweak(test_config());
let keypair = EntityKeypair::generate();
Arc::new(MeshNode::new(keypair, cfg).await.expect("MeshNode::new"))
}
async fn handshake(a: &Arc<MeshNode>, b: &Arc<MeshNode>) {
let a_id = a.node_id();
let b_id = b.node_id();
let b_pub = *b.public_key();
let b_addr = b.local_addr();
let b_clone = b.clone();
let accept = tokio::spawn(async move { b_clone.accept(a_id).await });
a.connect(b_addr, &b_pub, b_id)
.await
.expect("connect failed");
accept
.await
.expect("accept task panicked")
.expect("accept failed");
a.start();
b.start();
}
async fn wait_until<F>(node: &Arc<MeshNode>, mut cond: F) -> bool
where
F: FnMut(&MeshNode) -> bool,
{
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
while tokio::time::Instant::now() < deadline {
if cond(node) {
return true;
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
cond(node)
}
#[tokio::test]
async fn peer_static_x25519_returns_peer_noise_pubkey_after_handshake() {
let a = build_node().await;
let b = build_node().await;
handshake(&a, &b).await;
let a_id = a.node_id();
let b_id = b.node_id();
assert_eq!(
a.peer_static_x25519(b_id),
Some(*b.public_key()),
"initiator (A) should recover B's Noise static pubkey",
);
assert!(
b.peer_static_x25519(a_id).is_none(),
"responder (B) should see None under NKpsk0 — pattern discloses only -> e",
);
assert!(a.peer_static_x25519(0xDEAD_BEEF_CAFE_F00D).is_none());
}
#[tokio::test]
async fn migration_identity_context_unseals_envelope_without_exposing_key() {
use net::adapter::net::identity::{EntityKeypair, IdentityEnvelope};
use net::adapter::net::state::causal::CausalLink;
use net::adapter::net::state::snapshot::StateSnapshot;
let a = build_node().await;
let b = build_node().await;
handshake(&a, &b).await;
let daemon_kp = EntityKeypair::generate();
let b_static_pub = *b.public_key();
let chain_link = CausalLink {
origin_hash: daemon_kp.origin_hash(),
horizon_encoded: 0,
sequence: 0,
parent_hash: 0,
};
let envelope =
IdentityEnvelope::new(&daemon_kp, b_static_pub, &chain_link).expect("seal envelope");
let snapshot = StateSnapshot {
version: net::adapter::net::state::snapshot::SNAPSHOT_VERSION,
entity_id: daemon_kp.entity_id().clone(),
chain_link,
through_seq: 0,
state: bytes::Bytes::new(),
horizon: Default::default(),
created_at: 0,
bindings_bytes: Vec::new(),
identity_envelope: Some(envelope),
head_payload: None,
};
let ctx = b.migration_identity_context();
let opened = (ctx.unseal_snapshot)(&snapshot)
.expect("unseal succeeds")
.expect("envelope present → Some(keypair)");
assert_eq!(opened.entity_id(), daemon_kp.entity_id());
assert_eq!(opened.origin_hash(), daemon_kp.origin_hash());
let a_from_b_via_ctx = (a.migration_identity_context().peer_static_lookup)(b.node_id());
assert_eq!(
a_from_b_via_ctx,
Some(b_static_pub),
"peer_static_lookup on A's context must find B's static \
(A initiated to B; initiator learns responder's static)",
);
use std::mem::size_of;
let fat_ptr = 2 * size_of::<usize>();
assert_eq!(
size_of::<net::adapter::net::subprotocol::MigrationIdentityContext>(),
2 * fat_ptr,
"MigrationIdentityContext must stay two Arc<dyn Fn ...> — a \
size bump means a new field, most likely the Noise static \
private key leaking back as a readable [u8; 32]",
);
}
#[tokio::test]
async fn two_node_announce_is_visible() {
let a = build_node().await;
let b = build_node().await;
handshake(&a, &b).await;
let caps = CapabilitySet::new().add_tag("gpu").add_tag("inference");
a.announce_capabilities(caps)
.await
.expect("announce failed");
let filter = CapabilityFilter::new().require_tag("gpu");
let a_id = a.node_id();
let arrived = wait_until(&b, |node| {
node.find_nodes_by_filter(&filter).contains(&a_id)
})
.await;
assert!(arrived, "B did not observe A's capability announcement");
}
#[tokio::test]
async fn announcement_expires_after_ttl() {
let a = build_node().await;
let b = build_node().await;
handshake(&a, &b).await;
let caps = CapabilitySet::new().add_tag("ephemeral");
a.announce_capabilities_with(caps, Duration::from_secs(1), true)
.await
.expect("announce failed");
let filter = CapabilityFilter::new().require_tag("ephemeral");
let a_id = a.node_id();
assert!(
wait_until(&b, |n| n.find_nodes_by_filter(&filter).contains(&a_id)).await,
"B never indexed A's announcement in the first place"
);
tokio::time::sleep(Duration::from_millis(1_500)).await;
let still_present = b.find_nodes_by_filter(&filter).contains(&a_id);
assert!(
!still_present,
"B still returns A after TTL expiry (GC not running?)"
);
}
#[tokio::test]
async fn late_joiner_receives_session_open_push() {
let a = build_node().await;
let caps = CapabilitySet::new().add_tag("preannounced");
a.announce_capabilities(caps)
.await
.expect("announce failed");
let b = build_node().await;
handshake(&a, &b).await;
let filter = CapabilityFilter::new().require_tag("preannounced");
let a_id = a.node_id();
let arrived = wait_until(&b, |n| n.find_nodes_by_filter(&filter).contains(&a_id)).await;
assert!(
arrived,
"session-open push did not deliver the pre-announcement"
);
}
#[tokio::test]
async fn require_signed_capabilities_drops_unsigned_announcements() {
let a = build_node().await;
let b = build_node_with(|cfg| cfg.with_require_signed_capabilities(true)).await;
handshake(&a, &b).await;
a.announce_capabilities_with(
CapabilitySet::new().add_tag("classified"),
Duration::from_secs(60),
false, )
.await
.expect("announce failed");
let filter = CapabilityFilter::new().require_tag("classified");
assert!(
a.find_nodes_by_filter(&filter).contains(&a.node_id()),
"sender lost its own self-index"
);
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(
!b.find_nodes_by_filter(&filter).contains(&a.node_id()),
"receiver accepted an unsigned announcement despite require_signed_capabilities=true"
);
}
#[tokio::test]
async fn stale_versions_are_ignored_by_index() {
use net::adapter::net::behavior::fold::{capability_bridge, CapabilityFold, Fold};
use net::adapter::net::EntityId;
let fold = Fold::<CapabilityFold>::with_sweep_interval(std::time::Duration::ZERO);
let caps_v1 = CapabilitySet::new().add_tag("v1");
let caps_v2 = CapabilitySet::new().add_tag("v2");
let eid = EntityId::from_bytes([0u8; 32]);
let v1 = CapabilityAnnouncement::new( 0xAA, eid.clone(), 1, caps_v1);
let v2 = CapabilityAnnouncement::new(0xAA, eid, 2, caps_v2);
capability_bridge::apply_legacy_announcement(&fold, v2)
.expect("apply legacy announcement in fixture");
capability_bridge::apply_legacy_announcement(&fold, v1)
.expect("apply legacy announcement in fixture");
let v2_filter = CapabilityFilter::new().require_tag("v2");
assert_eq!(
capability_bridge::find_nodes_matching(&fold, &v2_filter),
vec![0xAA]
);
let v1_filter = CapabilityFilter::new().require_tag("v1");
assert!(
capability_bridge::find_nodes_matching(&fold, &v1_filter).is_empty(),
"older version overwrote the newer one"
);
}
#[tokio::test]
async fn forged_node_id_rejected_even_with_valid_signature() {
use net::adapter::net::behavior::SUBPROTOCOL_CAPABILITY_ANN;
let a = build_node().await;
let b = build_node().await;
handshake(&a, &b).await;
let attacker_kp = EntityKeypair::generate();
let forged_node_id: u64 = 0x1234_5678_9ABC_DEF0;
assert_ne!(
forged_node_id,
attacker_kp.node_id(),
"fixture: forged_node_id must differ from the signer's real node_id",
);
let caps = CapabilitySet::new().add_tag("forged-node-id-probe");
let mut ann =
CapabilityAnnouncement::new(forged_node_id, attacker_kp.entity_id().clone(), 1, caps);
ann.sign(&attacker_kp);
assert!(
ann.verify().is_ok(),
"forged announcement still carries a valid signature",
);
let payload = ann.to_bytes();
a.send_subprotocol(b.local_addr(), SUBPROTOCOL_CAPABILITY_ANN, &payload)
.await
.expect("send forged announcement");
let filter = CapabilityFilter::new().require_tag("forged-node-id-probe");
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(
!b.find_nodes_by_filter(&filter).contains(&forged_node_id),
"receiver indexed a forged node_id despite derivation mismatch — \
node_id must be bound to entity_id cryptographically",
);
}
#[tokio::test]
async fn forwarded_announcement_does_not_tofu_pin_forwarder_to_victim_entity() {
use net::adapter::net::behavior::SUBPROTOCOL_CAPABILITY_ANN;
let attacker = build_node().await;
let receiver = build_node().await;
handshake(&attacker, &receiver).await;
let victim_kp = EntityKeypair::generate();
let victim_entity = victim_kp.entity_id().clone();
let victim_node_id = victim_kp.node_id();
let caps = CapabilitySet::new().add_tag("forwarded-tofu-probe");
let mut ann = CapabilityAnnouncement::new(victim_node_id, victim_entity.clone(), 1, caps);
ann.sign(&victim_kp);
assert!(ann.verify().is_ok(), "victim's signature is valid");
ann.hop_count = 1;
let payload = ann.to_bytes();
attacker
.send_subprotocol(receiver.local_addr(), SUBPROTOCOL_CAPABILITY_ANN, &payload)
.await
.expect("send forwarded announcement");
let filter = CapabilityFilter::new().require_tag("forwarded-tofu-probe");
let arrived = wait_until(&receiver, |n| {
n.find_nodes_by_filter(&filter).contains(&victim_node_id)
})
.await;
assert!(
arrived,
"receiver should still index the victim by node_id — signature is valid",
);
let attacker_node_id = attacker.node_id();
assert!(
receiver.peer_entity_id(attacker_node_id)
!= Some(victim_entity.clone()),
"attacker's session got TOFU-pinned to the victim's entity_id via a forwarded announcement — \
forwarder can now impersonate origin for channel auth",
);
}
#[tokio::test]
async fn unsigned_announcement_does_not_tofu_pin_entity() {
let a = build_node().await;
let b = build_node_with(|cfg| cfg.with_require_signed_capabilities(false)).await;
handshake(&a, &b).await;
a.announce_capabilities_with(
CapabilitySet::new().add_tag("unsigned-tofu-probe"),
Duration::from_secs(60),
false, )
.await
.expect("announce");
let filter = CapabilityFilter::new().require_tag("unsigned-tofu-probe");
let a_id = a.node_id();
let arrived = wait_until(&b, |n| n.find_nodes_by_filter(&filter).contains(&a_id)).await;
assert!(arrived, "unsigned announcement should still index");
assert!(
b.peer_entity_id(a_id).is_none(),
"TOFU pin established from an unsigned announcement — \
unauthenticated entity_id is attacker-controlled input",
);
}
#[tokio::test]
async fn forwarded_announcement_does_not_write_relay_peer_subnet() {
use net::adapter::net::behavior::SUBPROTOCOL_CAPABILITY_ANN;
use net::adapter::net::{SubnetPolicy, SubnetRule};
let attacker = build_node().await;
let rule = SubnetRule::new("region:", 0).map("privileged", 1);
let policy = SubnetPolicy::new().add_rule(rule);
let receiver = build_node_with(|cfg| cfg.with_subnet_policy(Arc::new(policy))).await;
handshake(&attacker, &receiver).await;
let victim_kp = EntityKeypair::generate();
let caps = CapabilitySet::new().add_tag("region:privileged");
let mut ann =
CapabilityAnnouncement::new(victim_kp.node_id(), victim_kp.entity_id().clone(), 1, caps);
ann.sign(&victim_kp);
assert!(ann.verify().is_ok(), "victim's signature is valid");
ann.hop_count = 1;
attacker
.send_subprotocol(
receiver.local_addr(),
SUBPROTOCOL_CAPABILITY_ANN,
&ann.to_bytes(),
)
.await
.expect("send forwarded announcement");
let filter = CapabilityFilter::new().require_tag("region:privileged");
let victim_node_id = victim_kp.node_id();
let arrived = wait_until(&receiver, |n| {
n.find_nodes_by_filter(&filter).contains(&victim_node_id)
})
.await;
assert!(arrived, "receiver should still index the victim by node_id");
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(
receiver.peer_subnet(attacker.node_id()).is_none(),
"forwarded announcement wrote the relay's subnet — a crafted last \
hop can reshape any legitimate peer's SubnetLocal visibility",
);
}
#[tokio::test]
async fn unsigned_announcement_does_not_write_peer_subnet() {
use net::adapter::net::{SubnetPolicy, SubnetRule};
let a = build_node().await;
let rule = SubnetRule::new("region:", 0).map("privileged", 1);
let policy = SubnetPolicy::new().add_rule(rule);
let b = build_node_with(|cfg| {
cfg.with_require_signed_capabilities(false)
.with_subnet_policy(Arc::new(policy))
})
.await;
handshake(&a, &b).await;
a.announce_capabilities_with(
CapabilitySet::new().add_tag("region:privileged"),
Duration::from_secs(60),
false, )
.await
.expect("announce");
let filter = CapabilityFilter::new().require_tag("region:privileged");
let a_id = a.node_id();
let arrived = wait_until(&b, |n| n.find_nodes_by_filter(&filter).contains(&a_id)).await;
assert!(
arrived,
"unsigned announcement should still index under opt-out",
);
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(
b.peer_subnet(a_id).is_none(),
"unsigned announcement was allowed to pick the peer's subnet — \
subnet_visible decisions become attacker-controlled",
);
}
#[tokio::test]
async fn signed_announcement_with_mismatched_node_id_entity_id_is_rejected() {
use net::adapter::net::behavior::SUBPROTOCOL_CAPABILITY_ANN;
let attacker = build_node().await;
let receiver = build_node().await;
handshake(&attacker, &receiver).await;
let victim_kp = EntityKeypair::generate();
let victim_entity = victim_kp.entity_id().clone();
let victim_node_id = victim_kp.node_id();
let bogus_node_id = victim_node_id.wrapping_add(1);
assert_ne!(
bogus_node_id, victim_node_id,
"bogus_node_id must differ from the legitimate derivation",
);
let caps = CapabilitySet::new().add_tag("binding-mismatch-probe");
let mut ann = CapabilityAnnouncement::new(bogus_node_id, victim_entity.clone(), 1, caps);
ann.sign(&victim_kp);
assert!(
ann.verify().is_ok(),
"signature must verify — we want to isolate the binding check",
);
ann.hop_count = 1;
let payload = ann.to_bytes();
attacker
.send_subprotocol(receiver.local_addr(), SUBPROTOCOL_CAPABILITY_ANN, &payload)
.await
.expect("send bogus forwarded announcement");
tokio::time::sleep(Duration::from_millis(150)).await;
let filter = CapabilityFilter::new().require_tag("binding-mismatch-probe");
let matches = receiver.find_nodes_by_filter(&filter);
assert!(
!matches.contains(&bogus_node_id),
"receiver indexed the spoofed node_id despite the binding mismatch — \
the entity_id→node_id binding check failed to reject the announcement",
);
assert!(
!matches.contains(&victim_node_id),
"receiver indexed the victim's legitimate node_id using data from the \
spoofed announcement — binding check should reject BEFORE index runs",
);
let attacker_node_id = attacker.node_id();
assert!(
receiver.peer_entity_id(attacker_node_id) != Some(victim_entity),
"attacker's session got TOFU-pinned to victim's entity_id via a \
binding-mismatched announcement — forwarder / sender can impersonate \
the victim for channel-auth purposes",
);
}