use super::*;
use crate::node::session_wire::{FSP_FLAG_CP, build_fsp_header};
use crate::protocol::{SessionAck, SessionDatagram, SessionSetup, encode_coords};
use crate::tree::TreeCoordinate;
use spanning_tree::{
TestNode, cleanup_nodes, process_available_packets, run_tree_test, verify_tree_convergence,
};
#[tokio::test]
async fn test_forwarding_decode_error() {
let mut node = make_node();
let from = make_node_addr(0xAA);
node.handle_session_datagram(&from, &[0x00; 5], false).await;
}
#[tokio::test]
async fn test_forwarding_hop_limit_exhausted() {
let mut node = make_node();
let from = make_node_addr(0xAA);
let src = make_node_addr(0x01);
let dest = make_node_addr(0x02);
let dg = SessionDatagram::new(src, dest, vec![0x10, 0x00, 0x00, 0x00]).with_ttl(0);
let encoded = dg.encode();
node.handle_session_datagram(&from, &encoded[1..], false)
.await;
}
#[tokio::test]
async fn test_forwarding_hop_limit_one_drops_at_transit() {
let mut node = make_node();
let from = make_node_addr(0xAA);
let my_addr = *node.node_addr();
let src = make_node_addr(0x01);
let dg = SessionDatagram::new(src, my_addr, vec![0x10, 0x00, 0x00, 0x00]).with_ttl(1);
let encoded = dg.encode();
node.handle_session_datagram(&from, &encoded[1..], false)
.await;
}
#[tokio::test]
async fn test_forwarding_local_delivery() {
let mut node = make_node();
let my_addr = *node.node_addr();
let from = make_node_addr(0xAA);
let dg = SessionDatagram::new(from, my_addr, vec![0x10, 0x00, 0x00, 0x00]);
let encoded = dg.encode();
node.handle_session_datagram(&from, &encoded[1..], false)
.await;
}
#[tokio::test]
async fn test_forwarding_direct_peer() {
let edges = vec![(0, 1)];
let mut nodes = run_tree_test(2, &edges, false).await;
let node0_addr = *nodes[0].node.node_addr();
let node1_addr = *nodes[1].node.node_addr();
let external_src = make_node_addr(0xEE);
let dg = SessionDatagram::new(external_src, node1_addr, vec![0x10, 0x00, 0x00, 0x00]);
let encoded = dg.encode();
nodes[0]
.node
.handle_session_datagram(&node0_addr, &encoded[1..], false)
.await;
tokio::time::sleep(Duration::from_millis(50)).await;
let count = process_available_packets(&mut nodes).await;
assert!(count > 0, "Expected forwarded packet to arrive at node 1");
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_coord_cache_warming_session_setup() {
let mut node = make_node();
let from = make_node_addr(0xAA);
let src_addr = make_node_addr(0x01);
let dest_addr = make_node_addr(0x02);
let root_addr = make_node_addr(0xF0);
let src_coords = TreeCoordinate::from_addrs(vec![src_addr, root_addr]).unwrap();
let dest_coords = TreeCoordinate::from_addrs(vec![dest_addr, root_addr]).unwrap();
let setup = SessionSetup::new(src_coords.clone(), dest_coords.clone());
let setup_payload = setup.encode();
let dg = SessionDatagram::new(src_addr, dest_addr, setup_payload);
let encoded = dg.encode();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
assert!(node.coord_cache().get(&src_addr, now_ms).is_none());
assert!(node.coord_cache().get(&dest_addr, now_ms).is_none());
node.handle_session_datagram(&from, &encoded[1..], false)
.await;
let cached_src = node.coord_cache().get(&src_addr, now_ms);
let cached_dest = node.coord_cache().get(&dest_addr, now_ms);
assert!(cached_src.is_some(), "src_addr coords not cached");
assert!(cached_dest.is_some(), "dest_addr coords not cached");
let cached_src = cached_src.unwrap();
let cached_dest = cached_dest.unwrap();
assert_eq!(cached_src.root_id(), &root_addr);
assert_eq!(cached_dest.root_id(), &root_addr);
}
#[tokio::test]
async fn test_coord_cache_warming_session_ack() {
let mut node = make_node();
let from = make_node_addr(0xAA);
let src_addr = make_node_addr(0x01);
let dest_addr = make_node_addr(0x02);
let root_addr = make_node_addr(0xF0);
let src_coords = TreeCoordinate::from_addrs(vec![src_addr, root_addr]).unwrap();
let dest_coords = TreeCoordinate::from_addrs(vec![dest_addr, root_addr]).unwrap();
let ack = SessionAck::new(src_coords.clone(), dest_coords.clone());
let ack_payload = ack.encode();
let dg = SessionDatagram::new(src_addr, dest_addr, ack_payload);
let encoded = dg.encode();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
assert!(node.coord_cache().get(&src_addr, now_ms).is_none());
assert!(node.coord_cache().get(&dest_addr, now_ms).is_none());
node.handle_session_datagram(&from, &encoded[1..], false)
.await;
let cached_src = node.coord_cache().get(&src_addr, now_ms);
assert!(
cached_src.is_some(),
"src_addr coords not cached from SessionAck"
);
assert_eq!(cached_src.unwrap().root_id(), &root_addr);
let cached_dest = node.coord_cache().get(&dest_addr, now_ms);
assert!(
cached_dest.is_some(),
"dest_addr coords not cached from SessionAck"
);
assert_eq!(cached_dest.unwrap().root_id(), &root_addr);
}
#[tokio::test]
async fn test_coord_cache_warming_encrypted_msg_with_coords() {
let mut node = make_node();
let from = make_node_addr(0xAA);
let src_addr = make_node_addr(0x01);
let dest_addr = make_node_addr(0x02);
let root_addr = make_node_addr(0xF0);
let src_coords = TreeCoordinate::from_addrs(vec![src_addr, root_addr]).unwrap();
let dest_coords = TreeCoordinate::from_addrs(vec![dest_addr, root_addr]).unwrap();
let header = build_fsp_header(0, FSP_FLAG_CP, 20);
let mut data_payload = Vec::new();
data_payload.extend_from_slice(&header);
encode_coords(&src_coords, &mut data_payload);
encode_coords(&dest_coords, &mut data_payload);
data_payload.extend_from_slice(&[0xCC; 36]);
let dg = SessionDatagram::new(src_addr, dest_addr, data_payload);
let encoded = dg.encode();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
assert!(node.coord_cache().get(&src_addr, now_ms).is_none());
assert!(node.coord_cache().get(&dest_addr, now_ms).is_none());
node.handle_session_datagram(&from, &encoded[1..], false)
.await;
assert!(
node.coord_cache().get(&src_addr, now_ms).is_some(),
"src coords not cached from encrypted message"
);
assert!(
node.coord_cache().get(&dest_addr, now_ms).is_some(),
"dest coords not cached from encrypted message"
);
}
#[tokio::test]
async fn test_coord_cache_warming_encrypted_msg_no_coords() {
let mut node = make_node();
let from = make_node_addr(0xAA);
let src_addr = make_node_addr(0x01);
let dest_addr = make_node_addr(0x02);
let header = build_fsp_header(0, 0, 20);
let mut data_payload = Vec::new();
data_payload.extend_from_slice(&header);
data_payload.extend_from_slice(&[0xCC; 36]);
let dg = SessionDatagram::new(src_addr, dest_addr, data_payload);
let encoded = dg.encode();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
node.handle_session_datagram(&from, &encoded[1..], false)
.await;
assert!(
node.coord_cache().get(&src_addr, now_ms).is_none(),
"Should not cache coords from message without CP flag"
);
assert!(
node.coord_cache().get(&dest_addr, now_ms).is_none(),
"Should not cache coords from message without CP flag"
);
}
fn populate_all_coord_caches(nodes: &mut [TestNode]) {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let all_coords: Vec<(NodeAddr, TreeCoordinate)> = nodes
.iter()
.map(|tn| {
(
*tn.node.node_addr(),
tn.node.tree_state().my_coords().clone(),
)
})
.collect();
for tn in nodes.iter_mut() {
for (addr, coords) in &all_coords {
if addr != tn.node.node_addr() {
tn.node
.coord_cache_mut()
.insert(*addr, coords.clone(), now_ms);
}
}
}
}
#[tokio::test]
async fn test_forwarding_single_hop() {
let edges = vec![(0, 1), (1, 2)];
let mut nodes = run_tree_test(3, &edges, false).await;
verify_tree_convergence(&nodes);
populate_all_coord_caches(&mut nodes);
let node0_addr = *nodes[0].node.node_addr();
let node1_addr = *nodes[1].node.node_addr();
let node2_addr = *nodes[2].node.node_addr();
let dg = SessionDatagram::new(
node0_addr,
node2_addr,
vec![0x10, 0x00, 0x04, 0x00, 1, 2, 3, 4],
);
let encoded = dg.encode();
nodes[0]
.node
.send_encrypted_link_message(&node1_addr, &encoded)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
process_available_packets(&mut nodes).await;
tokio::time::sleep(Duration::from_millis(50)).await;
let count = process_available_packets(&mut nodes).await;
assert!(count > 0, "Expected forwarded packet at node 2");
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_forwarding_multi_hop() {
let edges = vec![(0, 1), (1, 2), (2, 3), (3, 4)];
let mut nodes = run_tree_test(5, &edges, false).await;
verify_tree_convergence(&nodes);
populate_all_coord_caches(&mut nodes);
let node0_addr = *nodes[0].node.node_addr();
let node1_addr = *nodes[1].node.node_addr();
let node4_addr = *nodes[4].node.node_addr();
let dg = SessionDatagram::new(
node0_addr,
node4_addr,
vec![0x10, 0x00, 0x04, 0x00, 1, 2, 3, 4],
);
let encoded = dg.encode();
nodes[0]
.node
.send_encrypted_link_message(&node1_addr, &encoded)
.await
.unwrap();
for _ in 0..5 {
tokio::time::sleep(Duration::from_millis(50)).await;
process_available_packets(&mut nodes).await;
}
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_forwarding_hop_limit_prevents_infinite_loops() {
let edges = vec![(0, 1), (1, 2)];
let mut nodes = run_tree_test(3, &edges, false).await;
verify_tree_convergence(&nodes);
populate_all_coord_caches(&mut nodes);
let node0_addr = *nodes[0].node.node_addr();
let node1_addr = *nodes[1].node.node_addr();
let node2_addr = *nodes[2].node.node_addr();
let dg = SessionDatagram::new(
node0_addr,
node2_addr,
vec![0x10, 0x00, 0x04, 0x00, 1, 2, 3, 4],
)
.with_ttl(2);
let encoded = dg.encode();
nodes[0]
.node
.send_encrypted_link_message(&node1_addr, &encoded)
.await
.unwrap();
for _ in 0..3 {
tokio::time::sleep(Duration::from_millis(50)).await;
process_available_packets(&mut nodes).await;
}
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_forwarding_no_route_generates_error() {
let edges = vec![(0, 1)];
let mut nodes = run_tree_test(2, &edges, false).await;
verify_tree_convergence(&nodes);
let node0_addr = *nodes[0].node.node_addr();
let node1_addr = *nodes[1].node.node_addr();
let unknown_dest = make_node_addr(0xFF);
let dg = SessionDatagram::new(node1_addr, unknown_dest, vec![0x10, 0x00, 0x00, 0x00]);
let encoded = dg.encode();
nodes[1]
.node
.send_encrypted_link_message(&node0_addr, &encoded)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
process_available_packets(&mut nodes).await;
tokio::time::sleep(Duration::from_millis(50)).await;
let count = process_available_packets(&mut nodes).await;
assert!(count > 0, "Expected error signal to arrive at node 1");
cleanup_nodes(&mut nodes).await;
}
#[tokio::test]
async fn test_forwarding_with_cache_warming_enables_routing() {
let edges = vec![(0, 1), (1, 2), (2, 3)];
let mut nodes = run_tree_test(4, &edges, false).await;
verify_tree_convergence(&nodes);
let node0_addr = *nodes[0].node.node_addr();
let node1_addr = *nodes[1].node.node_addr();
let _node2_addr = *nodes[2].node.node_addr();
let node3_addr = *nodes[3].node.node_addr();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let all_coords: Vec<(NodeAddr, TreeCoordinate)> = nodes
.iter()
.map(|tn| {
(
*tn.node.node_addr(),
tn.node.tree_state().my_coords().clone(),
)
})
.collect();
for (addr, coords) in &all_coords {
if addr != nodes[0].node.node_addr() {
nodes[0]
.node
.coord_cache_mut()
.insert(*addr, coords.clone(), now_ms);
}
}
for i in 0..4 {
for j in 0..4 {
if i != j {
let j_addr = *nodes[j].node.node_addr();
if nodes[i].node.get_peer(&j_addr).is_some() {
let coords = all_coords
.iter()
.find(|(a, _)| a == &j_addr)
.unwrap()
.1
.clone();
nodes[i]
.node
.coord_cache_mut()
.insert(j_addr, coords, now_ms);
}
}
}
}
let src_coords = nodes[0].node.tree_state().my_coords().clone();
let dest_coords = nodes[3].node.tree_state().my_coords().clone();
let setup = SessionSetup::new(src_coords, dest_coords);
let setup_payload = setup.encode();
let dg = SessionDatagram::new(node0_addr, node3_addr, setup_payload);
let encoded = dg.encode();
nodes[0]
.node
.send_encrypted_link_message(&node1_addr, &encoded)
.await
.unwrap();
for _ in 0..5 {
tokio::time::sleep(Duration::from_millis(50)).await;
process_available_packets(&mut nodes).await;
}
let cached_0_at_1 = nodes[1].node.coord_cache().get(&node0_addr, now_ms);
let cached_3_at_1 = nodes[1].node.coord_cache().get(&node3_addr, now_ms);
assert!(
cached_0_at_1.is_some(),
"Node 1 should have cached node 0's coords from SessionSetup"
);
assert!(
cached_3_at_1.is_some(),
"Node 1 should have cached node 3's coords from SessionSetup"
);
let cached_0_at_2 = nodes[2].node.coord_cache().get(&node0_addr, now_ms);
let cached_3_at_2 = nodes[2].node.coord_cache().get(&node3_addr, now_ms);
assert!(
cached_0_at_2.is_some(),
"Node 2 should have cached node 0's coords from SessionSetup"
);
assert!(
cached_3_at_2.is_some(),
"Node 2 should have cached node 3's coords from SessionSetup"
);
cleanup_nodes(&mut nodes).await;
}
use crate::node::TransportDropState;
use crate::node::handlers::session::mark_ipv6_ecn_ce;
use crate::transport::TransportId;
fn make_ipv6_packet_with_ecn(ecn: u8) -> Vec<u8> {
let mut pkt = vec![0u8; 40];
let tc = ecn; pkt[0] = 0x60 | (tc >> 4);
pkt[1] = tc << 4;
pkt
}
fn read_ecn(pkt: &[u8]) -> u8 {
let tc = ((pkt[0] & 0x0F) << 4) | (pkt[1] >> 4);
tc & 0x03
}
#[test]
fn test_mark_ecn_ce_on_ect0() {
let mut pkt = make_ipv6_packet_with_ecn(0b10);
assert_eq!(read_ecn(&pkt), 0b10);
mark_ipv6_ecn_ce(&mut pkt);
assert_eq!(read_ecn(&pkt), 0b11);
}
#[test]
fn test_mark_ecn_ce_on_ect1() {
let mut pkt = make_ipv6_packet_with_ecn(0b01);
assert_eq!(read_ecn(&pkt), 0b01);
mark_ipv6_ecn_ce(&mut pkt);
assert_eq!(read_ecn(&pkt), 0b11);
}
#[test]
fn test_mark_ecn_ce_on_not_ect() {
let mut pkt = make_ipv6_packet_with_ecn(0b00);
mark_ipv6_ecn_ce(&mut pkt);
assert_eq!(read_ecn(&pkt), 0b00);
}
#[test]
fn test_mark_ecn_ce_already_ce() {
let mut pkt = make_ipv6_packet_with_ecn(0b11);
mark_ipv6_ecn_ce(&mut pkt);
assert_eq!(read_ecn(&pkt), 0b11);
}
#[test]
fn test_mark_ecn_ce_preserves_dscp_and_flow_label() {
let mut pkt = vec![0u8; 40];
let tc: u8 = 0xB2;
pkt[0] = 0x60 | (tc >> 4); pkt[1] = (tc << 4) | 0x0A; pkt[2] = 0xBC;
pkt[3] = 0xDE;
mark_ipv6_ecn_ce(&mut pkt);
let new_tc = ((pkt[0] & 0x0F) << 4) | (pkt[1] >> 4);
assert_eq!(new_tc, 0xB3, "TC should be 0xB3 (DSCP preserved, ECN=CE)");
assert_eq!(pkt[0] >> 4, 6, "Version nibble preserved");
assert_eq!(pkt[1] & 0x0F, 0x0A, "Flow label high nibble preserved");
assert_eq!(pkt[2], 0xBC, "Flow label byte 2 preserved");
assert_eq!(pkt[3], 0xDE, "Flow label byte 3 preserved");
}
#[test]
fn test_mark_ecn_ce_short_packet() {
let mut pkt = vec![0x60];
mark_ipv6_ecn_ce(&mut pkt);
assert_eq!(pkt, vec![0x60]);
let mut empty: Vec<u8> = vec![];
mark_ipv6_ecn_ce(&mut empty);
assert!(empty.is_empty());
}
#[tokio::test]
async fn test_ce_relay_through_forwarding() {
let edges = vec![(0, 1), (1, 2)];
let mut nodes = run_tree_test(3, &edges, false).await;
verify_tree_convergence(&nodes);
populate_all_coord_caches(&mut nodes);
let node0_addr = *nodes[0].node.node_addr();
let node1_addr = *nodes[1].node.node_addr();
let node2_addr = *nodes[2].node.node_addr();
let ce_before = nodes[2]
.node
.get_peer(&node1_addr)
.and_then(|p| p.mmp())
.map(|m| m.receiver.ecn_ce_count())
.unwrap_or(0);
let dg = SessionDatagram::new(
node0_addr,
node2_addr,
vec![0x10, 0x00, 0x04, 0x00, 1, 2, 3, 4],
);
let encoded = dg.encode();
nodes[0]
.node
.send_encrypted_link_message_with_ce(&node1_addr, &encoded, true)
.await
.unwrap();
for _ in 0..3 {
tokio::time::sleep(Duration::from_millis(50)).await;
process_available_packets(&mut nodes).await;
}
let ce_after = nodes[2]
.node
.get_peer(&node1_addr)
.and_then(|p| p.mmp())
.map(|m| m.receiver.ecn_ce_count())
.unwrap_or(0);
assert!(
ce_after > ce_before,
"Node 2 should see CE flag relayed from node 1 (before={ce_before}, after={ce_after})"
);
cleanup_nodes(&mut nodes).await;
}
#[test]
fn test_detect_congestion_with_transport_drops() {
let mut node = make_node();
let fake_addr = NodeAddr::from_bytes([1; 16]);
assert!(!node.detect_congestion(&fake_addr));
let tid = TransportId::new(1);
node.transport_drops.insert(
tid,
TransportDropState {
prev_drops: 100,
dropping: true,
},
);
assert!(node.detect_congestion(&fake_addr));
node.transport_drops.get_mut(&tid).unwrap().dropping = false;
assert!(!node.detect_congestion(&fake_addr));
}
#[test]
fn test_detect_congestion_disabled_ecn() {
let mut node = make_node();
node.config.node.ecn.enabled = false;
let tid = TransportId::new(1);
node.transport_drops.insert(
tid,
TransportDropState {
prev_drops: 50,
dropping: true,
},
);
let fake_addr = NodeAddr::from_bytes([1; 16]);
assert!(!node.detect_congestion(&fake_addr));
}
#[test]
fn test_sample_transport_congestion() {
let mut node = make_node();
let tid = TransportId::new(1);
node.transport_drops.insert(
tid,
TransportDropState {
prev_drops: 0,
dropping: false,
},
);
node.sample_transport_congestion();
assert!(!node.transport_drops[&tid].dropping);
}