#![cfg(all(feature = "net", feature = "cortex"))]
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use net::adapter::net::behavior::CapabilitySet;
use net::adapter::net::cortex::{
RpcContext, RpcHandler, RpcHandlerError, RpcResponsePayload, RpcStatus,
};
use net::adapter::net::mesh_rpc::{CallOptions, RpcError};
use net::adapter::net::{EntityKeypair, MeshNode, MeshNodeConfig, SocketBufferConfig};
const TEST_BUFFER_SIZE: usize = 256 * 1024;
const PSK: [u8; 32] = [0x42u8; 32];
fn test_config() -> MeshNodeConfig {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut cfg = MeshNodeConfig::new(addr, PSK)
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(5))
.with_handshake(3, Duration::from_secs(2))
.with_capability_gc_interval(Duration::from_millis(250))
.with_min_announce_interval(Duration::from_millis(10));
cfg.socket_buffers = SocketBufferConfig {
send_buffer_size: TEST_BUFFER_SIZE,
recv_buffer_size: TEST_BUFFER_SIZE,
};
cfg
}
async fn build_node() -> Arc<MeshNode> {
let cfg = test_config();
let keypair = EntityKeypair::generate();
Arc::new(MeshNode::new(keypair, cfg).await.expect("MeshNode::new"))
}
async fn handshake_pair(a: &Arc<MeshNode>, b: &Arc<MeshNode>) {
let a_id = a.node_id();
let b_id = b.node_id();
let b_pub = *b.public_key();
let b_addr = b.local_addr();
let b_clone = b.clone();
let accept = tokio::spawn(async move { b_clone.accept(a_id).await });
a.connect(b_addr, &b_pub, b_id)
.await
.expect("connect failed");
accept
.await
.expect("accept task panicked")
.expect("accept failed");
a.start();
b.start();
}
struct EchoHandler;
#[async_trait::async_trait]
impl RpcHandler for EchoHandler {
async fn call(&self, ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
Ok(RpcResponsePayload {
status: RpcStatus::Ok,
headers: vec![],
body: ctx.payload.body,
})
}
}
async fn wait_for_service_visibility(node: &Arc<MeshNode>, target: u64, service: &str) {
use net::adapter::net::behavior::fold::capability_bridge::find_nodes_matching;
use net::adapter::net::behavior::CapabilityFilter;
let tag = format!("nrpc:{service}");
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
let filter = CapabilityFilter::default().require_tag(tag.clone());
while tokio::time::Instant::now() < deadline {
let nodes = find_nodes_matching(node.capability_fold(), &filter);
if nodes.contains(&target) {
return;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
panic!(
"timed out waiting for target {target:#x} to advertise `{tag}` in caller's capability index",
);
}
#[tokio::test]
async fn call_service_permissive_announcement_admits_any_caller() {
let server = build_node().await;
let caller = build_node().await;
handshake_pair(&caller, &server).await;
let _serve = server
.serve_rpc("echo", Arc::new(EchoHandler))
.expect("serve_rpc");
server
.announce_capabilities(CapabilitySet::new())
.await
.expect("server announce");
caller
.announce_capabilities(CapabilitySet::new())
.await
.expect("caller announce");
wait_for_service_visibility(&caller, server.node_id(), "echo").await;
let reply = caller
.call_service(
"echo",
Bytes::from_static(b"permissive hello"),
CallOptions::default(),
)
.await
.expect("permissive default must admit any caller");
assert_eq!(reply.body.as_ref(), b"permissive hello");
}
#[tokio::test]
async fn call_service_filters_unauthorized_candidates_before_target_selection() {
use net::adapter::net::behavior::CapabilityAnnouncement;
let caller = build_node().await;
let denying_server = build_node().await;
let allowing_server = build_node().await;
for (server, allow) in [
(&denying_server, vec![0xDEAD_BEEF_BAAD_F00D]),
(&allowing_server, vec![]),
] {
let caps = CapabilitySet::new().add_tag("nrpc:echo");
let mut ann =
CapabilityAnnouncement::new(server.node_id(), server.entity_id().clone(), 100, caps);
ann.allowed_nodes = allow;
caller.test_inject_capability_announcement(ann);
}
let err = caller
.call_service(
"echo",
Bytes::from_static(b"x"),
CallOptions {
deadline: Some(std::time::Instant::now() + Duration::from_millis(500)),
..Default::default()
},
)
.await
.expect_err("no handler registered → call must error somehow");
assert!(
!matches!(err, RpcError::CapabilityDenied { .. }),
"filter must steer call_service to allowing_server; \
instead got CapabilityDenied which means the denying \
candidate was picked. err={err:?}",
);
}
#[tokio::test]
async fn call_service_denies_when_every_candidate_rejects_caller() {
use net::adapter::net::behavior::CapabilityAnnouncement;
let caller = build_node().await;
let server_a = build_node().await;
let server_b = build_node().await;
for server in [&server_a, &server_b] {
let caps = CapabilitySet::new().add_tag("nrpc:echo");
let mut ann =
CapabilityAnnouncement::new(server.node_id(), server.entity_id().clone(), 100, caps);
ann.allowed_nodes = vec![0xDEAD_BEEF_BAAD_F00D];
caller.test_inject_capability_announcement(ann);
}
let err = caller
.call_service("echo", Bytes::from_static(b"x"), CallOptions::default())
.await
.expect_err("every candidate denies → CapabilityDenied");
match err {
RpcError::CapabilityDenied { capability, .. } => {
assert_eq!(capability, "echo");
}
other => panic!("expected CapabilityDenied, got {other:?}"),
}
}
#[tokio::test]
async fn serve_rpc_self_indexes_announcement_with_nrpc_tag() {
let node = build_node().await;
assert!(
!node.test_capability_fold_has(node.node_id()),
"no self-announcement before serve_rpc",
);
let _serve = node
.serve_rpc("echo", Arc::new(EchoHandler))
.expect("serve_rpc");
assert!(
node.test_capability_fold_has(node.node_id()),
"serve_rpc must auto-self-index",
);
let self_caps = node.test_capability_fold_get(node.node_id());
assert!(
self_caps.has_tag("nrpc:echo"),
"auto-self-indexed announcement must carry nrpc:<service>",
);
}
#[tokio::test]
async fn serve_rpc_self_index_works_regardless_of_announce_order() {
let node = build_node().await;
node.announce_capabilities(CapabilitySet::new())
.await
.expect("pre-announce");
assert!(
node.test_capability_fold_has(node.node_id()),
"pre-serve_rpc self-ann present",
);
let pre = node.test_capability_fold_get(node.node_id());
assert!(
!pre.has_tag("nrpc:echo"),
"pre-serve_rpc self-ann must not carry nrpc:echo",
);
let _serve = node
.serve_rpc("echo", Arc::new(EchoHandler))
.expect("serve_rpc");
assert!(
node.test_capability_fold_has(node.node_id()),
"post-serve_rpc self-ann present",
);
let post = node.test_capability_fold_get(node.node_id());
assert!(
post.has_tag("nrpc:echo"),
"post-serve_rpc self-ann must carry the merged tag regardless of order",
);
}
#[tokio::test]
async fn call_service_caller_side_gate_denies_when_not_in_allow_list() {
use net::adapter::net::behavior::CapabilityAnnouncement;
let server = build_node().await;
let caller = build_node().await;
handshake_pair(&caller, &server).await;
let _serve = server
.serve_rpc("echo", Arc::new(EchoHandler))
.expect("serve_rpc");
let caps = CapabilitySet::new().add_tag("nrpc:echo");
let mut ann =
CapabilityAnnouncement::new(server.node_id(), server.entity_id().clone(), 100, caps);
ann.allowed_nodes = vec![0xDEAD_BEEF_BAAD_F00D];
caller.test_inject_capability_announcement(ann.clone());
server.test_inject_capability_announcement(ann);
let err = caller
.call_service(
"echo",
Bytes::from_static(b"should-be-denied"),
CallOptions::default(),
)
.await
.expect_err("restrictive allow-list must deny the caller");
match err {
RpcError::CapabilityDenied { target, capability } => {
assert_eq!(target, server.node_id());
assert_eq!(capability, "echo");
}
other => panic!("expected CapabilityDenied, got {other:?}"),
}
}
#[tokio::test]
async fn serve_rpc_spawned_reannounce_propagates_nrpc_tag_to_peers() {
use net::adapter::net::behavior::fold::capability_bridge::find_nodes_matching;
use net::adapter::net::behavior::CapabilityFilter;
let server = build_node().await;
let peer = build_node().await;
handshake_pair(&peer, &server).await;
let _serve = server
.serve_rpc("echo", Arc::new(EchoHandler))
.expect("serve_rpc");
let filter = CapabilityFilter::default().require_tag("nrpc:echo".to_string());
let server_id = server.node_id();
let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
loop {
if find_nodes_matching(peer.capability_fold(), &filter).contains(&server_id) {
return;
}
if tokio::time::Instant::now() > deadline {
panic!(
"spawned re-announce did not propagate `nrpc:echo` from server \
{server_id:#x} to peer's capability index within 3s; either \
the spawn was dropped or broadcast regressed",
);
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
}
#[tokio::test]
async fn callee_bridge_denial_bumps_capability_denied_metric() {
use net::adapter::net::behavior::CapabilityAnnouncement;
let server = build_node().await;
let caller = build_node().await;
handshake_pair(&caller, &server).await;
let _serve = server
.serve_rpc("echo", Arc::new(EchoHandler))
.expect("serve_rpc");
let caps_permissive = CapabilitySet::new().add_tag("nrpc:echo");
let permissive = CapabilityAnnouncement::new(
server.node_id(),
server.entity_id().clone(),
50,
caps_permissive,
);
caller.test_inject_capability_announcement(permissive);
let caps_restrictive = CapabilitySet::new().add_tag("nrpc:echo");
let mut restrictive = CapabilityAnnouncement::new(
server.node_id(),
server.entity_id().clone(),
100,
caps_restrictive,
);
restrictive.allowed_nodes = vec![0xDEAD_BEEF_BAAD_F00D];
server.test_inject_capability_announcement(restrictive);
let err = caller
.call(
server.node_id(),
"echo",
Bytes::from_static(b"bypass"),
CallOptions {
deadline: Some(std::time::Instant::now() + Duration::from_secs(2)),
..Default::default()
},
)
.await
.expect_err("callee-side gate must deny");
assert!(
matches!(err, RpcError::CapabilityDenied { .. }),
"expected CapabilityDenied, got {err:?}",
);
let snap = server.rpc_metrics_snapshot();
let echo = snap
.services
.iter()
.find(|s| s.service == "echo")
.expect("echo service tracked in registry");
assert!(
echo.capability_denied_total >= 1,
"bridge denial must bump capability_denied_total; got snapshot {echo:?}",
);
assert_eq!(
echo.handler_invocations_total, 0,
"handler must not run on denied calls; got {} invocations",
echo.handler_invocations_total,
);
}