#![cfg(feature = "net")]
#![allow(
clippy::disallowed_methods,
reason = "test code legitimately uses std::sync::Mutex for SUT setup; no real poison concern"
)]
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use net::adapter::net::{NetAdapterConfig, StaticKeypair};
use net::adapter::Adapter;
use net::event::{batch_process_nonce, Batch, InternalEvent};
use tokio::net::UdpSocket;
use tokio::sync::Barrier;
const TEST_BUFFER_SIZE: usize = 256 * 1024;
struct NodeIdentity {
keypair: StaticKeypair,
#[allow(dead_code)]
port: u16,
addr: SocketAddr,
}
impl NodeIdentity {
fn new(port: u16) -> Self {
let keypair = StaticKeypair::generate();
let addr: SocketAddr = format!("127.0.0.1:{}", port).parse().unwrap();
Self {
keypair,
port,
addr,
}
}
}
struct Link {
initiator: net::adapter::net::NetAdapter,
responder: net::adapter::net::NetAdapter,
}
async fn find_ports(n: usize) -> Vec<u16> {
let mut ports = Vec::with_capacity(n);
let mut sockets = Vec::with_capacity(n);
for _ in 0..n {
let sock = UdpSocket::bind("127.0.0.1:0").await.unwrap();
ports.push(sock.local_addr().unwrap().port());
sockets.push(sock);
}
drop(sockets);
ports
}
fn create_link_configs(
initiator: &NodeIdentity,
responder: &NodeIdentity,
psk: &[u8; 32],
) -> (NetAdapterConfig, NetAdapterConfig) {
let init_cfg = NetAdapterConfig::initiator(
initiator.addr,
responder.addr,
*psk,
responder.keypair.public,
)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(5))
.with_socket_buffers(TEST_BUFFER_SIZE, TEST_BUFFER_SIZE);
let resp_cfg = NetAdapterConfig::responder(
responder.addr,
initiator.addr,
*psk,
responder.keypair.clone(),
)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(5))
.with_socket_buffers(TEST_BUFFER_SIZE, TEST_BUFFER_SIZE);
(init_cfg, resp_cfg)
}
async fn connect_link(init_cfg: NetAdapterConfig, resp_cfg: NetAdapterConfig) -> Link {
let barrier = Arc::new(Barrier::new(2));
let rb = barrier.clone();
let resp_handle = tokio::spawn(async move {
let mut adapter = net::adapter::net::NetAdapter::new(resp_cfg).unwrap();
rb.wait().await;
adapter.init().await.expect("responder init failed");
adapter
});
let ib = barrier.clone();
let init_handle = tokio::spawn(async move {
let mut adapter = net::adapter::net::NetAdapter::new(init_cfg).unwrap();
ib.wait().await;
adapter.init().await.expect("initiator init failed");
adapter
});
let timeout = Duration::from_secs(10);
let (resp, init) =
tokio::time::timeout(timeout, futures::future::join(resp_handle, init_handle))
.await
.expect("handshake timed out");
Link {
initiator: init.expect("initiator task panicked"),
responder: resp.expect("responder task panicked"),
}
}
struct Triangle {
link_ab: Link,
link_ac: Link,
link_bc: Link,
}
impl Triangle {
async fn setup() -> Self {
let ports = find_ports(6).await;
let psk = [0x42u8; 32];
let a_for_ab = NodeIdentity::new(ports[0]);
let b_for_ab = NodeIdentity::new(ports[1]);
let a_for_ac = NodeIdentity::new(ports[2]);
let c_for_ac = NodeIdentity::new(ports[3]);
let b_for_bc = NodeIdentity::new(ports[4]);
let c_for_bc = NodeIdentity::new(ports[5]);
let (ab_init, ab_resp) = create_link_configs(&a_for_ab, &b_for_ab, &psk);
let (ac_init, ac_resp) = create_link_configs(&a_for_ac, &c_for_ac, &psk);
let (bc_init, bc_resp) = create_link_configs(&b_for_bc, &c_for_bc, &psk);
let (link_ab, link_ac, link_bc) = tokio::join!(
connect_link(ab_init, ab_resp),
connect_link(ac_init, ac_resp),
connect_link(bc_init, bc_resp),
);
Triangle {
link_ab,
link_ac,
link_bc,
}
}
async fn shutdown(self) {
let futs = vec![
self.link_ab.initiator.shutdown(),
self.link_ab.responder.shutdown(),
self.link_ac.initiator.shutdown(),
self.link_ac.responder.shutdown(),
self.link_bc.initiator.shutdown(),
self.link_bc.responder.shutdown(),
];
for fut in futs {
let _ = fut.await;
}
}
}
fn make_batch(shard_id: u16, count: usize, tag: &str) -> Batch {
let events: Vec<InternalEvent> = (0..count)
.map(|i| {
InternalEvent::from_value(
serde_json::json!({"tag": tag, "index": i}),
i as u64,
shard_id,
)
})
.collect();
Batch {
shard_id,
events,
sequence_start: 0,
process_nonce: batch_process_nonce(),
}
}
#[tokio::test]
async fn test_three_node_handshake() {
let triangle = Triangle::setup().await;
assert!(
triangle.link_ab.initiator.is_healthy().await,
"A→B initiator unhealthy"
);
assert!(
triangle.link_ab.responder.is_healthy().await,
"A→B responder unhealthy"
);
assert!(
triangle.link_ac.initiator.is_healthy().await,
"A→C initiator unhealthy"
);
assert!(
triangle.link_ac.responder.is_healthy().await,
"A→C responder unhealthy"
);
assert!(
triangle.link_bc.initiator.is_healthy().await,
"B→C initiator unhealthy"
);
assert!(
triangle.link_bc.responder.is_healthy().await,
"B→C responder unhealthy"
);
triangle.shutdown().await;
}
#[tokio::test]
async fn test_three_node_health_after_one_shutdown() {
let triangle = Triangle::setup().await;
let _ = triangle.link_ac.responder.shutdown().await;
let _ = triangle.link_bc.responder.shutdown().await;
tokio::time::sleep(Duration::from_millis(300)).await;
assert!(
triangle.link_ab.initiator.is_healthy().await,
"A→B should remain healthy after C dies"
);
assert!(
triangle.link_ab.responder.is_healthy().await,
"B→A should remain healthy after C dies"
);
let _ = triangle.link_ab.initiator.shutdown().await;
let _ = triangle.link_ab.responder.shutdown().await;
let _ = triangle.link_ac.initiator.shutdown().await;
let _ = triangle.link_bc.initiator.shutdown().await;
}
#[tokio::test]
async fn test_data_flow_a_to_b() {
let triangle = Triangle::setup().await;
tokio::time::sleep(Duration::from_millis(100)).await;
let batch = make_batch(0, 10, "a_to_b");
triangle
.link_ab
.initiator
.on_batch(batch.into())
.await
.expect("A→B send failed");
tokio::time::sleep(Duration::from_millis(500)).await;
let result = triangle
.link_ab
.responder
.poll_shard(0, None, 100)
.await
.expect("B poll failed");
assert!(
!result.events.is_empty(),
"B should receive events from A, got {}",
result.events.len()
);
triangle.shutdown().await;
}
#[tokio::test]
async fn test_data_flow_a_to_b_and_c() {
let triangle = Triangle::setup().await;
tokio::time::sleep(Duration::from_millis(100)).await;
let batch_ab = make_batch(0, 10, "to_b");
let batch_ac = make_batch(0, 10, "to_c");
let (send_ab, send_ac) = tokio::join!(
triangle.link_ab.initiator.on_batch(batch_ab.into()),
triangle.link_ac.initiator.on_batch(batch_ac.into()),
);
send_ab.expect("A→B send failed");
send_ac.expect("A→C send failed");
tokio::time::sleep(Duration::from_millis(500)).await;
let b_result = triangle
.link_ab
.responder
.poll_shard(0, None, 100)
.await
.expect("B poll failed");
let c_result = triangle
.link_ac
.responder
.poll_shard(0, None, 100)
.await
.expect("C poll failed");
assert!(
!b_result.events.is_empty(),
"B should receive events from A"
);
assert!(
!c_result.events.is_empty(),
"C should receive events from A"
);
triangle.shutdown().await;
}
#[tokio::test]
async fn test_bidirectional_simultaneous() {
let triangle = Triangle::setup().await;
tokio::time::sleep(Duration::from_millis(100)).await;
let batch_a_to_b = make_batch(0, 10, "a_sends");
let batch_b_to_a = make_batch(0, 10, "b_sends");
triangle
.link_ab
.initiator
.on_batch(batch_a_to_b.into())
.await
.expect("A→B send failed");
tokio::time::sleep(Duration::from_millis(100)).await;
triangle
.link_ab
.responder
.on_batch(batch_b_to_a.into())
.await
.expect("B→A send failed");
tokio::time::sleep(Duration::from_millis(1000)).await;
let b_received = triangle
.link_ab
.responder
.poll_shard(0, None, 100)
.await
.expect("B poll shard 0 failed");
let a_received = triangle
.link_ab
.initiator
.poll_shard(0, None, 100)
.await
.expect("A poll shard 0 failed");
assert!(
!b_received.events.is_empty(),
"B should receive A's events, got {}",
b_received.events.len()
);
assert!(
!a_received.events.is_empty(),
"A should receive B's events, got {}",
a_received.events.len()
);
triangle.shutdown().await;
}
#[tokio::test]
async fn test_independent_streams_no_interference() {
let triangle = Triangle::setup().await;
tokio::time::sleep(Duration::from_millis(100)).await;
let batch_s1 = make_batch(0, 10, "stream_s1_for_b");
triangle
.link_ab
.initiator
.on_batch(batch_s1.into())
.await
.expect("S1 send failed");
let batch_s2 = make_batch(0, 10, "stream_s2_for_c");
triangle
.link_ac
.initiator
.on_batch(batch_s2.into())
.await
.expect("S2 send failed");
tokio::time::sleep(Duration::from_millis(500)).await;
let b_events = triangle
.link_ab
.responder
.poll_shard(0, None, 100)
.await
.expect("B poll failed");
let c_events = triangle
.link_ac
.responder
.poll_shard(0, None, 100)
.await
.expect("C poll failed");
for event in &b_events.events {
let json: serde_json::Value = event.parse().unwrap();
assert_eq!(
json["tag"], "stream_s1_for_b",
"B received event from wrong stream: {:?}",
json
);
}
for event in &c_events.events {
let json: serde_json::Value = event.parse().unwrap();
assert_eq!(
json["tag"], "stream_s2_for_c",
"C received event from wrong stream: {:?}",
json
);
}
assert!(!b_events.events.is_empty(), "B should receive S1 events");
assert!(!c_events.events.is_empty(), "C should receive S2 events");
triangle.shutdown().await;
}
#[tokio::test]
async fn test_full_ring_traffic() {
let triangle = Triangle::setup().await;
tokio::time::sleep(Duration::from_millis(100)).await;
let batch_ab = make_batch(0, 10, "a_to_b");
let batch_bc = make_batch(0, 10, "b_to_c");
let batch_ca = make_batch(0, 10, "c_to_a");
triangle
.link_ab
.initiator
.on_batch(batch_ab.into())
.await
.expect("A→B failed");
triangle
.link_bc
.initiator
.on_batch(batch_bc.into())
.await
.expect("B→C failed");
triangle
.link_ac
.responder
.on_batch(batch_ca.into())
.await
.expect("C→A failed");
tokio::time::sleep(Duration::from_millis(1000)).await;
let b_got = triangle
.link_ab
.responder
.poll_shard(0, None, 100)
.await
.expect("B poll failed");
let c_got = triangle
.link_bc
.responder
.poll_shard(0, None, 100)
.await
.expect("C poll failed");
let a_got = triangle
.link_ac
.initiator
.poll_shard(0, None, 100)
.await
.expect("A poll failed");
assert!(
!b_got.events.is_empty(),
"B should receive from A, got {}",
b_got.events.len()
);
assert!(
!c_got.events.is_empty(),
"C should receive from B, got {}",
c_got.events.len()
);
assert!(
!a_got.events.is_empty(),
"A should receive from C, got {}",
a_got.events.len()
);
triangle.shutdown().await;
}
#[tokio::test]
async fn test_sustained_throughput_all_links() {
let triangle = Triangle::setup().await;
tokio::time::sleep(Duration::from_millis(100)).await;
let batch_ab = make_batch(0, 100, "ab");
let batch_ac = make_batch(0, 100, "ac");
let batch_bc = make_batch(0, 100, "bc");
let (r1, r2, r3) = tokio::join!(
triangle.link_ab.initiator.on_batch(batch_ab.into()),
triangle.link_ac.initiator.on_batch(batch_ac.into()),
triangle.link_bc.initiator.on_batch(batch_bc.into()),
);
r1.expect("A→B failed");
r2.expect("A→C failed");
r3.expect("B→C failed");
tokio::time::sleep(Duration::from_millis(1000)).await;
let b_count = triangle
.link_ab
.responder
.poll_shard(0, None, 1000)
.await
.expect("B poll failed")
.events
.len();
let c_from_a = triangle
.link_ac
.responder
.poll_shard(0, None, 1000)
.await
.expect("C poll A failed")
.events
.len();
let c_from_b = triangle
.link_bc
.responder
.poll_shard(0, None, 1000)
.await
.expect("C poll B failed")
.events
.len();
assert!(
b_count >= 50,
"B should receive most of A's 100 events, got {}",
b_count
);
assert!(
c_from_a >= 50,
"C should receive most of A's 100 events, got {}",
c_from_a
);
assert!(
c_from_b >= 50,
"C should receive most of B's 100 events, got {}",
c_from_b
);
triangle.shutdown().await;
}
#[tokio::test]
async fn test_failure_detection_on_node_shutdown() {
let triangle = Triangle::setup().await;
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(triangle.link_ab.initiator.is_healthy().await);
assert!(triangle.link_ab.responder.is_healthy().await);
let shutdown_start = std::time::Instant::now();
let _ = triangle.link_ab.responder.shutdown().await;
let max_wait = Duration::from_secs(10);
let poll_interval = Duration::from_millis(100);
let mut detected = false;
loop {
if shutdown_start.elapsed() > max_wait {
break;
}
if !triangle.link_ab.initiator.is_healthy().await {
detected = true;
break;
}
tokio::time::sleep(poll_interval).await;
}
let detection_time = shutdown_start.elapsed();
assert!(
detected,
"A should detect B's failure within {:?}, gave up after {:?}",
max_wait, detection_time
);
assert!(
detection_time > Duration::from_millis(50),
"Detection too fast ({:?}) — likely a bug, not real heartbeat detection",
detection_time
);
let _ = triangle.link_ab.initiator.shutdown().await;
let _ = triangle.link_ac.initiator.shutdown().await;
let _ = triangle.link_ac.responder.shutdown().await;
let _ = triangle.link_bc.initiator.shutdown().await;
let _ = triangle.link_bc.responder.shutdown().await;
}
#[tokio::test]
async fn test_data_flow_survives_node_death() {
let triangle = Triangle::setup().await;
tokio::time::sleep(Duration::from_millis(100)).await;
let _ = triangle.link_ab.responder.shutdown().await;
let _ = triangle.link_bc.initiator.shutdown().await;
tokio::time::sleep(Duration::from_millis(500)).await;
let batch = make_batch(0, 20, "after_b_death");
triangle
.link_ac
.initiator
.on_batch(batch.into())
.await
.expect("A→C send should work after B dies");
tokio::time::sleep(Duration::from_millis(500)).await;
let c_got = triangle
.link_ac
.responder
.poll_shard(0, None, 100)
.await
.expect("C poll failed");
assert!(
!c_got.events.is_empty(),
"C should receive events from A after B dies, got {}",
c_got.events.len()
);
let _ = triangle.link_ab.initiator.shutdown().await;
let _ = triangle.link_ac.initiator.shutdown().await;
let _ = triangle.link_ac.responder.shutdown().await;
let _ = triangle.link_bc.responder.shutdown().await;
}
use bytes::{BufMut, Bytes, BytesMut};
use net::adapter::net::{
NetRouter, RouteAction, RouterConfig, RouterError, RoutingHeader, ROUTING_HEADER_SIZE,
};
fn build_routed_packet(dest_id: u64, src_id: u32, ttl: u8, payload: &[u8]) -> Bytes {
let header = RoutingHeader::new(dest_id, src_id, ttl);
let mut buf = BytesMut::with_capacity(ROUTING_HEADER_SIZE + payload.len());
header.write_to(&mut buf);
buf.put_slice(payload);
buf.freeze()
}
#[tokio::test]
async fn test_router_forwarding_through_middle_node() {
let ports = find_ports(3).await;
let node_a: u64 = 0x1111;
let node_b: u64 = 0x2222;
let node_c: u64 = 0x3333;
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let router_b = NetRouter::new(RouterConfig::new(node_b, addr_b))
.await
.expect("router B failed to bind");
router_b.add_route(node_c, addr_c);
let send_handle = router_b.start();
let payload = b"hello from A to C";
let packet = build_routed_packet(node_c, node_a as u32, 4, payload);
let sock_a = UdpSocket::bind(addr_a).await.unwrap();
sock_a.send_to(&packet, addr_b).await.unwrap();
let mut recv_buf = vec![0u8; 8192];
let (n, from) = tokio::time::timeout(Duration::from_secs(2), router_b.recv_from(&mut recv_buf))
.await
.expect("recv timed out")
.expect("recv failed");
let data = Bytes::copy_from_slice(&recv_buf[..n]);
let action = router_b.route_packet(data, from).expect("route failed");
match action {
RouteAction::Forwarded(dest) => {
assert_eq!(dest, addr_c, "should forward to C's address");
}
RouteAction::Local(_) => panic!("should not be local delivery"),
}
let stats = router_b.stats();
assert_eq!(stats.packets_received, 1);
assert_eq!(stats.packets_forwarded, 1);
assert_eq!(stats.packets_local, 0);
router_b.stop();
if let Some(h) = send_handle {
let _ = h.await;
}
}
#[tokio::test]
async fn test_router_local_delivery() {
let ports = find_ports(2).await;
let node_a: u64 = 0x1111;
let node_b: u64 = 0x2222;
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let router_b = NetRouter::new(RouterConfig::new(node_b, addr_b))
.await
.expect("router B failed to bind");
let payload = b"for B directly";
let packet = build_routed_packet(node_b, node_a as u32, 4, payload);
let sock_a = UdpSocket::bind(addr_a).await.unwrap();
sock_a.send_to(&packet, addr_b).await.unwrap();
let mut recv_buf = vec![0u8; 8192];
let (n, from) = tokio::time::timeout(Duration::from_secs(2), router_b.recv_from(&mut recv_buf))
.await
.expect("recv timed out")
.expect("recv failed");
let data = Bytes::copy_from_slice(&recv_buf[..n]);
let action = router_b.route_packet(data, from).expect("route failed");
match action {
RouteAction::Local(local_data) => {
assert_eq!(&local_data[..], payload);
}
RouteAction::Forwarded(_) => panic!("should be local delivery, not forwarded"),
}
let stats = router_b.stats();
assert_eq!(stats.packets_local, 1);
assert_eq!(stats.packets_forwarded, 0);
}
#[tokio::test]
async fn test_router_ttl_expiry() {
let ports = find_ports(3).await;
let node_a: u64 = 0x1111;
let node_b: u64 = 0x2222;
let node_c: u64 = 0x3333;
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let router_b = NetRouter::new(RouterConfig::new(node_b, addr_b))
.await
.expect("router B failed to bind");
router_b.add_route(node_c, addr_c);
let packet = build_routed_packet(node_c, node_a as u32, 0, b"should expire");
let result = router_b.route_packet(packet, "127.0.0.1:0".parse().unwrap());
assert!(
matches!(result, Err(RouterError::TtlExpired)),
"expected TtlExpired, got {:?}",
result
);
let stats = router_b.stats();
assert_eq!(
stats.packets_dropped, 1,
"TTL-expired packet should be counted as dropped"
);
assert_eq!(
stats.packets_forwarded, 0,
"TTL-expired packet should not be forwarded"
);
}
#[tokio::test]
async fn test_router_hop_count_incremented() {
let ports = find_ports(3).await;
let node_a: u64 = 0x1111;
let node_b: u64 = 0x2222;
let node_c: u64 = 0x3333;
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let router_b = NetRouter::new(RouterConfig::new(node_b, addr_b))
.await
.expect("router B failed to bind");
router_b.add_route(node_c, addr_c);
let send_handle = router_b.start();
let packet = build_routed_packet(node_c, node_a as u32, 4, b"hop test");
let action = router_b
.route_packet(packet, "127.0.0.1:0".parse().unwrap())
.unwrap();
assert!(matches!(action, RouteAction::Forwarded(_)));
let sock_c = UdpSocket::bind(addr_c).await.unwrap();
let mut recv_buf = vec![0u8; 8192];
let (n, _) = tokio::time::timeout(Duration::from_secs(2), sock_c.recv_from(&mut recv_buf))
.await
.expect("C recv timed out")
.expect("C recv failed");
let fwd_header = RoutingHeader::from_bytes(&recv_buf[..n])
.expect("invalid routing header in forwarded packet");
assert_eq!(
fwd_header.hop_count, 1,
"hop_count should be 1 after one forward"
);
assert_eq!(fwd_header.ttl, 3, "TTL should be decremented from 4 to 3");
assert_eq!(fwd_header.dest_id, node_c, "dest_id should be unchanged");
router_b.stop();
if let Some(h) = send_handle {
let _ = h.await;
}
}
#[tokio::test]
async fn test_router_no_route() {
let ports = find_ports(1).await;
let node_b: u64 = 0x2222;
let unknown_dest: u64 = 0x9999;
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let router_b = NetRouter::new(RouterConfig::new(node_b, addr_b))
.await
.expect("router B failed to bind");
let packet = build_routed_packet(unknown_dest, 0x1111, 4, b"no route");
let result = router_b.route_packet(packet, "127.0.0.1:0".parse().unwrap());
assert!(
matches!(result, Err(RouterError::NoRoute)),
"expected NoRoute, got {:?}",
result
);
}
#[tokio::test]
async fn test_eventbus_over_net_full_stack() {
let ports = find_ports(2).await;
let psk = [0x42u8; 32];
let responder_keypair = StaticKeypair::generate();
let sender_addr: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let receiver_addr: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let sender_net =
NetAdapterConfig::initiator(sender_addr, receiver_addr, psk, responder_keypair.public)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(10))
.with_socket_buffers(TEST_BUFFER_SIZE, TEST_BUFFER_SIZE);
let sender_config = net::config::EventBusConfig::builder()
.num_shards(2)
.ring_buffer_capacity(1024)
.adapter(net::config::AdapterConfig::Net(Box::new(sender_net)))
.without_scaling()
.build()
.unwrap();
let receiver_net =
NetAdapterConfig::responder(receiver_addr, sender_addr, psk, responder_keypair)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(10))
.with_socket_buffers(TEST_BUFFER_SIZE, TEST_BUFFER_SIZE);
let barrier = Arc::new(Barrier::new(2));
let rb = barrier.clone();
let receiver_handle = tokio::spawn(async move {
let mut adapter = net::adapter::net::NetAdapter::new(receiver_net).unwrap();
rb.wait().await;
adapter.init().await.expect("receiver init failed");
tokio::time::sleep(Duration::from_millis(2000)).await;
let shard0 = adapter
.poll_shard(0, None, 1000)
.await
.expect("poll 0 failed");
let shard1 = adapter
.poll_shard(1, None, 1000)
.await
.expect("poll 1 failed");
adapter.shutdown().await.expect("receiver shutdown failed");
shard0.events.len() + shard1.events.len()
});
let sb = barrier.clone();
let sender_handle = tokio::spawn(async move {
sb.wait().await;
let bus = net::EventBus::new(sender_config)
.await
.expect("sender EventBus failed");
tokio::time::sleep(Duration::from_millis(500)).await;
for i in 0..50 {
let event =
net::event::Event::new(serde_json::json!({"index": i, "source": "eventbus"}));
bus.ingest(event).unwrap();
}
bus.flush().await.expect("flush failed");
tokio::time::sleep(Duration::from_millis(1000)).await;
bus.shutdown().await.expect("sender shutdown failed");
});
let timeout = Duration::from_secs(15);
let (recv_result, send_result) = tokio::time::timeout(
timeout,
futures::future::join(receiver_handle, sender_handle),
)
.await
.expect("full-stack test timed out");
send_result.expect("sender panicked");
let received = recv_result.expect("receiver panicked");
assert!(
received >= 25,
"receiver should get most of 50 events through the full stack, got {}",
received
);
}
#[tokio::test]
async fn test_backpressure_ring_buffer_survives_flood() {
let config = net::config::EventBusConfig::builder()
.num_shards(2)
.ring_buffer_capacity(1024) .without_scaling()
.build()
.unwrap();
let bus = net::EventBus::new(config).await.unwrap();
let mut ingested = 0u64;
let mut _dropped = 0u64;
for i in 0..10_000 {
let event = net::event::Event::new(serde_json::json!({"flood": i}));
match bus.ingest(event) {
Ok(_) => ingested += 1,
Err(_) => _dropped += 1,
}
}
assert!(ingested > 0, "at least some events should be ingested");
tokio::time::sleep(Duration::from_millis(100)).await;
let post_flood = net::event::Event::new(serde_json::json!({"after": "flood"}));
let result = bus.ingest(post_flood);
assert!(
result.is_ok(),
"bus should accept events after flood subsides"
);
let stats = bus.stats();
let total_ingested = stats
.events_ingested
.load(std::sync::atomic::Ordering::Relaxed);
assert!(total_ingested > 0, "stats should reflect ingested events");
bus.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_eventbus_bidirectional_over_net() {
let ports = find_ports(2).await;
let psk = [0x42u8; 32];
let keypair_b = StaticKeypair::generate();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let net_a = NetAdapterConfig::initiator(addr_a, addr_b, psk, keypair_b.public)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(10))
.with_socket_buffers(TEST_BUFFER_SIZE, TEST_BUFFER_SIZE);
let net_b = NetAdapterConfig::responder(addr_b, addr_a, psk, keypair_b)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(10))
.with_socket_buffers(TEST_BUFFER_SIZE, TEST_BUFFER_SIZE);
let config_a = net::config::EventBusConfig::builder()
.num_shards(2)
.ring_buffer_capacity(1024)
.adapter(net::config::AdapterConfig::Net(Box::new(net_a)))
.without_scaling()
.build()
.unwrap();
let barrier = Arc::new(Barrier::new(2));
let bb = barrier.clone();
let b_handle = tokio::spawn(async move {
let mut adapter = net::adapter::net::NetAdapter::new(net_b).unwrap();
bb.wait().await;
adapter.init().await.expect("B init failed");
tokio::time::sleep(Duration::from_millis(500)).await;
let events: Vec<InternalEvent> = (0..20)
.map(|i| InternalEvent::from_value(serde_json::json!({"from": "B", "i": i}), i, 0))
.collect();
adapter
.on_batch(
Batch {
shard_id: 0,
events,
sequence_start: 0,
process_nonce: batch_process_nonce(),
}
.into(),
)
.await
.expect("B send failed");
tokio::time::sleep(Duration::from_millis(1500)).await;
let received = adapter
.poll_shard(0, None, 1000)
.await
.expect("B poll failed");
adapter.shutdown().await.expect("B shutdown failed");
received.events.len()
});
let ab = barrier.clone();
let a_handle = tokio::spawn(async move {
ab.wait().await;
let bus = net::EventBus::new(config_a)
.await
.expect("A EventBus failed");
tokio::time::sleep(Duration::from_millis(500)).await;
for i in 0..20 {
bus.ingest(net::event::Event::new(
serde_json::json!({"from": "A", "i": i}),
))
.unwrap();
}
bus.flush().await.expect("A flush failed");
tokio::time::sleep(Duration::from_millis(1500)).await;
let response = bus
.poll(net::ConsumeRequest::new(1000))
.await
.expect("A poll failed");
bus.shutdown().await.expect("A shutdown failed");
response.events.len()
});
let timeout = Duration::from_secs(15);
let (b_result, a_result) =
tokio::time::timeout(timeout, futures::future::join(b_handle, a_handle))
.await
.expect("bidirectional test timed out");
let b_received = b_result.expect("B panicked");
let _a_received = a_result.expect("A panicked");
assert!(
b_received > 0,
"B should receive A's events through EventBus pipeline, got {}",
b_received
);
}
#[tokio::test]
async fn test_router_end_to_end_forwarding() {
let ports = find_ports(3).await;
let node_a: u64 = 0x1111;
let node_b: u64 = 0x2222;
let node_c: u64 = 0x3333;
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let router_b = NetRouter::new(RouterConfig::new(node_b, addr_b))
.await
.expect("router B bind failed");
router_b.add_route(node_c, addr_c);
let send_handle = router_b.start();
let sock_c = UdpSocket::bind(addr_c).await.unwrap();
let sock_a = UdpSocket::bind(addr_a).await.unwrap();
for i in 0..10u8 {
let payload = format!("packet-{}", i);
let packet = build_routed_packet(node_c, node_a as u32, 4, payload.as_bytes());
sock_a.send_to(&packet, addr_b).await.unwrap();
}
let mut recv_buf = vec![0u8; 8192];
for _ in 0..10 {
let (n, from) =
tokio::time::timeout(Duration::from_secs(2), router_b.recv_from(&mut recv_buf))
.await
.expect("B recv timed out")
.expect("B recv failed");
let data = Bytes::copy_from_slice(&recv_buf[..n]);
let _ = router_b.route_packet(data, from);
}
tokio::time::sleep(Duration::from_millis(200)).await;
let mut received = 0;
while let Ok(Ok((n, _))) =
tokio::time::timeout(Duration::from_millis(500), sock_c.recv_from(&mut recv_buf)).await
{
let hdr = RoutingHeader::from_bytes(&recv_buf[..n]);
assert!(
hdr.is_some(),
"forwarded packet should have valid routing header"
);
let hdr = hdr.unwrap();
assert_eq!(hdr.dest_id, node_c);
assert_eq!(hdr.hop_count, 1, "should have 1 hop from B");
received += 1;
}
assert!(
received >= 5,
"C should receive most of 10 forwarded packets, got {}",
received
);
router_b.stop();
if let Some(h) = send_handle {
let _ = h.await;
}
}
#[tokio::test]
async fn test_router_multi_hop_two_routers() {
let ports = find_ports(3).await;
let node_a: u64 = 0x1111;
let node_b: u64 = 0x2222;
let node_c: u64 = 0x3333;
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let router_b = NetRouter::new(RouterConfig::new(node_b, addr_b))
.await
.expect("router B bind failed");
router_b.add_route(node_c, addr_c);
let send_b = router_b.start();
let router_c = NetRouter::new(RouterConfig::new(node_c, addr_c))
.await
.expect("router C bind failed");
let sock_a = UdpSocket::bind(addr_a).await.unwrap();
let packet = build_routed_packet(node_c, node_a as u32, 4, b"multi-hop-test");
sock_a.send_to(&packet, addr_b).await.unwrap();
let mut buf = vec![0u8; 8192];
let (n, from) = tokio::time::timeout(Duration::from_secs(2), router_b.recv_from(&mut buf))
.await
.expect("B recv timed out")
.expect("B recv failed");
let data = Bytes::copy_from_slice(&buf[..n]);
let action = router_b.route_packet(data, from).expect("B route failed");
assert!(matches!(action, RouteAction::Forwarded(_)));
tokio::time::sleep(Duration::from_millis(200)).await;
let (n, from) = tokio::time::timeout(Duration::from_secs(2), router_c.recv_from(&mut buf))
.await
.expect("C recv timed out")
.expect("C recv failed");
let data = Bytes::copy_from_slice(&buf[..n]);
let action = router_c.route_packet(data, from).expect("C route failed");
match action {
RouteAction::Local(payload) => {
assert_eq!(
&payload[..],
b"multi-hop-test",
"payload should survive 2 hops"
);
}
RouteAction::Forwarded(_) => panic!("C should deliver locally, not forward"),
}
let stats_b = router_b.stats();
assert_eq!(stats_b.packets_forwarded, 1);
let stats_c = router_c.stats();
assert_eq!(stats_c.packets_local, 1);
router_b.stop();
if let Some(h) = send_b {
let _ = h.await;
}
}
use net::adapter::net::{
ChannelConfig, ChannelConfigRegistry, ChannelId, DropReason, ForwardDecision, SubnetGateway,
SubnetId, Visibility,
};
fn register_channel(registry: &ChannelConfigRegistry, name: &str, vis: Visibility) -> u16 {
let id = ChannelId::parse(name).expect("invalid channel name");
let wire = id.wire_hash();
let config = ChannelConfig::new(id).with_visibility(vis);
registry.insert(config);
wire
}
#[tokio::test]
async fn test_subnet_gateway_blocks_local_traffic() {
let subnet_ab = SubnetId::new(&[3, 1]);
let subnet_c = SubnetId::new(&[3, 2]);
let registry = std::sync::Arc::new(ChannelConfigRegistry::new());
let local_hash = register_channel(®istry, "internal/metrics", Visibility::SubnetLocal);
let gateway = SubnetGateway::new(subnet_ab, registry);
let decision = gateway.should_forward(subnet_ab, subnet_c, local_hash, 8, 0);
assert_eq!(
decision,
ForwardDecision::Drop(DropReason::SubnetLocal),
"SubnetLocal traffic must not cross subnet boundary"
);
}
#[tokio::test]
async fn test_subnet_gateway_forwards_global_traffic() {
let subnet_ab = SubnetId::new(&[3, 1]);
let subnet_c = SubnetId::new(&[3, 2]);
let registry = std::sync::Arc::new(ChannelConfigRegistry::new());
let global_hash = register_channel(®istry, "events/global", Visibility::Global);
let gateway = SubnetGateway::new(subnet_ab, registry);
let decision = gateway.should_forward(subnet_ab, subnet_c, global_hash, 8, 0);
assert_eq!(
decision,
ForwardDecision::Forward,
"Global traffic must cross subnet boundary"
);
}
#[tokio::test]
async fn test_subnet_gateway_exported_selective() {
let subnet_ab = SubnetId::new(&[3, 1]);
let subnet_c = SubnetId::new(&[3, 2]);
let subnet_d = SubnetId::new(&[3, 3]);
let registry = std::sync::Arc::new(ChannelConfigRegistry::new());
let export_hash = register_channel(®istry, "data/shared", Visibility::Exported);
let gateway = SubnetGateway::new(subnet_ab, registry);
gateway.add_peer(subnet_c);
gateway.add_peer(subnet_d);
gateway.export_channel(export_hash, vec![subnet_c]);
let to_c = gateway.should_forward(subnet_ab, subnet_c, export_hash, 8, 0);
assert_eq!(
to_c,
ForwardDecision::Forward,
"exported to C should forward"
);
let to_d = gateway.should_forward(subnet_ab, subnet_d, export_hash, 8, 0);
assert_eq!(
to_d,
ForwardDecision::Drop(DropReason::NotExported),
"not exported to D should drop"
);
}
#[tokio::test]
async fn test_subnet_gateway_parent_visible() {
let child = SubnetId::new(&[3, 1, 2]);
let parent = SubnetId::new(&[3, 1]);
let sibling = SubnetId::new(&[3, 2]);
let registry = std::sync::Arc::new(ChannelConfigRegistry::new());
let hash = register_channel(®istry, "status/reports", Visibility::ParentVisible);
let gateway = SubnetGateway::new(child, registry);
let to_parent = gateway.should_forward(child, parent, hash, 8, 0);
assert_eq!(
to_parent,
ForwardDecision::Forward,
"child to parent should forward"
);
let to_sibling = gateway.should_forward(child, sibling, hash, 8, 0);
assert_eq!(
to_sibling,
ForwardDecision::Drop(DropReason::NotAncestor),
"child to sibling should drop"
);
}
#[tokio::test]
async fn test_subnet_gateway_stats() {
let subnet_a = SubnetId::new(&[1]);
let subnet_b = SubnetId::new(&[2]);
let registry = std::sync::Arc::new(ChannelConfigRegistry::new());
let local_hash = register_channel(®istry, "chan/local", Visibility::SubnetLocal);
let global_hash = register_channel(®istry, "chan/global", Visibility::Global);
let gateway = SubnetGateway::new(subnet_a, registry);
for _ in 0..5 {
gateway.should_forward(subnet_a, subnet_b, local_hash, 8, 0);
}
for _ in 0..3 {
gateway.should_forward(subnet_a, subnet_b, global_hash, 8, 0);
}
assert_eq!(gateway.forwarded_count(), 3, "3 global packets forwarded");
assert_eq!(gateway.dropped_count(), 5, "5 local packets dropped");
}
use net::adapter::net::{CorrelatedFailureConfig, CorrelatedFailureDetector, CorrelationVerdict};
#[tokio::test]
async fn test_correlated_failure_independent() {
let config = CorrelatedFailureConfig {
correlation_window: Duration::from_secs(2),
mass_failure_threshold: 0.3,
subnet_correlation_threshold: 0.8,
max_concurrent_migrations: 3,
};
let mut detector = CorrelatedFailureDetector::new(config);
for i in 0..10u64 {
detector.register_node(i, SubnetId::new(&[(i as u8) % 4]));
}
let verdict = detector.record_failures(&[0, 1], 10);
assert!(
matches!(verdict, CorrelationVerdict::Independent { .. }),
"2/10 = 20% < 30% threshold = Independent, got {:?}",
verdict
);
let budget = detector.recovery_budget();
assert_eq!(
budget,
usize::MAX,
"independent failures get unlimited budget"
);
}
#[tokio::test]
async fn test_correlated_failure_mass() {
let config = CorrelatedFailureConfig {
correlation_window: Duration::from_secs(2),
mass_failure_threshold: 0.3,
subnet_correlation_threshold: 0.8,
max_concurrent_migrations: 3,
};
let mut detector = CorrelatedFailureDetector::new(config);
for i in 0..10u64 {
detector.register_node(i, SubnetId::new(&[1]));
}
let verdict = detector.record_failures(&[0, 1, 2, 3], 10);
assert!(
matches!(verdict, CorrelationVerdict::MassFailure { .. }),
"4/10 = 40% > 30% threshold = MassFailure, got {:?}",
verdict
);
let budget = detector.recovery_budget();
assert_eq!(
budget, 3,
"mass failure throttles to max_concurrent_migrations"
);
assert!(detector.in_mass_failure());
}
#[tokio::test]
async fn test_correlated_failure_recovery_resets() {
let config = CorrelatedFailureConfig {
correlation_window: Duration::from_secs(2),
mass_failure_threshold: 0.3,
subnet_correlation_threshold: 0.8,
max_concurrent_migrations: 2,
};
let mut detector = CorrelatedFailureDetector::new(config);
for i in 0..10u64 {
detector.register_node(i, SubnetId::new(&[1]));
}
detector.record_failures(&[0, 1, 2, 3, 4], 10);
assert!(detector.in_mass_failure());
assert_eq!(detector.recovery_budget(), 2);
detector.clear_window();
let verdict = detector.record_failures(&[8], 10);
assert!(matches!(verdict, CorrelationVerdict::Independent { .. }));
assert!(!detector.in_mass_failure());
assert_eq!(detector.recovery_budget(), usize::MAX);
}
use net::adapter::net::{
FailureDetector, FailureDetectorConfig, MeshNode, MeshNodeConfig, NodeStatus,
};
#[tokio::test]
async fn test_failure_detector_lifecycle() {
let config = FailureDetectorConfig {
timeout: Duration::from_millis(100),
miss_threshold: 3,
suspicion_threshold: 1,
cleanup_interval: Duration::from_secs(60),
};
let failed_nodes = Arc::new(std::sync::Mutex::new(Vec::<u64>::new()));
let recovered_nodes = Arc::new(std::sync::Mutex::new(Vec::<u64>::new()));
let failed_cb = failed_nodes.clone();
let recovered_cb = recovered_nodes.clone();
let detector = FailureDetector::with_config(config)
.on_failure(move |id| failed_cb.lock().unwrap().push(id))
.on_recovery(move |id| recovered_cb.lock().unwrap().push(id));
let node_a: u64 = 0x1111;
let node_b: u64 = 0x2222;
let node_c: u64 = 0x3333;
let addr: SocketAddr = "127.0.0.1:1234".parse().unwrap();
detector.heartbeat(node_a, addr);
detector.heartbeat(node_b, addr);
detector.heartbeat(node_c, addr);
assert_eq!(detector.status(node_a), NodeStatus::Healthy);
assert_eq!(detector.status(node_b), NodeStatus::Healthy);
assert_eq!(detector.status(node_c), NodeStatus::Healthy);
tokio::time::sleep(Duration::from_millis(150)).await;
detector.heartbeat(node_a, addr);
detector.heartbeat(node_c, addr);
let newly_failed = detector.check_all();
let b_status = detector.status(node_b);
assert!(
b_status == NodeStatus::Suspected || b_status == NodeStatus::Failed,
"B should be suspected or failed after timeout, got {:?}",
b_status
);
tokio::time::sleep(Duration::from_millis(300)).await;
detector.heartbeat(node_a, addr);
detector.heartbeat(node_c, addr);
let _ = detector.check_all();
assert_eq!(
detector.status(node_b),
NodeStatus::Failed,
"B should be failed after sustained silence"
);
assert!(
failed_nodes.lock().unwrap().contains(&node_b),
"failure callback should have been called for B"
);
detector.heartbeat(node_b, addr);
assert_eq!(
detector.status(node_b),
NodeStatus::Healthy,
"B should recover after heartbeat"
);
assert!(
recovered_nodes.lock().unwrap().contains(&node_b),
"recovery callback should have been called for B"
);
assert_eq!(detector.status(node_a), NodeStatus::Healthy);
assert_eq!(detector.status(node_c), NodeStatus::Healthy);
assert!(newly_failed.is_empty() || newly_failed == vec![node_b]);
}
use net::adapter::net::{Pingwave, PINGWAVE_SIZE};
#[tokio::test]
async fn test_pingwave_forwarding_chain() {
let node_a: u64 = 0x1111;
let pw = Pingwave::new(node_a, 1, 3);
assert_eq!(pw.origin_id, node_a);
assert_eq!(pw.ttl, 3);
assert_eq!(pw.hop_count, 0);
assert!(!pw.is_expired());
let bytes = pw.to_bytes();
assert_eq!(bytes.len(), PINGWAVE_SIZE);
let mut pw_at_b = Pingwave::from_bytes(&bytes).expect("deserialization failed");
assert_eq!(pw_at_b.origin_id, node_a);
assert!(pw_at_b.forward());
assert_eq!(pw_at_b.ttl, 2);
assert_eq!(pw_at_b.hop_count, 1);
let mut pw_at_c = pw_at_b;
assert!(pw_at_c.forward());
assert_eq!(pw_at_c.ttl, 1);
assert_eq!(pw_at_c.hop_count, 2);
let mut pw_at_d = pw_at_c;
assert!(pw_at_d.forward());
assert_eq!(pw_at_d.ttl, 0);
assert_eq!(pw_at_d.hop_count, 3);
assert!(!pw_at_d.forward(), "TTL=0 should refuse to forward");
assert!(pw_at_d.is_expired());
}
#[tokio::test]
async fn test_pingwave_over_udp() {
let ports = find_ports(3).await;
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let sock_a = UdpSocket::bind(addr_a).await.unwrap();
let sock_b = UdpSocket::bind(addr_b).await.unwrap();
let sock_c = UdpSocket::bind(addr_c).await.unwrap();
let pw = Pingwave::new(0x1111, 42, 3);
let bytes = pw.to_bytes();
sock_a.send_to(&bytes, addr_b).await.unwrap();
sock_a.send_to(&bytes, addr_c).await.unwrap();
let mut buf = vec![0u8; 64];
let (n, from) = tokio::time::timeout(Duration::from_secs(2), sock_b.recv_from(&mut buf))
.await
.expect("B recv timed out")
.expect("B recv failed");
let pw_b = Pingwave::from_bytes(&buf[..n]).expect("B: invalid pingwave");
assert_eq!(pw_b.origin_id, 0x1111);
assert_eq!(pw_b.seq, 42);
assert_eq!(from, addr_a);
let (n, from) = tokio::time::timeout(Duration::from_secs(2), sock_c.recv_from(&mut buf))
.await
.expect("C recv timed out")
.expect("C recv failed");
let pw_c = Pingwave::from_bytes(&buf[..n]).expect("C: invalid pingwave");
assert_eq!(pw_c.origin_id, 0x1111);
assert_eq!(from, addr_a);
}
use net::adapter::net::identity::EntityKeypair;
#[tokio::test]
async fn test_mesh_node_two_node_data_exchange() {
let ports = find_ports(2).await;
let psk = [0x42u8; 32];
let identity_a = EntityKeypair::generate();
let identity_b = EntityKeypair::generate();
let node_id_a = identity_a.node_id();
let node_id_b = identity_b.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let config_a = MeshNodeConfig::new(addr_a, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(10));
let config_b = MeshNodeConfig::new(addr_b, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(10));
let node_a = MeshNode::new(identity_a, config_a).await.unwrap();
let node_b = MeshNode::new(identity_b, config_b).await.unwrap();
let pubkey_b = *node_b.public_key();
let (accept_result, connect_result) = tokio::join!(node_b.accept(node_id_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_b, &pubkey_b, node_id_b).await
},);
accept_result.expect("B accept failed");
connect_result.expect("A connect failed");
node_a.start();
node_b.start();
assert_eq!(node_a.peer_count(), 1);
assert_eq!(node_b.peer_count(), 1);
tokio::time::sleep(Duration::from_millis(100)).await;
let batch = make_batch(0, 20, "mesh_test");
node_a.send_to_peer(addr_b, &batch).await.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
let result = node_b.poll_shard(0, None, 100).await.unwrap();
assert!(
!result.events.is_empty(),
"B should receive events from A via MeshNode, got {}",
result.events.len()
);
for event in &result.events {
let json: serde_json::Value = event.parse().unwrap();
assert_eq!(json["tag"], "mesh_test");
}
node_a.shutdown().await.unwrap();
node_b.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_mesh_node_triangle() {
let ports = find_ports(3).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let id_c = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let nid_c = id_c.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let mk_config = |addr: SocketAddr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(10))
};
let node_a = MeshNode::new(id_a, mk_config(addr_a)).await.unwrap();
let node_b = MeshNode::new(id_b, mk_config(addr_b)).await.unwrap();
let node_c = MeshNode::new(id_c, mk_config(addr_c)).await.unwrap();
let pub_b = *node_b.public_key();
let pub_c = *node_c.public_key();
let (accept_result, connect_result) = tokio::join!(node_b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_b, &pub_b, nid_b).await
},);
accept_result.expect("B accept A failed");
connect_result.expect("A connect B failed");
let (accept_result, connect_result) = tokio::join!(node_c.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_c, &pub_c, nid_c).await
},);
accept_result.expect("C accept A failed");
connect_result.expect("A connect C failed");
node_a.start();
node_b.start();
node_c.start();
assert_eq!(node_a.peer_count(), 2, "A should have 2 peers");
assert_eq!(node_b.peer_count(), 1, "B should have 1 peer (A)");
assert_eq!(node_c.peer_count(), 1, "C should have 1 peer (A)");
tokio::time::sleep(Duration::from_millis(100)).await;
node_a
.send_to_peer(addr_b, &make_batch(0, 10, "to_b"))
.await
.unwrap();
node_a
.send_to_peer(addr_c, &make_batch(0, 10, "to_c"))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
let b_events = node_b.poll_shard(0, None, 100).await.unwrap();
let c_events = node_c.poll_shard(0, None, 100).await.unwrap();
assert!(
!b_events.events.is_empty(),
"B should receive from A, got {}",
b_events.events.len()
);
assert!(
!c_events.events.is_empty(),
"C should receive from A, got {}",
c_events.events.len()
);
node_a.shutdown().await.unwrap();
node_b.shutdown().await.unwrap();
node_c.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_mesh_node_relay_through_middle() {
let ports = find_ports(3).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let id_c = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let nid_c = id_c.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let mk_config = |addr: SocketAddr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(10))
};
let node_a = MeshNode::new(id_a, mk_config(addr_a)).await.unwrap();
let node_b = MeshNode::new(id_b, mk_config(addr_b)).await.unwrap();
let node_c = MeshNode::new(id_c, mk_config(addr_c)).await.unwrap();
let pub_b = *node_b.public_key();
let pub_c = *node_c.public_key();
let (r1, r2) = tokio::join!(node_b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_b, &pub_b, nid_b).await
},);
r1.expect("B accept A failed");
r2.expect("A connect B failed");
let (r1, r2) = tokio::join!(node_c.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_c, &pub_c, nid_c).await
},);
r1.expect("C accept A failed");
r2.expect("A connect C failed");
let (r1, r2) = tokio::join!(node_c.accept(nid_b), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_b.connect(addr_c, &pub_c, nid_c).await
},);
r1.expect("C accept B failed");
r2.expect("B connect C failed");
node_a.router().add_route(nid_c, addr_b);
node_b.router().add_route(nid_c, addr_c);
node_a.start();
node_b.start();
node_c.start();
tokio::time::sleep(Duration::from_millis(200)).await;
let batch = make_batch(0, 10, "relayed_from_a");
node_a.send_routed(nid_c, &batch).await.unwrap();
tokio::time::sleep(Duration::from_millis(1500)).await;
let c_result = node_c.poll_shard(0, None, 100).await.unwrap();
assert!(
!c_result.events.is_empty(),
"C should receive relayed events from A through B, got {}",
c_result.events.len()
);
for event in &c_result.events {
let json: serde_json::Value = event.parse().unwrap();
assert_eq!(json["tag"], "relayed_from_a");
}
let b_result = node_b.poll_shard(0, None, 100).await.unwrap();
assert_eq!(
b_result.events.len(),
0,
"B should NOT have decrypted events — it's only a relay"
);
node_a.shutdown().await.unwrap();
node_b.shutdown().await.unwrap();
node_c.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_mesh_relay_preserves_payload_integrity() {
let ports = find_ports(3).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let id_c = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let nid_c = id_c.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(10))
};
let a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let c = MeshNode::new(id_c, mk(addr_c)).await.unwrap();
let pub_b = *b.public_key();
let pub_c = *c.public_key();
let (r1, r2) = tokio::join!(b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(c.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(c.accept(nid_b), async {
tokio::time::sleep(Duration::from_millis(50)).await;
b.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
a.router().add_route(nid_c, addr_b);
b.router().add_route(nid_c, addr_c);
a.start();
b.start();
c.start();
tokio::time::sleep(Duration::from_millis(200)).await;
let batch = make_batch(0, 100, "integrity_check");
a.send_routed(nid_c, &batch).await.unwrap();
tokio::time::sleep(Duration::from_millis(2000)).await;
let result = c.poll_shard(0, None, 1000).await.unwrap();
assert!(
result.events.len() >= 50,
"C should receive most of 100 relayed events, got {}",
result.events.len()
);
for event in &result.events {
let json: serde_json::Value = event.parse().unwrap();
assert_eq!(
json["tag"], "integrity_check",
"payload corrupted during relay"
);
}
a.shutdown().await.unwrap();
b.shutdown().await.unwrap();
c.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_mesh_relay_tamper_detected() {
let ports = find_ports(3).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_c = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_c = id_c.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(10))
};
let a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let c = MeshNode::new(id_c, mk(addr_c)).await.unwrap();
let pub_c = *c.public_key();
let (r1, r2) = tokio::join!(c.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
a.router().add_route(nid_c, addr_b);
a.start();
c.start();
tokio::time::sleep(Duration::from_millis(200)).await;
let evil_b = UdpSocket::bind(addr_b).await.unwrap();
let batch = make_batch(0, 5, "tamper_test");
a.send_routed(nid_c, &batch).await.unwrap();
let mut buf = vec![0u8; 8192];
let (n, _) = tokio::time::timeout(Duration::from_secs(2), evil_b.recv_from(&mut buf))
.await
.expect("B recv timed out")
.expect("B recv failed");
use net::adapter::net::HEADER_SIZE;
let tamper_offset = ROUTING_HEADER_SIZE + HEADER_SIZE + 10;
if n > tamper_offset {
buf[tamper_offset] ^= 0xFF;
}
evil_b.send_to(&buf[..n], addr_c).await.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
let result = c.poll_shard(0, None, 100).await.unwrap();
assert_eq!(
result.events.len(),
0,
"C should reject tampered packets — AEAD tag mismatch. Got {} events",
result.events.len()
);
a.shutdown().await.unwrap();
c.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_mesh_node_failure_detection() {
let ports = find_ports(2).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_millis(500))
};
let a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let pub_b = *b.public_key();
let (r1, r2) = tokio::join!(b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
a.start();
b.start();
tokio::time::sleep(Duration::from_millis(300)).await;
assert_eq!(a.failure_detector().status(nid_b), NodeStatus::Healthy);
b.shutdown().await.unwrap();
tokio::time::sleep(Duration::from_millis(1500)).await;
a.failure_detector().check_all();
let status = a.failure_detector().status(nid_b);
assert!(
status == NodeStatus::Failed || status == NodeStatus::Suspected,
"A should detect B's failure, got {:?}",
status
);
a.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_mesh_node_reroute_on_failure() {
let ports = find_ports(3).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let id_c = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let nid_c = id_c.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(5))
};
let a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let c = MeshNode::new(id_c, mk(addr_c)).await.unwrap();
let pub_b = *b.public_key();
let pub_c = *c.public_key();
let (r1, r2) = tokio::join!(b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(c.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(c.accept(nid_b), async {
tokio::time::sleep(Duration::from_millis(50)).await;
b.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
a.router().add_route(nid_c, addr_b);
b.router().add_route(nid_c, addr_c);
a.start();
b.start();
c.start();
tokio::time::sleep(Duration::from_millis(200)).await;
let batch1 = make_batch(0, 5, "before_failure");
a.send_routed(nid_c, &batch1).await.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
let c_before = c.poll_shard(0, None, 100).await.unwrap();
assert!(
!c_before.events.is_empty(),
"C should receive events via B before failure, got {}",
c_before.events.len()
);
b.shutdown().await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
a.router().remove_route(nid_c);
a.router().add_route(nid_c, addr_c);
let batch2 = make_batch(0, 5, "after_reroute");
a.send_routed(nid_c, &batch2).await.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
let c_after = c.poll_shard(0, None, 100).await.unwrap();
assert!(
!c_after.events.is_empty(),
"C should receive events after reroute, got {}",
c_after.events.len()
);
let has_rerouted = c_after.events.iter().any(|e| {
e.parse()
.map(|v: serde_json::Value| v["tag"] == "after_reroute")
.unwrap_or(false)
});
assert!(has_rerouted, "C should have events tagged 'after_reroute'");
a.shutdown().await.unwrap();
c.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_mesh_node_reroute_no_data_loss() {
let ports = find_ports(3).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let id_c = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let nid_c = id_c.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(10))
};
let a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let c = MeshNode::new(id_c, mk(addr_c)).await.unwrap();
let pub_b = *b.public_key();
let pub_c = *c.public_key();
let (r1, r2) = tokio::join!(b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(c.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(c.accept(nid_b), async {
tokio::time::sleep(Duration::from_millis(50)).await;
b.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
a.router().add_route(nid_c, addr_b);
b.router().add_route(nid_c, addr_c);
a.start();
b.start();
c.start();
tokio::time::sleep(Duration::from_millis(200)).await;
let batch1 = make_batch(0, 10, "phase1_via_relay");
a.send_routed(nid_c, &batch1).await.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
b.shutdown().await.unwrap();
a.router().remove_route(nid_c);
a.router().add_route(nid_c, addr_c);
let batch2 = make_batch(0, 10, "phase2_direct");
a.send_routed(nid_c, &batch2).await.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
let result = c.poll_shard(0, None, 1000).await.unwrap();
let phase1_count = result
.events
.iter()
.filter(|e| {
e.parse()
.map(|v: serde_json::Value| v["tag"] == "phase1_via_relay")
.unwrap_or(false)
})
.count();
let phase2_count = result
.events
.iter()
.filter(|e| {
e.parse()
.map(|v: serde_json::Value| v["tag"] == "phase2_direct")
.unwrap_or(false)
})
.count();
assert!(
phase1_count > 0,
"C should have received phase 1 (relayed) events, got {}",
phase1_count
);
assert!(
phase2_count > 0,
"C should have received phase 2 (direct) events, got {}",
phase2_count
);
a.shutdown().await.unwrap();
c.shutdown().await.unwrap();
}
use net::adapter::net::behavior::capability::CapabilityFilter;
use net::adapter::net::compute::orchestrator::wire as migration_wire;
use net::adapter::net::state::causal::CausalEvent;
use net::adapter::net::{
DaemonError, DaemonFactoryRegistry, DaemonHost, DaemonHostConfig, DaemonRegistry, MeshDaemon,
MigrationMessage, MigrationOrchestrator, MigrationPhase, MigrationSourceHandler,
MigrationSubprotocolHandler, MigrationTargetHandler, SUBPROTOCOL_MIGRATION,
};
struct CounterDaemon {
count: u64,
}
impl CounterDaemon {
fn with_count(count: u64) -> Self {
Self { count }
}
}
impl MeshDaemon for CounterDaemon {
fn name(&self) -> &str {
"counter"
}
fn requirements(&self) -> CapabilityFilter {
CapabilityFilter::default()
}
fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
self.count += 1;
Ok(vec![Bytes::from(self.count.to_le_bytes().to_vec())])
}
fn snapshot(&self) -> Option<Bytes> {
Some(Bytes::from(self.count.to_le_bytes().to_vec()))
}
fn restore(&mut self, state: Bytes) -> Result<(), DaemonError> {
if state.len() != 8 {
return Err(DaemonError::RestoreFailed("bad state size".into()));
}
self.count = u64::from_le_bytes(state[..8].try_into().unwrap());
Ok(())
}
}
#[tokio::test]
async fn test_migration_snapshot_over_wire() {
let ports = find_ports(2).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(10))
};
let registry_b = Arc::new(DaemonRegistry::new());
let daemon_kp = EntityKeypair::generate();
let daemon_origin = daemon_kp.origin_hash();
let host = DaemonHost::new(
Box::new(CounterDaemon::with_count(42)),
daemon_kp.clone(),
DaemonHostConfig::default(),
);
registry_b.register(host).unwrap();
let orchestrator_b = Arc::new(MigrationOrchestrator::new(registry_b.clone(), nid_b));
let source_b = Arc::new(MigrationSourceHandler::new(registry_b.clone()));
let target_b = Arc::new(MigrationTargetHandler::new(registry_b.clone()));
let handler_b = Arc::new(MigrationSubprotocolHandler::new(
orchestrator_b,
source_b.clone(),
target_b,
nid_b,
));
let node_a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let node_b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let pub_b = *node_b.public_key();
node_b.set_migration_handler(handler_b);
let registry_a = Arc::new(DaemonRegistry::new());
let factories_a = Arc::new(DaemonFactoryRegistry::new());
factories_a
.register(daemon_kp.clone(), DaemonHostConfig::default(), || {
Box::new(CounterDaemon::with_count(0))
})
.unwrap();
let orch_a = Arc::new(MigrationOrchestrator::new(registry_a.clone(), nid_a));
let source_a = Arc::new(MigrationSourceHandler::new(registry_a.clone()));
let target_a = Arc::new(MigrationTargetHandler::new_with_factories(
registry_a.clone(),
factories_a,
));
let handler_a = Arc::new(MigrationSubprotocolHandler::new(
orch_a, source_a, target_a, nid_a,
));
node_a.set_migration_handler(handler_a);
let (r1, r2) = tokio::join!(node_b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
node_a.start();
node_b.start();
tokio::time::sleep(Duration::from_millis(200)).await;
let msg = MigrationMessage::TakeSnapshot {
daemon_origin,
target_node: nid_a,
};
let encoded = migration_wire::encode(&msg).unwrap();
node_a
.send_subprotocol(addr_b, SUBPROTOCOL_MIGRATION, &encoded)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(2000)).await;
let second_try = source_b.start_snapshot(daemon_origin, 0x9999, 0x1111);
assert!(
second_try.is_err(),
"B should have processed TakeSnapshot over wire — source handler in snapshot state"
);
node_a.shutdown().await.unwrap();
node_b.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_migration_full_lifecycle_over_wire() {
let ports = find_ports(3).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let id_c = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let nid_c = id_c.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(10))
};
let registry_b = Arc::new(DaemonRegistry::new());
let daemon_kp = EntityKeypair::generate();
let daemon_origin = daemon_kp.origin_hash();
let host = DaemonHost::new(
Box::new(CounterDaemon::with_count(42)),
daemon_kp.clone(),
DaemonHostConfig::default(),
);
registry_b.register(host).unwrap();
let registry_a = Arc::new(DaemonRegistry::new());
let orch_a = Arc::new(MigrationOrchestrator::new(registry_a.clone(), nid_a));
let handler_a = Arc::new(MigrationSubprotocolHandler::new(
orch_a.clone(),
Arc::new(MigrationSourceHandler::new(registry_a.clone())),
Arc::new(MigrationTargetHandler::new(registry_a.clone())),
nid_a,
));
let orch_b = Arc::new(MigrationOrchestrator::new(registry_b.clone(), nid_b));
let source_b = Arc::new(MigrationSourceHandler::new(registry_b.clone()));
let handler_b = Arc::new(MigrationSubprotocolHandler::new(
orch_b,
source_b.clone(),
Arc::new(MigrationTargetHandler::new(registry_b.clone())),
nid_b,
));
let registry_c = Arc::new(DaemonRegistry::new());
let factories_c = Arc::new(DaemonFactoryRegistry::new());
factories_c
.register(daemon_kp.clone(), DaemonHostConfig::default(), || {
Box::new(CounterDaemon::with_count(0))
})
.unwrap();
let handler_c = Arc::new(MigrationSubprotocolHandler::new(
Arc::new(MigrationOrchestrator::new(registry_c.clone(), nid_c)),
Arc::new(MigrationSourceHandler::new(registry_c.clone())),
Arc::new(MigrationTargetHandler::new_with_factories(
registry_c.clone(),
factories_c.clone(),
)),
nid_c,
));
let node_a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let node_b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let node_c = MeshNode::new(id_c, mk(addr_c)).await.unwrap();
let pub_b = *node_b.public_key();
let pub_c = *node_c.public_key();
node_a.set_migration_handler(handler_a);
node_b.set_migration_handler(handler_b);
node_c.set_migration_handler(handler_c);
let (r1, r2) = tokio::join!(node_b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(node_c.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
node_a.start();
node_b.start();
node_c.start();
tokio::time::sleep(Duration::from_millis(200)).await;
orch_a.start_migration(daemon_origin, nid_b, nid_c).unwrap();
assert_eq!(orch_a.status(daemon_origin), Some(MigrationPhase::Snapshot));
let msg = MigrationMessage::TakeSnapshot {
daemon_origin,
target_node: nid_c,
};
let encoded = migration_wire::encode(&msg).unwrap();
node_a
.send_subprotocol(addr_b, SUBPROTOCOL_MIGRATION, &encoded)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(3000)).await;
let source_check = source_b.start_snapshot(daemon_origin, 0x9999, 0x1111);
assert!(
source_check.is_err(),
"B should have processed TakeSnapshot"
);
assert!(
!orch_a.is_migrating(daemon_origin),
"orchestrator record should be removed after ActivateAck"
);
assert!(
registry_c.contains(daemon_origin),
"daemon should be registered on target C after full lifecycle"
);
assert!(
!registry_b.contains(daemon_origin),
"daemon should be unregistered from source B after cleanup"
);
node_a.shutdown().await.unwrap();
node_b.shutdown().await.unwrap();
node_c.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_partition_detection_via_filter() {
let ports = find_ports(2).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_millis(500))
};
let node_a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let node_b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let pub_b = *node_b.public_key();
let (r1, r2) = tokio::join!(node_b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
node_a.start();
node_b.start();
tokio::time::sleep(Duration::from_millis(300)).await;
assert_eq!(node_a.failure_detector().status(nid_b), NodeStatus::Healthy);
node_a.block_peer(addr_b);
assert!(node_a.is_blocked(&addr_b));
tokio::time::sleep(Duration::from_millis(1500)).await;
node_a.failure_detector().check_all();
let status = node_a.failure_detector().status(nid_b);
assert!(
status == NodeStatus::Failed || status == NodeStatus::Suspected,
"A should detect B as failed during partition, got {:?}",
status
);
let batch = make_batch(0, 5, "during_partition");
node_a.send_to_peer(addr_b, &batch).await.unwrap(); tokio::time::sleep(Duration::from_millis(500)).await;
let b_events = node_b.poll_shard(0, None, 100).await.unwrap();
let partition_events: Vec<_> = b_events
.events
.iter()
.filter(|e| {
e.parse()
.map(|v: serde_json::Value| v["tag"] == "during_partition")
.unwrap_or(false)
})
.collect();
assert_eq!(
partition_events.len(),
0,
"B should not receive events sent during partition"
);
node_a.shutdown().await.unwrap();
node_b.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_partition_healing() {
let ports = find_ports(2).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_millis(500))
};
let node_a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let node_b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let pub_b = *node_b.public_key();
let (r1, r2) = tokio::join!(node_b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
node_a.start();
node_b.start();
tokio::time::sleep(Duration::from_millis(300)).await;
node_a.block_peer(addr_b);
tokio::time::sleep(Duration::from_millis(1000)).await;
node_a.unblock_peer(&addr_b);
assert!(!node_a.is_blocked(&addr_b));
tokio::time::sleep(Duration::from_millis(1000)).await;
let status = node_a.failure_detector().status(nid_b);
assert_eq!(
status,
NodeStatus::Healthy,
"B should recover to Healthy after partition heals, got {:?}",
status
);
let batch = make_batch(0, 10, "after_healing");
node_a.send_to_peer(addr_b, &batch).await.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
let b_events = node_b.poll_shard(0, None, 100).await.unwrap();
let healed_events: Vec<_> = b_events
.events
.iter()
.filter(|e| {
e.parse()
.map(|v: serde_json::Value| v["tag"] == "after_healing")
.unwrap_or(false)
})
.collect();
assert!(
!healed_events.is_empty(),
"B should receive events after partition heals"
);
node_a.shutdown().await.unwrap();
node_b.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_partition_asymmetric_three_node() {
let ports = find_ports(3).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let id_c = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let nid_c = id_c.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(5))
};
let node_a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let node_b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let node_c = MeshNode::new(id_c, mk(addr_c)).await.unwrap();
let pub_b = *node_b.public_key();
let pub_c = *node_c.public_key();
let (r1, r2) = tokio::join!(node_b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(node_c.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(node_c.accept(nid_b), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_b.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
node_a.start();
node_b.start();
node_c.start();
tokio::time::sleep(Duration::from_millis(200)).await;
node_a.block_peer(addr_b);
node_a.block_peer(addr_c);
node_b.block_peer(addr_a);
node_c.block_peer(addr_a);
tokio::time::sleep(Duration::from_millis(500)).await;
let batch = make_batch(0, 10, "b_to_c_during_partition");
node_b.send_to_peer(addr_c, &batch).await.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
let c_events = node_c.poll_shard(0, None, 100).await.unwrap();
let bc_events: Vec<_> = c_events
.events
.iter()
.filter(|e| {
e.parse()
.map(|v: serde_json::Value| v["tag"] == "b_to_c_during_partition")
.unwrap_or(false)
})
.collect();
assert!(
!bc_events.is_empty(),
"B→C should work during A's partition, got {} events",
bc_events.len()
);
let batch = make_batch(0, 5, "a_to_b_blocked");
node_a.send_to_peer(addr_b, &batch).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
let b_events = node_b.poll_shard(0, None, 100).await.unwrap();
let blocked_events: Vec<_> = b_events
.events
.iter()
.filter(|e| {
e.parse()
.map(|v: serde_json::Value| v["tag"] == "a_to_b_blocked")
.unwrap_or(false)
})
.collect();
assert_eq!(
blocked_events.len(),
0,
"A→B should be blocked during partition"
);
node_a.shutdown().await.unwrap();
node_b.shutdown().await.unwrap();
node_c.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_mesh_node_auto_reroute() {
let ports = find_ports(3).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let id_c = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let nid_c = id_c.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_millis(600))
};
let node_a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let node_b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let node_c = MeshNode::new(id_c, mk(addr_c)).await.unwrap();
let pub_b = *node_b.public_key();
let pub_c = *node_c.public_key();
let (r1, r2) = tokio::join!(node_b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(node_c.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(node_c.accept(nid_b), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_b.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
node_a.router().add_route(nid_c, addr_b);
node_b.router().add_route(nid_c, addr_c);
node_a.start();
node_b.start();
node_c.start();
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(node_a.reroute_policy().active_reroutes(), 0);
let batch1 = make_batch(0, 5, "before_auto_reroute");
node_a.send_routed(nid_c, &batch1).await.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
let c_before = node_c.poll_shard(0, None, 100).await.unwrap();
assert!(!c_before.events.is_empty(), "C should receive via B");
node_b.shutdown().await.unwrap();
tokio::time::sleep(Duration::from_millis(2500)).await;
node_a.failure_detector().check_all();
assert!(
node_a.reroute_policy().active_reroutes() > 0,
"reroute policy should have rerouted after B's failure"
);
let batch2 = make_batch(0, 5, "after_auto_reroute");
node_a.send_routed(nid_c, &batch2).await.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
let c_after = node_c.poll_shard(0, None, 100).await.unwrap();
let auto_events: Vec<_> = c_after
.events
.iter()
.filter(|e| {
e.parse()
.map(|v: serde_json::Value| v["tag"] == "after_auto_reroute")
.unwrap_or(false)
})
.collect();
assert!(
!auto_events.is_empty(),
"C should receive events after automatic reroute (no manual route update)"
);
node_a.shutdown().await.unwrap();
node_c.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_mesh_node_auto_reroute_recovery() {
let ports = find_ports(3).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let id_c = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let nid_c = id_c.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_millis(600))
};
let node_a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let node_b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let node_c = MeshNode::new(id_c, mk(addr_c)).await.unwrap();
let pub_b = *node_b.public_key();
let pub_c = *node_c.public_key();
let (r1, r2) = tokio::join!(node_b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(node_c.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
node_a.router().add_route(nid_c, addr_b);
node_a.start();
node_b.start();
node_c.start();
tokio::time::sleep(Duration::from_millis(200)).await;
node_a.block_peer(addr_b);
node_b.block_peer(addr_a);
tokio::time::sleep(Duration::from_millis(2500)).await;
node_a.failure_detector().check_all();
assert!(
node_a.reroute_policy().active_reroutes() > 0,
"should auto-reroute"
);
node_a.unblock_peer(&addr_b);
node_b.unblock_peer(&addr_a);
tokio::time::sleep(Duration::from_millis(1000)).await;
assert_eq!(
node_a.reroute_policy().active_reroutes(),
0,
"original route should be restored after B recovers"
);
let next_hop = node_a.router().routing_table().lookup(nid_c);
assert_eq!(
next_hop,
Some(addr_b),
"route to C should be restored through B"
);
node_a.shutdown().await.unwrap();
node_b.shutdown().await.unwrap();
node_c.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_proximity_graph_pingwave_discovery() {
let ports = find_ports(3).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let id_c = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let nid_c = id_c.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(10))
};
let node_a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let node_b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let node_c = MeshNode::new(id_c, mk(addr_c)).await.unwrap();
let pub_b = *node_b.public_key();
let pub_c = *node_c.public_key();
let (r1, r2) = tokio::join!(node_b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(node_c.accept(nid_b), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_b.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
node_a.start();
node_b.start();
node_c.start();
assert_eq!(node_a.proximity_graph().node_count(), 1);
tokio::time::sleep(Duration::from_millis(1500)).await;
let a_nodes = node_a.proximity_graph().node_count();
assert!(
a_nodes >= 2,
"A should discover C via B's pingwave relay, knows {} nodes",
a_nodes
);
node_a.shutdown().await.unwrap();
node_b.shutdown().await.unwrap();
node_c.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_mesh_handshake_via_relay() {
let ports = find_ports(3).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let id_c = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let nid_c = id_c.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(30))
};
let a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let c = MeshNode::new(id_c, mk(addr_c)).await.unwrap();
let pub_b = *b.public_key();
let pub_c = *c.public_key();
let (r1, r2) = tokio::join!(b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_b, &pub_b, nid_b).await
});
r1.expect("B accept A failed");
r2.expect("A connect B failed");
let (r1, r2) = tokio::join!(c.accept(nid_b), async {
tokio::time::sleep(Duration::from_millis(50)).await;
b.connect(addr_c, &pub_c, nid_c).await
});
r1.expect("C accept B failed");
r2.expect("B connect C failed");
a.start();
b.start();
c.start();
let result = a.connect_via(addr_b, &pub_c, nid_c).await;
result.expect("A connect_via B to C failed");
b.router().add_route(nid_a, addr_a);
tokio::time::sleep(Duration::from_millis(200)).await;
let batch = make_batch(0, 10, "via_relay_handshake");
a.send_routed(nid_c, &batch)
.await
.expect("A send_routed to C failed");
tokio::time::sleep(Duration::from_millis(1500)).await;
let c_result = c.poll_shard(0, None, 100).await.unwrap();
assert!(
!c_result.events.is_empty(),
"C should receive events from A via relayed-handshake session, got {}",
c_result.events.len()
);
for event in &c_result.events {
let json: serde_json::Value = event.parse().unwrap();
assert_eq!(json["tag"], "via_relay_handshake");
}
let b_result = b.poll_shard(0, None, 100).await.unwrap();
assert_eq!(
b_result.events.len(),
0,
"B should not decrypt A→C data — it only relays"
);
a.shutdown().await.unwrap();
b.shutdown().await.unwrap();
c.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_mesh_handshake_relay_bidirectional() {
let ports = find_ports(3).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let id_c = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let nid_c = id_c.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(30))
};
let a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let c = MeshNode::new(id_c, mk(addr_c)).await.unwrap();
let pub_b = *b.public_key();
let pub_c = *c.public_key();
let (r1, r2) = tokio::join!(b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(c.accept(nid_b), async {
tokio::time::sleep(Duration::from_millis(50)).await;
b.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
a.start();
b.start();
c.start();
a.connect_via(addr_b, &pub_c, nid_c)
.await
.expect("connect_via failed");
b.router().add_route(nid_a, addr_a);
tokio::time::sleep(Duration::from_millis(200)).await;
a.send_routed(nid_c, &make_batch(0, 5, "a_to_c"))
.await
.unwrap();
c.send_routed(nid_a, &make_batch(0, 5, "c_to_a"))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(1500)).await;
let c_events = c.poll_shard(0, None, 100).await.unwrap();
let a_events = a.poll_shard(0, None, 100).await.unwrap();
assert!(
!c_events.events.is_empty(),
"C should receive events from A via B"
);
assert!(
!a_events.events.is_empty(),
"A should receive events from C via B"
);
for event in &c_events.events {
let json: serde_json::Value = event.parse().unwrap();
assert_eq!(json["tag"], "a_to_c");
}
for event in &a_events.events {
let json: serde_json::Value = event.parse().unwrap();
assert_eq!(json["tag"], "c_to_a");
}
a.shutdown().await.unwrap();
b.shutdown().await.unwrap();
c.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_stream_multiplex_two_streams_same_peer() {
use net::adapter::net::{Reliability, StreamConfig};
let ports = find_ports(2).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(4)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(30))
};
let a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let pub_b = *b.public_key();
let (r1, r2) = tokio::join!(b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
a.start();
b.start();
let s_fire = a
.open_stream(
nid_b,
111,
StreamConfig::new().with_reliability(Reliability::FireAndForget),
)
.unwrap();
let s_rel = a
.open_stream(
nid_b,
222,
StreamConfig::new().with_reliability(Reliability::Reliable),
)
.unwrap();
let events_fire: Vec<Bytes> = (0..3)
.map(|i| Bytes::from(format!(r#"{{"stream":"fire","i":{}}}"#, i)))
.collect();
let events_rel: Vec<Bytes> = (0..5)
.map(|i| Bytes::from(format!(r#"{{"stream":"rel","i":{}}}"#, i)))
.collect();
a.send_on_stream(&s_fire, &events_fire).await.unwrap();
a.send_on_stream(&s_rel, &events_rel).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
let all = a.all_stream_stats(nid_b);
let fire_stats = all
.iter()
.find(|(sid, _)| *sid == 111)
.map(|(_, s)| *s)
.expect("stream 111 stats");
let rel_stats = all
.iter()
.find(|(sid, _)| *sid == 222)
.map(|(_, s)| *s)
.expect("stream 222 stats");
assert_eq!(
fire_stats.tx_seq, 1,
"fire stream sent 1 packet (3 events fit in one)"
);
assert_eq!(
rel_stats.tx_seq, 1,
"rel stream sent 1 packet (5 events fit in one)"
);
assert!(fire_stats.active);
assert!(rel_stats.active);
let fire_solo = a.stream_stats(nid_b, 111).unwrap();
assert_eq!(fire_solo.tx_seq, 1);
a.shutdown().await.unwrap();
b.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_stream_open_close_idempotency() {
use net::adapter::net::{Reliability, StreamConfig};
let ports = find_ports(2).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
};
let a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let pub_b = *b.public_key();
let (r1, r2) = tokio::join!(b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
let first = a
.open_stream(
nid_b,
77,
StreamConfig::new().with_reliability(Reliability::Reliable),
)
.unwrap();
assert_eq!(first.stream_id(), 77);
a.send_on_stream(&first, &[Bytes::from_static(b"{}")])
.await
.unwrap();
let stats1 = a.stream_stats(nid_b, 77).unwrap();
assert_eq!(stats1.tx_seq, 1);
let second = a
.open_stream(
nid_b,
77,
StreamConfig::new().with_reliability(Reliability::Reliable),
)
.unwrap();
a.send_on_stream(&second, &[Bytes::from_static(b"{}")])
.await
.unwrap();
let stats2 = a.stream_stats(nid_b, 77).unwrap();
assert_eq!(
stats2.tx_seq, 2,
"second send on re-opened stream continues tx_seq"
);
a.close_stream(nid_b, 77);
assert!(a.stream_stats(nid_b, 77).is_none());
let third = a.open_stream(nid_b, 77, StreamConfig::new()).unwrap();
a.send_on_stream(&third, &[Bytes::from_static(b"{}")])
.await
.unwrap();
let stats3 = a.stream_stats(nid_b, 77).unwrap();
assert_eq!(stats3.tx_seq, 1, "after close+reopen tx_seq resets");
a.shutdown().await.unwrap();
b.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_regression_send_on_stream_rejects_closed_stream() {
use net::adapter::net::{Reliability, StreamConfig, StreamError};
let ports = find_ports(2).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
};
let a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let pub_b = *b.public_key();
let (r1, r2) = tokio::join!(b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
let stream = a
.open_stream(
nid_b,
123,
StreamConfig::new().with_reliability(Reliability::Reliable),
)
.unwrap();
a.send_on_stream(&stream, &[Bytes::from_static(b"{}")])
.await
.unwrap();
a.close_stream(nid_b, 123);
assert!(a.stream_stats(nid_b, 123).is_none());
let result = a
.send_on_stream(&stream, &[Bytes::from_static(b"{}")])
.await;
assert!(
matches!(result, Err(StreamError::NotConnected)),
"expected NotConnected for send on closed stream; got {:?}",
result
);
assert!(
a.stream_stats(nid_b, 123).is_none(),
"send on closed stream must NOT revive the stream"
);
let fresh = a
.open_stream(
nid_b,
123,
StreamConfig::new().with_reliability(Reliability::FireAndForget),
)
.unwrap();
let stale_result = a
.send_on_stream(&stream, &[Bytes::from_static(b"{}")])
.await;
assert!(
matches!(stale_result, Err(StreamError::NotConnected)),
"stale handle from pre-reopen lifetime must still refuse; got {:?}",
stale_result
);
a.send_on_stream(&fresh, &[Bytes::from_static(b"{}")])
.await
.expect("fresh handle after reopen must work");
a.shutdown().await.unwrap();
b.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_multi_hop_routing_pingwave_installs_indirect_route() {
let ports = find_ports(4).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let id_c = EntityKeypair::generate();
let id_d = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let nid_c = id_c.node_id();
let nid_d = id_d.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let addr_d: SocketAddr = format!("127.0.0.1:{}", ports[3]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(30))
};
let a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let c = MeshNode::new(id_c, mk(addr_c)).await.unwrap();
let d = MeshNode::new(id_d, mk(addr_d)).await.unwrap();
let pub_b = *b.public_key();
let pub_c = *c.public_key();
let pub_d = *d.public_key();
let (r1, r2) = tokio::join!(b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(c.accept(nid_b), async {
tokio::time::sleep(Duration::from_millis(50)).await;
b.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(d.accept(nid_c), async {
tokio::time::sleep(Duration::from_millis(50)).await;
c.connect(addr_d, &pub_d, nid_d).await
});
r1.unwrap();
r2.unwrap();
a.start();
b.start();
c.start();
d.start();
tokio::time::sleep(Duration::from_millis(1500)).await;
let lookup = a.router().routing_table().lookup(nid_d);
assert_eq!(
lookup,
Some(addr_b),
"A should have learned D via B from pingwave propagation; got {:?}",
lookup
);
let b_lookup = a.router().routing_table().lookup(nid_b);
assert_eq!(b_lookup, Some(addr_b), "direct B route preserved");
let _ = nid_a;
let _ = nid_c;
a.shutdown().await.unwrap();
b.shutdown().await.unwrap();
c.shutdown().await.unwrap();
d.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_regression_dv_path_to_returns_multi_hop() {
let ports = find_ports(3).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let id_c = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let nid_c = id_c.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(200))
.with_session_timeout(Duration::from_secs(30))
};
let a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let c = MeshNode::new(id_c, mk(addr_c)).await.unwrap();
let pub_b = *b.public_key();
let pub_c = *c.public_key();
let (r1, r2) = tokio::join!(b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(c.accept(nid_b), async {
tokio::time::sleep(Duration::from_millis(50)).await;
b.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
a.start();
b.start();
c.start();
tokio::time::sleep(Duration::from_millis(1500)).await;
let mut c_graph_id = [0u8; 32];
c_graph_id[0..8].copy_from_slice(&nid_c.to_le_bytes());
let path = a.proximity_graph().path_to(&c_graph_id);
assert!(
path.is_some(),
"path_to(C) must be Some now that edges populate on pingwave receipt"
);
let path = path.unwrap();
assert!(
path.len() >= 2,
"path_to(C) should have at least 2 nodes (self + next hop); got {:?}",
path.len()
);
a.shutdown().await.unwrap();
b.shutdown().await.unwrap();
c.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_regression_pingwave_from_unregistered_source_is_dropped() {
use net::adapter::net::behavior::EnhancedPingwave;
use tokio::net::UdpSocket;
let ports = find_ports(2).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let attacker_addr: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let a = MeshNode::new(
id_a,
MeshNodeConfig::new(addr_a, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(30)),
)
.await
.unwrap();
a.start();
let attacker = UdpSocket::bind(attacker_addr).await.unwrap();
let mut forged_origin_graph_id = [0u8; 32];
let forged_origin_nid: u64 = 0xDEAD_BEEF_CAFE_F00D;
forged_origin_graph_id[0..8].copy_from_slice(&forged_origin_nid.to_le_bytes());
let mut pw = EnhancedPingwave::new(forged_origin_graph_id, 1, 3);
pw.hop_count = 1;
let pw_bytes = pw.to_bytes();
attacker.send_to(&pw_bytes, addr_a).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let lookup = a.router().routing_table().lookup(forged_origin_nid);
assert!(
lookup.is_none(),
"routing table must NOT install a route for a forged origin \
advertised from an unregistered source; got next_hop={:?}",
lookup
);
assert!(
a.proximity_graph()
.get_node(&forged_origin_graph_id)
.is_none(),
"proximity graph must NOT contain a node for a forged origin \
advertised from an unregistered source"
);
assert!(
a.proximity_graph()
.path_to(&forged_origin_graph_id)
.is_none(),
"path_to(forged_origin) must be None — no edge should have \
been installed by the dropped pingwave"
);
drop(attacker);
a.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_regression_proximity_graph_local_id_matches_peer_encoding() {
use net::adapter::net::behavior::loadbalance::HealthStatus;
let ports = find_ports(1).await;
let psk = [0x42u8; 32];
let id = EntityKeypair::generate();
let expected_nid = id.node_id();
let addr: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let node = MeshNode::new(id, MeshNodeConfig::new(addr, psk))
.await
.unwrap();
let pw = node
.proximity_graph()
.create_pingwave(HealthStatus::Healthy);
let mut expected = [0u8; 32];
expected[0..8].copy_from_slice(&expected_nid.to_le_bytes());
assert_eq!(
pw.origin_id, expected,
"proximity graph's local id must use the zero-padded node_id \
encoding so it matches what peers see when they seed this node \
into their own graph"
);
node.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_regression_handshake_relay_multi_hop_via_routing_table() {
let ports = find_ports(4).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let id_c = EntityKeypair::generate();
let id_d = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let nid_c = id_c.node_id();
let nid_d = id_d.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let addr_d: SocketAddr = format!("127.0.0.1:{}", ports[3]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(30))
};
let a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let c = MeshNode::new(id_c, mk(addr_c)).await.unwrap();
let d = MeshNode::new(id_d, mk(addr_d)).await.unwrap();
let pub_b = *b.public_key();
let pub_c = *c.public_key();
let pub_d = *d.public_key();
let (r1, r2) = tokio::join!(b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(c.accept(nid_b), async {
tokio::time::sleep(Duration::from_millis(50)).await;
b.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(d.accept(nid_c), async {
tokio::time::sleep(Duration::from_millis(50)).await;
c.connect(addr_d, &pub_d, nid_d).await
});
r1.unwrap();
r2.unwrap();
b.router().add_route(nid_d, addr_c);
c.router().add_route(nid_a, addr_b);
a.start();
b.start();
c.start();
d.start();
let _before = a.peer_count();
a.connect_via(addr_b, &pub_d, nid_d)
.await
.expect("A connect_via D (through B→C) failed");
tokio::time::sleep(Duration::from_millis(500)).await;
assert!(
a.peer_count() >= 2,
"A must have at least B + D after multi-hop connect_via; got {}",
a.peer_count()
);
assert!(
d.peer_count() >= 2,
"D must have at least C + A after the relayed handshake completes; got {}",
d.peer_count()
);
let _ = nid_a;
a.shutdown().await.unwrap();
b.shutdown().await.unwrap();
c.shutdown().await.unwrap();
d.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_regression_handshake_relay_registers_peer_after_msg2_sent() {
let ports = find_ports(3).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let id_c = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let nid_c = id_c.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(30))
};
let a = MeshNode::new(id_a, mk(addr_a)).await.unwrap();
let b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let c = MeshNode::new(id_c, mk(addr_c)).await.unwrap();
let pub_b = *b.public_key();
let pub_c = *c.public_key();
let (r1, r2) = tokio::join!(b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
let (r1, r2) = tokio::join!(c.accept(nid_b), async {
tokio::time::sleep(Duration::from_millis(50)).await;
b.connect(addr_c, &pub_c, nid_c).await
});
r1.unwrap();
r2.unwrap();
assert_eq!(a.peer_count(), 1);
assert_eq!(c.peer_count(), 1);
a.start();
b.start();
c.start();
a.connect_via(addr_b, &pub_c, nid_c).await.unwrap();
tokio::time::timeout(Duration::from_secs(2), async {
while c.peer_count() < 2 {
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("timed out waiting for C to register A after connect_via");
assert_eq!(a.peer_count(), 2, "A must have B and C");
assert_eq!(
c.peer_count(),
2,
"C must have B and A — the A entry is only inserted after the \
spawned msg2-send succeeds"
);
let _ = nid_a;
a.shutdown().await.unwrap();
b.shutdown().await.unwrap();
c.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_regression_pingwave_not_dispatched_on_net_magic_packet() {
let ports = find_ports(2).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let mk_config = |addr| {
MeshNodeConfig::new(addr, psk)
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(10))
};
let node_a = MeshNode::new(id_a, mk_config(addr_a)).await.unwrap();
let node_b = MeshNode::new(id_b, mk_config(addr_b)).await.unwrap();
let pub_b = *node_b.public_key();
let (r1, r2) = tokio::join!(node_b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
node_a.start();
node_b.start();
let before = node_a.proximity_graph().stats().pingwaves_received;
let attacker = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let mut pkt = [0u8; 72];
pkt[0] = 0x45;
pkt[1] = 0x4E;
attacker.send_to(&pkt, addr_a).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let after = node_a.proximity_graph().stats().pingwaves_received;
assert_eq!(
before, after,
"a 72-byte packet starting with the Net magic must not be \
dispatched as a pingwave"
);
tokio::time::sleep(Duration::from_millis(1200)).await;
let after_valid = node_a.proximity_graph().stats().pingwaves_received;
assert!(
after_valid > after,
"legitimate pingwaves from a handshaked peer must be accepted; \
before_valid_leg={}, after_valid_leg={}",
after,
after_valid
);
node_a.shutdown().await.unwrap();
node_b.shutdown().await.unwrap();
}
use net::adapter::net::{ChannelName, OnFailure, PublishConfig, Reliability};
async fn setup_publisher_with_two_subscribers() -> (MeshNode, MeshNode, MeshNode, u64, u64, u64) {
let ports = find_ports(3).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let id_c = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let nid_c = id_c.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let addr_c: SocketAddr = format!("127.0.0.1:{}", ports[2]).parse().unwrap();
let mk_config = |addr: SocketAddr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(2)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(10))
};
let node_a = MeshNode::new(id_a, mk_config(addr_a)).await.unwrap();
let node_b = MeshNode::new(id_b, mk_config(addr_b)).await.unwrap();
let node_c = MeshNode::new(id_c, mk_config(addr_c)).await.unwrap();
let pub_b = *node_b.public_key();
let pub_c = *node_c.public_key();
let (accept_result, connect_result) = tokio::join!(node_b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_b, &pub_b, nid_b).await
});
accept_result.expect("B accept A failed");
connect_result.expect("A connect B failed");
let (accept_result, connect_result) = tokio::join!(node_c.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
node_a.connect(addr_c, &pub_c, nid_c).await
});
accept_result.expect("C accept A failed");
connect_result.expect("A connect C failed");
node_a.start();
node_b.start();
node_c.start();
tokio::time::sleep(Duration::from_millis(100)).await;
(node_a, node_b, node_c, nid_a, nid_b, nid_c)
}
#[tokio::test]
async fn test_channel_publisher_fanout_reaches_all_subscribers() {
let (node_a, node_b, node_c, nid_a, _nid_b, _nid_c) =
setup_publisher_with_two_subscribers().await;
let channel = ChannelName::new("sensors/lidar").unwrap();
node_b
.subscribe_channel(nid_a, channel.clone())
.await
.expect("B subscribe failed");
node_c
.subscribe_channel(nid_a, channel.clone())
.await
.expect("C subscribe failed");
let ch_id = net::adapter::net::ChannelId::new(channel.clone());
let members = node_a.roster().members(&ch_id);
assert_eq!(members.len(), 2, "A's roster should have 2 subscribers");
let publisher = node_a.channel_publisher(
channel.clone(),
PublishConfig::new()
.with_reliability(Reliability::FireAndForget)
.with_on_failure(OnFailure::Collect),
);
let payload = bytes::Bytes::from_static(b"lidar-scan-0");
let report = node_a.publish(&publisher, payload).await.unwrap();
assert_eq!(
report.attempted, 2,
"should have attempted both subscribers"
);
assert_eq!(report.delivered, 2, "both per-peer sends should succeed");
assert!(report.errors.is_empty(), "no errors expected");
node_a.shutdown().await.unwrap();
node_b.shutdown().await.unwrap();
node_c.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_channel_publisher_unsubscribe_evicts_from_roster() {
let (node_a, node_b, node_c, nid_a, nid_b, nid_c) =
setup_publisher_with_two_subscribers().await;
let channel = ChannelName::new("alerts").unwrap();
node_b
.subscribe_channel(nid_a, channel.clone())
.await
.unwrap();
node_c
.subscribe_channel(nid_a, channel.clone())
.await
.unwrap();
let ch_id = net::adapter::net::ChannelId::new(channel.clone());
assert_eq!(node_a.roster().members(&ch_id).len(), 2);
node_b
.unsubscribe_channel(nid_a, channel.clone())
.await
.unwrap();
let mut members = Vec::new();
for _ in 0..20 {
members = node_a.roster().members(&ch_id);
if members.len() == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert_eq!(
members.len(),
1,
"after B unsubscribes, only C should remain"
);
assert_eq!(members[0], nid_c, "surviving subscriber should be C");
let publisher = node_a.channel_publisher(
channel.clone(),
PublishConfig::new().with_on_failure(OnFailure::Collect),
);
let report = node_a
.publish(&publisher, bytes::Bytes::from_static(b"only-c"))
.await
.unwrap();
assert_eq!(report.attempted, 1);
assert_eq!(report.delivered, 1);
let _ = nid_a;
let _ = nid_b;
node_a.shutdown().await.unwrap();
node_b.shutdown().await.unwrap();
node_c.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_channel_publisher_empty_roster_is_ok() {
let (node_a, node_b, node_c, _nid_a, _nid_b, _nid_c) =
setup_publisher_with_two_subscribers().await;
let channel = ChannelName::new("nobody/listens").unwrap();
let publisher = node_a.channel_publisher(
channel,
PublishConfig::new().with_on_failure(OnFailure::Collect),
);
let report = node_a
.publish(&publisher, bytes::Bytes::from_static(b"x"))
.await
.unwrap();
assert_eq!(report.attempted, 0);
assert_eq!(report.delivered, 0);
assert!(report.errors.is_empty());
assert!(report.is_empty());
node_a.shutdown().await.unwrap();
node_b.shutdown().await.unwrap();
node_c.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_channel_publisher_unsubscribe_idempotent() {
let (node_a, node_b, node_c, nid_a, _nid_b, _nid_c) =
setup_publisher_with_two_subscribers().await;
let channel = ChannelName::new("ghosts").unwrap();
node_b
.unsubscribe_channel(nid_a, channel.clone())
.await
.expect("unsubscribe must be idempotent");
node_a.shutdown().await.unwrap();
node_b.shutdown().await.unwrap();
node_c.shutdown().await.unwrap();
}
use net::adapter::net::{StreamConfig, StreamError};
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_send_on_stream_backpressure_when_concurrent() {
let ports = find_ports(2).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(4)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(30))
};
let a = Arc::new(MeshNode::new(id_a, mk(addr_a)).await.unwrap());
let b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let pub_b = *b.public_key();
let (r1, r2) = tokio::join!(b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
a.start();
b.start();
let stream = a
.open_stream(nid_b, 7777, StreamConfig::new().with_window_bytes(128))
.unwrap();
let n_tasks: usize = 16;
let event = Bytes::from_static(b"{\"t\":\"bp\"}");
let mut handles = Vec::new();
for _ in 0..n_tasks {
let mesh = Arc::clone(&a);
let stream = stream.clone();
let payload = event.clone();
handles.push(tokio::spawn(async move {
mesh.send_on_stream(&stream, &[payload]).await
}));
}
let mut ok = 0usize;
let mut backpressure = 0usize;
let mut transport = 0usize;
for h in handles {
match h.await.unwrap() {
Ok(()) => ok += 1,
Err(StreamError::Backpressure) => backpressure += 1,
Err(StreamError::Transport(_)) => transport += 1,
Err(StreamError::NotConnected) => panic!("unexpected NotConnected"),
}
}
assert!(
backpressure > 0,
"expected at least one Backpressure in {} concurrent sends; got ok={}, bp={}, transport={}",
n_tasks,
ok,
backpressure,
transport
);
assert!(ok > 0, "expected at least one successful send");
let stats = a.stream_stats(nid_b, 7777).expect("stream stats");
assert!(
stats.backpressure_events >= backpressure as u64,
"stats.backpressure_events ({}) must be >= observed ({})",
stats.backpressure_events,
backpressure
);
assert_eq!(stats.tx_window, 128);
Arc::try_unwrap(a).ok().unwrap().shutdown().await.unwrap();
b.shutdown().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_send_with_retry_eventually_succeeds_through_backpressure() {
let ports = find_ports(2).await;
let psk = [0x42u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(4)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(30))
};
let a = Arc::new(MeshNode::new(id_a, mk(addr_a)).await.unwrap());
let b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let pub_b = *b.public_key();
let (r1, r2) = tokio::join!(b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
a.start();
b.start();
let stream = a
.open_stream(nid_b, 8888, StreamConfig::new().with_window_bytes(512))
.unwrap();
let n: usize = 32;
let mut handles = Vec::new();
for i in 0..n {
let mesh = Arc::clone(&a);
let stream = stream.clone();
let payload = Bytes::from(format!(r#"{{"i":{}}}"#, i));
handles.push(tokio::spawn(async move {
mesh.send_with_retry(&stream, &[payload], 64).await
}));
}
for h in handles {
h.await
.unwrap()
.expect("send_with_retry must eventually succeed");
}
let stats = a.stream_stats(nid_b, 8888).expect("stream stats");
assert!(stats.tx_credit_remaining > 0);
Arc::try_unwrap(a).ok().unwrap().shutdown().await.unwrap();
b.shutdown().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_v2_serial_sender_sees_backpressure_on_slow_receiver() {
let ports = find_ports(2).await;
let psk = [0x13u8; 32];
let id_a = EntityKeypair::generate();
let id_b = EntityKeypair::generate();
let nid_a = id_a.node_id();
let nid_b = id_b.node_id();
let addr_a: SocketAddr = format!("127.0.0.1:{}", ports[0]).parse().unwrap();
let addr_b: SocketAddr = format!("127.0.0.1:{}", ports[1]).parse().unwrap();
let mk = |addr| {
MeshNodeConfig::new(addr, psk)
.with_num_shards(4)
.with_handshake(3, Duration::from_secs(3))
.with_heartbeat_interval(Duration::from_millis(500))
.with_session_timeout(Duration::from_secs(30))
};
let a = Arc::new(MeshNode::new(id_a, mk(addr_a)).await.unwrap());
let b = MeshNode::new(id_b, mk(addr_b)).await.unwrap();
let pub_b = *b.public_key();
let (r1, r2) = tokio::join!(b.accept(nid_a), async {
tokio::time::sleep(Duration::from_millis(50)).await;
a.connect(addr_b, &pub_b, nid_b).await
});
r1.unwrap();
r2.unwrap();
a.start();
b.start();
let stream = a
.open_stream(nid_b, 9999, StreamConfig::new().with_window_bytes(256))
.unwrap();
let event = Bytes::from_static(b"{\"k\":\"v\"}"); let mut ok = 0usize;
let mut backpressure = 0usize;
for _ in 0..64 {
match a
.send_on_stream(&stream, std::slice::from_ref(&event))
.await
{
Ok(()) => ok += 1,
Err(StreamError::Backpressure) => backpressure += 1,
Err(StreamError::Transport(_)) => panic!(
"v2 must not surface kernel-buffer-full as Transport; \
credit exhaustion should always present as Backpressure"
),
Err(StreamError::NotConnected) => panic!("unexpected NotConnected"),
}
}
assert!(
backpressure > 0,
"serial sender must hit Backpressure once credit drains faster \
than grants arrive; got ok={}, bp={}",
ok,
backpressure,
);
assert!(ok > 0, "at least the initial handful must succeed");
tokio::time::sleep(Duration::from_millis(200)).await;
let stats = a.stream_stats(nid_b, 9999).expect("stats");
assert_eq!(stats.tx_window, 256);
assert!(
stats.backpressure_events >= backpressure as u64,
"stats.backpressure_events ({}) should be >= observed ({})",
stats.backpressure_events,
backpressure,
);
assert!(
stats.credit_grants_received > 0,
"at least one StreamWindow grant must have flowed back from B — \
otherwise the v2 loop is not actually active"
);
Arc::try_unwrap(a).ok().unwrap().shutdown().await.unwrap();
b.shutdown().await.unwrap();
}