use super::spanning_tree::*;
use super::*;
fn get_tree_edges(nodes: &[TestNode]) -> Vec<(usize, usize)> {
let mut edges = Vec::new();
for (i, tn) in nodes.iter().enumerate() {
let ts = tn.node.tree_state();
if !ts.is_root() {
let parent_addr = ts.my_declaration().parent_id();
if let Some(j) = nodes.iter().position(|n| n.node.node_addr() == parent_addr) {
edges.push((i, j));
}
}
}
edges
}
fn verify_filter_exchange(nodes: &[TestNode], edges: &[(usize, usize)]) {
for &(i, j) in edges {
let j_addr = *nodes[j].node.node_addr();
let i_addr = *nodes[i].node.node_addr();
let peer_j = nodes[i]
.node
.get_peer(&j_addr)
.unwrap_or_else(|| panic!("Node {} should have peer {}", i, j));
let filter_from_j = peer_j.inbound_filter().unwrap_or_else(|| {
panic!(
"Node {} should have inbound filter from node {} (addr={})",
i, j, j_addr
)
});
assert!(
filter_from_j.contains(&j_addr),
"Node {}'s filter from node {} should contain node {}'s addr",
i,
j,
j
);
let peer_i = nodes[j]
.node
.get_peer(&i_addr)
.unwrap_or_else(|| panic!("Node {} should have peer {}", j, i));
let filter_from_i = peer_i.inbound_filter().unwrap_or_else(|| {
panic!(
"Node {} should have inbound filter from node {} (addr={})",
j, i, i_addr
)
});
assert!(
filter_from_i.contains(&i_addr),
"Node {}'s filter from node {} should contain node {}'s addr",
j,
i,
i
);
}
}
fn verify_tree_propagation(nodes: &[TestNode], tree_edges: &[(usize, usize)]) {
let n = nodes.len();
let mut tree_adj = vec![vec![]; n];
for &(i, j) in tree_edges {
tree_adj[i].push(j);
tree_adj[j].push(i);
}
for &(i, j) in tree_edges {
let j_addr = *nodes[j].node.node_addr();
let peer_j = nodes[i].node.get_peer(&j_addr).unwrap();
let filter = peer_j.inbound_filter().unwrap();
for &neighbor_idx in &tree_adj[j] {
if neighbor_idx == i {
continue; }
let neighbor_addr = *nodes[neighbor_idx].node.node_addr();
assert!(
filter.contains(&neighbor_addr),
"Node {}'s filter from node {} should contain node {}'s tree neighbor {} (addr={})",
i,
j,
j,
neighbor_idx,
neighbor_addr
);
}
}
}
#[tokio::test]
async fn test_bloom_filter_10_nodes() {
let edges = generate_random_edges(10, 20, 123);
let mut nodes = run_tree_test(10, &edges, false).await;
verify_tree_convergence(&nodes);
verify_filter_exchange(&nodes, &edges);
let tree_edges = get_tree_edges(&nodes);
verify_tree_propagation(&nodes, &tree_edges);
print_filter_cardinality(&nodes);
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_bloom_filter_star() {
let edges: Vec<(usize, usize)> = vec![(0, 1), (0, 2), (0, 3), (0, 4)];
let mut nodes = run_tree_test(5, &edges, false).await;
verify_tree_convergence(&nodes);
verify_filter_exchange(&nodes, &edges);
let tree_edges = get_tree_edges(&nodes);
verify_tree_propagation(&nodes, &tree_edges);
let hub_addr = *nodes[0].node.node_addr();
for spoke in 1..5 {
let peer = nodes[spoke].node.get_peer(&hub_addr).unwrap();
let filter = peer.inbound_filter().unwrap();
for (other, other_node) in nodes[1..5].iter().enumerate() {
let other = other + 1; if other == spoke {
continue;
}
let other_addr = *other_node.node.node_addr();
assert!(
filter.contains(&other_addr),
"Spoke {}'s filter from hub should contain spoke {} (addr={})",
spoke,
other,
other_addr
);
}
}
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_bloom_filter_chain_propagation() {
let edges: Vec<(usize, usize)> = vec![(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7)];
let mut nodes = run_tree_test(8, &edges, false).await;
verify_tree_convergence(&nodes);
verify_filter_exchange(&nodes, &edges);
let tree_edges = get_tree_edges(&nodes);
verify_tree_propagation(&nodes, &tree_edges);
let addrs: Vec<NodeAddr> = nodes.iter().map(|tn| *tn.node.node_addr()).collect();
let peer_1 = nodes[0].node.get_peer(&addrs[1]).unwrap();
let filter = peer_1.inbound_filter().unwrap();
assert!(filter.contains(&addrs[1]), "Should contain node 1 (self)");
assert!(
filter.contains(&addrs[2]),
"Should contain node 2 (1-hop neighbor of node 1)"
);
for (i, addr) in addrs[2..8].iter().enumerate() {
assert!(
filter.contains(addr),
"Node 0's filter from node 1 should contain node {} \
(chain merge propagation)",
i + 2
);
}
for i in 0..6 {
let peer_6 = nodes[7].node.get_peer(&addrs[6]).unwrap();
let filter_6 = peer_6.inbound_filter().unwrap();
assert!(
filter_6.contains(&addrs[i]),
"Node 7's filter from node 6 should contain node {} \
(chain merge propagation)",
i
);
}
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_bloom_filter_ring() {
let edges: Vec<(usize, usize)> = vec![(0, 1), (1, 2), (2, 3), (3, 4), (4, 0)];
let mut nodes = run_tree_test(5, &edges, false).await;
verify_tree_convergence(&nodes);
verify_filter_exchange(&nodes, &edges);
let tree_edges = get_tree_edges(&nodes);
verify_tree_propagation(&nodes, &tree_edges);
for i in 0..5 {
for j in 0..5 {
if i == j {
continue;
}
let target_addr = *nodes[j].node.node_addr();
let reachable = nodes[i]
.node
.peers()
.any(|peer| peer.may_reach(&target_addr));
assert!(
reachable,
"Node {} should see node {} as reachable via at least one peer's filter",
i, j
);
}
}
cleanup_nodes(&mut nodes).await;
}
fn print_filter_cardinality(nodes: &[TestNode]) {
println!("\n === Filter Cardinality ===");
for (i, tn) in nodes.iter().enumerate() {
for (j, other) in nodes.iter().enumerate() {
if i == j {
continue;
}
let addr = *other.node.node_addr();
if let Some(peer) = tn.node.get_peer(&addr)
&& let Some(filter) = peer.inbound_filter()
{
let is_tree = tn.node.is_tree_peer(&addr);
println!(
" n{} <- n{}: est={} set_bits={} fill={:.1}% tree={}",
i,
j,
match filter.estimated_count(f64::INFINITY) {
Some(n) => format!("{:.1}", n),
None => "saturated".to_string(),
},
filter.count_ones(),
filter.fill_ratio() * 100.0,
is_tree,
);
}
}
}
}
fn collect_subtree(
subtree_root: usize,
parent: Option<usize>,
tree_adj: &[Vec<usize>],
) -> Vec<usize> {
let mut result = vec![subtree_root];
for &neighbor in &tree_adj[subtree_root] {
if Some(neighbor) != parent {
result.extend(collect_subtree(neighbor, Some(subtree_root), tree_adj));
}
}
result
}
#[tokio::test]
async fn test_bloom_filter_split_horizon() {
let edges: Vec<(usize, usize)> = vec![(0, 1), (0, 2), (1, 3), (1, 4), (2, 5), (5, 6)];
let mut nodes = run_tree_test(7, &edges, false).await;
verify_tree_convergence(&nodes);
verify_filter_exchange(&nodes, &edges);
let tree_edges = get_tree_edges(&nodes);
verify_tree_propagation(&nodes, &tree_edges);
let addrs: Vec<NodeAddr> = nodes.iter().map(|tn| *tn.node.node_addr()).collect();
let n = nodes.len();
let mut tree_adj = vec![vec![]; n];
for &(child, parent) in &tree_edges {
tree_adj[child].push(parent);
tree_adj[parent].push(child);
}
print_filter_cardinality(&nodes);
for &(child_idx, parent_idx) in &tree_edges {
let child_subtree = collect_subtree(child_idx, Some(parent_idx), &tree_adj);
let complement: Vec<usize> = (0..n).filter(|i| !child_subtree.contains(i)).collect();
let filter_up = nodes[parent_idx]
.node
.get_peer(&addrs[child_idx])
.unwrap()
.inbound_filter()
.unwrap();
for &idx in &child_subtree {
assert!(
filter_up.contains(&addrs[idx]),
"Upward filter (n{}→n{}): should contain subtree member n{} but doesn't",
child_idx,
parent_idx,
idx
);
}
for &idx in &complement {
assert!(
!filter_up.contains(&addrs[idx]),
"Upward filter (n{}→n{}): should NOT contain complement member n{} but does",
child_idx,
parent_idx,
idx
);
}
let up_est = filter_up
.estimated_count(f64::INFINITY)
.expect("upward filter should not be saturated in tree convergence test");
assert!(
(up_est - child_subtree.len() as f64).abs() < 1.5,
"Upward filter (n{}→n{}): expected ~{} entries, got {:.1}",
child_idx,
parent_idx,
child_subtree.len(),
up_est
);
let filter_down = nodes[child_idx]
.node
.get_peer(&addrs[parent_idx])
.unwrap()
.inbound_filter()
.unwrap();
for &idx in &complement {
assert!(
filter_down.contains(&addrs[idx]),
"Downward filter (n{}→n{}): should contain complement member n{} but doesn't",
parent_idx,
child_idx,
idx
);
}
for &idx in &child_subtree {
assert!(
!filter_down.contains(&addrs[idx]),
"Downward filter (n{}→n{}): should NOT contain subtree member n{} but does",
parent_idx,
child_idx,
idx
);
}
let down_est = filter_down
.estimated_count(f64::INFINITY)
.expect("downward filter should not be saturated in tree convergence test");
assert!(
(down_est - complement.len() as f64).abs() < 1.5,
"Downward filter (n{}→n{}): expected ~{} entries, got {:.1}",
parent_idx,
child_idx,
complement.len(),
down_est
);
assert_eq!(
child_subtree.len() + complement.len(),
n,
"Subtree + complement should cover all {} nodes",
n
);
}
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_bloom_filter_convergence_100_nodes() {
let _guard = lock_large_network_test().await;
const NUM_NODES: usize = 100;
const TARGET_EDGES: usize = 250;
const SEED: u64 = 42;
let edges = generate_random_edges(NUM_NODES, TARGET_EDGES, SEED);
let mut nodes = run_tree_test(NUM_NODES, &edges, false).await;
verify_tree_convergence(&nodes);
verify_filter_exchange(&nodes, &edges);
let tree_edges = get_tree_edges(&nodes);
verify_tree_propagation(&nodes, &tree_edges);
print_filter_cardinality(&nodes);
cleanup_nodes(&mut nodes).await;
}