use super::*;
#[tokio::test]
async fn test_two_node_handshake_udp() {
use crate::config::UdpConfig;
use crate::node::wire::{
build_encrypted, build_established_header, build_msg1, prepend_inner_header,
};
use crate::transport::udp::UdpTransport;
use tokio::time::{Duration, timeout};
let mut node_a = make_node();
let mut node_b = make_node();
let transport_id_a = TransportId::new(1);
let transport_id_b = TransportId::new(1);
let udp_config = UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
mtu: Some(1280),
..Default::default()
};
let (packet_tx_a, mut packet_rx_a) = packet_channel(64);
let (packet_tx_b, mut packet_rx_b) = packet_channel(64);
let mut transport_a = UdpTransport::new(transport_id_a, None, udp_config.clone(), packet_tx_a);
let mut transport_b = UdpTransport::new(transport_id_b, None, udp_config, packet_tx_b);
transport_a.start_async().await.unwrap();
transport_b.start_async().await.unwrap();
let addr_a = transport_a.local_addr().unwrap();
let addr_b = transport_b.local_addr().unwrap();
let remote_addr_b = TransportAddr::from_string(&addr_b.to_string());
let remote_addr_a = TransportAddr::from_string(&addr_a.to_string());
node_a
.transports
.insert(transport_id_a, TransportHandle::Udp(transport_a));
node_b
.transports
.insert(transport_id_b, TransportHandle::Udp(transport_b));
let peer_b_identity = PeerIdentity::from_pubkey_full(node_b.identity.pubkey_full());
let peer_b_node_addr = *peer_b_identity.node_addr();
let link_id_a = node_a.allocate_link_id();
let mut conn_a = PeerConnection::outbound(link_id_a, peer_b_identity, 1000);
let our_index_a = node_a.index_allocator.allocate().unwrap();
let our_keypair_a = node_a.identity.keypair();
let noise_msg1 = conn_a
.start_handshake(our_keypair_a, node_a.startup_epoch, 1000)
.unwrap();
conn_a.set_our_index(our_index_a);
conn_a.set_transport_id(transport_id_a);
conn_a.set_source_addr(remote_addr_b.clone());
let wire_msg1 = build_msg1(our_index_a, &noise_msg1);
let link_a = Link::connectionless(
link_id_a,
transport_id_a,
remote_addr_b.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
);
node_a.links.insert(link_id_a, link_a);
node_a.connections.insert(link_id_a, conn_a);
node_a
.pending_outbound
.insert((transport_id_a, our_index_a.as_u32()), link_id_a);
let transport = node_a.transports.get(&transport_id_a).unwrap();
transport
.send(&remote_addr_b, &wire_msg1)
.await
.expect("Failed to send msg1");
let packet_b = timeout(Duration::from_secs(1), packet_rx_b.recv())
.await
.expect("Timeout waiting for msg1")
.expect("Channel closed");
node_b.handle_msg1(packet_b).await;
let peer_a_node_addr =
*PeerIdentity::from_pubkey_full(node_a.identity.pubkey_full()).node_addr();
assert_eq!(
node_b.peer_count(),
1,
"Node B should have 1 peer after msg1"
);
let peer_a_on_b = node_b
.get_peer(&peer_a_node_addr)
.expect("Node B should have peer A");
assert!(
peer_a_on_b.has_session(),
"Peer A on B should have NoiseSession"
);
let our_index_b = peer_a_on_b.our_index().expect("B should have our_index");
assert!(
node_b
.peers_by_index
.contains_key(&(transport_id_b, our_index_b.as_u32())),
"Node B peers_by_index should be populated"
);
let packet_a = timeout(Duration::from_secs(1), packet_rx_a.recv())
.await
.expect("Timeout waiting for msg2")
.expect("Channel closed");
node_a.handle_msg2(packet_a).await;
assert_eq!(
node_a.peer_count(),
1,
"Node A should have 1 peer after msg2"
);
let peer_b_on_a = node_a
.get_peer(&peer_b_node_addr)
.expect("Node A should have peer B");
assert!(
peer_b_on_a.has_session(),
"Peer B on A should have NoiseSession"
);
assert_eq!(
peer_b_on_a.our_index(),
Some(our_index_a),
"Peer B on A should have our_index matching what we allocated"
);
assert!(
node_a
.peers_by_index
.contains_key(&(transport_id_a, our_index_a.as_u32())),
"Node A peers_by_index should be populated"
);
let msg_a = b"\x10test from A"; let inner_a = prepend_inner_header(0, msg_a);
let peer_b = node_a.get_peer_mut(&peer_b_node_addr).unwrap();
let their_index_b = peer_b.their_index().expect("A should know B's index");
let session_a = peer_b.noise_session_mut().unwrap();
let counter_a = session_a.current_send_counter();
let header_a = build_established_header(their_index_b, counter_a, 0, inner_a.len() as u16);
let ciphertext_a = session_a.encrypt_with_aad(&inner_a, &header_a).unwrap();
let wire_encrypted = build_encrypted(&header_a, &ciphertext_a);
let transport = node_a.transports.get(&transport_id_a).unwrap();
transport
.send(&remote_addr_b, &wire_encrypted)
.await
.expect("Failed to send encrypted frame");
let encrypted_packet_b = timeout(Duration::from_secs(1), packet_rx_b.recv())
.await
.expect("Timeout waiting for encrypted frame")
.expect("Channel closed");
node_b.handle_encrypted_frame(encrypted_packet_b).await;
let peer_a = node_b.get_peer(&peer_a_node_addr).unwrap();
assert!(
peer_a.is_healthy(),
"Peer A on B should still be healthy after receiving encrypted frame"
);
let msg_b = b"\x10test from B"; let inner_b = prepend_inner_header(0, msg_b);
let peer_a = node_b.get_peer_mut(&peer_a_node_addr).unwrap();
let their_index_a = peer_a.their_index().expect("B should know A's index");
let session_b = peer_a.noise_session_mut().unwrap();
let counter_b = session_b.current_send_counter();
let header_b = build_established_header(their_index_a, counter_b, 0, inner_b.len() as u16);
let ciphertext_b = session_b.encrypt_with_aad(&inner_b, &header_b).unwrap();
let wire_encrypted_b = build_encrypted(&header_b, &ciphertext_b);
let transport = node_b.transports.get(&transport_id_b).unwrap();
transport
.send(&remote_addr_a, &wire_encrypted_b)
.await
.expect("Failed to send encrypted frame B→A");
let encrypted_packet_a = timeout(Duration::from_secs(1), packet_rx_a.recv())
.await
.expect("Timeout waiting for encrypted frame B→A")
.expect("Channel closed");
node_a.handle_encrypted_frame(encrypted_packet_a).await;
let peer_b = node_a.get_peer(&peer_b_node_addr).unwrap();
assert!(
peer_b.is_healthy(),
"Peer B on A should still be healthy after receiving encrypted frame"
);
for (_, t) in node_a.transports.iter_mut() {
t.stop().await.ok();
}
for (_, t) in node_b.transports.iter_mut() {
t.stop().await.ok();
}
}
#[tokio::test]
async fn test_run_rx_loop_handshake() {
use crate::config::UdpConfig;
use crate::node::wire::build_msg1;
use crate::transport::udp::UdpTransport;
use tokio::time::Duration;
let mut node_a = make_node();
let mut node_b = make_node();
let transport_id_a = TransportId::new(1);
let transport_id_b = TransportId::new(1);
let udp_config = UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
mtu: Some(1280),
..Default::default()
};
let (packet_tx_a, packet_rx_a) = packet_channel(64);
let (packet_tx_b, packet_rx_b) = packet_channel(64);
let mut transport_a = UdpTransport::new(transport_id_a, None, udp_config.clone(), packet_tx_a);
let mut transport_b = UdpTransport::new(transport_id_b, None, udp_config, packet_tx_b);
transport_a.start_async().await.unwrap();
transport_b.start_async().await.unwrap();
let addr_b = transport_b.local_addr().unwrap();
let remote_addr_b = TransportAddr::from_string(&addr_b.to_string());
node_a
.transports
.insert(transport_id_a, TransportHandle::Udp(transport_a));
node_b
.transports
.insert(transport_id_b, TransportHandle::Udp(transport_b));
node_a.packet_rx = Some(packet_rx_a);
node_b.packet_rx = Some(packet_rx_b);
node_a.state = NodeState::Running;
node_b.state = NodeState::Running;
let peer_b_identity = PeerIdentity::from_pubkey_full(node_b.identity.pubkey_full());
let peer_b_node_addr = *peer_b_identity.node_addr();
let link_id_a = node_a.allocate_link_id();
let mut conn_a = PeerConnection::outbound(link_id_a, peer_b_identity, 1000);
let our_index_a = node_a.index_allocator.allocate().unwrap();
let our_keypair_a = node_a.identity.keypair();
let noise_msg1 = conn_a
.start_handshake(our_keypair_a, node_a.startup_epoch, 1000)
.unwrap();
conn_a.set_our_index(our_index_a);
conn_a.set_transport_id(transport_id_a);
conn_a.set_source_addr(remote_addr_b.clone());
let wire_msg1 = build_msg1(our_index_a, &noise_msg1);
let link_a = Link::connectionless(
link_id_a,
transport_id_a,
remote_addr_b.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
);
node_a.links.insert(link_id_a, link_a);
node_a.connections.insert(link_id_a, conn_a);
node_a
.pending_outbound
.insert((transport_id_a, our_index_a.as_u32()), link_id_a);
let transport = node_a.transports.get(&transport_id_a).unwrap();
transport
.send(&remote_addr_b, &wire_msg1)
.await
.expect("Failed to send msg1");
tokio::time::sleep(Duration::from_millis(50)).await;
tokio::select! {
result = node_b.run_rx_loop() => {
panic!("Node B rx loop exited unexpectedly: {:?}", result);
}
_ = tokio::time::sleep(Duration::from_millis(500)) => {
}
}
let peer_a_node_addr =
*PeerIdentity::from_pubkey_full(node_a.identity.pubkey_full()).node_addr();
assert_eq!(
node_b.peer_count(),
1,
"Node B should have 1 peer after rx loop processed msg1"
);
let peer_a_on_b = node_b
.get_peer(&peer_a_node_addr)
.expect("Node B should have peer A");
assert!(
peer_a_on_b.has_session(),
"Peer A on B should have NoiseSession"
);
let our_index_b = peer_a_on_b.our_index().expect("B should have our_index");
assert!(
peer_a_on_b.their_index().is_some(),
"B should have their_index"
);
assert!(
node_b
.peers_by_index
.contains_key(&(transport_id_b, our_index_b.as_u32())),
"Node B peers_by_index should be populated"
);
tokio::select! {
result = node_a.run_rx_loop() => {
panic!("Node A rx loop exited unexpectedly: {:?}", result);
}
_ = tokio::time::sleep(Duration::from_millis(500)) => {
}
}
assert_eq!(
node_a.peer_count(),
1,
"Node A should have 1 peer after rx loop processed msg2"
);
let peer_b_on_a = node_a
.get_peer(&peer_b_node_addr)
.expect("Node A should have peer B");
assert!(
peer_b_on_a.has_session(),
"Peer B on A should have NoiseSession"
);
assert_eq!(
peer_b_on_a.our_index(),
Some(our_index_a),
"Peer B on A should have our_index matching what we allocated"
);
assert!(
peer_b_on_a.their_index().is_some(),
"A should know B's index"
);
assert!(
node_a
.peers_by_index
.contains_key(&(transport_id_a, our_index_a.as_u32())),
"Node A peers_by_index should be populated"
);
for (_, t) in node_a.transports.iter_mut() {
t.stop().await.ok();
}
for (_, t) in node_b.transports.iter_mut() {
t.stop().await.ok();
}
}
#[tokio::test]
async fn test_static_address_handshake_without_nostr_discovery() {
use crate::Identity;
use crate::config::{ConnectPolicy, PeerAddress, PeerConfig, UdpConfig};
use crate::transport::udp::UdpTransport;
use crate::transport::{TransportHandle, packet_channel};
use tokio::time::Duration;
let mut config_a = Config::new();
config_a.node.discovery.nostr.enabled = false;
config_a.node.discovery.lan.enabled = false;
let mut config_b = Config::new();
config_b.node.discovery.nostr.enabled = false;
config_b.node.discovery.lan.enabled = false;
let identity_a = Identity::generate();
let identity_b = Identity::generate();
let transport_id = TransportId::new(1);
let udp_config = UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
mtu: Some(1280),
..Default::default()
};
let (packet_tx_b, packet_rx_b) = packet_channel(64);
let mut transport_b = UdpTransport::new(transport_id, None, udp_config.clone(), packet_tx_b);
transport_b.start_async().await.unwrap();
let addr_b = transport_b.local_addr().unwrap();
config_a.peers.push(PeerConfig {
npub: identity_b.npub(),
alias: None,
addresses: vec![PeerAddress::new("udp", addr_b.to_string())],
connect_policy: ConnectPolicy::AutoConnect,
auto_reconnect: true,
});
let mut node_a = Node::with_identity(identity_a, config_a).unwrap();
let mut node_b = Node::with_identity(identity_b, config_b).unwrap();
let (packet_tx_a, packet_rx_a) = packet_channel(64);
let mut transport_a = UdpTransport::new(transport_id, None, udp_config, packet_tx_a);
transport_a.start_async().await.unwrap();
node_a
.transports
.insert(transport_id, TransportHandle::Udp(transport_a));
node_b
.transports
.insert(transport_id, TransportHandle::Udp(transport_b));
node_a.packet_rx = Some(packet_rx_a);
node_b.packet_rx = Some(packet_rx_b);
node_a.state = NodeState::Running;
node_b.state = NodeState::Running;
node_a.initiate_peer_connections().await;
let _ = tokio::time::timeout(Duration::from_millis(500), async {
tokio::select! {
_ = node_b.run_rx_loop() => {}
_ = node_a.run_rx_loop() => {}
}
})
.await;
let peer_a_addr = *PeerIdentity::from_pubkey_full(node_a.identity.pubkey_full()).node_addr();
let peer_b_addr = *PeerIdentity::from_pubkey_full(node_b.identity.pubkey_full()).node_addr();
assert_eq!(
node_a.peer_count(),
1,
"node A should reach node B using only the cached static UDP address"
);
assert_eq!(
node_b.peer_count(),
1,
"node B should authenticate node A's static-only handshake"
);
assert!(node_a.get_peer(&peer_b_addr).is_some());
assert!(node_b.get_peer(&peer_a_addr).is_some());
for (_, t) in node_a.transports.iter_mut() {
t.stop().await.ok();
}
for (_, t) in node_b.transports.iter_mut() {
t.stop().await.ok();
}
}
#[tokio::test]
async fn test_cross_connection_both_initiate() {
use crate::config::UdpConfig;
use crate::node::wire::build_msg1;
use crate::transport::udp::UdpTransport;
use tokio::time::{Duration, timeout};
let mut node_a = make_node();
let mut node_b = make_node();
let transport_id_a = TransportId::new(1);
let transport_id_b = TransportId::new(1);
let udp_config = UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
mtu: Some(1280),
..Default::default()
};
let (packet_tx_a, mut packet_rx_a) = packet_channel(64);
let (packet_tx_b, mut packet_rx_b) = packet_channel(64);
let mut transport_a = UdpTransport::new(transport_id_a, None, udp_config.clone(), packet_tx_a);
let mut transport_b = UdpTransport::new(transport_id_b, None, udp_config, packet_tx_b);
transport_a.start_async().await.unwrap();
transport_b.start_async().await.unwrap();
let addr_a = transport_a.local_addr().unwrap();
let addr_b = transport_b.local_addr().unwrap();
let remote_addr_b = TransportAddr::from_string(&addr_b.to_string());
let remote_addr_a = TransportAddr::from_string(&addr_a.to_string());
node_a
.transports
.insert(transport_id_a, TransportHandle::Udp(transport_a));
node_b
.transports
.insert(transport_id_b, TransportHandle::Udp(transport_b));
let peer_b_identity = PeerIdentity::from_pubkey_full(node_b.identity.pubkey_full());
let peer_b_node_addr = *peer_b_identity.node_addr();
let peer_a_identity = PeerIdentity::from_pubkey_full(node_a.identity.pubkey_full());
let peer_a_node_addr = *peer_a_identity.node_addr();
let link_id_a_out = node_a.allocate_link_id();
let mut conn_a = PeerConnection::outbound(link_id_a_out, peer_b_identity, 1000);
let our_index_a = node_a.index_allocator.allocate().unwrap();
let our_keypair_a = node_a.identity.keypair();
let noise_msg1_a = conn_a
.start_handshake(our_keypair_a, node_a.startup_epoch, 1000)
.unwrap();
conn_a.set_our_index(our_index_a);
conn_a.set_transport_id(transport_id_a);
conn_a.set_source_addr(remote_addr_b.clone());
let wire_msg1_a = build_msg1(our_index_a, &noise_msg1_a);
let link_a_out = Link::connectionless(
link_id_a_out,
transport_id_a,
remote_addr_b.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
);
node_a.links.insert(link_id_a_out, link_a_out);
node_a
.addr_to_link
.insert((transport_id_a, remote_addr_b.clone()), link_id_a_out);
node_a.connections.insert(link_id_a_out, conn_a);
node_a
.pending_outbound
.insert((transport_id_a, our_index_a.as_u32()), link_id_a_out);
let link_id_b_out = node_b.allocate_link_id();
let mut conn_b = PeerConnection::outbound(link_id_b_out, peer_a_identity, 1000);
let our_index_b = node_b.index_allocator.allocate().unwrap();
let our_keypair_b = node_b.identity.keypair();
let noise_msg1_b = conn_b
.start_handshake(our_keypair_b, node_b.startup_epoch, 1000)
.unwrap();
conn_b.set_our_index(our_index_b);
conn_b.set_transport_id(transport_id_b);
conn_b.set_source_addr(remote_addr_a.clone());
let wire_msg1_b = build_msg1(our_index_b, &noise_msg1_b);
let link_b_out = Link::connectionless(
link_id_b_out,
transport_id_b,
remote_addr_a.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
);
node_b.links.insert(link_id_b_out, link_b_out);
node_b
.addr_to_link
.insert((transport_id_b, remote_addr_a.clone()), link_id_b_out);
node_b.connections.insert(link_id_b_out, conn_b);
node_b
.pending_outbound
.insert((transport_id_b, our_index_b.as_u32()), link_id_b_out);
let transport = node_a.transports.get(&transport_id_a).unwrap();
transport
.send(&remote_addr_b, &wire_msg1_a)
.await
.expect("A send msg1");
let transport = node_b.transports.get(&transport_id_b).unwrap();
transport
.send(&remote_addr_a, &wire_msg1_b)
.await
.expect("B send msg1");
let packet_at_b = timeout(Duration::from_secs(1), packet_rx_b.recv())
.await
.expect("Timeout")
.expect("Channel closed");
node_b.handle_msg1(packet_at_b).await;
assert_eq!(
node_b.peer_count(),
1,
"Node B should have 1 peer after processing A's msg1"
);
assert!(
node_b.get_peer(&peer_a_node_addr).is_some(),
"Node B should have peer A"
);
let packet_at_a = timeout(Duration::from_secs(1), packet_rx_a.recv())
.await
.expect("Timeout")
.expect("Channel closed");
node_a.handle_msg1(packet_at_a).await;
assert_eq!(
node_a.peer_count(),
1,
"Node A should have 1 peer after processing B's msg1"
);
assert!(
node_a.get_peer(&peer_b_node_addr).is_some(),
"Node A should have peer B"
);
let msg2_at_a = timeout(Duration::from_secs(1), packet_rx_a.recv())
.await
.expect("Timeout waiting for msg2 at A")
.expect("Channel closed");
node_a.handle_msg2(msg2_at_a).await;
let msg2_at_b = timeout(Duration::from_secs(1), packet_rx_b.recv())
.await
.expect("Timeout waiting for msg2 at B")
.expect("Channel closed");
node_b.handle_msg2(msg2_at_b).await;
assert_eq!(
node_a.peer_count(),
1,
"Node A should have exactly 1 peer after cross-connection"
);
assert_eq!(
node_b.peer_count(),
1,
"Node B should have exactly 1 peer after cross-connection"
);
let peer_b_on_a = node_a
.get_peer(&peer_b_node_addr)
.expect("A should have peer B");
let peer_a_on_b = node_b
.get_peer(&peer_a_node_addr)
.expect("B should have peer A");
assert!(peer_b_on_a.has_session(), "Peer B on A should have session");
assert!(peer_a_on_b.has_session(), "Peer A on B should have session");
assert!(peer_b_on_a.can_send(), "Peer B on A should be sendable");
assert!(peer_a_on_b.can_send(), "Peer A on B should be sendable");
for (_, t) in node_a.transports.iter_mut() {
t.stop().await.ok();
}
for (_, t) in node_b.transports.iter_mut() {
t.stop().await.ok();
}
}
#[tokio::test]
async fn test_stale_connection_cleanup() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let peer_identity = make_peer_identity();
let remote_addr = TransportAddr::from_string("10.0.0.2:2121");
let past_time_ms = 1000; let link_id = node.allocate_link_id();
let mut conn = PeerConnection::outbound(link_id, peer_identity, past_time_ms);
let our_index = node.index_allocator.allocate().unwrap();
let our_keypair = node.identity.keypair();
let _noise_msg1 = conn
.start_handshake(our_keypair, node.startup_epoch, past_time_ms)
.unwrap();
conn.set_our_index(our_index);
conn.set_transport_id(transport_id);
conn.set_source_addr(remote_addr.clone());
let link = Link::connectionless(
link_id,
transport_id,
remote_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
);
node.links.insert(link_id, link);
node.addr_to_link
.insert((transport_id, remote_addr.clone()), link_id);
node.connections.insert(link_id, conn);
node.pending_outbound
.insert((transport_id, our_index.as_u32()), link_id);
assert_eq!(node.connection_count(), 1);
assert_eq!(node.link_count(), 1);
assert!(
node.pending_outbound
.contains_key(&(transport_id, our_index.as_u32()))
);
assert_eq!(node.index_allocator.count(), 1);
node.check_timeouts();
assert_eq!(
node.connection_count(),
0,
"Stale connection should be removed"
);
assert_eq!(node.link_count(), 0, "Stale link should be removed");
assert!(
!node
.pending_outbound
.contains_key(&(transport_id, our_index.as_u32())),
"pending_outbound should be cleaned up"
);
assert_eq!(
node.index_allocator.count(),
0,
"Session index should be freed"
);
assert!(
!node.addr_to_link.contains_key(&(transport_id, remote_addr)),
"addr_to_link should be cleaned up"
);
}
#[tokio::test]
async fn test_failed_connection_cleanup() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let peer_identity = make_peer_identity();
let remote_addr = TransportAddr::from_string("10.0.0.2:2121");
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let link_id = node.allocate_link_id();
let mut conn = PeerConnection::outbound(link_id, peer_identity, now_ms);
let our_index = node.index_allocator.allocate().unwrap();
let our_keypair = node.identity.keypair();
let _noise_msg1 = conn
.start_handshake(our_keypair, node.startup_epoch, now_ms)
.unwrap();
conn.set_our_index(our_index);
conn.set_transport_id(transport_id);
conn.set_source_addr(remote_addr.clone());
conn.mark_failed();
let link = Link::connectionless(
link_id,
transport_id,
remote_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
);
node.links.insert(link_id, link);
node.addr_to_link
.insert((transport_id, remote_addr.clone()), link_id);
node.connections.insert(link_id, conn);
node.pending_outbound
.insert((transport_id, our_index.as_u32()), link_id);
assert_eq!(node.connection_count(), 1);
node.check_timeouts();
assert_eq!(
node.connection_count(),
0,
"Failed connection should be removed"
);
assert_eq!(node.link_count(), 0, "Failed link should be removed");
assert_eq!(
node.index_allocator.count(),
0,
"Session index should be freed"
);
}
#[tokio::test]
async fn test_msg1_stored_for_resend() {
use crate::node::wire::build_msg1;
let mut node = make_node();
let transport_id = TransportId::new(1);
let peer_identity = make_peer_identity();
let remote_addr = TransportAddr::from_string("10.0.0.2:2121");
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let link_id = node.allocate_link_id();
let mut conn = PeerConnection::outbound(link_id, peer_identity, now_ms);
let our_index = node.index_allocator.allocate().unwrap();
let our_keypair = node.identity.keypair();
let noise_msg1 = conn
.start_handshake(our_keypair, node.startup_epoch, now_ms)
.unwrap();
conn.set_our_index(our_index);
conn.set_transport_id(transport_id);
conn.set_source_addr(remote_addr.clone());
let wire_msg1 = build_msg1(our_index, &noise_msg1);
let resend_interval = node.config.node.rate_limit.handshake_resend_interval_ms;
conn.set_handshake_msg1(wire_msg1.clone(), now_ms + resend_interval);
assert_eq!(conn.handshake_msg1().unwrap(), &wire_msg1);
assert_eq!(conn.resend_count(), 0);
assert!(conn.next_resend_at_ms() > now_ms);
}
#[tokio::test]
async fn test_resend_scheduling() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let peer_identity = make_peer_identity();
let remote_addr = TransportAddr::from_string("10.0.0.2:2121");
let now_ms = 100_000u64; let link_id = node.allocate_link_id();
let mut conn = PeerConnection::outbound(link_id, peer_identity, now_ms);
let our_index = node.index_allocator.allocate().unwrap();
let our_keypair = node.identity.keypair();
let noise_msg1 = conn
.start_handshake(our_keypair, node.startup_epoch, now_ms)
.unwrap();
conn.set_our_index(our_index);
conn.set_transport_id(transport_id);
conn.set_source_addr(remote_addr.clone());
let wire_msg1 = crate::node::wire::build_msg1(our_index, &noise_msg1);
conn.set_handshake_msg1(wire_msg1, now_ms + 1000);
let link = Link::connectionless(
link_id,
transport_id,
remote_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
);
node.links.insert(link_id, link);
node.addr_to_link
.insert((transport_id, remote_addr), link_id);
node.pending_outbound
.insert((transport_id, our_index.as_u32()), link_id);
node.connections.insert(link_id, conn);
node.resend_pending_handshakes(now_ms + 500).await;
let conn = node.connections.get(&link_id).unwrap();
assert_eq!(conn.resend_count(), 0, "No resend before scheduled time");
node.resend_pending_handshakes(now_ms + 1000).await;
let conn = node.connections.get(&link_id).unwrap();
assert_eq!(
conn.resend_count(),
0,
"No transport means no resend recorded"
);
}
#[test]
fn test_msg2_stored_on_connection() {
let mut conn = PeerConnection::inbound(LinkId::new(1), 1000);
assert!(conn.handshake_msg2().is_none());
let msg2_bytes = vec![0x01, 0x02, 0x03, 0x04];
conn.set_handshake_msg2(msg2_bytes.clone());
assert_eq!(conn.handshake_msg2().unwrap(), &msg2_bytes);
}
#[test]
fn test_resend_count_tracking() {
let peer_identity = make_peer_identity();
let mut conn = PeerConnection::outbound(LinkId::new(1), peer_identity, 1000);
assert_eq!(conn.resend_count(), 0);
assert_eq!(conn.next_resend_at_ms(), 0);
conn.set_handshake_msg1(vec![0x01], 2000);
assert_eq!(conn.resend_count(), 0);
assert_eq!(conn.next_resend_at_ms(), 2000);
conn.record_resend(4000); assert_eq!(conn.resend_count(), 1);
assert_eq!(conn.next_resend_at_ms(), 4000);
conn.record_resend(8000); assert_eq!(conn.resend_count(), 2);
assert_eq!(conn.next_resend_at_ms(), 8000);
}
#[tokio::test]
async fn test_duplicate_msg2_dropped() {
use crate::node::wire::build_msg2;
use crate::transport::ReceivedPacket;
let mut node = make_node();
let transport_id = TransportId::new(1);
let receiver_idx = SessionIndex::new(42);
let sender_idx = SessionIndex::new(99);
let fake_noise_msg2 = vec![0u8; 57]; let wire_msg2 = build_msg2(sender_idx, receiver_idx, &fake_noise_msg2);
let packet = ReceivedPacket {
transport_id,
remote_addr: TransportAddr::from_string("10.0.0.2:2121"),
data: wire_msg2,
timestamp_ms: 1000,
trace_enqueued_at: None,
};
node.handle_msg2(packet).await;
assert_eq!(node.connection_count(), 0);
assert_eq!(node.peer_count(), 0);
}
#[test]
fn test_should_admit_msg1_no_transport() {
let node = make_node();
let addr = TransportAddr::from_string("10.0.0.2:2121");
assert!(node.should_admit_msg1(TransportId::new(1), &addr));
}
#[tokio::test]
async fn test_should_admit_msg1_rejects_fresh_when_accept_off() {
use crate::config::TcpConfig;
use crate::transport::tcp::TcpTransport;
let mut node = make_node();
let transport_id = TransportId::new(1);
let cfg = TcpConfig {
bind_addr: None,
..Default::default()
};
let (tx, _rx) = packet_channel(64);
let tcp = TcpTransport::new(transport_id, None, cfg, tx);
node.transports
.insert(transport_id, TransportHandle::Tcp(tcp));
let addr = TransportAddr::from_string("10.0.0.2:2121");
assert!(!node.should_admit_msg1(transport_id, &addr));
}
#[tokio::test]
async fn test_should_admit_msg1_admits_rekey_when_accept_off() {
use crate::config::TcpConfig;
use crate::transport::tcp::TcpTransport;
let mut node = make_node();
let transport_id = TransportId::new(1);
let cfg = TcpConfig {
bind_addr: None,
..Default::default()
};
let (tx, _rx) = packet_channel(64);
let tcp = TcpTransport::new(transport_id, None, cfg, tx);
node.transports
.insert(transport_id, TransportHandle::Tcp(tcp));
let addr = TransportAddr::from_string("10.0.0.2:2121");
let link_id = node.allocate_link_id();
node.addr_to_link
.insert((transport_id, addr.clone()), link_id);
assert!(node.should_admit_msg1(transport_id, &addr));
}
#[tokio::test]
async fn test_should_admit_msg1_admits_rekey_when_udp_accept_off() {
use crate::config::UdpConfig;
use crate::transport::udp::UdpTransport;
let mut node = make_node();
let transport_id = TransportId::new(1);
let cfg = UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
accept_connections: Some(false),
..Default::default()
};
let (tx, _rx) = packet_channel(64);
let udp = UdpTransport::new(transport_id, None, cfg, tx);
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let addr = TransportAddr::from_string("10.0.0.2:2121");
assert!(!node.should_admit_msg1(transport_id, &addr));
let link_id = node.allocate_link_id();
node.addr_to_link
.insert((transport_id, addr.clone()), link_id);
assert!(node.should_admit_msg1(transport_id, &addr));
}
#[tokio::test]
async fn test_should_admit_msg1_admits_rekey_when_addr_form_differs() {
use crate::config::UdpConfig;
use crate::peer::ActivePeer;
use crate::transport::udp::UdpTransport;
let mut node = make_node();
let transport_id = TransportId::new(1);
let cfg = UdpConfig {
outbound_only: Some(true),
..Default::default()
};
let (tx, _rx) = packet_channel(64);
let udp = UdpTransport::new(transport_id, None, cfg, tx);
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let hostname_addr = TransportAddr::from_string("core-vm.example:2121");
let link_id = node.allocate_link_id();
node.addr_to_link
.insert((transport_id, hostname_addr.clone()), link_id);
let peer_full = crate::Identity::generate();
let peer_identity = PeerIdentity::from_pubkey(peer_full.pubkey());
let peer_node_addr = *peer_identity.node_addr();
let mut peer = ActivePeer::new(peer_identity, link_id, 1000);
let numeric_addr = TransportAddr::from_string("100.64.0.5:2121");
peer.set_current_addr(transport_id, &numeric_addr);
node.peers.insert(peer_node_addr, peer);
assert!(node.should_admit_msg1(transport_id, &hostname_addr));
assert!(
node.should_admit_msg1(transport_id, &numeric_addr),
"rekey msg1 from established peer must be admitted even when \
addr_to_link is keyed by a different addr-form (hostname vs \
numeric); the carve-out must consult peer current_addr"
);
let stranger_addr = TransportAddr::from_string("198.51.100.1:2121");
assert!(
!node.should_admit_msg1(transport_id, &stranger_addr),
"fresh msg1 from unknown source must still be rejected"
);
}