#![cfg(all(feature = "net", feature = "cortex"))]
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use net::adapter::net::behavior::predicate::{
predicate_from_rpc_headers, predicate_to_rpc_header, EvalContext, RPC_WHERE_HEADER,
};
use net::adapter::net::behavior::tag::{Tag, TaxonomyAxis};
use net::adapter::net::cortex::{
RpcContext, RpcHandler, RpcHandlerError, RpcResponsePayload, RpcStatus,
};
use net::adapter::net::mesh_rpc::{CallOptions, RpcError};
use net::adapter::net::{EntityKeypair, MeshNode, MeshNodeConfig, SocketBufferConfig};
use net::pred;
const TEST_BUFFER_SIZE: usize = 256 * 1024;
const PSK: [u8; 32] = [0x42u8; 32];
fn test_config() -> MeshNodeConfig {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut cfg = MeshNodeConfig::new(addr, PSK)
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(5))
.with_handshake(3, Duration::from_secs(2))
.with_capability_gc_interval(Duration::from_millis(250));
cfg.socket_buffers = SocketBufferConfig {
send_buffer_size: TEST_BUFFER_SIZE,
recv_buffer_size: TEST_BUFFER_SIZE,
};
cfg
}
async fn build_node() -> Arc<MeshNode> {
let cfg = test_config();
let keypair = EntityKeypair::generate();
Arc::new(MeshNode::new(keypair, cfg).await.expect("MeshNode::new"))
}
async fn handshake_pair(a: &Arc<MeshNode>, b: &Arc<MeshNode>) {
let a_id = a.node_id();
let b_id = b.node_id();
let b_pub = *b.public_key();
let b_addr = b.local_addr();
let b_clone = b.clone();
let accept = tokio::spawn(async move { b_clone.accept(a_id).await });
a.connect(b_addr, &b_pub, b_id)
.await
.expect("connect failed");
accept
.await
.expect("accept task panicked")
.expect("accept failed");
a.start();
b.start();
}
fn synthetic_corpus() -> Vec<(Vec<Tag>, BTreeMap<String, String>)> {
fn tag_present(axis: TaxonomyAxis, key: &str) -> Tag {
Tag::AxisPresent {
axis,
key: key.to_string(),
}
}
fn tag_value(axis: TaxonomyAxis, key: &str, value: &str) -> Tag {
Tag::AxisValue {
axis,
key: key.to_string(),
value: value.to_string(),
separator: net::adapter::net::behavior::tag::AxisSeparator::Eq,
}
}
fn meta(pairs: &[(&str, &str)]) -> BTreeMap<String, String> {
pairs
.iter()
.map(|(k, v)| ((*k).to_string(), (*v).to_string()))
.collect()
}
vec![
(
vec![
tag_present(TaxonomyAxis::Hardware, "gpu"),
tag_value(TaxonomyAxis::Hardware, "memory_gb", "64"),
],
meta(&[("intent", "ml-training")]),
),
(
vec![
tag_present(TaxonomyAxis::Hardware, "gpu"),
tag_value(TaxonomyAxis::Hardware, "memory_gb", "16"),
],
meta(&[("intent", "ml-training")]),
),
(
vec![tag_value(TaxonomyAxis::Hardware, "memory_gb", "64")],
meta(&[("intent", "ml-training")]),
),
(
vec![
tag_present(TaxonomyAxis::Hardware, "gpu"),
tag_value(TaxonomyAxis::Hardware, "memory_gb", "64"),
],
meta(&[("intent", "billing")]),
),
]
}
struct WhereFilterHandler;
#[async_trait::async_trait]
impl RpcHandler for WhereFilterHandler {
async fn call(&self, ctx: RpcContext) -> Result<RpcResponsePayload, RpcHandlerError> {
let pred = match predicate_from_rpc_headers(&ctx.payload.headers) {
Some(Ok(p)) => p,
Some(Err(e)) => {
return Err(RpcHandlerError::Application {
code: 0xC000,
message: format!("predicate header decode failed: {e}"),
});
}
None => {
return Ok(RpcResponsePayload {
status: RpcStatus::Ok,
headers: vec![],
body: serde_json::to_vec::<Vec<u32>>(&vec![])
.expect("empty json")
.into(),
});
}
};
let corpus = synthetic_corpus();
let mut matches: Vec<u32> = Vec::new();
for (idx, (tags, metadata)) in corpus.iter().enumerate() {
let ctx = EvalContext::new(tags, metadata);
if pred.evaluate(&ctx) {
matches.push(idx as u32);
}
}
Ok(RpcResponsePayload {
status: RpcStatus::Ok,
headers: vec![],
body: serde_json::to_vec(&matches).expect("matches json").into(),
})
}
}
#[tokio::test]
async fn predicate_header_round_trip_filters_corpus() {
let server = build_node().await;
let caller = build_node().await;
handshake_pair(&caller, &server).await;
let _serve = server
.serve_rpc("where", Arc::new(WhereFilterHandler))
.expect("serve_rpc");
let p = pred!(and [
pred!(exists "hardware.gpu"),
pred!(num_at_least "hardware.memory_gb", 32.0),
pred!(metadata_equals "intent", "ml-training"),
]);
let header = predicate_to_rpc_header(&p).expect("encode");
assert_eq!(header.0, RPC_WHERE_HEADER);
let opts = CallOptions {
request_headers: vec![header],
..CallOptions::default()
};
let reply = caller
.call(server.node_id(), "where", Bytes::new(), opts)
.await
.expect("call must succeed (Ok status surfaces as Ok(RpcReply))");
let matches: Vec<u32> = serde_json::from_slice(&reply.body).expect("body decodes as Vec<u32>");
assert_eq!(
matches,
vec![0],
"only candidate 0 (GPU + 64 GB + ml-training) should match the AND predicate",
);
}
#[tokio::test]
async fn predicate_header_admits_multiple_candidates() {
let server = build_node().await;
let caller = build_node().await;
handshake_pair(&caller, &server).await;
let _serve = server
.serve_rpc("where", Arc::new(WhereFilterHandler))
.expect("serve_rpc");
let p = pred!(exists "hardware.gpu");
let header = predicate_to_rpc_header(&p).expect("encode");
let opts = CallOptions {
request_headers: vec![header],
..CallOptions::default()
};
let reply = caller
.call(server.node_id(), "where", Bytes::new(), opts)
.await
.expect("call");
let matches: Vec<u32> = serde_json::from_slice(&reply.body).expect("body");
assert_eq!(
matches,
vec![0, 1, 3],
"every GPU-bearing candidate matches"
);
}
#[tokio::test]
async fn predicate_header_absent_returns_empty_matches() {
let server = build_node().await;
let caller = build_node().await;
handshake_pair(&caller, &server).await;
let _serve = server
.serve_rpc("where", Arc::new(WhereFilterHandler))
.expect("serve_rpc");
let reply = caller
.call(
server.node_id(),
"where",
Bytes::new(),
CallOptions::default(),
)
.await
.expect("call");
let matches: Vec<u32> = serde_json::from_slice(&reply.body).expect("body");
assert!(matches.is_empty(), "no header → empty matches");
}
#[tokio::test]
async fn predicate_header_malformed_surfaces_typed_error() {
let server = build_node().await;
let caller = build_node().await;
handshake_pair(&caller, &server).await;
let _serve = server
.serve_rpc("where", Arc::new(WhereFilterHandler))
.expect("serve_rpc");
let opts = CallOptions {
request_headers: vec![(RPC_WHERE_HEADER.to_string(), b"{not-json".to_vec())],
..CallOptions::default()
};
let err = caller
.call(server.node_id(), "where", Bytes::new(), opts)
.await
.expect_err("malformed header must surface as a server error");
match err {
RpcError::ServerError { status, message } => {
assert_ne!(status, 0, "non-Ok wire status");
assert!(
message.contains("predicate header decode failed"),
"diagnostic must carry the decode-failure message, got: {message}",
);
}
other => panic!("expected ServerError, got: {other:?}"),
}
}