#![cfg(all(feature = "net", feature = "meshdb"))]
#![allow(
clippy::disallowed_methods,
reason = "test code legitimately uses std::sync::Mutex for SUT setup; no real poison concern"
)]
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use net::adapter::net::behavior::meshdb::planner::{CostEstimate, ExecutionPlan, OperatorNode};
use net::adapter::net::behavior::meshdb::{
enable_meshdb_on_mesh, ChainReader, FederatedMeshQueryExecutor, LocalMeshQueryExecutor,
MeshDbServer, MeshQueryExecutor, OperatorPlan, SeqNum, MESHDB_SERVER_BATCH_ROWS,
};
use net::adapter::net::{EntityKeypair, MeshNode, MeshNodeConfig, SocketBufferConfig};
const TEST_BUFFER_SIZE: usize = 256 * 1024;
const PSK: [u8; 32] = [0x42u8; 32];
fn test_config() -> MeshNodeConfig {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut cfg = MeshNodeConfig::new(addr, PSK)
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(5))
.with_handshake(3, Duration::from_secs(2))
.with_capability_gc_interval(Duration::from_millis(250));
cfg.socket_buffers = SocketBufferConfig {
send_buffer_size: TEST_BUFFER_SIZE,
recv_buffer_size: TEST_BUFFER_SIZE,
};
cfg
}
async fn build_node() -> Arc<MeshNode> {
let keypair = EntityKeypair::generate();
let node = MeshNode::new(keypair, test_config())
.await
.expect("MeshNode::new");
Arc::new(node)
}
async fn handshake(a: &Arc<MeshNode>, b: &Arc<MeshNode>) {
let a_id = a.node_id();
let b_id = b.node_id();
let b_pub = *b.public_key();
let b_addr = b.local_addr();
let b_clone = b.clone();
let accept = tokio::spawn(async move { b_clone.accept(a_id).await });
a.connect(b_addr, &b_pub, b_id)
.await
.expect("connect failed");
accept
.await
.expect("accept task panicked")
.expect("accept failed");
a.start();
b.start();
}
#[derive(Default)]
struct InMemoryChainReader {
chains: Mutex<BTreeMap<u64, BTreeMap<SeqNum, Vec<u8>>>>,
}
impl InMemoryChainReader {
fn append(&self, origin: u64, seq: SeqNum, payload: Vec<u8>) {
self.chains
.lock()
.unwrap()
.entry(origin)
.or_default()
.insert(seq, payload);
}
}
impl ChainReader for InMemoryChainReader {
fn read_one(&self, origin: u64, seq: SeqNum) -> Option<Vec<u8>> {
self.chains.lock().unwrap().get(&origin)?.get(&seq).cloned()
}
fn read_range(&self, origin: u64, start: SeqNum, end: SeqNum) -> Vec<(SeqNum, Vec<u8>)> {
self.chains
.lock()
.unwrap()
.get(&origin)
.map(|c| c.range(start..end).map(|(s, p)| (*s, p.clone())).collect())
.unwrap_or_default()
}
fn latest_seq(&self, origin: u64) -> Option<SeqNum> {
self.chains
.lock()
.unwrap()
.get(&origin)?
.keys()
.next_back()
.copied()
}
}
fn atomic_plan(op: OperatorPlan, target: u64) -> ExecutionPlan {
ExecutionPlan {
root: OperatorNode {
operator: op,
target_nodes: vec![target],
cost: CostEstimate::default(),
},
total_cost: CostEstimate::default(),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn federated_latest_query_over_real_wire() {
let a = build_node().await;
let b = build_node().await;
handshake(&a, &b).await;
let reader = Arc::new(InMemoryChainReader::default());
reader.append(0xCAFE_BABE, SeqNum(7), b"hello-wire".to_vec());
let executor: Arc<dyn MeshQueryExecutor> = Arc::new(LocalMeshQueryExecutor::new(reader));
let server = MeshDbServer::new(executor);
let (_dispatcher_b, _transport_b) = enable_meshdb_on_mesh(&b, Some(server.clone()));
let (_dispatcher_a, transport_a) = enable_meshdb_on_mesh(&a, None);
let fed_a = FederatedMeshQueryExecutor::new(transport_a);
let plan = atomic_plan(
OperatorPlan::LatestRead {
origin: 0xCAFE_BABE,
},
b.node_id(),
);
let running = fed_a
.execute(plan)
.await
.expect("federated execute over the wire");
use futures::StreamExt;
let mut rows = Vec::new();
let mut stream = running.rows;
let drain = async {
while let Some(item) = stream.next().await {
rows.push(item.expect("row"));
}
};
tokio::time::timeout(Duration::from_secs(5), drain)
.await
.expect("drain timed out");
assert_eq!(rows.len(), 1, "expected exactly one row, got {rows:?}");
assert_eq!(rows[0].origin, 0xCAFE_BABE);
assert_eq!(rows[0].seq, SeqNum(7));
assert_eq!(rows[0].payload, b"hello-wire");
let deadline = std::time::Instant::now() + Duration::from_secs(2);
while server.inflight_calls() != 0 && std::time::Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(20)).await;
}
assert_eq!(server.inflight_calls(), 0);
assert!(a.has_meshdb_inbound_router());
assert!(b.has_meshdb_inbound_router());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn federated_latest_query_over_wire_returns_empty_for_unknown_origin() {
let a = build_node().await;
let b = build_node().await;
handshake(&a, &b).await;
let reader = Arc::new(InMemoryChainReader::default()); let executor: Arc<dyn MeshQueryExecutor> = Arc::new(LocalMeshQueryExecutor::new(reader));
let server = MeshDbServer::new(executor);
let (_d, _t) = enable_meshdb_on_mesh(&b, Some(server));
let (_d, transport) = enable_meshdb_on_mesh(&a, None);
let fed = FederatedMeshQueryExecutor::new(transport);
let plan = atomic_plan(
OperatorPlan::LatestRead {
origin: 0xDEAD_BEEF,
},
b.node_id(),
);
let running = fed.execute(plan).await.expect("federated execute");
use futures::StreamExt;
let mut rows = Vec::new();
let mut stream = running.rows;
let drain = async {
while let Some(item) = stream.next().await {
rows.push(item.expect("row"));
}
};
tokio::time::timeout(Duration::from_secs(5), drain)
.await
.expect("drain timed out");
assert!(rows.is_empty(), "expected zero rows, got {rows:?}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn federated_query_with_no_server_eventually_terminates() {
let a = build_node().await;
let b = build_node().await;
handshake(&a, &b).await;
let (_d_b, _t_b) = enable_meshdb_on_mesh(&b, None);
let (_d_a, transport) = enable_meshdb_on_mesh(&a, None);
let fed = FederatedMeshQueryExecutor::new(transport);
let plan = atomic_plan(OperatorPlan::LatestRead { origin: 0xCAFE }, b.node_id());
let running = fed.execute(plan).await.expect("federated execute");
use futures::StreamExt;
let mut stream = running.rows;
let timeout_result = tokio::time::timeout(Duration::from_millis(500), stream.next()).await;
match timeout_result {
Err(_) => {
}
Ok(None) => {
}
Ok(Some(Err(_))) => {
}
Ok(Some(Ok(row))) => panic!("unexpected row from a server-less peer: {row:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn federated_between_query_over_wire_streams_multiple_batches() {
let a = build_node().await;
let b = build_node().await;
handshake(&a, &b).await;
let reader = Arc::new(InMemoryChainReader::default());
let total: u64 = (MESHDB_SERVER_BATCH_ROWS as u64) * 2 + 17;
for seq in 1..=total {
reader.append(0xFEED, SeqNum(seq), format!("row-{seq}").into_bytes());
}
let executor: Arc<dyn MeshQueryExecutor> = Arc::new(LocalMeshQueryExecutor::new(reader));
let server = MeshDbServer::new(executor);
let (_db, _tb) = enable_meshdb_on_mesh(&b, Some(server.clone()));
let (_da, transport) = enable_meshdb_on_mesh(&a, None);
let fed = FederatedMeshQueryExecutor::new(transport);
let plan = atomic_plan(
OperatorPlan::BetweenRead {
origin: 0xFEED,
start: SeqNum(1),
end: SeqNum(total + 1),
},
b.node_id(),
);
let running = fed.execute(plan).await.expect("federated execute");
use futures::StreamExt;
let mut rows = Vec::new();
let mut stream = running.rows;
let drain = async {
while let Some(item) = stream.next().await {
rows.push(item.expect("row"));
}
};
tokio::time::timeout(Duration::from_secs(10), drain)
.await
.expect("drain timed out");
assert_eq!(
rows.len(),
total as usize,
"every row across the batch boundary must surface",
);
let seqs: Vec<u64> = rows.iter().map(|r| r.seq.0).collect();
let expected: Vec<u64> = (1..=total).collect();
assert_eq!(seqs, expected, "rows must stay in seq order across batches");
let deadline = std::time::Instant::now() + Duration::from_secs(2);
while server.inflight_calls() != 0 && std::time::Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(20)).await;
}
assert_eq!(server.inflight_calls(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn federated_query_over_wire_propagates_executor_error() {
use async_trait::async_trait;
use net::adapter::net::behavior::meshdb::{
error::MeshError,
executor::{ExecuteOptions, RunningQuery},
};
struct AlwaysErrorExecutor;
#[async_trait]
impl MeshQueryExecutor for AlwaysErrorExecutor {
async fn execute(&self, _plan: ExecutionPlan) -> Result<RunningQuery, MeshError> {
Err(MeshError::ExecutorError {
node: 0xB,
detail: "synthetic-server-failure".to_string(),
})
}
async fn execute_with(
&self,
_plan: ExecutionPlan,
_options: ExecuteOptions,
) -> Result<RunningQuery, MeshError> {
Err(MeshError::ExecutorError {
node: 0xB,
detail: "synthetic-server-failure".to_string(),
})
}
}
let a = build_node().await;
let b = build_node().await;
handshake(&a, &b).await;
let executor: Arc<dyn MeshQueryExecutor> = Arc::new(AlwaysErrorExecutor);
let server = MeshDbServer::new(executor);
let (_db, _tb) = enable_meshdb_on_mesh(&b, Some(server.clone()));
let (_da, transport) = enable_meshdb_on_mesh(&a, None);
let fed = FederatedMeshQueryExecutor::new(transport);
let plan = atomic_plan(OperatorPlan::LatestRead { origin: 0xBADC0DE }, b.node_id());
let running = fed.execute(plan).await.expect("federated execute");
use futures::StreamExt;
let mut errors = Vec::new();
let mut rows = Vec::new();
let mut stream = running.rows;
let drain = async {
while let Some(item) = stream.next().await {
match item {
Ok(r) => rows.push(r),
Err(e) => errors.push(e),
}
}
};
tokio::time::timeout(Duration::from_secs(5), drain)
.await
.expect("drain timed out");
assert!(rows.is_empty(), "errored query must not emit rows");
assert_eq!(
errors.len(),
1,
"expected exactly one typed error; got {errors:?}",
);
let rendered = format!("{}", errors[0]);
assert!(
rendered.contains("synthetic-server-failure"),
"error must carry executor's detail; got {rendered:?}",
);
let deadline = std::time::Instant::now() + Duration::from_secs(2);
while server.inflight_calls() != 0 && std::time::Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(20)).await;
}
assert_eq!(server.inflight_calls(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn federated_query_caller_cancels_via_handle_clears_both_sides() {
let a = build_node().await;
let b = build_node().await;
handshake(&a, &b).await;
let reader = Arc::new(InMemoryChainReader::default());
let total: u64 = (MESHDB_SERVER_BATCH_ROWS as u64) * 4;
for seq in 1..=total {
reader.append(0xC0DE, SeqNum(seq), b"x".to_vec());
}
let executor: Arc<dyn MeshQueryExecutor> = Arc::new(LocalMeshQueryExecutor::new(reader));
let server = MeshDbServer::new(executor);
let (_db, _tb) = enable_meshdb_on_mesh(&b, Some(server.clone()));
let (_da, transport) = enable_meshdb_on_mesh(&a, None);
let fed = FederatedMeshQueryExecutor::new(transport);
let plan = atomic_plan(
OperatorPlan::BetweenRead {
origin: 0xC0DE,
start: SeqNum(1),
end: SeqNum(total + 1),
},
b.node_id(),
);
let running = fed.execute(plan).await.expect("federated execute");
let handle = running.handle.clone();
use futures::StreamExt;
let mut stream = running.rows;
let mut got = 0usize;
while let Some(item) = stream.next().await {
if item.is_ok() {
got += 1;
}
if got >= 3 {
handle.cancel();
break;
}
}
let drain = async {
while let Some(item) = stream.next().await {
let _ = item;
}
};
tokio::time::timeout(Duration::from_secs(5), drain)
.await
.expect("post-cancel drain timed out");
drop(stream);
let deadline = std::time::Instant::now() + Duration::from_secs(2);
while server.inflight_calls() != 0 && std::time::Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(20)).await;
}
assert_eq!(
server.inflight_calls(),
0,
"server-side inflight must clear after caller cancels",
);
}