use super::*;
use crate::bloom::BloomFilter;
use crate::config::RoutingMode;
use crate::tree::{ParentDeclaration, TreeCoordinate};
use spanning_tree::{
TestNode, cleanup_nodes, drain_all_packets, generate_random_edges, initiate_handshake,
lock_large_network_test, make_test_node, run_tree_test, verify_tree_convergence,
};
use std::collections::HashSet;
#[test]
fn test_routing_local_delivery() {
let mut node = make_node();
let my_addr = *node.node_addr();
assert!(node.find_next_hop(&my_addr).is_none());
}
#[test]
fn test_routing_direct_peer() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let peer_addr = *identity.node_addr();
node.add_connection(conn).unwrap();
node.promote_connection(link_id, identity, 2000).unwrap();
let result = node.find_next_hop(&peer_addr);
assert!(result.is_some());
assert_eq!(result.unwrap().node_addr(), &peer_addr);
}
#[test]
fn test_routing_unknown_destination() {
let mut node = make_node();
let unknown = make_node_addr(99);
assert!(node.find_next_hop(&unknown).is_none());
}
#[test]
fn test_routing_bloom_filter_hit() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let my_addr = *node.node_addr();
let link_id1 = LinkId::new(1);
let (conn1, id1) = make_completed_connection(&mut node, link_id1, transport_id, 1000);
let peer1_addr = *id1.node_addr();
node.add_connection(conn1).unwrap();
node.promote_connection(link_id1, id1, 2000).unwrap();
let link_id2 = LinkId::new(2);
let (conn2, id2) = make_completed_connection(&mut node, link_id2, transport_id, 1000);
let peer2_addr = *id2.node_addr();
node.add_connection(conn2).unwrap();
node.promote_connection(link_id2, id2, 2000).unwrap();
let peer1_coords = TreeCoordinate::from_addrs(vec![peer1_addr, my_addr]).unwrap();
node.tree_state_mut().update_peer(
ParentDeclaration::new(peer1_addr, my_addr, 1, 1000),
peer1_coords,
);
let peer2_coords = TreeCoordinate::from_addrs(vec![peer2_addr, my_addr]).unwrap();
node.tree_state_mut().update_peer(
ParentDeclaration::new(peer2_addr, my_addr, 1, 1000),
peer2_coords,
);
let dest = make_node_addr(99);
let dest_coords = TreeCoordinate::from_addrs(vec![dest, peer1_addr, my_addr]).unwrap();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
node.coord_cache_mut().insert(dest, dest_coords, now_ms);
let peer1 = node.get_peer_mut(&peer1_addr).unwrap();
let mut filter = BloomFilter::new();
filter.insert(&dest);
peer1.update_filter(filter, 1, 3000);
let result = node.find_next_hop(&dest);
assert!(result.is_some());
assert_eq!(result.unwrap().node_addr(), &peer1_addr);
assert_ne!(result.unwrap().node_addr(), &peer2_addr);
}
#[test]
fn test_routing_bloom_filter_multiple_hits_tiebreak() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let my_addr = *node.node_addr();
let mut peer_addrs = Vec::new();
for i in 1..=3 {
let link_id = LinkId::new(i);
let (conn, id) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let addr = *id.node_addr();
peer_addrs.push(addr);
node.add_connection(conn).unwrap();
node.promote_connection(link_id, id, 2000).unwrap();
}
for &addr in &peer_addrs {
let coords = TreeCoordinate::from_addrs(vec![addr, my_addr]).unwrap();
node.tree_state_mut()
.update_peer(ParentDeclaration::new(addr, my_addr, 1, 1000), coords);
}
let dest = make_node_addr(99);
let dest_coords = TreeCoordinate::from_addrs(vec![dest, peer_addrs[0], my_addr]).unwrap();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
node.coord_cache_mut().insert(dest, dest_coords, now_ms);
for &addr in &peer_addrs {
let peer = node.get_peer_mut(&addr).unwrap();
let mut filter = BloomFilter::new();
filter.insert(&dest);
peer.update_filter(filter, 1, 3000);
}
let result = node.find_next_hop(&dest);
assert!(result.is_some());
assert_eq!(result.unwrap().node_addr(), &peer_addrs[0]);
}
#[test]
fn test_routing_tree_fallback() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let my_addr = *node.node_addr();
let link_id = LinkId::new(1);
let (conn, id) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let peer_addr = *id.node_addr();
node.add_connection(conn).unwrap();
node.promote_connection(link_id, id, 2000).unwrap();
let peer_coords = TreeCoordinate::from_addrs(vec![peer_addr, my_addr]).unwrap();
node.tree_state_mut().update_peer(
ParentDeclaration::new(peer_addr, my_addr, 1, 1000),
peer_coords,
);
let dest = make_node_addr(99);
let dest_coords = TreeCoordinate::from_addrs(vec![dest, peer_addr, my_addr]).unwrap();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
node.coord_cache_mut().insert(dest, dest_coords, now_ms);
let result = node.find_next_hop(&dest);
assert!(result.is_some());
assert_eq!(result.unwrap().node_addr(), &peer_addr);
}
#[test]
fn test_routing_bloom_hit_not_closer_falls_through_to_tree() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let my_addr = *node.node_addr();
let tree_link = LinkId::new(1);
let (tree_conn, tree_id) = make_completed_connection(&mut node, tree_link, transport_id, 1000);
let tree_peer_addr = *tree_id.node_addr();
node.add_connection(tree_conn).unwrap();
node.promote_connection(tree_link, tree_id, 2000).unwrap();
let bloom_link = LinkId::new(2);
let (bloom_conn, bloom_id) =
make_completed_connection(&mut node, bloom_link, transport_id, 1000);
let bloom_peer_addr = *bloom_id.node_addr();
node.add_connection(bloom_conn).unwrap();
node.promote_connection(bloom_link, bloom_id, 2000).unwrap();
let tree_peer_coords = TreeCoordinate::from_addrs(vec![tree_peer_addr, my_addr]).unwrap();
node.tree_state_mut().update_peer(
ParentDeclaration::new(tree_peer_addr, my_addr, 1, 1000),
tree_peer_coords,
);
let bloom_peer_coords = TreeCoordinate::from_addrs(vec![bloom_peer_addr, my_addr]).unwrap();
node.tree_state_mut().update_peer(
ParentDeclaration::new(bloom_peer_addr, my_addr, 1, 1000),
bloom_peer_coords,
);
let dest = make_node_addr(99);
let dest_coords = TreeCoordinate::from_addrs(vec![dest, tree_peer_addr, my_addr]).unwrap();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
node.coord_cache_mut().insert(dest, dest_coords, now_ms);
let bloom_peer = node.get_peer_mut(&bloom_peer_addr).unwrap();
let mut filter = BloomFilter::new();
filter.insert(&dest);
bloom_peer.update_filter(filter, 1, 3000);
let result = node.find_next_hop(&dest);
assert!(
result.is_some(),
"find_next_hop must fall through to tree routing when bloom \
candidates exist but none are strictly closer than self"
);
let next_hop = result.unwrap().node_addr();
assert_eq!(
next_hop, &tree_peer_addr,
"tree-routing winner expected (tree_peer), got {:?}",
next_hop,
);
assert_ne!(
next_hop, &bloom_peer_addr,
"bloom_peer must be excluded by the self-distance check",
);
}
#[test]
fn test_routing_tree_no_coords_in_cache() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, id) = make_completed_connection(&mut node, link_id, transport_id, 1000);
node.add_connection(conn).unwrap();
node.promote_connection(link_id, id, 2000).unwrap();
let dest = make_node_addr(99);
assert!(node.find_next_hop(&dest).is_none());
}
#[test]
fn test_reply_learned_mode_uses_observed_route_without_coords() {
let mut config = Config::new();
config.node.routing.mode = RoutingMode::ReplyLearned;
let mut node = Node::new(config).unwrap();
let transport_id = TransportId::new(1);
let link_id1 = LinkId::new(1);
let (conn1, id1) = make_completed_connection(&mut node, link_id1, transport_id, 1000);
let peer1_addr = *id1.node_addr();
node.add_connection(conn1).unwrap();
node.promote_connection(link_id1, id1, 2000).unwrap();
let link_id2 = LinkId::new(2);
let (conn2, id2) = make_completed_connection(&mut node, link_id2, transport_id, 1000);
let peer2_addr = *id2.node_addr();
node.add_connection(conn2).unwrap();
node.promote_connection(link_id2, id2, 2000).unwrap();
let dest = make_node_addr(99);
node.learn_reverse_route(dest, peer2_addr);
let result = node.find_next_hop(&dest);
assert!(result.is_some(), "learned route should not require coords");
assert_eq!(result.unwrap().node_addr(), &peer2_addr);
assert_ne!(peer1_addr, peer2_addr);
}
#[test]
fn test_reply_learned_mode_multipaths_observed_routes() {
let mut config = Config::new();
config.node.routing.mode = RoutingMode::ReplyLearned;
let mut node = Node::new(config).unwrap();
let transport_id = TransportId::new(1);
let link_id1 = LinkId::new(1);
let (conn1, id1) = make_completed_connection(&mut node, link_id1, transport_id, 1000);
let peer1_addr = *id1.node_addr();
node.add_connection(conn1).unwrap();
node.promote_connection(link_id1, id1, 2000).unwrap();
let link_id2 = LinkId::new(2);
let (conn2, id2) = make_completed_connection(&mut node, link_id2, transport_id, 1000);
let peer2_addr = *id2.node_addr();
node.add_connection(conn2).unwrap();
node.promote_connection(link_id2, id2, 2000).unwrap();
let dest = make_node_addr(99);
node.learn_reverse_route(dest, peer1_addr);
for _ in 0..4 {
node.learn_reverse_route(dest, peer2_addr);
}
let mut selected = Vec::new();
for _ in 0..20 {
selected.push(
*node
.find_next_hop(&dest)
.expect("learned route")
.node_addr(),
);
}
let peer1_count = selected.iter().filter(|addr| **addr == peer1_addr).count();
let peer2_count = selected.iter().filter(|addr| **addr == peer2_addr).count();
assert!(
peer1_count > 0,
"lower-score learned route should remain in exploratory rotation"
);
assert!(
peer2_count > peer1_count,
"higher-score learned route should carry most packets"
);
}
#[test]
fn test_reply_learned_mode_periodically_explores_coordinate_route() {
let mut config = Config::new();
config.node.routing.mode = RoutingMode::ReplyLearned;
config.node.routing.learned_fallback_explore_interval = 2;
let mut node = Node::new(config).unwrap();
let transport_id = TransportId::new(1);
let my_addr = *node.node_addr();
let tree_link = LinkId::new(1);
let (tree_conn, tree_id) = make_completed_connection(&mut node, tree_link, transport_id, 1000);
let tree_peer_addr = *tree_id.node_addr();
node.add_connection(tree_conn).unwrap();
node.promote_connection(tree_link, tree_id, 2000).unwrap();
let learned_link = LinkId::new(2);
let (learned_conn, learned_id) =
make_completed_connection(&mut node, learned_link, transport_id, 1000);
let learned_peer_addr = *learned_id.node_addr();
node.add_connection(learned_conn).unwrap();
node.promote_connection(learned_link, learned_id, 2000)
.unwrap();
let tree_peer_coords = TreeCoordinate::from_addrs(vec![tree_peer_addr, my_addr]).unwrap();
node.tree_state_mut().update_peer(
ParentDeclaration::new(tree_peer_addr, my_addr, 1, 1000),
tree_peer_coords,
);
let learned_peer_coords = TreeCoordinate::from_addrs(vec![learned_peer_addr, my_addr]).unwrap();
node.tree_state_mut().update_peer(
ParentDeclaration::new(learned_peer_addr, my_addr, 1, 1000),
learned_peer_coords,
);
let dest = make_node_addr(99);
let dest_coords = TreeCoordinate::from_addrs(vec![dest, tree_peer_addr, my_addr]).unwrap();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
node.coord_cache_mut().insert(dest, dest_coords, now_ms);
node.learn_reverse_route(dest, learned_peer_addr);
let first = *node
.find_next_hop(&dest)
.expect("learned route")
.node_addr();
let second = *node
.find_next_hop(&dest)
.expect("learned route")
.node_addr();
let third = *node
.find_next_hop(&dest)
.expect("coordinate exploration route")
.node_addr();
assert_eq!(first, learned_peer_addr);
assert_eq!(second, learned_peer_addr);
assert_eq!(
third, tree_peer_addr,
"fallback exploration should periodically try the coordinate route"
);
}
#[test]
fn test_tree_mode_ignores_learned_route_without_coords() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, id) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let peer_addr = *id.node_addr();
node.add_connection(conn).unwrap();
node.promote_connection(link_id, id, 2000).unwrap();
let dest = make_node_addr(99);
node.learn_reverse_route(dest, peer_addr);
assert!(
node.find_next_hop(&dest).is_none(),
"default tree mode must preserve current no-coords behavior"
);
}
#[test]
fn test_routing_refreshes_coord_cache_ttl() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let my_addr = *node.node_addr();
let link_id = LinkId::new(1);
let (conn, id) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let peer_addr = *id.node_addr();
node.add_connection(conn).unwrap();
node.promote_connection(link_id, id, 2000).unwrap();
let dest = make_node_addr(99);
let dest_coords = TreeCoordinate::from_addrs(vec![dest, peer_addr, my_addr]).unwrap();
node.tree_state_mut().update_peer(
ParentDeclaration::new(peer_addr, my_addr, 1, 1000),
TreeCoordinate::from_addrs(vec![peer_addr, my_addr]).unwrap(),
);
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let short_ttl = 10_000; node.coord_cache_mut()
.insert_with_ttl(dest, dest_coords, now_ms, short_ttl);
let original_expiry = node.coord_cache().get_entry(&dest).unwrap().expires_at();
assert!(node.find_next_hop(&dest).is_some());
let new_expiry = node.coord_cache().get_entry(&dest).unwrap().expires_at();
assert!(
new_expiry > original_expiry,
"find_next_hop should refresh the coord_cache TTL: original={}, new={}",
original_expiry,
new_expiry,
);
}
#[test]
fn test_routing_bloom_hit_without_coords_returns_none() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id1 = LinkId::new(1);
let (conn1, id1) = make_completed_connection(&mut node, link_id1, transport_id, 1000);
let peer1_addr = *id1.node_addr();
node.add_connection(conn1).unwrap();
node.promote_connection(link_id1, id1, 2000).unwrap();
let link_id2 = LinkId::new(2);
let (conn2, id2) = make_completed_connection(&mut node, link_id2, transport_id, 1000);
let peer2_addr = *id2.node_addr();
node.add_connection(conn2).unwrap();
node.promote_connection(link_id2, id2, 2000).unwrap();
let dest = make_node_addr(99);
for &addr in &[peer1_addr, peer2_addr] {
let peer = node.get_peer_mut(&addr).unwrap();
let mut filter = BloomFilter::new();
filter.insert(&dest);
peer.update_filter(filter, 1, 3000);
}
assert!(node.find_next_hop(&dest).is_none());
}
#[test]
fn test_routing_discovery_coord_cache() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let my_addr = *node.node_addr();
let link_id = LinkId::new(1);
let (conn, id) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let peer_addr = *id.node_addr();
node.add_connection(conn).unwrap();
node.promote_connection(link_id, id, 2000).unwrap();
let peer_coords = TreeCoordinate::from_addrs(vec![peer_addr, my_addr]).unwrap();
node.tree_state_mut().update_peer(
ParentDeclaration::new(peer_addr, my_addr, 1, 1000),
peer_coords,
);
let dest = make_node_addr(99);
let dest_coords = TreeCoordinate::from_addrs(vec![dest, peer_addr, my_addr]).unwrap();
let peer = node.get_peer_mut(&peer_addr).unwrap();
let mut filter = BloomFilter::new();
filter.insert(&dest);
peer.update_filter(filter, 1, 3000);
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
assert!(node.coord_cache().get(&dest, now_ms).is_none());
assert!(node.find_next_hop(&dest).is_none());
node.coord_cache_mut().insert(dest, dest_coords, now_ms);
let result = node.find_next_hop(&dest);
assert!(result.is_some(), "Should route via coord_cache");
assert_eq!(
result.unwrap().node_addr(),
&peer_addr,
"Should pick peer with bloom filter hit"
);
}
#[tokio::test]
async fn test_routing_chain_topology() {
let mut nodes = vec![
make_test_node().await,
make_test_node().await,
make_test_node().await,
make_test_node().await,
];
initiate_handshake(&mut nodes, 0, 1).await;
initiate_handshake(&mut nodes, 1, 2).await;
initiate_handshake(&mut nodes, 2, 3).await;
drain_all_packets(&mut nodes, false).await;
let root = nodes.iter().map(|n| *n.node.node_addr()).min().unwrap();
for tn in &nodes {
assert_eq!(*tn.node.tree_state().root(), root, "Tree not converged");
}
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let node3_addr = *nodes[3].node.node_addr();
let node3_coords = nodes[3].node.tree_state().my_coords().clone();
nodes[0]
.node
.coord_cache_mut()
.insert(node3_addr, node3_coords, now_ms);
let node0_addr = *nodes[0].node.node_addr();
let node0_coords = nodes[0].node.tree_state().my_coords().clone();
nodes[3]
.node
.coord_cache_mut()
.insert(node0_addr, node0_coords, now_ms);
let node1_addr = *nodes[1].node.node_addr();
let node2_addr = *nodes[2].node.node_addr();
let hop = nodes[0].node.find_next_hop(&node3_addr);
assert!(hop.is_some(), "Node 0 should find route to node 3");
assert_eq!(
hop.unwrap().node_addr(),
&node1_addr,
"Node 0's next hop to node 3 should be node 1"
);
let hop = nodes[3].node.find_next_hop(&node0_addr);
assert!(hop.is_some(), "Node 3 should find route to node 0");
assert_eq!(
hop.unwrap().node_addr(),
&node2_addr,
"Node 3's next hop to node 0 should be node 2"
);
}
#[tokio::test]
async fn test_routing_bloom_preferred_over_tree() {
let mut nodes = vec![
make_test_node().await,
make_test_node().await,
make_test_node().await,
];
initiate_handshake(&mut nodes, 0, 1).await;
initiate_handshake(&mut nodes, 0, 2).await;
initiate_handshake(&mut nodes, 1, 2).await;
drain_all_packets(&mut nodes, false).await;
let dest = make_node_addr(99);
let peer2_addr = *nodes[2].node.node_addr();
let mut dest_path: Vec<NodeAddr> = nodes[2]
.node
.tree_state()
.my_coords()
.node_addrs()
.copied()
.collect();
dest_path.insert(0, dest);
let dest_coords = TreeCoordinate::from_addrs(dest_path).unwrap();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
nodes[0]
.node
.coord_cache_mut()
.insert(dest, dest_coords, now_ms);
let peer2 = nodes[0].node.get_peer_mut(&peer2_addr).unwrap();
let mut filter = BloomFilter::new();
filter.insert(&dest);
peer2.update_filter(filter, 100, 50000);
let hop = nodes[0].node.find_next_hop(&dest);
assert!(hop.is_some(), "Should route via bloom filter");
assert_eq!(
hop.unwrap().node_addr(),
&peer2_addr,
"Should pick peer with bloom filter hit"
);
}
#[derive(Debug)]
enum ForwardResult {
Delivered(usize),
NoRoute { at_node: usize, hops: usize },
Loop { at_node: usize, hops: usize },
}
fn build_addr_index(nodes: &[TestNode]) -> std::collections::HashMap<NodeAddr, usize> {
nodes
.iter()
.enumerate()
.map(|(i, tn)| (*tn.node.node_addr(), i))
.collect()
}
fn simulate_forwarding(
nodes: &mut [TestNode],
addr_index: &std::collections::HashMap<NodeAddr, usize>,
src: usize,
dst: usize,
) -> ForwardResult {
let dest_addr = *nodes[dst].node.node_addr();
let max_hops = nodes.len();
let mut current = src;
let mut visited = HashSet::new();
visited.insert(current);
for hop in 0..max_hops {
let next = nodes[current].node.find_next_hop(&dest_addr);
match next {
None => {
if *nodes[current].node.node_addr() == dest_addr {
return ForwardResult::Delivered(hop);
}
return ForwardResult::NoRoute {
at_node: current,
hops: hop,
};
}
Some(peer) => {
let next_addr = *peer.node_addr();
if next_addr == dest_addr {
return ForwardResult::Delivered(hop + 1);
}
let next_idx = match addr_index.get(&next_addr) {
Some(&idx) => idx,
None => {
return ForwardResult::NoRoute {
at_node: current,
hops: hop,
};
}
};
if visited.contains(&next_idx) {
return ForwardResult::Loop {
at_node: next_idx,
hops: hop + 1,
};
}
visited.insert(next_idx);
current = next_idx;
}
}
}
ForwardResult::NoRoute {
at_node: current,
hops: max_hops,
}
}
#[tokio::test]
async fn test_routing_reachability_100_nodes() {
let _guard = lock_large_network_test().await;
const NUM_NODES: usize = 100;
const TARGET_EDGES: usize = 250;
const SEED: u64 = 42;
let edges = generate_random_edges(NUM_NODES, TARGET_EDGES, SEED);
let mut nodes = run_tree_test(NUM_NODES, &edges, false).await;
verify_tree_convergence(&nodes);
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let all_coords: Vec<(NodeAddr, TreeCoordinate)> = nodes
.iter()
.map(|tn| {
(
*tn.node.node_addr(),
tn.node.tree_state().my_coords().clone(),
)
})
.collect();
for node in &mut nodes {
for (addr, coords) in &all_coords {
if addr != node.node.node_addr() {
node.node
.coord_cache_mut()
.insert(*addr, coords.clone(), now_ms);
}
}
}
let addr_index = build_addr_index(&nodes);
let mut total_pairs = 0;
let mut total_hops = 0usize;
let mut max_hops = 0usize;
let mut failures = Vec::new();
let mut loops = Vec::new();
for src in 0..NUM_NODES {
for dst in 0..NUM_NODES {
if src == dst {
continue;
}
total_pairs += 1;
match simulate_forwarding(&mut nodes, &addr_index, src, dst) {
ForwardResult::Delivered(hops) => {
total_hops += hops;
if hops > max_hops {
max_hops = hops;
}
}
ForwardResult::NoRoute { at_node, hops } => {
failures.push((src, dst, at_node, hops));
}
ForwardResult::Loop { at_node, hops } => {
loops.push((src, dst, at_node, hops));
}
}
}
}
let delivered = total_pairs - failures.len() - loops.len();
let avg_hops = if delivered > 0 {
total_hops as f64 / delivered as f64
} else {
0.0
};
eprintln!("\n === Routing Reachability ({} nodes) ===", NUM_NODES);
eprintln!(
" Pairs tested: {} | Delivered: {} | Failed: {} | Loops: {}",
total_pairs,
delivered,
failures.len(),
loops.len()
);
eprintln!(" Hops: avg={:.1} max={}", avg_hops, max_hops);
if !failures.is_empty() {
let show = failures.len().min(10);
eprintln!(" First {} failures:", show);
for &(src, dst, at_node, hops) in &failures[..show] {
eprintln!(
" {} -> {}: stuck at node {} after {} hops",
src, dst, at_node, hops
);
}
}
if !loops.is_empty() {
let show = loops.len().min(10);
eprintln!(" First {} loops:", show);
for &(src, dst, at_node, hops) in &loops[..show] {
eprintln!(
" {} -> {}: loop at node {} after {} hops",
src, dst, at_node, hops
);
}
}
assert!(
loops.is_empty(),
"Detected {} routing loops out of {} pairs",
loops.len(),
total_pairs
);
assert!(
failures.is_empty(),
"Detected {} routing failures out of {} pairs",
failures.len(),
total_pairs
);
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_routing_stops_after_peer_removal() {
use crate::protocol::{Disconnect, DisconnectReason};
let edges = vec![(0, 1), (1, 2), (2, 3)];
let mut nodes = run_tree_test(4, &edges, false).await;
verify_tree_convergence(&nodes);
let _node0_addr = *nodes[0].node.node_addr();
let node1_addr = *nodes[1].node.node_addr();
let node2_addr = *nodes[2].node.node_addr();
let node3_addr = *nodes[3].node.node_addr();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let all_coords: Vec<(NodeAddr, crate::tree::TreeCoordinate)> = nodes
.iter()
.map(|tn| {
(
*tn.node.node_addr(),
tn.node.tree_state().my_coords().clone(),
)
})
.collect();
for node in &mut nodes {
for (addr, coords) in &all_coords {
if addr != node.node.node_addr() {
node.node
.coord_cache_mut()
.insert(*addr, coords.clone(), now_ms);
}
}
}
let addr_index = build_addr_index(&nodes);
match simulate_forwarding(&mut nodes, &addr_index, 0, 3) {
ForwardResult::Delivered(_) => {}
other => panic!("Expected delivery before removal, got {:?}", other),
}
let disconnect = Disconnect::new(DisconnectReason::Shutdown);
let plaintext = disconnect.encode();
nodes[2]
.node
.send_encrypted_link_message(&node1_addr, &plaintext)
.await
.expect("Failed to send disconnect");
drain_all_packets(&mut nodes, false).await;
assert!(
nodes[1].node.get_peer(&node2_addr).is_none(),
"Node 1 should have removed node 2"
);
let node0_reaches_node3 = nodes[0]
.node
.peers()
.any(|peer| peer.may_reach(&node3_addr));
assert!(
!node0_reaches_node3,
"Node 0 should not see node 3 as reachable after partition"
);
match simulate_forwarding(&mut nodes, &addr_index, 0, 3) {
ForwardResult::NoRoute { .. } => {} ForwardResult::Loop { .. } => {} ForwardResult::Delivered(hops) => {
panic!(
"Should NOT deliver after partition, but got delivery in {} hops",
hops
);
}
}
match simulate_forwarding(&mut nodes, &addr_index, 2, 3) {
ForwardResult::Delivered(_) => {}
other => panic!("Expected delivery within component, got {:?}", other),
}
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_routing_bloom_only_transit() {
let edges = vec![(0, 1), (1, 2), (2, 3)];
let mut nodes = run_tree_test(4, &edges, false).await;
verify_tree_convergence(&nodes);
let node3_addr = *nodes[3].node.node_addr();
let node3_coords = nodes[3].node.tree_state().my_coords().clone();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
nodes[0]
.node
.coord_cache_mut()
.insert(node3_addr, node3_coords, now_ms);
let hop = nodes[0].node.find_next_hop(&node3_addr);
assert!(hop.is_some(), "Node 0 should route to node 3 (has coords)");
let hop_at_1 = nodes[1].node.find_next_hop(&node3_addr);
assert!(
hop_at_1.is_none(),
"Node 1 should NOT route without cached coords (loop prevention)"
);
let hop_at_2 = nodes[2].node.find_next_hop(&node3_addr);
assert!(
hop_at_2.is_some(),
"Node 2 should route to node 3 (direct peer)"
);
assert_eq!(
hop_at_2.unwrap().node_addr(),
&node3_addr,
"Node 2's next hop to node 3 should be node 3 itself"
);
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_routing_source_only_coords_100_nodes() {
let _guard = lock_large_network_test().await;
const NUM_NODES: usize = 100;
const TARGET_EDGES: usize = 250;
const SEED: u64 = 42;
let edges = generate_random_edges(NUM_NODES, TARGET_EDGES, SEED);
let mut nodes = run_tree_test(NUM_NODES, &edges, false).await;
verify_tree_convergence(&nodes);
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let all_coords: Vec<(NodeAddr, crate::tree::TreeCoordinate)> = nodes
.iter()
.map(|tn| {
(
*tn.node.node_addr(),
tn.node.tree_state().my_coords().clone(),
)
})
.collect();
let addr_index = build_addr_index(&nodes);
let mut source_only_delivered = 0usize;
let mut source_only_failed = 0usize;
let mut total_pairs = 0usize;
let sample_pairs: Vec<(usize, usize)> = (0..NUM_NODES)
.step_by(10)
.flat_map(|s| {
(0..NUM_NODES)
.step_by(10)
.filter(move |&d| d != s)
.map(move |d| (s, d))
})
.collect();
for &(src, dst) in &sample_pairs {
total_pairs += 1;
for node in &mut nodes {
node.node.coord_cache_mut().clear();
}
let (dest_addr, dest_coords) = &all_coords[dst];
nodes[src]
.node
.coord_cache_mut()
.insert(*dest_addr, dest_coords.clone(), now_ms);
match simulate_forwarding(&mut nodes, &addr_index, src, dst) {
ForwardResult::Delivered(_) => source_only_delivered += 1,
ForwardResult::NoRoute { .. } => source_only_failed += 1,
ForwardResult::Loop { .. } => {
panic!(
"Routing loop detected with source-only coords: {} -> {}",
src, dst
);
}
}
}
eprintln!(
"\n === Source-Only Coords Routing ({} nodes) ===",
NUM_NODES
);
eprintln!(
" Pairs: {} | Delivered: {} | Failed: {} | Delivery rate: {:.1}%",
total_pairs,
source_only_delivered,
source_only_failed,
source_only_delivered as f64 / total_pairs as f64 * 100.0
);
assert!(
source_only_delivered > 0,
"At least some direct-peer pairs should be delivered"
);
for node in &mut nodes {
for (addr, coords) in &all_coords {
if addr != node.node.node_addr() {
node.node
.coord_cache_mut()
.insert(*addr, coords.clone(), now_ms);
}
}
}
let mut full_cache_failures = 0usize;
for &(src, dst) in &sample_pairs {
match simulate_forwarding(&mut nodes, &addr_index, src, dst) {
ForwardResult::Delivered(_) => {}
_ => full_cache_failures += 1,
}
}
assert_eq!(
full_cache_failures, 0,
"With full coord caches, all pairs should be delivered"
);
cleanup_nodes(&mut nodes).await;
}