#![cfg(all(feature = "net", feature = "cortex"))]
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use bytes::Bytes;
use net::adapter::net::behavior::{
group::GroupId, subnet::SubnetId, CapabilityAnnouncement, CapabilitySet,
};
use net::adapter::net::cortex::{
RpcContext, RpcHandler, RpcHandlerError, RpcResponsePayload, RpcStatus,
};
use net::adapter::net::mesh_rpc::{CallOptions, RpcError};
use net::adapter::net::{EntityKeypair, MeshNode, MeshNodeConfig, SocketBufferConfig};
const TEST_BUFFER_SIZE: usize = 256 * 1024;
const PSK: [u8; 32] = [0x42u8; 32];
fn test_config() -> MeshNodeConfig {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut cfg = MeshNodeConfig::new(addr, PSK)
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(5))
.with_handshake(3, Duration::from_secs(2))
.with_capability_gc_interval(Duration::from_millis(250))
.with_min_announce_interval(Duration::from_millis(10));
cfg.socket_buffers = SocketBufferConfig {
send_buffer_size: TEST_BUFFER_SIZE,
recv_buffer_size: TEST_BUFFER_SIZE,
};
cfg
}
async fn build_node() -> Arc<MeshNode> {
let cfg = test_config();
let keypair = EntityKeypair::generate();
Arc::new(MeshNode::new(keypair, cfg).await.expect("MeshNode::new"))
}
async fn handshake_pair(a: &Arc<MeshNode>, b: &Arc<MeshNode>) {
handshake_only(a, b).await;
a.start();
b.start();
}
async fn handshake_only(a: &Arc<MeshNode>, b: &Arc<MeshNode>) {
let a_id = a.node_id();
let b_id = b.node_id();
let b_pub = *b.public_key();
let b_addr = b.local_addr();
let b_clone = b.clone();
let accept = tokio::spawn(async move { b_clone.accept(a_id).await });
a.connect(b_addr, &b_pub, b_id)
.await
.expect("connect failed");
accept
.await
.expect("accept task panicked")
.expect("accept failed");
}
async fn star(center: &Arc<MeshNode>, peers: &[&Arc<MeshNode>]) {
for p in peers {
handshake_only(p, center).await;
}
center.start();
for p in peers {
p.start();
}
}
struct EchoHandler;
#[async_trait::async_trait]
impl RpcHandler for EchoHandler {
async fn call(&self, ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
Ok(RpcResponsePayload {
status: RpcStatus::Ok,
headers: vec![],
body: ctx.payload.body,
})
}
}
fn fold_announcement_everywhere(nodes: &[&Arc<MeshNode>], ann: &CapabilityAnnouncement) {
for n in nodes {
n.test_inject_capability_announcement(ann.clone());
}
}
fn target_announcement(
target: &Arc<MeshNode>,
version: u64,
capability_tag: &str,
allowed_nodes: Vec<u64>,
allowed_subnets: Vec<SubnetId>,
allowed_groups: Vec<GroupId>,
) -> CapabilityAnnouncement {
let caps = CapabilitySet::new().add_tag(capability_tag);
let mut ann =
CapabilityAnnouncement::new(target.node_id(), target.entity_id().clone(), version, caps);
ann.allowed_nodes = allowed_nodes;
ann.allowed_subnets = allowed_subnets;
ann.allowed_groups = allowed_groups;
ann
}
fn caller_announcement(
caller: &Arc<MeshNode>,
version: u64,
membership_subnet: Option<SubnetId>,
membership_groups: &[GroupId],
) -> CapabilityAnnouncement {
let mut caps = CapabilitySet::new();
if let Some(s) = membership_subnet {
caps = caps.add_tag(s.to_tag());
}
for g in membership_groups {
caps = caps.add_tag(g.to_tag());
}
CapabilityAnnouncement::new(caller.node_id(), caller.entity_id().clone(), version, caps)
}
#[tokio::test]
async fn scenario_1_permissive_baseline_admits_any_caller() {
let target = build_node().await;
let caller = build_node().await;
handshake_pair(&caller, &target).await;
let _serve = target
.serve_rpc("echo", Arc::new(EchoHandler))
.expect("serve_rpc");
target
.announce_capabilities(CapabilitySet::new())
.await
.expect("target announce");
use net::adapter::net::behavior::fold::capability_bridge::find_nodes_matching;
use net::adapter::net::behavior::CapabilityFilter;
let filter = CapabilityFilter::default().require_tag("nrpc:echo".to_string());
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
loop {
if find_nodes_matching(caller.capability_fold(), &filter).contains(&target.node_id()) {
break;
}
if tokio::time::Instant::now() > deadline {
panic!("propagation timeout");
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
let reply = caller
.call_service(
"echo",
Bytes::from_static(b"permissive"),
CallOptions::default(),
)
.await
.expect("permissive default must admit any caller");
assert_eq!(reply.body.as_ref(), b"permissive");
}
#[tokio::test]
async fn scenario_2_allow_by_node_admits_listed_only() {
let target = build_node().await;
let allowed_caller = build_node().await;
let denied_caller = build_node().await;
star(&target, &[&allowed_caller, &denied_caller]).await;
let _serve = target
.serve_rpc("echo", Arc::new(EchoHandler))
.expect("serve_rpc");
let ann = target_announcement(
&target,
100,
"nrpc:echo",
vec![allowed_caller.node_id()],
vec![],
vec![],
);
fold_announcement_everywhere(&[&target, &allowed_caller, &denied_caller], &ann);
let reply = allowed_caller
.call_service(
"echo",
Bytes::from_static(b"hi"),
CallOptions {
deadline: Some(Instant::now() + Duration::from_millis(1500)),
..Default::default()
},
)
.await
.expect("allowed caller must complete the round-trip");
assert_eq!(reply.body.as_ref(), b"hi");
let err = denied_caller
.call_service("echo", Bytes::from_static(b"hi"), CallOptions::default())
.await
.expect_err("denied caller must hit the gate");
match err {
RpcError::CapabilityDenied {
target: t,
capability,
} => {
assert_eq!(t, target.node_id());
assert_eq!(capability, "echo");
}
other => panic!("expected CapabilityDenied, got {other:?}"),
}
}
#[tokio::test]
async fn scenario_3_allow_by_subnet_admits_subnet_members() {
let target = build_node().await;
let in_subnet = build_node().await;
let out_of_subnet = build_node().await;
star(&target, &[&in_subnet, &out_of_subnet]).await;
let _serve = target
.serve_rpc("echo", Arc::new(EchoHandler))
.expect("serve_rpc");
let subnet = SubnetId::from_bytes([0x42; 16]);
let target_ann = target_announcement(&target, 200, "nrpc:echo", vec![], vec![subnet], vec![]);
let in_subnet_ann = caller_announcement(&in_subnet, 1, Some(subnet), &[]);
let out_of_subnet_ann = caller_announcement(&out_of_subnet, 1, None, &[]);
fold_announcement_everywhere(&[&target, &in_subnet, &out_of_subnet], &target_ann);
fold_announcement_everywhere(&[&target, &in_subnet], &in_subnet_ann);
fold_announcement_everywhere(&[&target, &out_of_subnet], &out_of_subnet_ann);
let reply = in_subnet
.call_service(
"echo",
Bytes::from_static(b"in-subnet"),
CallOptions {
deadline: Some(Instant::now() + Duration::from_millis(1500)),
..Default::default()
},
)
.await
.expect("subnet member must complete the round-trip");
assert_eq!(reply.body.as_ref(), b"in-subnet");
let err = out_of_subnet
.call_service(
"echo",
Bytes::from_static(b"out-of-subnet"),
CallOptions::default(),
)
.await
.expect_err("non-member must hit the gate");
assert!(matches!(err, RpcError::CapabilityDenied { .. }));
}
#[tokio::test]
async fn scenario_4_allow_by_group_admits_group_claimants() {
let target = build_node().await;
let claimant = build_node().await;
let non_claimant = build_node().await;
star(&target, &[&claimant, &non_claimant]).await;
let _serve = target
.serve_rpc("echo", Arc::new(EchoHandler))
.expect("serve_rpc");
let group = GroupId::from_bytes([0x77; 32]);
let target_ann = target_announcement(&target, 300, "nrpc:echo", vec![], vec![], vec![group]);
let claimant_ann = caller_announcement(&claimant, 1, None, &[group]);
let non_claimant_ann = caller_announcement(&non_claimant, 1, None, &[]);
fold_announcement_everywhere(&[&target, &claimant, &non_claimant], &target_ann);
fold_announcement_everywhere(&[&target, &claimant], &claimant_ann);
fold_announcement_everywhere(&[&target, &non_claimant], &non_claimant_ann);
let reply = claimant
.call_service(
"echo",
Bytes::from_static(b"group-member"),
CallOptions {
deadline: Some(Instant::now() + Duration::from_millis(1500)),
..Default::default()
},
)
.await
.expect("group claimant must complete the round-trip");
assert_eq!(reply.body.as_ref(), b"group-member");
let err = non_claimant
.call_service(
"echo",
Bytes::from_static(b"non-claimant"),
CallOptions::default(),
)
.await
.expect_err("non-claimant must hit the gate");
assert!(matches!(err, RpcError::CapabilityDenied { .. }));
}
#[tokio::test]
async fn scenario_5_revocation_via_new_announcement_supersedes() {
let target = build_node().await;
let caller = build_node().await;
handshake_pair(&caller, &target).await;
let v1 = target_announcement(&target, 1, "nrpc:echo", vec![], vec![], vec![]);
fold_announcement_everywhere(&[&target, &caller], &v1);
let err = caller
.call_service(
"echo",
Bytes::from_static(b"v1"),
CallOptions {
deadline: Some(Instant::now() + Duration::from_millis(500)),
..Default::default()
},
)
.await
.expect_err("no handler registered");
assert!(
!matches!(err, RpcError::CapabilityDenied { .. }),
"v1 must admit; got {err:?}",
);
let v2 = target_announcement(
&target,
2,
"nrpc:echo",
vec![target.node_id()],
vec![],
vec![],
);
fold_announcement_everywhere(&[&target, &caller], &v2);
let err = caller
.call_service("echo", Bytes::from_static(b"v2"), CallOptions::default())
.await
.expect_err("v2 must deny");
assert!(matches!(err, RpcError::CapabilityDenied { .. }));
}
#[tokio::test]
async fn scenario_6_callee_side_defense_in_depth_rejects() {
let target = build_node().await;
let caller = build_node().await;
handshake_pair(&caller, &target).await;
let _serve = target
.serve_rpc("echo", Arc::new(EchoHandler))
.expect("serve_rpc");
let permissive = target_announcement(&target, 1, "nrpc:echo", vec![], vec![], vec![]);
caller.test_inject_capability_announcement(permissive);
let restrictive = target_announcement(
&target,
2,
"nrpc:echo",
vec![0xDEAD_BEEF_BAAD_F00D],
vec![],
vec![],
);
target.test_inject_capability_announcement(restrictive);
let err = caller
.call(
target.node_id(),
"echo",
Bytes::from_static(b"bypass"),
CallOptions::default(),
)
.await
.expect_err("callee-side gate must deny");
match err {
RpcError::CapabilityDenied {
target: t,
capability,
} => {
assert_eq!(t, target.node_id());
assert_eq!(capability, "echo");
}
other => panic!("expected CapabilityDenied surfaced from callee, got {other:?}"),
}
}
#[tokio::test]
async fn helper_fold_announcement_lands_in_every_index() {
let a = build_node().await;
let b = build_node().await;
let ann = target_announcement(&a, 1, "nrpc:probe", vec![], vec![], vec![]);
fold_announcement_everywhere(&[&a, &b], &ann);
assert!(a.test_capability_fold_has(a.node_id()));
assert!(b.test_capability_fold_has(a.node_id()));
}
#[tokio::test]
async fn from_bytes_rejects_oversized_allow_list() {
use net::adapter::net::behavior::capability::MAX_ALLOW_LIST_LEN;
let node = build_node().await;
let caps = CapabilitySet::new().add_tag("nrpc:probe");
let mut ann = CapabilityAnnouncement::new(node.node_id(), node.entity_id().clone(), 1, caps);
ann.allowed_nodes = (0..(MAX_ALLOW_LIST_LEN as u64) + 1).collect();
let bytes = ann.to_bytes();
assert!(
CapabilityAnnouncement::from_bytes(&bytes).is_none(),
"wire-side deserializer must reject oversized allow-list",
);
}
#[tokio::test]
async fn membership_parse_returns_no_subnet_when_announcement_has_multiple_subnet_tags() {
let node = build_node().await;
let s1 = SubnetId::from_bytes([0xAA; 16]);
let s2 = SubnetId::from_bytes([0xBB; 16]);
let caps = CapabilitySet::new()
.add_tag(s1.to_tag())
.add_tag(s2.to_tag())
.add_tag("nrpc:probe");
let ann = CapabilityAnnouncement::new(node.node_id(), node.entity_id().clone(), 1, caps);
node.test_inject_capability_announcement(ann);
let caps = node.test_capability_fold_get(node.node_id());
let mut distinct: Vec<SubnetId> = Vec::new();
for tag in &caps.tags {
if let Some(s) = SubnetId::from_tag(&tag.to_string()) {
if !distinct.contains(&s) {
distinct.push(s);
}
}
}
let subnet_of = if distinct.len() == 1 {
Some(distinct[0])
} else {
None
};
assert_eq!(
subnet_of, None,
"two distinct subnet tags must collapse to no membership for deterministic gate verdicts",
);
}