use super::*;
use crate::config::RoutingMode;
use crate::node::RecentRequest;
use crate::protocol::{LookupRequest, LookupResponse};
use crate::tree::TreeCoordinate;
use spanning_tree::{
cleanup_nodes, generate_random_edges, lock_large_network_test, process_available_packets,
run_tree_test, verify_tree_convergence,
};
#[tokio::test]
async fn test_request_decode_error() {
let mut node = make_node();
let from = make_node_addr(0xAA);
node.handle_lookup_request(&from, &[0x00; 5]).await;
assert!(node.recent_requests.is_empty());
}
#[tokio::test]
async fn test_request_dedup() {
let mut node = make_node();
let from = make_node_addr(0xAA);
let target = make_node_addr(0xBB);
let origin = make_node_addr(0xCC);
let coords = TreeCoordinate::from_addrs(vec![origin, make_node_addr(0)]).unwrap();
let request = LookupRequest::new(999, target, origin, coords, 5, 0);
let payload = &request.encode()[1..];
node.handle_lookup_request(&from, payload).await;
assert_eq!(node.recent_requests.len(), 1);
node.handle_lookup_request(&from, payload).await;
assert_eq!(node.recent_requests.len(), 1);
}
#[tokio::test]
async fn test_request_target_is_self() {
let mut node = make_node();
let from = make_node_addr(0xAA);
let origin = make_node_addr(0xCC);
let my_addr = *node.node_addr();
let coords = TreeCoordinate::from_addrs(vec![origin, make_node_addr(0)]).unwrap();
let request = LookupRequest::new(777, my_addr, origin, coords, 5, 0);
let payload = &request.encode()[1..];
node.handle_lookup_request(&from, payload).await;
assert!(node.recent_requests.contains_key(&777));
}
#[tokio::test]
async fn test_request_ttl_zero_not_forwarded() {
let mut node = make_node();
let from = make_node_addr(0xAA);
let target = make_node_addr(0xBB);
let origin = make_node_addr(0xCC);
let coords = TreeCoordinate::from_addrs(vec![origin, make_node_addr(0)]).unwrap();
let request = LookupRequest::new(666, target, origin, coords, 0, 0);
let payload = &request.encode()[1..];
node.handle_lookup_request(&from, payload).await;
assert!(node.recent_requests.contains_key(&666));
}
#[tokio::test]
async fn test_response_decode_error() {
let mut node = make_node();
let from = make_node_addr(0xAA);
node.handle_lookup_response(&from, &[0x00; 10]).await;
assert!(node.coord_cache().is_empty());
}
#[tokio::test]
async fn test_response_originator_caches_route() {
let mut node = make_node();
let from = make_node_addr(0xAA);
let target_identity = Identity::generate();
let target = *target_identity.node_addr();
let root = make_node_addr(0xF0);
let coords = TreeCoordinate::from_addrs(vec![target, root]).unwrap();
node.register_identity(target, target_identity.pubkey_full());
let proof_data = LookupResponse::proof_bytes(555, &target, &coords);
let proof = target_identity.sign(&proof_data);
let response = LookupResponse::new(555, target, coords.clone(), proof);
let payload = &response.encode()[1..];
assert!(!node.recent_requests.contains_key(&555));
node.handle_lookup_response(&from, payload).await;
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
assert!(node.coord_cache().contains(&target, now_ms));
assert_eq!(node.coord_cache().get(&target, now_ms).unwrap(), &coords);
}
#[tokio::test]
async fn test_response_transit_needs_recent_request() {
let mut node = make_node();
let from = make_node_addr(0xAA);
let target = make_node_addr(0xBB);
let root = make_node_addr(0xF0);
let coords = TreeCoordinate::from_addrs(vec![target, root]).unwrap();
let proof_data = LookupResponse::proof_bytes(444, &target, &coords);
let target_identity = Identity::generate();
let proof = target_identity.sign(&proof_data);
let response = LookupResponse::new(444, target, coords, proof);
let payload = &response.encode()[1..];
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
node.recent_requests
.insert(444, RecentRequest::new(make_node_addr(0xDD), now_ms));
node.handle_lookup_response(&from, payload).await;
let now_ms2 = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
assert!(!node.coord_cache().contains(&target, now_ms2));
}
#[tokio::test]
async fn test_response_proof_verification_success() {
let mut node = make_node();
let from = make_node_addr(0xAA);
let target_identity = Identity::generate();
let target = *target_identity.node_addr();
let root = make_node_addr(0xF0);
let coords = TreeCoordinate::from_addrs(vec![target, root]).unwrap();
node.register_identity(target, target_identity.pubkey_full());
let proof_data = LookupResponse::proof_bytes(700, &target, &coords);
let proof = target_identity.sign(&proof_data);
let response = LookupResponse::new(700, target, coords.clone(), proof);
let payload = &response.encode()[1..];
node.handle_lookup_response(&from, payload).await;
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
assert!(
node.coord_cache().contains(&target, now_ms),
"Valid proof should result in cached coords"
);
assert_eq!(node.coord_cache().get(&target, now_ms).unwrap(), &coords);
}
#[tokio::test]
async fn test_response_proof_verification_failure() {
let mut node = make_node();
let from = make_node_addr(0xAA);
let target_identity = Identity::generate();
let target = *target_identity.node_addr();
let root = make_node_addr(0xF0);
let coords = TreeCoordinate::from_addrs(vec![target, root]).unwrap();
node.register_identity(target, target_identity.pubkey_full());
let wrong_identity = Identity::generate();
let proof_data = LookupResponse::proof_bytes(701, &target, &coords);
let proof = wrong_identity.sign(&proof_data);
let response = LookupResponse::new(701, target, coords, proof);
let payload = &response.encode()[1..];
node.handle_lookup_response(&from, payload).await;
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
assert!(
!node.coord_cache().contains(&target, now_ms),
"Bad signature should NOT result in cached coords"
);
}
#[tokio::test]
async fn test_response_identity_cache_miss() {
let mut node = make_node();
let from = make_node_addr(0xAA);
let target_identity = Identity::generate();
let target = *target_identity.node_addr();
let root = make_node_addr(0xF0);
let coords = TreeCoordinate::from_addrs(vec![target, root]).unwrap();
let proof_data = LookupResponse::proof_bytes(702, &target, &coords);
let proof = target_identity.sign(&proof_data);
let response = LookupResponse::new(702, target, coords, proof);
let payload = &response.encode()[1..];
node.handle_lookup_response(&from, payload).await;
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
assert!(
!node.coord_cache().contains(&target, now_ms),
"identity_cache miss should discard the response"
);
}
#[tokio::test]
async fn test_response_coord_substitution_detected() {
let mut node = make_node();
let from = make_node_addr(0xAA);
let target_identity = Identity::generate();
let target = *target_identity.node_addr();
let root = make_node_addr(0xF0);
let real_coords = TreeCoordinate::from_addrs(vec![target, root]).unwrap();
let fake_coords = TreeCoordinate::from_addrs(vec![target, make_node_addr(0xEE), root]).unwrap();
node.register_identity(target, target_identity.pubkey_full());
let proof_data = LookupResponse::proof_bytes(703, &target, &real_coords);
let proof = target_identity.sign(&proof_data);
let response = LookupResponse::new(703, target, fake_coords, proof);
let payload = &response.encode()[1..];
node.handle_lookup_response(&from, payload).await;
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
assert!(
!node.coord_cache().contains(&target, now_ms),
"Substituted coords should be detected and response discarded"
);
}
#[tokio::test]
async fn test_recent_request_expiry() {
let mut node = make_node();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
node.recent_requests
.insert(123, RecentRequest::new(make_node_addr(1), now_ms - 11_000));
node.recent_requests
.insert(456, RecentRequest::new(make_node_addr(2), now_ms));
assert_eq!(node.recent_requests.len(), 2);
let target = make_node_addr(0xBB);
let origin = make_node_addr(0xCC);
let coords = TreeCoordinate::from_addrs(vec![origin, make_node_addr(0)]).unwrap();
let request = LookupRequest::new(789, target, origin, coords, 3, 0);
let payload = &request.encode()[1..];
node.handle_lookup_request(&make_node_addr(0xAA), payload)
.await;
assert!(!node.recent_requests.contains_key(&123));
assert!(node.recent_requests.contains_key(&456));
assert!(node.recent_requests.contains_key(&789));
}
#[tokio::test]
async fn test_request_forwarding_two_node() {
let edges = vec![(0, 1)];
let mut nodes = run_tree_test(2, &edges, false).await;
let node0_addr = *nodes[0].node.node_addr();
let target = *nodes[1].node.node_addr(); let root = make_node_addr(0);
let coords = TreeCoordinate::from_addrs(vec![node0_addr, root]).unwrap();
let request = LookupRequest::new(42, target, node0_addr, coords, 5, 0);
let payload = &request.encode()[1..];
nodes[0]
.node
.handle_lookup_request(&node0_addr, payload)
.await;
tokio::time::sleep(Duration::from_millis(50)).await;
let count = process_available_packets(&mut nodes).await;
assert!(
count > 0,
"Expected forwarded LookupRequest to arrive at node 1"
);
assert!(
nodes[1].node.recent_requests.contains_key(&42),
"Node 1 should have recorded the forwarded request"
);
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_request_target_found_generates_response() {
let edges = vec![(0, 1)];
let mut nodes = run_tree_test(2, &edges, false).await;
let node1_addr = *nodes[1].node.node_addr();
nodes[0].node.initiate_lookup(&node1_addr, 5).await;
for _ in 0..4 {
tokio::time::sleep(Duration::from_millis(50)).await;
process_available_packets(&mut nodes).await;
}
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
assert!(
nodes[0].node.coord_cache().contains(&node1_addr, now_ms),
"Node 0 should have cached node 1's route from LookupResponse"
);
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_request_three_node_chain() {
let edges = vec![(0, 1), (1, 2)];
let mut nodes = run_tree_test(3, &edges, false).await;
let node2_addr = *nodes[2].node.node_addr();
let node2_pubkey = nodes[2].node.identity().pubkey_full();
nodes[0].node.register_identity(node2_addr, node2_pubkey);
nodes[0].node.initiate_lookup(&node2_addr, 8).await;
for _ in 0..10 {
tokio::time::sleep(Duration::from_millis(100)).await;
process_available_packets(&mut nodes).await;
}
assert!(
!nodes[1].node.recent_requests.is_empty(),
"Node 1 should have recorded the forwarded request"
);
assert!(
!nodes[2].node.recent_requests.is_empty(),
"Node 2 should have received the request"
);
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
assert!(
nodes[0].node.coord_cache().contains(&node2_addr, now_ms),
"Node 0 should have cached node 2's route through 3-node chain"
);
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_reply_learned_forwards_lookup_to_direct_non_tree_target() {
let edges = vec![(0, 1), (1, 2)];
let mut nodes = run_tree_test(3, &edges, false).await;
verify_tree_convergence(&nodes);
let node0_addr = *nodes[0].node.node_addr();
let node1_addr = *nodes[1].node.node_addr();
let node2_addr = *nodes[2].node.node_addr();
nodes[1].node.config.node.routing.mode = RoutingMode::ReplyLearned;
nodes[1].node.tree_state_mut().remove_peer(&node2_addr);
nodes[1].node.tree_state_mut().become_root();
assert!(
nodes[1]
.node
.peers
.get(&node2_addr)
.is_some_and(|peer| peer.can_send()),
"node2 should remain a direct sendable peer"
);
assert!(
!nodes[1].node.is_tree_peer(&node2_addr),
"node2 should not be a tree peer in this regression fixture"
);
let origin_coords = TreeCoordinate::from_addrs(vec![node0_addr, node1_addr]).unwrap();
let request = LookupRequest::new(4242, node2_addr, node0_addr, origin_coords, 5, 0);
let payload = &request.encode()[1..];
nodes[1]
.node
.handle_lookup_request(&node0_addr, payload)
.await;
for _ in 0..4 {
tokio::time::sleep(Duration::from_millis(50)).await;
process_available_packets(&mut nodes).await;
}
assert!(
nodes[2].node.recent_requests.contains_key(&4242),
"direct non-tree target should receive the forwarded lookup"
);
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_reply_learned_initiates_lookup_to_sendable_non_tree_peer() {
let edges = vec![(0, 1)];
let mut nodes = run_tree_test(2, &edges, false).await;
verify_tree_convergence(&nodes);
let node0_addr = *nodes[0].node.node_addr();
let node1_addr = *nodes[1].node.node_addr();
nodes[0].node.config.node.routing.mode = RoutingMode::ReplyLearned;
nodes[0].node.tree_state_mut().remove_peer(&node1_addr);
nodes[0].node.tree_state_mut().become_root();
assert!(
nodes[0]
.node
.peers
.get(&node1_addr)
.is_some_and(|peer| peer.can_send()),
"node1 should remain a direct sendable peer"
);
assert!(
!nodes[0].node.is_tree_peer(&node1_addr),
"node1 should not be a tree peer in this regression fixture"
);
let target = make_node_addr(0x55);
let sent = nodes[0].node.initiate_lookup(&target, 5).await;
assert_eq!(
sent, 1,
"non-tree sendable peer should receive fallback lookup"
);
for _ in 0..4 {
tokio::time::sleep(Duration::from_millis(50)).await;
process_available_packets(&mut nodes).await;
}
assert_eq!(
nodes[1].node.recent_requests.len(),
1,
"fallback lookup should arrive at the non-tree peer"
);
let recent = nodes[1].node.recent_requests.values().next().unwrap();
assert_eq!(recent.from_peer, node0_addr);
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_reply_learned_initiates_lookup_fanout_despite_tree_match() {
let edges = vec![(0, 1), (1, 2), (0, 3)];
let mut nodes = run_tree_test(4, &edges, false).await;
verify_tree_convergence(&nodes);
let node0_addr = *nodes[0].node.node_addr();
let node1_addr = *nodes[1].node.node_addr();
let node2_addr = *nodes[2].node.node_addr();
let node3_addr = *nodes[3].node.node_addr();
nodes[0].node.config.node.routing.mode = RoutingMode::ReplyLearned;
assert!(
nodes[0]
.node
.peers
.get(&node1_addr)
.is_some_and(|peer| peer.may_reach(&node2_addr)),
"node1 should be the tree/bloom match for node2"
);
nodes[0].node.tree_state_mut().remove_peer(&node3_addr);
nodes[0].node.tree_state_mut().become_root();
assert!(
nodes[0]
.node
.peers
.get(&node3_addr)
.is_some_and(|peer| peer.can_send()),
"node3 should remain a direct sendable peer"
);
assert!(
!nodes[0].node.is_tree_peer(&node3_addr),
"node3 should not be a tree peer in this regression fixture"
);
let sent = nodes[0].node.initiate_lookup(&node2_addr, 5).await;
assert_eq!(
sent, 2,
"reply-learned lookup should include the tree match and live non-tree peer"
);
for _ in 0..4 {
tokio::time::sleep(Duration::from_millis(50)).await;
process_available_packets(&mut nodes).await;
}
assert!(
nodes[3]
.node
.recent_requests
.values()
.any(|request| request.from_peer == node0_addr),
"non-tree peer should receive reply-learned fanout despite tree match"
);
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_reply_learned_forward_fallback_uses_non_tree_peer_without_origin_echo() {
let edges = vec![(0, 1), (1, 2)];
let mut nodes = run_tree_test(3, &edges, false).await;
verify_tree_convergence(&nodes);
let node0_addr = *nodes[0].node.node_addr();
let node1_addr = *nodes[1].node.node_addr();
let node2_addr = *nodes[2].node.node_addr();
nodes[1].node.config.node.routing.mode = RoutingMode::ReplyLearned;
nodes[1].node.tree_state_mut().remove_peer(&node2_addr);
nodes[1].node.tree_state_mut().become_root();
assert!(
nodes[1]
.node
.peers
.get(&node2_addr)
.is_some_and(|peer| peer.can_send()),
"node2 should remain a direct sendable peer"
);
assert!(
!nodes[1].node.is_tree_peer(&node2_addr),
"node2 should not be a tree peer in this regression fixture"
);
let target = make_node_addr(0x66);
let origin_coords = TreeCoordinate::from_addrs(vec![node0_addr, node1_addr]).unwrap();
let request = LookupRequest::new(4343, target, node0_addr, origin_coords, 5, 0);
let payload = &request.encode()[1..];
nodes[1]
.node
.handle_lookup_request(&node0_addr, payload)
.await;
for _ in 0..4 {
tokio::time::sleep(Duration::from_millis(50)).await;
process_available_packets(&mut nodes).await;
}
assert!(
nodes[2].node.recent_requests.contains_key(&4343),
"reply-learned fallback should fan out through the non-tree peer"
);
assert!(
!nodes[0].node.recent_requests.contains_key(&4343),
"transit fallback must not echo lookup requests to the originator"
);
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_reply_learned_forwards_lookup_fanout_despite_tree_match() {
let edges = vec![(0, 1), (1, 2), (2, 4), (1, 3)];
let mut nodes = run_tree_test(5, &edges, false).await;
verify_tree_convergence(&nodes);
let node0_addr = *nodes[0].node.node_addr();
let node1_addr = *nodes[1].node.node_addr();
let node2_addr = *nodes[2].node.node_addr();
let node3_addr = *nodes[3].node.node_addr();
let node4_addr = *nodes[4].node.node_addr();
nodes[1].node.config.node.routing.mode = RoutingMode::ReplyLearned;
assert!(
nodes[1]
.node
.peers
.get(&node2_addr)
.is_some_and(|peer| peer.may_reach(&node4_addr)),
"node2 should be the tree/bloom match for node4"
);
nodes[1].node.tree_state_mut().remove_peer(&node3_addr);
nodes[1].node.tree_state_mut().become_root();
assert!(
nodes[1]
.node
.peers
.get(&node3_addr)
.is_some_and(|peer| peer.can_send()),
"node3 should remain a direct sendable peer"
);
assert!(
!nodes[1].node.is_tree_peer(&node3_addr),
"node3 should not be a tree peer in this regression fixture"
);
let origin_coords = TreeCoordinate::from_addrs(vec![node0_addr, node1_addr]).unwrap();
let request = LookupRequest::new(4444, node4_addr, node0_addr, origin_coords, 5, 0);
let payload = &request.encode()[1..];
nodes[1]
.node
.handle_lookup_request(&node0_addr, payload)
.await;
for _ in 0..4 {
tokio::time::sleep(Duration::from_millis(50)).await;
process_available_packets(&mut nodes).await;
}
assert!(
nodes[2].node.recent_requests.contains_key(&4444),
"tree/bloom match should receive the forwarded lookup"
);
assert!(
nodes[3].node.recent_requests.contains_key(&4444),
"non-tree peer should also receive reply-learned fanout"
);
assert!(
!nodes[0].node.recent_requests.contains_key(&4444),
"transit fanout must not echo lookup requests to the originator"
);
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_request_dedup_convergent_paths() {
let edges = vec![(0, 1), (0, 2), (1, 2)];
let mut nodes = run_tree_test(3, &edges, false).await;
let node0_addr = *nodes[0].node.node_addr();
let target = *nodes[2].node.node_addr(); let root = make_node_addr(0);
let coords = TreeCoordinate::from_addrs(vec![node0_addr, root]).unwrap();
let request = LookupRequest::new(300, target, node0_addr, coords, 5, 0);
let payload = &request.encode()[1..];
nodes[0]
.node
.handle_lookup_request(&node0_addr, payload)
.await;
for _ in 0..5 {
tokio::time::sleep(Duration::from_millis(50)).await;
process_available_packets(&mut nodes).await;
}
assert!(
nodes[2].node.recent_requests.contains_key(&300),
"Node 2 (target) should have received the request"
);
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
#[ignore] async fn test_discovery_100_nodes() {
let _guard = lock_large_network_test().await;
const NUM_NODES: usize = 100;
const TARGET_EDGES: usize = 250;
const SEED: u64 = 42;
const TTL: u8 = 20; let edges = generate_random_edges(NUM_NODES, TARGET_EDGES, SEED);
let mut nodes = run_tree_test(NUM_NODES, &edges, false).await;
verify_tree_convergence(&nodes);
for tn in nodes.iter_mut() {
tn.node.disable_discovery_forward_rate_limit();
}
let all_addrs: Vec<NodeAddr> = nodes.iter().map(|tn| *tn.node.node_addr()).collect();
let all_pubkeys: Vec<secp256k1::PublicKey> = nodes
.iter()
.map(|tn| tn.node.identity().pubkey_full())
.collect();
for (src, node) in nodes.iter_mut().enumerate() {
for dst in (0..NUM_NODES).step_by(10) {
if src == dst {
continue;
}
node.node
.register_identity(all_addrs[dst], all_pubkeys[dst]);
}
}
let mut lookup_pairs: Vec<(usize, usize)> = Vec::new();
for src in 0..NUM_NODES {
for dst in (0..NUM_NODES).step_by(10) {
if src == dst {
continue;
}
lookup_pairs.push((src, dst));
}
}
let total_lookups = lookup_pairs.len();
for src in 0..NUM_NODES {
let mut initiated = false;
for &(s, dst) in &lookup_pairs {
if s == src {
nodes[src].node.initiate_lookup(&all_addrs[dst], TTL).await;
initiated = true;
}
}
if !initiated {
continue;
}
let mut idle_rounds = 0;
for _ in 0..80 {
tokio::time::sleep(Duration::from_millis(5)).await;
let count = process_available_packets(&mut nodes).await;
if count == 0 {
idle_rounds += 1;
if idle_rounds >= 5 {
break;
}
} else {
idle_rounds = 0;
}
}
}
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let mut resolved = 0usize;
let mut failed = 0usize;
let mut failed_pairs: Vec<(usize, usize)> = Vec::new();
for &(src, dst) in &lookup_pairs {
if nodes[src]
.node
.coord_cache()
.contains(&all_addrs[dst], now_ms)
{
resolved += 1;
} else {
failed += 1;
if failed_pairs.len() < 20 {
failed_pairs.push((src, dst));
}
}
}
eprintln!("\n === Discovery 100-Node Test ===",);
eprintln!(
" Lookups: {} | Resolved: {} | Failed: {} | Success rate: {:.1}%",
total_lookups,
resolved,
failed,
resolved as f64 / total_lookups as f64 * 100.0
);
let total_cached: usize = nodes.iter().map(|tn| tn.node.coord_cache().len()).sum();
let min_cached = nodes
.iter()
.map(|tn| tn.node.coord_cache().len())
.min()
.unwrap();
let max_cached = nodes
.iter()
.map(|tn| tn.node.coord_cache().len())
.max()
.unwrap();
eprintln!(
" Coord cache entries: total={} min={} max={} avg={:.1}",
total_cached,
min_cached,
max_cached,
total_cached as f64 / NUM_NODES as f64
);
if !failed_pairs.is_empty() {
eprintln!(
" --- Failure Diagnostics ({} failures) ---",
failed_pairs.len()
);
for &(src, dst) in &failed_pairs {
let src_coords = nodes[src].node.tree_state().my_coords().clone();
let dst_coords = nodes[dst].node.tree_state().my_coords().clone();
let tree_dist = src_coords.distance_to(&dst_coords);
let reverse_cached = nodes[dst]
.node
.coord_cache()
.contains(&all_addrs[src], now_ms);
let src_peers = nodes[src].node.peers.len();
let dst_peers = nodes[dst].node.peers.len();
eprintln!(
" node {} -> node {}: tree_dist={} src_depth={} dst_depth={} \
src_peers={} dst_peers={} reverse_cached={}",
src,
dst,
tree_dist,
src_coords.depth(),
dst_coords.depth(),
src_peers,
dst_peers,
reverse_cached
);
}
}
assert_eq!(
failed, 0,
"All {} lookups should resolve, but {} failed",
total_lookups, failed
);
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_response_path_mtu_two_node() {
let edges = vec![(0, 1)];
let mut nodes = run_tree_test(2, &edges, false).await;
let node1_addr = *nodes[1].node.node_addr();
nodes[0].node.initiate_lookup(&node1_addr, 5).await;
for _ in 0..4 {
tokio::time::sleep(Duration::from_millis(50)).await;
process_available_packets(&mut nodes).await;
}
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
assert!(
nodes[0].node.coord_cache().contains(&node1_addr, now_ms),
"Node 0 should have cached node 1's route"
);
let entry = nodes[0].node.coord_cache().get_entry(&node1_addr).unwrap();
let path_mtu = entry
.path_mtu()
.expect("path_mtu should be set from discovery");
assert_eq!(
path_mtu, 1280,
"Two-node path_mtu should be the target-edge link MTU (1280 in tests)"
);
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_apply_outgoing_link_mtu_to_response_unknown_peer_noop() {
let node = make_node();
let unknown = make_node_addr(0x99);
let coords = TreeCoordinate::from_addrs(vec![unknown, make_node_addr(0)]).unwrap();
let identity = Identity::generate();
let proof_data = LookupResponse::proof_bytes(1, &unknown, &coords);
let proof = identity.sign(&proof_data);
let mut response = LookupResponse::new(1, unknown, coords, proof);
response.path_mtu = 1500;
node.apply_outgoing_link_mtu_to_response(&mut response, &unknown);
assert_eq!(
response.path_mtu, 1500,
"Unknown next_hop must leave path_mtu untouched"
);
}
#[tokio::test]
async fn test_response_path_mtu_three_node_chain() {
let edges = vec![(0, 1), (1, 2)];
let mut nodes = run_tree_test(3, &edges, false).await;
let node2_addr = *nodes[2].node.node_addr();
let node2_pubkey = nodes[2].node.identity().pubkey_full();
nodes[0].node.register_identity(node2_addr, node2_pubkey);
nodes[0].node.initiate_lookup(&node2_addr, 8).await;
for _ in 0..10 {
tokio::time::sleep(Duration::from_millis(100)).await;
process_available_packets(&mut nodes).await;
}
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
assert!(
nodes[0].node.coord_cache().contains(&node2_addr, now_ms),
"Node 0 should have cached node 2's route"
);
let entry = nodes[0].node.coord_cache().get_entry(&node2_addr).unwrap();
let path_mtu = entry
.path_mtu()
.expect("path_mtu should be set from discovery");
assert_eq!(
path_mtu, 1280,
"Three-node chain path_mtu should reflect transit node's transport MTU (1280)"
);
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_cache_entry_path_mtu_stored() {
let mut node = make_node();
let target = make_node_addr(0xBB);
let coords = TreeCoordinate::from_addrs(vec![target, make_node_addr(0)]).unwrap();
let now_ms = 1000u64;
node.coord_cache_mut()
.insert_with_path_mtu(target, coords, now_ms, 1280);
let entry = node.coord_cache().get_entry(&target).unwrap();
assert_eq!(entry.path_mtu(), Some(1280));
}
#[tokio::test]
async fn test_cache_entry_no_path_mtu_from_regular_insert() {
let mut node = make_node();
let target = make_node_addr(0xBB);
let coords = TreeCoordinate::from_addrs(vec![target, make_node_addr(0)]).unwrap();
let now_ms = 1000u64;
node.coord_cache_mut().insert(target, coords, now_ms);
let entry = node.coord_cache().get_entry(&target).unwrap();
assert_eq!(entry.path_mtu(), None);
}
#[tokio::test]
async fn test_request_min_mtu_preserved_through_encode_decode() {
let target = make_node_addr(0xBB);
let origin = make_node_addr(0xCC);
let coords = TreeCoordinate::from_addrs(vec![origin, make_node_addr(0)]).unwrap();
let request = LookupRequest::new(100, target, origin, coords, 5, 1386);
let encoded = request.encode();
let decoded = LookupRequest::decode(&encoded[1..]).unwrap();
assert_eq!(decoded.min_mtu, 1386);
}
#[tokio::test]
async fn test_originator_stores_path_mtu_in_cache() {
let mut node = make_node();
let from = make_node_addr(0xAA);
let target_identity = Identity::generate();
let target = *target_identity.node_addr();
let root = make_node_addr(0xF0);
let coords = TreeCoordinate::from_addrs(vec![target, root]).unwrap();
node.register_identity(target, target_identity.pubkey_full());
let proof_data = LookupResponse::proof_bytes(800, &target, &coords);
let proof = target_identity.sign(&proof_data);
let mut response = LookupResponse::new(800, target, coords.clone(), proof);
response.path_mtu = 1280;
let payload = &response.encode()[1..];
node.handle_lookup_response(&from, payload).await;
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
assert!(node.coord_cache().contains(&target, now_ms));
let entry = node.coord_cache().get_entry(&target).unwrap();
assert_eq!(
entry.path_mtu(),
Some(1280),
"Originator should store path_mtu from LookupResponse in cache"
);
}
#[tokio::test]
async fn test_open_discovery_sweep_queues_eligible_skips_filtered() {
use crate::config::NostrDiscoveryPolicy;
use crate::discovery::nostr::{NostrDiscovery, OverlayEndpointAdvert, OverlayTransportKind};
use crate::peer::ActivePeer;
use crate::transport::LinkId;
use std::sync::Arc;
let mut config = crate::Config::new();
config.node.discovery.nostr.enabled = true;
config.node.discovery.nostr.policy = NostrDiscoveryPolicy::Open;
let mut node = crate::Node::new(config).unwrap();
let connected_identity = crate::Identity::generate();
let connected_npub = crate::encode_npub(&connected_identity.pubkey());
let connected_node_addr = *connected_identity.node_addr();
let connected_peer_identity = crate::PeerIdentity::from_pubkey(connected_identity.pubkey());
node.peers.insert(
connected_node_addr,
ActivePeer::new(connected_peer_identity, LinkId::new(1), 1_000),
);
let eligible_identity = crate::Identity::generate();
let eligible_npub = crate::encode_npub(&eligible_identity.pubkey());
let eligible_node_addr = *eligible_identity.node_addr();
let self_npub = crate::encode_npub(&node.identity().pubkey());
let self_node_addr = *node.identity().node_addr();
let bootstrap = Arc::new(NostrDiscovery::new_for_test());
let endpoint = OverlayEndpointAdvert {
transport: OverlayTransportKind::Udp,
addr: "203.0.113.7:2121".to_string(),
};
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
for npub in [&eligible_npub, &connected_npub, &self_npub] {
let advert =
NostrDiscovery::cached_advert_for_test(npub.clone(), endpoint.clone(), now_secs);
bootstrap.insert_advert_for_test(npub.clone(), advert).await;
}
node.run_open_discovery_sweep(&bootstrap, Some(3_600), "test")
.await;
assert!(
node.retry_pending.contains_key(&eligible_node_addr),
"eligible advert should be queued for retry"
);
let queued = node.retry_pending.get(&eligible_node_addr).unwrap();
assert_eq!(queued.peer_config.npub, eligible_npub);
assert!(
!node.retry_pending.contains_key(&connected_node_addr),
"advert for already-connected peer must not be queued"
);
assert!(
!node.retry_pending.contains_key(&self_node_addr),
"advert authored by own node must not be queued"
);
assert_eq!(node.retry_pending.len(), 1);
}
#[tokio::test]
async fn test_open_discovery_sweep_expedites_configured_peer_retry() {
use crate::config::{ConnectPolicy, NostrDiscoveryPolicy, PeerAddress, PeerConfig};
use crate::discovery::nostr::{NostrDiscovery, OverlayEndpointAdvert, OverlayTransportKind};
use std::sync::Arc;
let configured_identity = crate::Identity::generate();
let configured_npub = crate::encode_npub(&configured_identity.pubkey());
let configured_node_addr = *configured_identity.node_addr();
let mut config = crate::Config::new();
config.node.discovery.nostr.enabled = true;
config.node.discovery.nostr.policy = NostrDiscoveryPolicy::Open;
config.peers.push(PeerConfig {
npub: configured_npub.clone(),
alias: Some("test-peer".to_string()),
addresses: vec![PeerAddress::new("udp", "203.0.113.99:51820")],
connect_policy: ConnectPolicy::AutoConnect,
auto_reconnect: true,
});
let mut node = crate::Node::new(config).unwrap();
let pc = node
.config
.peers()
.iter()
.find(|pc| pc.npub == configured_npub)
.cloned()
.unwrap();
let now_ms = crate::Node::now_ms();
let scheduled_at_ms = now_ms + 60_000;
let mut state = crate::node::retry::RetryState::new(pc);
state.retry_count = 3;
state.retry_after_ms = scheduled_at_ms;
node.retry_pending.insert(configured_node_addr, state);
let bootstrap = Arc::new(NostrDiscovery::new_for_test());
let endpoint = OverlayEndpointAdvert {
transport: OverlayTransportKind::Udp,
addr: "203.0.113.7:2121".to_string(),
};
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let advert =
NostrDiscovery::cached_advert_for_test(configured_npub.clone(), endpoint, now_secs);
bootstrap
.insert_advert_for_test(configured_npub.clone(), advert)
.await;
node.run_open_discovery_sweep(&bootstrap, Some(3_600), "test")
.await;
let state = node
.retry_pending
.get(&configured_node_addr)
.expect("retry entry must still exist; sweep should expedite, not remove");
assert!(
state.retry_after_ms <= crate::Node::now_ms(),
"expected retry_after_ms ≤ now (expedited); got {} (now ≈ {})",
state.retry_after_ms,
crate::Node::now_ms()
);
assert!(
state.retry_after_ms < scheduled_at_ms,
"expected retry_after_ms < original scheduled_at_ms; got {} >= {}",
state.retry_after_ms,
scheduled_at_ms
);
assert_eq!(state.peer_config.npub, configured_npub);
assert_eq!(state.peer_config.alias.as_deref(), Some("test-peer"));
}
#[tokio::test]
async fn test_check_pending_lookups_default_sequence_unreachable() {
use crate::bloom::BloomFilter;
use crate::node::handlers::discovery::PendingLookup;
use crate::peer::ActivePeer;
use crate::transport::LinkId;
use std::sync::mpsc;
let mut node = make_node();
assert_eq!(
node.config.node.discovery.attempt_timeouts_secs,
vec![1, 2, 4, 8],
"test pins the [1,2,4,8] default; update the test if the default changes"
);
let (tun_tx, tun_rx) = mpsc::channel::<Vec<u8>>();
node.tun_tx = Some(tun_tx);
let target_identity = Identity::generate();
let target_addr = *target_identity.node_addr();
let peer_identity_full = Identity::generate();
let peer_addr = *peer_identity_full.node_addr();
let peer_identity = crate::PeerIdentity::from_pubkey(peer_identity_full.pubkey());
let mut peer = ActivePeer::new(peer_identity, LinkId::new(1), 0);
let mut bloom = BloomFilter::new();
bloom.insert(&target_addr);
peer.update_filter(bloom, 1, 0);
node.peers.insert(peer_addr, peer);
let our_addr = *node.node_addr();
let peer_decl = crate::tree::ParentDeclaration::new(peer_addr, our_addr, 1, 0);
let peer_coords = TreeCoordinate::from_addrs(vec![peer_addr, our_addr]).unwrap();
node.tree_state_mut().update_peer(peer_decl, peer_coords);
assert!(node.is_tree_peer(&peer_addr), "peer must be a tree peer");
let mut ipv6_pkt = vec![0u8; 40];
ipv6_pkt[0] = 0x60; ipv6_pkt[6] = 17; ipv6_pkt[7] = 64; ipv6_pkt[8] = 0xfd;
ipv6_pkt[23] = 0x01;
let target_ipv6 = crate::FipsAddress::from_node_addr(&target_addr).to_ipv6();
ipv6_pkt[24..40].copy_from_slice(&target_ipv6.octets());
let mut queue = std::collections::VecDeque::new();
queue.push_back(ipv6_pkt);
node.pending_tun_packets.insert(target_addr, queue);
node.pending_lookups
.insert(target_addr, PendingLookup::new(0));
let baseline_initiated = node.stats().discovery.req_initiated;
let baseline_timed_out = node.stats().discovery.resp_timed_out;
node.check_pending_lookups(1100).await;
{
let entry = node
.pending_lookups
.get(&target_addr)
.expect("still pending");
assert_eq!(entry.attempt, 2, "after retry #1, attempt should be 2");
assert_eq!(entry.last_sent_ms, 1100);
}
assert_eq!(
node.stats().discovery.req_initiated,
baseline_initiated + 1,
"retry #1 must invoke initiate_lookup exactly once"
);
node.check_pending_lookups(3100).await;
{
let entry = node
.pending_lookups
.get(&target_addr)
.expect("still pending");
assert_eq!(entry.attempt, 3, "after retry #2, attempt should be 3");
assert_eq!(entry.last_sent_ms, 3100);
}
assert_eq!(
node.stats().discovery.req_initiated,
baseline_initiated + 2,
"retry #2 must invoke initiate_lookup exactly once more"
);
node.check_pending_lookups(7100).await;
{
let entry = node
.pending_lookups
.get(&target_addr)
.expect("still pending");
assert_eq!(entry.attempt, 4, "after retry #3, attempt should be 4");
assert_eq!(entry.last_sent_ms, 7100);
}
assert_eq!(
node.stats().discovery.req_initiated,
baseline_initiated + 3,
"retry #3 must invoke initiate_lookup exactly once more"
);
node.check_pending_lookups(15_099).await;
assert!(
node.pending_lookups.contains_key(&target_addr),
"8s window not yet expired: pending_lookup must persist"
);
assert_eq!(
node.stats().discovery.req_initiated,
baseline_initiated + 3,
"no new attempt before final deadline"
);
assert_eq!(
node.stats().discovery.resp_timed_out,
baseline_timed_out,
"no timeout before final deadline"
);
while tun_rx.try_recv().is_ok() {}
node.check_pending_lookups(15_100).await;
assert!(
!node.pending_lookups.contains_key(&target_addr),
"final timeout must remove the pending_lookups entry"
);
assert_eq!(
node.stats().discovery.resp_timed_out,
baseline_timed_out + 1,
"final timeout must increment discovery.resp_timed_out"
);
assert_eq!(
node.stats().discovery.req_initiated,
baseline_initiated + 3,
"the final-timeout step must NOT call initiate_lookup"
);
assert!(
!node.pending_tun_packets.contains_key(&target_addr),
"queued packets for the unreachable target must be drained"
);
let icmp_frame = tun_rx
.try_recv()
.expect("ICMPv6 Destination Unreachable must be emitted on final timeout");
assert!(
icmp_frame.len() >= 48,
"ICMPv6 frame must be at least IPv6 header (40) + ICMPv6 header (8)"
);
assert_eq!(icmp_frame[0] >> 4, 6, "must be IPv6");
assert_eq!(icmp_frame[6], 58, "next_header must be IPPROTO_ICMPV6 (58)");
assert_eq!(icmp_frame[40], 1, "ICMPv6 type 1 = Destination Unreachable");
}