use super::*;
async fn make_started_udp_transport(id: u32) -> TransportHandle {
let (packet_tx, _packet_rx) = packet_channel(64);
let transport_id = TransportId::new(id);
let mut udp = UdpTransport::new(
transport_id,
Some(format!("udp{}", id)),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
TransportHandle::Udp(udp)
}
#[tokio::test]
async fn test_decrypt_failure_threshold_removes_peer_when_recovery_unavailable() {
const THRESHOLD: u32 = 20;
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1_000);
let node_addr = *identity.node_addr();
node.add_connection(conn).unwrap();
node.promote_connection(link_id, identity, 2_000).unwrap();
assert_eq!(node.peer_count(), 1, "peer should be present after promote");
let our_index = node
.get_peer(&node_addr)
.and_then(|p| p.our_index())
.expect("promoted peer must have our_index");
assert_eq!(
node.peers_by_index.get(&(transport_id, our_index.as_u32())),
Some(&node_addr),
"peers_by_index must be populated after promote"
);
assert_eq!(
node.get_peer(&node_addr)
.unwrap()
.consecutive_decrypt_failures(),
0,
"fresh peer's failure counter must start at zero"
);
for expected in 1..THRESHOLD {
node.handle_decrypt_failure(&node_addr).await;
let count = node
.get_peer(&node_addr)
.expect("peer must still be present below threshold")
.consecutive_decrypt_failures();
assert_eq!(
count, expected,
"counter should track failures pre-threshold"
);
}
assert_eq!(
node.peer_count(),
1,
"peer must remain registered until threshold is reached"
);
node.handle_decrypt_failure(&node_addr).await;
assert!(
node.get_peer(&node_addr).is_none(),
"peer must be removed from peers table at threshold"
);
assert_eq!(
node.peer_count(),
0,
"peer_count must be zero after eviction"
);
assert!(
!node
.peers_by_index
.contains_key(&(transport_id, our_index.as_u32())),
"peers_by_index entry must be cleaned up at threshold"
);
}
#[tokio::test]
async fn test_decrypt_failure_threshold_starts_recovery_rekey_when_transport_available() {
const THRESHOLD: u32 = 20;
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
node.transports
.insert(transport_id, make_started_udp_transport(1).await);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1_000);
let node_addr = *identity.node_addr();
node.add_connection(conn).unwrap();
node.promote_connection(link_id, identity, 2_000).unwrap();
for expected in 1..THRESHOLD {
node.handle_decrypt_failure(&node_addr).await;
let count = node
.get_peer(&node_addr)
.expect("peer must still be present below threshold")
.consecutive_decrypt_failures();
assert_eq!(count, expected);
}
node.handle_decrypt_failure(&node_addr).await;
let peer = node
.get_peer(&node_addr)
.expect("recovery rekey should keep peer alive");
assert!(
peer.rekey_in_progress(),
"threshold should start an in-place recovery rekey"
);
assert_eq!(
peer.consecutive_decrypt_failures(),
0,
"starting recovery should reset the local failure streak"
);
let rekey_index = peer
.rekey_our_index()
.expect("recovery rekey must allocate a new local index");
assert!(
node.pending_outbound
.contains_key(&(transport_id, rekey_index.as_u32())),
"rekey msg2 dispatch must be registered"
);
assert!(
node.retry_pending.is_empty(),
"recovery rekey should not schedule a reconnect retry"
);
let mut transport = node.transports.remove(&transport_id).unwrap();
transport.stop().await.unwrap();
}