use super::*;
use crate::discovery::nostr::{BootstrapEvent, NostrDiscovery};
use crate::node::wire::{Msg1Header, build_msg2};
use crate::peer::{ActivePeer, PromotionResult};
use crate::transport::ReceivedPacket;
use crate::transport::udp::UdpTransport;
use crate::transport::{TransportHandle, packet_channel};
use std::sync::Arc;
#[test]
fn test_node_creation() {
let node = make_node();
assert_eq!(node.state(), NodeState::Created);
assert_eq!(node.peer_count(), 0);
assert_eq!(node.connection_count(), 0);
assert_eq!(node.link_count(), 0);
assert!(!node.is_leaf_only());
}
#[test]
fn test_node_with_identity() {
let identity = Identity::generate();
let expected_node_addr = *identity.node_addr();
let config = Config::new();
let node = Node::with_identity(identity, config).unwrap();
assert_eq!(node.node_addr(), &expected_node_addr);
}
#[test]
fn test_node_with_identity_validates_config() {
let identity = Identity::generate();
let mut config = Config::new();
config.node.discovery.nostr.enabled = false;
config.peers = vec![crate::config::PeerConfig {
npub: "npub1peer".to_string(),
..Default::default()
}];
let err = Node::with_identity(identity, config).expect_err("expected config validation error");
assert!(matches!(err, NodeError::Config(_)));
}
#[test]
fn test_node_leaf_only() {
let config = Config::new();
let node = Node::leaf_only(config).unwrap();
assert!(node.is_leaf_only());
assert!(node.bloom_state().is_leaf_only());
}
#[tokio::test]
async fn test_nat_bootstrap_failure_falls_back_to_direct_udp_address() {
let peer_identity = Identity::generate();
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let peer_config = crate::config::PeerConfig {
npub: peer_identity.npub(),
alias: None,
addresses: vec![
crate::config::PeerAddress::with_priority("udp", "nat", 1),
crate::config::PeerAddress::with_priority("udp", "127.0.0.1:9", 2),
],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
let peer_identity = PeerIdentity::from_npub(&peer_config.npub).unwrap();
node.try_peer_addresses(&peer_config, peer_identity, false)
.await
.unwrap();
assert_eq!(node.connection_count(), 1);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_try_peer_addresses_races_all_concrete_udp_candidates() {
let peer_identity = Identity::generate();
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let peer_config = crate::config::PeerConfig {
npub: peer_identity.npub(),
alias: None,
addresses: vec![
crate::config::PeerAddress::with_priority("udp", "127.0.0.1:9", 1),
crate::config::PeerAddress::with_priority("udp", "127.0.0.1:10", 2),
],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
let peer_identity = PeerIdentity::from_npub(&peer_config.npub).unwrap();
node.try_peer_addresses(&peer_config, peer_identity, false)
.await
.unwrap();
let mut addrs = node
.connections
.values()
.filter_map(|conn| conn.source_addr().and_then(|addr| addr.as_str()))
.collect::<Vec<_>>();
addrs.sort();
assert_eq!(addrs, vec!["127.0.0.1:10", "127.0.0.1:9"]);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_try_peer_addresses_skips_incompatible_udp_address_family() {
let peer_identity = Identity::generate();
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let peer_config = crate::config::PeerConfig {
npub: peer_identity.npub(),
alias: None,
addresses: vec![
crate::config::PeerAddress::with_priority("udp", "[fd00::1]:9", 1),
crate::config::PeerAddress::with_priority("udp", "127.0.0.1:9", 2),
],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
let peer_identity = PeerIdentity::from_npub(&peer_config.npub).unwrap();
node.try_peer_addresses(&peer_config, peer_identity, false)
.await
.unwrap();
assert_eq!(node.connection_count(), 1);
assert_eq!(
node.connections
.values()
.next()
.and_then(|conn| conn.source_addr())
.and_then(|addr| addr.as_str()),
Some("127.0.0.1:9")
);
assert!(
node.find_link_by_addr(
transport_id,
&crate::transport::TransportAddr::from_string("[fd00::1]:9"),
)
.is_none(),
"IPv6 candidate must not allocate a failed link on an IPv4-only socket"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_transport_discovery_skips_incompatible_udp_address_family() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let candidate = node.transport_discovery_candidate(
transport_id,
crate::transport::TransportAddr::from_string("[fd00::1]:9"),
);
assert!(
candidate.is_none(),
"transport discovery must not feed IPv6 candidates to an IPv4 UDP socket"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_transport_discovery_avoids_bootstrap_udp_transport() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let bootstrap_id = TransportId::new(1);
let primary_id = TransportId::new(2);
for (transport_id, name) in [(bootstrap_id, "bootstrap"), (primary_id, "main")] {
let mut udp = UdpTransport::new(
transport_id,
Some(name.to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx.clone(),
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
}
node.bootstrap_transports.insert(bootstrap_id);
let candidate = node
.transport_discovery_candidate(
bootstrap_id,
crate::transport::TransportAddr::from_string("127.0.0.1:9"),
)
.expect("primary UDP transport should be eligible");
assert_eq!(candidate.0, primary_id);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_udp_transport_picker_ignores_bootstrap_transports() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let bootstrap_id = TransportId::new(1);
let primary_id = TransportId::new(2);
let other_primary_id = TransportId::new(3);
for (transport_id, name) in [
(bootstrap_id, "bootstrap"),
(other_primary_id, "other-primary"),
(primary_id, "primary"),
] {
let mut udp = UdpTransport::new(
transport_id,
Some(name.to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx.clone(),
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
}
node.bootstrap_transports.insert(bootstrap_id);
assert_eq!(node.find_transport_for_type("udp"), Some(primary_id));
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_node_state_transitions() {
let mut node = make_node();
assert!(!node.is_running());
assert!(node.state().can_start());
node.start().await.unwrap();
assert!(node.is_running());
assert!(!node.state().can_start());
node.stop().await.unwrap();
assert!(!node.is_running());
assert_eq!(node.state(), NodeState::Stopped);
}
#[tokio::test]
async fn test_node_start_does_not_wait_for_nostr_relay_startup() {
let mut config = Config::new();
config.node.control.enabled = false;
config.node.discovery.nostr.enabled = true;
config.node.discovery.nostr.advertise = true;
config.node.discovery.nostr.policy = crate::config::NostrDiscoveryPolicy::Open;
config.node.discovery.nostr.advert_relays = vec!["wss://127.0.0.1:9".to_string()];
config.node.discovery.nostr.dm_relays = vec!["wss://127.0.0.1:9".to_string()];
config.transports.udp = crate::config::TransportInstances::Single(crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
advertise_on_nostr: Some(true),
public: Some(false),
accept_connections: Some(true),
..Default::default()
});
let mut node = Node::new(config).unwrap();
tokio::time::timeout(std::time::Duration::from_millis(500), node.start())
.await
.expect("node start should not wait for relay I/O")
.unwrap();
assert!(node.is_running());
assert!(node.nostr_discovery_handle().is_some());
node.stop().await.unwrap();
}
#[tokio::test]
async fn test_node_double_start() {
let mut node = make_node();
node.start().await.unwrap();
let result = node.start().await;
assert!(matches!(result, Err(NodeError::AlreadyStarted)));
node.stop().await.unwrap();
}
#[tokio::test]
async fn test_node_stop_not_started() {
let mut node = make_node();
let result = node.stop().await;
assert!(matches!(result, Err(NodeError::NotStarted)));
}
#[test]
fn test_node_link_management() {
let mut node = make_node();
let link_id = node.allocate_link_id();
let link = Link::connectionless(
link_id,
TransportId::new(1),
TransportAddr::from_string("test"),
LinkDirection::Outbound,
Duration::from_millis(50),
);
node.add_link(link).unwrap();
assert_eq!(node.link_count(), 1);
assert!(node.get_link(&link_id).is_some());
assert_eq!(
node.find_link_by_addr(TransportId::new(1), &TransportAddr::from_string("test")),
Some(link_id)
);
node.remove_link(&link_id);
assert_eq!(node.link_count(), 0);
assert!(
node.find_link_by_addr(TransportId::new(1), &TransportAddr::from_string("test"))
.is_none()
);
}
#[test]
fn test_node_link_limit() {
let mut node = make_node();
node.set_max_links(2);
for i in 0..2 {
let link_id = node.allocate_link_id();
let link = Link::connectionless(
link_id,
TransportId::new(1),
TransportAddr::from_string(&format!("test{}", i)),
LinkDirection::Outbound,
Duration::from_millis(50),
);
node.add_link(link).unwrap();
}
let link_id = node.allocate_link_id();
let link = Link::connectionless(
link_id,
TransportId::new(1),
TransportAddr::from_string("test_extra"),
LinkDirection::Outbound,
Duration::from_millis(50),
);
let result = node.add_link(link);
assert!(matches!(result, Err(NodeError::MaxLinksExceeded { .. })));
}
#[test]
fn test_node_connection_management() {
let mut node = make_node();
let identity = make_peer_identity();
let link_id = LinkId::new(1);
let conn = PeerConnection::outbound(link_id, identity, 1000);
node.add_connection(conn).unwrap();
assert_eq!(node.connection_count(), 1);
assert!(node.get_connection(&link_id).is_some());
node.remove_connection(&link_id);
assert_eq!(node.connection_count(), 0);
}
#[test]
fn test_node_connection_duplicate() {
let mut node = make_node();
let identity = make_peer_identity();
let link_id = LinkId::new(1);
let conn1 = PeerConnection::outbound(link_id, identity, 1000);
let conn2 = PeerConnection::outbound(link_id, identity, 2000);
node.add_connection(conn1).unwrap();
let result = node.add_connection(conn2);
assert!(matches!(result, Err(NodeError::ConnectionAlreadyExists(_))));
}
#[test]
fn test_node_promote_connection() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let node_addr = *identity.node_addr();
node.add_connection(conn).unwrap();
assert_eq!(node.connection_count(), 1);
assert_eq!(node.peer_count(), 0);
let result = node.promote_connection(link_id, identity, 2000).unwrap();
assert!(matches!(result, PromotionResult::Promoted(_)));
assert_eq!(node.connection_count(), 0);
assert_eq!(node.peer_count(), 1);
let peer = node.get_peer(&node_addr).unwrap();
assert_eq!(peer.authenticated_at(), 2000);
assert!(peer.has_session(), "Promoted peer should have NoiseSession");
assert!(
peer.our_index().is_some(),
"Promoted peer should have our_index"
);
assert!(
peer.their_index().is_some(),
"Promoted peer should have their_index"
);
let our_index = peer.our_index().unwrap();
assert_eq!(
node.peers_by_index.get(&(transport_id, our_index.as_u32())),
Some(&node_addr)
);
}
#[test]
fn test_promote_open_discovery_retry_blocks_fallback_transit() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let node_addr = *identity.node_addr();
let retry = crate::node::retry::RetryState::new(crate::config::PeerConfig {
npub: identity.npub(),
alias: None,
addresses: Vec::new(),
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: false,
});
node.retry_pending.insert(node_addr, retry);
node.add_connection(conn).unwrap();
node.promote_connection(link_id, identity, 2000).unwrap();
assert!(
node.discovery_fallback_transit_blocked_peers
.contains(&node_addr),
"open-discovery retry peers should not become ambient lookup transit"
);
}
#[test]
fn test_promote_nonconfigured_open_discovery_peer_blocks_fallback_transit() {
let mut node = make_node();
node.config.node.discovery.nostr.policy = crate::config::NostrDiscoveryPolicy::Open;
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let node_addr = *identity.node_addr();
node.add_connection(conn).unwrap();
node.promote_connection(link_id, identity, 2000).unwrap();
assert!(
node.discovery_fallback_transit_blocked_peers
.contains(&node_addr),
"nonconfigured peers accepted under open discovery should not be fallback transit"
);
}
#[test]
fn test_promote_registers_decrypt_worker() {
let mut node = make_node();
let transport_id = TransportId::new(1);
node.decrypt_workers = Some(crate::node::decrypt_worker::DecryptWorkerPool::spawn(1));
let link_id = LinkId::new(1);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let node_addr = *identity.node_addr();
node.add_connection(conn).unwrap();
node.promote_connection(link_id, identity, 2000).unwrap();
let peer = node.get_peer(&node_addr).unwrap();
let our_index = peer.our_index().unwrap();
assert!(
node.decrypt_registered_sessions
.contains(&(transport_id, our_index.as_u32())),
"decrypt_registered_sessions must contain the new session after promote"
);
}
#[cfg(target_os = "linux")]
#[test]
fn test_deregister_session_index_preserves_connected_udp_on_rekey_drain() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let node_addr = *identity.node_addr();
node.add_connection(conn).unwrap();
node.promote_connection(link_id, identity, 2000).unwrap();
let index_old = node
.get_peer(&node_addr)
.unwrap()
.our_index()
.unwrap()
.as_u32();
let index_new: u32 = 9999;
node.peers_by_index
.insert((transport_id, index_new), node_addr);
node.deregister_session_index((transport_id, index_old));
assert!(
!node.peers_by_index.contains_key(&(transport_id, index_old)),
"old index must be evicted"
);
assert!(
node.peers_by_index.contains_key(&(transport_id, index_new)),
"new index must survive the deregister"
);
assert!(
node.get_peer(&node_addr).is_some(),
"peer must still be present after rekey-drain deregistration"
);
assert!(
!node
.decrypt_registered_sessions
.contains(&(transport_id, index_old)),
"old session must be evicted from decrypt_registered_sessions"
);
}
#[test]
fn test_node_cross_connection_resolution() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id1 = LinkId::new(1);
let (conn1, identity) = make_completed_connection(&mut node, link_id1, transport_id, 1000);
let node_addr = *identity.node_addr();
node.add_connection(conn1).unwrap();
node.promote_connection(link_id1, identity, 1500).unwrap();
assert_eq!(node.peer_count(), 1);
assert_eq!(node.get_peer(&node_addr).unwrap().link_id(), link_id1);
let peer = node.get_peer(&node_addr).unwrap();
let our_idx = peer.our_index().unwrap();
assert_eq!(
node.peers_by_index.get(&(transport_id, our_idx.as_u32())),
Some(&node_addr)
);
assert_eq!(node.peer_count(), 1);
}
#[test]
fn test_node_peer_limit() {
let mut node = make_node();
let transport_id = TransportId::new(1);
node.set_max_peers(2);
for i in 0..2 {
let link_id = LinkId::new(i as u64 + 1);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
node.add_connection(conn).unwrap();
node.promote_connection(link_id, identity, 2000).unwrap();
}
assert_eq!(node.peer_count(), 2);
let link_id = LinkId::new(3);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 3000);
node.add_connection(conn).unwrap();
let result = node.promote_connection(link_id, identity, 4000);
assert!(matches!(result, Err(NodeError::MaxPeersExceeded { .. })));
}
#[test]
fn test_node_link_id_allocation() {
let mut node = make_node();
let id1 = node.allocate_link_id();
let id2 = node.allocate_link_id();
let id3 = node.allocate_link_id();
assert_ne!(id1, id2);
assert_ne!(id2, id3);
assert_eq!(id1.as_u64(), 1);
assert_eq!(id2.as_u64(), 2);
assert_eq!(id3.as_u64(), 3);
}
#[test]
fn test_node_transport_management() {
let mut node = make_node();
assert_eq!(node.transport_count(), 0);
let id1 = node.allocate_transport_id();
let id2 = node.allocate_transport_id();
assert_ne!(id1, id2);
assert!(node.get_transport(&id1).is_none());
assert!(node.get_transport(&id2).is_none());
assert_eq!(node.transport_ids().count(), 0);
}
#[test]
fn test_node_sendable_peers() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id1 = LinkId::new(1);
let (conn1, identity1) = make_completed_connection(&mut node, link_id1, transport_id, 1000);
let node_addr1 = *identity1.node_addr();
node.add_connection(conn1).unwrap();
node.promote_connection(link_id1, identity1, 2000).unwrap();
let link_id2 = LinkId::new(2);
let (conn2, identity2) = make_completed_connection(&mut node, link_id2, transport_id, 1000);
node.add_connection(conn2).unwrap();
node.promote_connection(link_id2, identity2, 2000).unwrap();
let link_id3 = LinkId::new(3);
let (conn3, identity3) = make_completed_connection(&mut node, link_id3, transport_id, 1000);
let node_addr3 = *identity3.node_addr();
node.add_connection(conn3).unwrap();
node.promote_connection(link_id3, identity3, 2000).unwrap();
node.get_peer_mut(&node_addr3).unwrap().mark_disconnected();
assert_eq!(node.peer_count(), 3);
assert_eq!(node.sendable_peer_count(), 2);
let sendable: Vec<_> = node.sendable_peers().collect();
assert_eq!(sendable.len(), 2);
assert!(sendable.iter().any(|p| p.node_addr() == &node_addr1));
}
#[test]
fn test_node_index_allocator_initialized() {
let node = make_node();
assert_eq!(node.index_allocator.count(), 0);
}
#[test]
fn test_node_pending_outbound_tracking() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let index = node.index_allocator.allocate().unwrap();
node.pending_outbound
.insert((transport_id, index.as_u32()), link_id);
let found = node.pending_outbound.get(&(transport_id, index.as_u32()));
assert_eq!(found, Some(&link_id));
node.pending_outbound
.remove(&(transport_id, index.as_u32()));
let _ = node.index_allocator.free(index);
assert_eq!(node.index_allocator.count(), 0);
assert!(node.pending_outbound.is_empty());
}
#[test]
fn test_node_peers_by_index_tracking() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let node_addr = make_node_addr(42);
let index = node.index_allocator.allocate().unwrap();
node.peers_by_index
.insert((transport_id, index.as_u32()), node_addr);
let found = node.peers_by_index.get(&(transport_id, index.as_u32()));
assert_eq!(found, Some(&node_addr));
node.peers_by_index.remove(&(transport_id, index.as_u32()));
let _ = node.index_allocator.free(index);
assert!(node.peers_by_index.is_empty());
}
#[tokio::test]
async fn test_node_rx_loop_requires_start() {
let mut node = make_node();
let result = node.run_rx_loop().await;
assert!(matches!(result, Err(NodeError::NotStarted)));
}
#[tokio::test]
async fn test_node_rx_loop_takes_channel() {
let mut node = make_node();
node.start().await.unwrap();
assert!(node.packet_rx.is_some());
let rx = node.packet_rx.take();
assert!(rx.is_some());
assert!(node.packet_rx.is_none());
node.stop().await.unwrap();
}
#[test]
fn test_rate_limiter_initialized() {
let mut node = make_node();
assert!(node.msg1_rate_limiter.can_start_handshake());
assert!(node.msg1_rate_limiter.start_handshake());
assert_eq!(node.msg1_rate_limiter.pending_count(), 1);
node.msg1_rate_limiter.complete_handshake();
assert_eq!(node.msg1_rate_limiter.pending_count(), 0);
}
#[test]
fn test_promote_cleans_up_pending_outbound_to_same_peer() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let peer_b_full = Identity::generate();
let peer_b_identity = PeerIdentity::from_pubkey_full(peer_b_full.pubkey_full());
let peer_b_node_addr = *peer_b_identity.node_addr();
let pending_link_id = LinkId::new(1);
let pending_time_ms = 1000;
let mut pending_conn =
PeerConnection::outbound(pending_link_id, peer_b_identity, pending_time_ms);
let our_keypair = node.identity.keypair();
let _msg1 = pending_conn
.start_handshake(our_keypair, node.startup_epoch, pending_time_ms)
.unwrap();
let pending_index = node.index_allocator.allocate().unwrap();
pending_conn.set_our_index(pending_index);
pending_conn.set_transport_id(transport_id);
let pending_addr = TransportAddr::from_string("10.0.0.2:2121");
pending_conn.set_source_addr(pending_addr.clone());
let pending_link = Link::connectionless(
pending_link_id,
transport_id,
pending_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
);
node.links.insert(pending_link_id, pending_link);
node.addr_to_link
.insert((transport_id, pending_addr.clone()), pending_link_id);
node.connections.insert(pending_link_id, pending_conn);
node.pending_outbound
.insert((transport_id, pending_index.as_u32()), pending_link_id);
assert_eq!(node.connection_count(), 1);
assert_eq!(node.link_count(), 1);
assert_eq!(node.index_allocator.count(), 1);
let completing_link_id = LinkId::new(2);
let completing_time_ms = 2000;
let mut completing_conn =
PeerConnection::outbound(completing_link_id, peer_b_identity, completing_time_ms);
let our_keypair = node.identity.keypair();
let msg1 = completing_conn
.start_handshake(our_keypair, node.startup_epoch, completing_time_ms)
.unwrap();
let mut resp_conn = PeerConnection::inbound(LinkId::new(999), completing_time_ms);
let peer_keypair = peer_b_full.keypair();
let mut resp_epoch = [0u8; 8];
rand::Rng::fill_bytes(&mut rand::rng(), &mut resp_epoch);
let msg2 = resp_conn
.receive_handshake_init(peer_keypair, resp_epoch, &msg1, completing_time_ms)
.unwrap();
completing_conn
.complete_handshake(&msg2, completing_time_ms)
.unwrap();
let completing_index = node.index_allocator.allocate().unwrap();
completing_conn.set_our_index(completing_index);
completing_conn.set_their_index(SessionIndex::new(99));
completing_conn.set_transport_id(transport_id);
completing_conn.set_source_addr(TransportAddr::from_string("10.0.0.2:4001"));
node.add_connection(completing_conn).unwrap();
assert_eq!(node.connection_count(), 2);
assert_eq!(node.index_allocator.count(), 2);
let result = node
.promote_connection(completing_link_id, peer_b_identity, completing_time_ms)
.unwrap();
assert!(matches!(result, PromotionResult::Promoted(_)));
assert_eq!(
node.connection_count(),
1,
"Pending outbound should be preserved (deferred cleanup)"
);
assert_eq!(node.peer_count(), 1, "Promoted peer should exist");
assert!(
node.pending_outbound
.contains_key(&(transport_id, pending_index.as_u32())),
"pending_outbound entry should still exist (awaiting msg2)"
);
assert_eq!(
node.index_allocator.count(),
2,
"Both indices should remain until msg2 cleanup"
);
let peer = node.get_peer(&peer_b_node_addr).unwrap();
assert_eq!(peer.link_id(), completing_link_id);
}
#[test]
fn test_schedule_retry_creates_entry() {
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut config = Config::new();
config.peers.push(crate::config::PeerConfig::new(
peer_npub,
"udp",
"10.0.0.2:2121",
));
let mut node = Node::new(config).unwrap();
assert!(node.retry_pending.is_empty());
node.schedule_retry(peer_node_addr, 1000);
assert_eq!(node.retry_pending.len(), 1);
let state = node.retry_pending.get(&peer_node_addr).unwrap();
assert_eq!(state.retry_count, 1);
assert!(
state.reconnect,
"Auto-connect peers always get reconnect=true"
);
assert_eq!(state.retry_after_ms, 1000 + 10_000);
}
#[test]
fn test_schedule_retry_increments() {
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut config = Config::new();
config.peers.push(crate::config::PeerConfig::new(
peer_npub,
"udp",
"10.0.0.2:2121",
));
let mut node = Node::new(config).unwrap();
node.schedule_retry(peer_node_addr, 1000);
assert_eq!(
node.retry_pending.get(&peer_node_addr).unwrap().retry_count,
1
);
node.schedule_retry(peer_node_addr, 11_000);
let state = node.retry_pending.get(&peer_node_addr).unwrap();
assert_eq!(state.retry_count, 2);
assert_eq!(state.retry_after_ms, 11_000 + 20_000);
}
#[tokio::test]
async fn test_process_pending_retries_is_budgeted_per_tick() {
let mut node = make_node();
let mut addrs = Vec::new();
for _ in 0..20 {
let identity = Identity::generate();
let npub = identity.npub();
let peer_identity = PeerIdentity::from_npub(&npub).unwrap();
let node_addr = *peer_identity.node_addr();
node.retry_pending.insert(
node_addr,
crate::node::retry::RetryState {
peer_config: crate::config::PeerConfig::new(npub, "udp", "10.0.0.2:2121"),
retry_count: 0,
retry_after_ms: 0,
reconnect: true,
expires_at_ms: None,
},
);
addrs.push(node_addr);
}
node.process_pending_retries(1).await;
let processed = addrs
.iter()
.filter(|addr| {
node.retry_pending
.get(addr)
.is_some_and(|state| state.retry_count > 0)
})
.count();
let deferred = addrs.len().saturating_sub(processed);
assert_eq!(processed, 16);
assert_eq!(deferred, 4);
assert_eq!(node.retry_pending.len(), 20);
}
#[test]
fn test_schedule_retry_auto_connect_never_exhausts() {
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut config = Config::new();
config.node.retry.max_retries = 2;
config.peers.push(crate::config::PeerConfig::new(
peer_npub,
"udp",
"10.0.0.2:2121",
));
let mut node = Node::new(config).unwrap();
node.schedule_retry(peer_node_addr, 1000);
assert!(node.retry_pending.contains_key(&peer_node_addr));
node.schedule_retry(peer_node_addr, 2000);
assert!(node.retry_pending.contains_key(&peer_node_addr));
node.schedule_retry(peer_node_addr, 3000);
assert!(
node.retry_pending.contains_key(&peer_node_addr),
"Auto-connect peers should never exhaust retries"
);
assert_eq!(
node.retry_pending.get(&peer_node_addr).unwrap().retry_count,
3
);
}
#[test]
fn test_schedule_retry_disabled() {
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut config = Config::new();
config.node.retry.max_retries = 0;
config.peers.push(crate::config::PeerConfig::new(
peer_npub,
"udp",
"10.0.0.2:2121",
));
let mut node = Node::new(config).unwrap();
node.schedule_retry(peer_node_addr, 1000);
assert!(
node.retry_pending.is_empty(),
"No retry should be scheduled when max_retries=0"
);
}
#[test]
fn test_schedule_retry_ignores_non_autoconnect() {
let peer_identity = Identity::generate();
let peer_node_addr = *peer_identity.node_addr();
let mut node = make_node();
node.schedule_retry(peer_node_addr, 1000);
assert!(
node.retry_pending.is_empty(),
"No retry for unconfigured peer"
);
}
#[test]
fn test_schedule_retry_skips_connected_peer() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let node_addr = *identity.node_addr();
node.add_connection(conn).unwrap();
node.promote_connection(link_id, identity, 2000).unwrap();
assert_eq!(node.peer_count(), 1);
node.schedule_retry(node_addr, 3000);
assert!(
node.retry_pending.is_empty(),
"No retry for already-connected peer"
);
}
#[tokio::test]
async fn test_try_peer_addresses_skips_connected_peer() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, peer_identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let peer_config = crate::config::PeerConfig::new(peer_identity.npub(), "udp", "127.0.0.1:9");
node.add_connection(conn).unwrap();
node.promote_connection(link_id, peer_identity, 2000)
.unwrap();
let link_count = node.link_count();
let connection_count = node.connection_count();
node.try_peer_addresses(&peer_config, peer_identity, true)
.await
.unwrap();
assert_eq!(
node.link_count(),
link_count,
"stale retry/traversal fallback must not create a duplicate link"
);
assert_eq!(
node.connection_count(),
connection_count,
"stale retry/traversal fallback must not create a duplicate handshake"
);
}
#[tokio::test]
async fn test_try_peer_addresses_skips_connecting_peer() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let peer_identity = make_peer_identity();
let peer_config = crate::config::PeerConfig::new(peer_identity.npub(), "udp", "127.0.0.1:9");
let mut pending = PeerConnection::outbound(LinkId::new(1), peer_identity, 1000);
pending.set_transport_id(transport_id);
pending.set_source_addr(TransportAddr::from_string("127.0.0.1:9"));
node.add_connection(pending).unwrap();
node.try_peer_addresses(&peer_config, peer_identity, true)
.await
.unwrap();
assert_eq!(
node.connection_count(),
1,
"stale retry/traversal fallback must not start a second handshake"
);
assert_eq!(
node.link_count(),
0,
"stale retry/traversal fallback must not allocate a link for the duplicate path"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_nostr_traversal_failure_skips_connected_peer() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, peer_identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
node.add_connection(conn).unwrap();
node.promote_connection(link_id, peer_identity, 2000)
.unwrap();
let bootstrap = Arc::new(NostrDiscovery::new_for_test());
bootstrap.push_event_for_test(BootstrapEvent::Failed {
peer_config: crate::config::PeerConfig::new(peer_identity.npub(), "udp", "127.0.0.1:9"),
reason: "stale traversal failure".to_string(),
});
node.nostr_discovery = Some(bootstrap.clone());
node.poll_nostr_discovery().await;
assert!(
bootstrap.failure_state_snapshot().is_empty(),
"stale failures for connected peers must not affect traversal cooldown"
);
assert!(
node.retry_pending.is_empty(),
"stale failures for connected peers must not enqueue reconnect attempts"
);
}
#[tokio::test]
async fn test_process_pending_retries_drops_expired_entries() {
let mut node = make_node();
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut state = super::super::retry::RetryState::new(crate::config::PeerConfig::new(
peer_npub,
"udp",
"127.0.0.1:9",
));
state.retry_after_ms = 0;
state.expires_at_ms = Some(1_000);
state.reconnect = true;
node.retry_pending.insert(peer_node_addr, state);
node.process_pending_retries(1_000).await;
assert!(
!node.retry_pending.contains_key(&peer_node_addr),
"expired retry entries should be dropped before retry processing"
);
}
#[test]
fn test_schedule_reconnect_preserves_backoff() {
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut config = Config::new();
config.peers.push(crate::config::PeerConfig::new(
peer_npub,
"udp",
"10.0.0.2:2121",
));
let mut node = Node::new(config).unwrap();
node.schedule_retry(peer_node_addr, 1_000); node.schedule_retry(peer_node_addr, 11_000); {
let state = node.retry_pending.get(&peer_node_addr).unwrap();
assert_eq!(state.retry_count, 2, "Two failures should yield count=2");
}
node.schedule_reconnect(peer_node_addr, 31_000);
let state = node.retry_pending.get(&peer_node_addr).unwrap();
assert!(state.reconnect, "Entry should be marked as reconnect");
assert_eq!(
state.retry_count, 3,
"schedule_reconnect should increment existing count (was 2), not reset to 0 (regression: issue #5)"
);
let base_ms = node.config.node.retry.base_interval_secs * 1000;
let max_ms = node.config.node.retry.max_backoff_secs * 1000;
let expected_delay = state.backoff_ms(base_ms, max_ms);
assert_eq!(
state.retry_after_ms,
31_000 + expected_delay,
"retry_after_ms should reflect count=3 backoff"
);
}
#[test]
fn test_schedule_reconnect_fresh_state() {
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut config = Config::new();
config.peers.push(crate::config::PeerConfig::new(
peer_npub,
"udp",
"10.0.0.2:2121",
));
let mut node = Node::new(config).unwrap();
node.schedule_reconnect(peer_node_addr, 1_000);
let state = node.retry_pending.get(&peer_node_addr).unwrap();
assert!(state.reconnect, "Entry should be marked as reconnect");
assert_eq!(
state.retry_count, 0,
"Fresh reconnect should start at count=0"
);
let base_ms = node.config.node.retry.base_interval_secs * 1000;
let max_ms = node.config.node.retry.max_backoff_secs * 1000;
let expected_delay = state.backoff_ms(base_ms, max_ms);
assert_eq!(state.retry_after_ms, 1_000 + expected_delay);
}
#[test]
fn test_disconnect_schedules_reconnect() {
use crate::protocol::{Disconnect, DisconnectReason};
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut config = Config::new();
config.peers.push(crate::config::PeerConfig::new(
peer_npub,
"udp",
"10.0.0.2:2121",
));
let mut node = Node::new(config).unwrap();
let payload = Disconnect::new(DisconnectReason::Shutdown).encode();
node.handle_disconnect(&peer_node_addr, &payload);
let state = node
.retry_pending
.get(&peer_node_addr)
.expect("handle_disconnect should schedule reconnect for auto-connect peer");
assert!(state.reconnect, "Entry should be marked as reconnect");
assert_eq!(
state.retry_count, 0,
"Fresh reconnect after disconnect should start at count=0"
);
}
#[test]
fn test_promote_clears_retry_pending() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let node_addr = *identity.node_addr();
node.retry_pending.insert(
node_addr,
super::super::retry::RetryState::new(crate::config::PeerConfig::default()),
);
assert_eq!(node.retry_pending.len(), 1);
node.add_connection(conn).unwrap();
node.promote_connection(link_id, identity, 2000).unwrap();
assert!(
!node.retry_pending.contains_key(&node_addr),
"retry_pending should be cleared on successful promotion"
);
}
#[tokio::test]
async fn test_initiate_peer_connections_schedules_retry_on_no_transport() {
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut config = Config::new();
config.peers.push(crate::config::PeerConfig::new(
peer_npub,
"udp",
"10.0.0.2:2121",
));
let mut node = Node::new(config).unwrap();
assert!(node.retry_pending.is_empty());
node.initiate_peer_connections().await;
assert!(
node.retry_pending.contains_key(&peer_node_addr),
"startup peer-init failure must enqueue a retry so the peer can recover \
without a daemon restart"
);
}
async fn make_udp_transport_with_mtu(id: u32, mtu: u16) -> TransportHandle {
let (packet_tx, _packet_rx) = packet_channel(64);
let transport_id = TransportId::new(id);
let mut udp = UdpTransport::new(
transport_id,
Some(format!("udp{}", id)),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
mtu: Some(mtu),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
TransportHandle::Udp(udp)
}
#[tokio::test]
async fn test_transport_mtu_returns_min_across_operational() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx);
node.packet_rx = Some(packet_rx);
let udp1 = make_udp_transport_with_mtu(1, 1497).await;
let udp2 = make_udp_transport_with_mtu(2, 1280).await;
let udp3 = make_udp_transport_with_mtu(3, 1400).await;
node.transports.insert(TransportId::new(1), udp1);
node.transports.insert(TransportId::new(2), udp2);
node.transports.insert(TransportId::new(3), udp3);
assert_eq!(node.transport_mtu(), 1280);
assert_eq!(node.effective_ipv6_mtu(), 1203);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_transport_mtu_fallback_when_no_operational_transports() {
let node = make_node();
assert_eq!(node.transport_mtu(), 1280);
}
#[tokio::test]
async fn test_transport_mtu_min_with_single_operational() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx);
node.packet_rx = Some(packet_rx);
let udp = make_udp_transport_with_mtu(1, 1452).await;
node.transports.insert(TransportId::new(1), udp);
assert_eq!(node.transport_mtu(), 1452);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_seed_path_mtu_inserts_when_empty() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx);
node.packet_rx = Some(packet_rx);
let udp = make_udp_transport_with_mtu(1, 1452).await;
node.transports.insert(TransportId::new(1), udp);
let peer_addr = make_node_addr(0xAA);
let fips_addr = crate::FipsAddress::from_node_addr(&peer_addr);
let transport_addr = TransportAddr::from_string("10.0.0.2:2121");
node.seed_path_mtu_for_link_peer(&peer_addr, TransportId::new(1), &transport_addr);
let stored = node
.path_mtu_lookup
.read()
.unwrap()
.get(&fips_addr)
.copied();
assert_eq!(
stored,
Some(1452),
"Empty lookup should be seeded with the link MTU"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_seed_path_mtu_keeps_tighter_existing_value() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx);
node.packet_rx = Some(packet_rx);
let udp = make_udp_transport_with_mtu(1, 1452).await;
node.transports.insert(TransportId::new(1), udp);
let peer_addr = make_node_addr(0xBB);
let fips_addr = crate::FipsAddress::from_node_addr(&peer_addr);
let transport_addr = TransportAddr::from_string("10.0.0.3:2121");
node.path_mtu_lookup
.write()
.unwrap()
.insert(fips_addr, 1280);
node.seed_path_mtu_for_link_peer(&peer_addr, TransportId::new(1), &transport_addr);
let stored = node
.path_mtu_lookup
.read()
.unwrap()
.get(&fips_addr)
.copied();
assert_eq!(
stored,
Some(1280),
"Existing tighter value (1280) must not be loosened by direct-link seed (1452)"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_seed_path_mtu_tightens_looser_existing_value() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx);
node.packet_rx = Some(packet_rx);
let udp = make_udp_transport_with_mtu(1, 1280).await;
node.transports.insert(TransportId::new(1), udp);
let peer_addr = make_node_addr(0xCC);
let fips_addr = crate::FipsAddress::from_node_addr(&peer_addr);
let transport_addr = TransportAddr::from_string("10.0.0.4:2121");
node.path_mtu_lookup
.write()
.unwrap()
.insert(fips_addr, 1452);
node.seed_path_mtu_for_link_peer(&peer_addr, TransportId::new(1), &transport_addr);
let stored = node
.path_mtu_lookup
.read()
.unwrap()
.get(&fips_addr)
.copied();
assert_eq!(
stored,
Some(1280),
"Direct-link seed (1280) must overwrite looser existing value (1452)"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_retry_dials_overlay_advert_before_stale_static_udp() {
use crate::config::NostrDiscoveryPolicy;
use crate::discovery::nostr::{NostrDiscovery, OverlayEndpointAdvert, OverlayTransportKind};
let mut config = Config::new();
config.node.discovery.nostr.enabled = true;
config.node.discovery.nostr.policy = NostrDiscoveryPolicy::ConfiguredOnly;
let mut node = Node::new(config).unwrap();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let stale_static_addr = "127.0.0.1:9";
let fresh_overlay_addr = "127.0.0.1:55180";
let bootstrap = Arc::new(NostrDiscovery::new_for_test());
let endpoint = OverlayEndpointAdvert {
transport: OverlayTransportKind::Udp,
addr: fresh_overlay_addr.to_string(),
};
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let advert = NostrDiscovery::cached_advert_for_test(peer_npub.clone(), endpoint, now_secs);
bootstrap
.insert_advert_for_test(peer_npub.clone(), advert)
.await;
node.nostr_discovery = Some(bootstrap);
let peer_config = crate::config::PeerConfig {
npub: peer_npub.clone(),
alias: None,
addresses: vec![crate::config::PeerAddress::new("udp", stale_static_addr)],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
node.initiate_peer_retry_connection(&peer_config)
.await
.unwrap();
let fresh = TransportAddr::from_string(fresh_overlay_addr);
let stale = TransportAddr::from_string(stale_static_addr);
let fresh_link = node.find_link_by_addr(transport_id, &fresh);
let stale_link = node.find_link_by_addr(transport_id, &stale);
assert!(
fresh_link.is_some(),
"retry should dial the just-refreshed overlay advert {fresh_overlay_addr}"
);
assert!(
stale_link.is_some(),
"retry should keep stale static {stale_static_addr} in the bounded path race"
);
assert!(
fresh_link.unwrap().as_u64() < stale_link.unwrap().as_u64(),
"fresh overlay advert should still be attempted before stale static"
);
assert_eq!(node.connection_count(), 2);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_bootstrap_dials_freshest_address_first() {
use crate::config::NostrDiscoveryPolicy;
use crate::discovery::nostr::{NostrDiscovery, OverlayEndpointAdvert, OverlayTransportKind};
let mut config = Config::new();
config.node.discovery.nostr.enabled = true;
config.node.discovery.nostr.policy = NostrDiscoveryPolicy::ConfiguredOnly;
let mut node = Node::new(config).unwrap();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let static_addr = "127.0.0.1:9";
let overlay_addr = "127.0.0.1:55181";
let bootstrap = Arc::new(NostrDiscovery::new_for_test());
let endpoint = OverlayEndpointAdvert {
transport: OverlayTransportKind::Udp,
addr: overlay_addr.to_string(),
};
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let advert = NostrDiscovery::cached_advert_for_test(peer_npub.clone(), endpoint, now_secs);
bootstrap
.insert_advert_for_test(peer_npub.clone(), advert)
.await;
node.nostr_discovery = Some(bootstrap);
let peer_config = crate::config::PeerConfig {
npub: peer_npub.clone(),
alias: None,
addresses: vec![crate::config::PeerAddress::new("udp", static_addr)],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
node.initiate_peer_connection(&peer_config).await.unwrap();
let stat = TransportAddr::from_string(static_addr);
let overlay = TransportAddr::from_string(overlay_addr);
let overlay_link = node.find_link_by_addr(transport_id, &overlay);
let static_link = node.find_link_by_addr(transport_id, &stat);
assert!(
overlay_link.is_some(),
"cold-start must dial the freshly observed overlay address first"
);
assert!(
static_link.is_some(),
"cold-start should keep the unstamped static address in the bounded path race"
);
assert!(
overlay_link.unwrap().as_u64() < static_link.unwrap().as_u64(),
"fresh overlay advert should still be attempted before static address"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_seed_path_mtu_noop_for_unknown_transport() {
let node = make_node();
let peer_addr = make_node_addr(0xDD);
let fips_addr = crate::FipsAddress::from_node_addr(&peer_addr);
let transport_addr = TransportAddr::from_string("10.0.0.5:2121");
node.seed_path_mtu_for_link_peer(&peer_addr, TransportId::new(99), &transport_addr);
let map = node.path_mtu_lookup.read().unwrap();
assert!(
map.get(&fips_addr).is_none(),
"Seed must be a no-op when transport_id is not registered"
);
}
fn npub_for_test() -> String {
Identity::generate().npub()
}
fn peer_identity_for_outbound_refresh_owner(node: &Node) -> (Identity, PeerIdentity) {
loop {
let identity = Identity::generate();
let peer_identity = PeerIdentity::from_pubkey_full(identity.pubkey_full());
if node.identity.node_addr() < peer_identity.node_addr() {
return (identity, peer_identity);
}
}
}
fn peer_identity_for_outbound_refresh_loser(node: &Node) -> (Identity, PeerIdentity) {
loop {
let identity = Identity::generate();
let peer_identity = PeerIdentity::from_pubkey_full(identity.pubkey_full());
if node.identity.node_addr() > peer_identity.node_addr() {
return (identity, peer_identity);
}
}
}
fn auto_connect_peer(npub: String, addr: &str) -> crate::config::PeerConfig {
crate::config::PeerConfig {
npub,
alias: None,
addresses: vec![crate::config::PeerAddress::new("udp", addr)],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
}
}
#[tokio::test]
async fn update_peers_races_alternate_path_even_when_outbound_would_lose() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let (peer_full, peer_identity) = peer_identity_for_outbound_refresh_loser(&node);
let peer_node_addr = *peer_identity.node_addr();
let old_addr = TransportAddr::from_string("127.0.0.1:7");
let old_link_id = LinkId::new(7);
let mut active_peer = ActivePeer::new(peer_identity, old_link_id, 1_000);
active_peer.set_current_addr(transport_id, &old_addr);
node.peers.insert(peer_node_addr, active_peer);
node.links.insert(
old_link_id,
Link::connectionless(
old_link_id,
transport_id,
old_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
let peer = auto_connect_peer(peer_full.npub(), "127.0.0.1:9");
node.config.peers = vec![peer.clone()];
let outcome = node.update_peers(vec![peer]).await.unwrap();
assert_eq!(outcome.unchanged, 1);
assert_eq!(node.peer_count(), 1, "current active peer must remain live");
assert_eq!(
node.connection_count(),
1,
"alternate path should be attempted even when our outbound would lose cross-connection"
);
let active = node.get_peer(&peer_node_addr).unwrap();
assert_eq!(active.link_id(), old_link_id);
assert_eq!(active.current_addr(), Some(&old_addr));
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn update_peers_returns_zero_on_empty_diff() {
let mut node = make_node();
let outcome = node.update_peers(Vec::new()).await.unwrap();
assert_eq!(outcome.added, 0);
assert_eq!(outcome.removed, 0);
assert_eq!(outcome.updated, 0);
assert_eq!(outcome.unchanged, 0);
}
#[tokio::test]
async fn update_peers_adds_new_peer_and_registers_alias() {
let mut node = make_node();
let npub = npub_for_test();
let mut peer = auto_connect_peer(npub.clone(), "127.0.0.1:9");
peer.alias = Some("alice".to_string());
let outcome = node.update_peers(vec![peer.clone()]).await.unwrap();
assert_eq!(outcome.added, 1);
assert_eq!(outcome.removed, 0);
assert_eq!(outcome.updated, 0);
assert_eq!(outcome.unchanged, 0);
assert_eq!(node.config.peers.len(), 1);
let identity = PeerIdentity::from_npub(&peer.npub).unwrap();
assert_eq!(
node.peer_aliases.get(identity.node_addr()),
Some(&"alice".to_string())
);
}
#[tokio::test]
async fn update_peers_removes_dropped_peer_and_clears_retry_state() {
let mut node = make_node();
let npub = npub_for_test();
let peer = auto_connect_peer(npub.clone(), "127.0.0.1:9");
let _ = node.update_peers(vec![peer.clone()]).await.unwrap();
let identity = PeerIdentity::from_npub(&peer.npub).unwrap();
let node_addr = *identity.node_addr();
assert!(node.retry_pending.contains_key(&node_addr));
let outcome = node.update_peers(Vec::new()).await.unwrap();
assert_eq!(outcome.added, 0);
assert_eq!(outcome.removed, 1);
assert!(node.config.peers.is_empty());
assert!(!node.retry_pending.contains_key(&node_addr));
assert!(!node.peer_aliases.contains_key(&node_addr));
}
#[tokio::test]
async fn update_peers_reports_updated_when_addresses_change() {
let mut node = make_node();
let npub = npub_for_test();
let original = auto_connect_peer(npub.clone(), "127.0.0.1:9");
let _ = node.update_peers(vec![original]).await.unwrap();
let new_version = auto_connect_peer(npub.clone(), "127.0.0.1:55180");
let outcome = node.update_peers(vec![new_version.clone()]).await.unwrap();
assert_eq!(outcome.added, 0);
assert_eq!(outcome.removed, 0);
assert_eq!(outcome.updated, 1);
assert_eq!(outcome.unchanged, 0);
assert_eq!(node.config.peers.len(), 1);
assert_eq!(node.config.peers[0].addresses[0].addr, "127.0.0.1:55180");
}
#[tokio::test]
async fn update_peers_reports_unchanged_for_identical_entry() {
let mut node = make_node();
let npub = npub_for_test();
let peer = auto_connect_peer(npub, "127.0.0.1:9");
let _ = node.update_peers(vec![peer.clone()]).await.unwrap();
let outcome = node.update_peers(vec![peer]).await.unwrap();
assert_eq!(outcome.added, 0);
assert_eq!(outcome.removed, 0);
assert_eq!(outcome.updated, 0);
assert_eq!(outcome.unchanged, 1);
}
#[tokio::test]
async fn update_peers_redials_existing_auto_peer_with_direct_hint() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let npub = npub_for_test();
let original = crate::config::PeerConfig {
npub: npub.clone(),
alias: None,
addresses: Vec::new(),
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
node.config.peers = vec![original];
let refreshed = auto_connect_peer(npub, "127.0.0.1:9");
let outcome = node.update_peers(vec![refreshed]).await.unwrap();
assert_eq!(outcome.added, 0);
assert_eq!(outcome.removed, 0);
assert_eq!(outcome.updated, 1);
assert_eq!(node.connection_count(), 1);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn update_peers_redials_unchanged_auto_peer_without_link() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let peer = auto_connect_peer(npub_for_test(), "127.0.0.1:9");
node.config.peers = vec![peer.clone()];
let outcome = node.update_peers(vec![peer]).await.unwrap();
assert_eq!(outcome.added, 0);
assert_eq!(outcome.removed, 0);
assert_eq!(outcome.updated, 0);
assert_eq!(outcome.unchanged, 1);
assert_eq!(node.connection_count(), 1);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn update_peers_races_alternate_path_for_active_peer_without_dropping_current_link() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let (peer_full, peer_identity) = peer_identity_for_outbound_refresh_owner(&node);
let peer_node_addr = *peer_identity.node_addr();
let old_addr = TransportAddr::from_string("127.0.0.1:7");
let old_link_id = LinkId::new(7);
let mut active_peer = ActivePeer::new(peer_identity, old_link_id, 1_000);
active_peer.set_current_addr(transport_id, &old_addr);
node.peers.insert(peer_node_addr, active_peer);
node.links.insert(
old_link_id,
Link::connectionless(
old_link_id,
transport_id,
old_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
let peer = auto_connect_peer(peer_full.npub(), "127.0.0.1:9");
node.config.peers = vec![peer.clone()];
let outcome = node.update_peers(vec![peer]).await.unwrap();
assert_eq!(outcome.unchanged, 1);
assert_eq!(node.peer_count(), 1, "current active peer must remain live");
assert_eq!(
node.connection_count(),
1,
"alternate path should be a pending handshake, not a peer replacement"
);
let active = node.get_peer(&peer_node_addr).unwrap();
assert_eq!(active.link_id(), old_link_id);
assert_eq!(active.current_addr(), Some(&old_addr));
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn update_peers_does_not_churn_active_peer_already_on_known_candidate() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let (peer_full, peer_identity) = peer_identity_for_outbound_refresh_owner(&node);
let peer_node_addr = *peer_identity.node_addr();
let current_addr = TransportAddr::from_string("127.0.0.1:9");
let old_link_id = LinkId::new(7);
let mut active_peer = ActivePeer::new(peer_identity, old_link_id, 1_000);
active_peer.set_current_addr(transport_id, ¤t_addr);
node.peers.insert(peer_node_addr, active_peer);
let peer = auto_connect_peer(peer_full.npub(), "127.0.0.1:9");
node.config.peers = vec![peer.clone()];
let outcome = node.update_peers(vec![peer]).await.unwrap();
assert_eq!(outcome.unchanged, 1);
assert_eq!(node.peer_count(), 1);
assert_eq!(
node.connection_count(),
0,
"known-good active concrete path should not be redialed every refresh"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[test]
fn active_peer_same_path_discovery_skips_fresh_peer() {
let mut node = make_node();
let (_peer_full, peer_identity) = peer_identity_for_outbound_refresh_owner(&node);
let peer_node_addr = *peer_identity.node_addr();
let transport_id = TransportId::new(1);
let current_addr = TransportAddr::from_string("127.0.0.1:9");
let mut active_peer = ActivePeer::new(peer_identity, LinkId::new(7), Node::now_ms());
active_peer.set_current_addr(transport_id, ¤t_addr);
node.peers.insert(peer_node_addr, active_peer);
let candidate = crate::config::PeerAddress::new("udp", "127.0.0.1:9");
assert!(node.active_peer_candidate_is_fresh_enough_to_skip(
&peer_node_addr,
std::slice::from_ref(&candidate),
));
}
#[test]
fn active_peer_same_path_discovery_refreshes_stale_peer() {
let mut node = make_node();
let (_peer_full, peer_identity) = peer_identity_for_outbound_refresh_owner(&node);
let peer_node_addr = *peer_identity.node_addr();
let transport_id = TransportId::new(1);
let current_addr = TransportAddr::from_string("127.0.0.1:9");
let stale_at = Node::now_ms().saturating_sub(
node.config
.node
.heartbeat_interval_secs
.saturating_add(1)
.saturating_mul(1000),
);
let mut active_peer = ActivePeer::new(peer_identity, LinkId::new(7), stale_at);
active_peer.set_current_addr(transport_id, ¤t_addr);
node.peers.insert(peer_node_addr, active_peer);
let candidate = crate::config::PeerAddress::new("udp", "127.0.0.1:9");
assert!(!node.active_peer_candidate_is_fresh_enough_to_skip(
&peer_node_addr,
std::slice::from_ref(&candidate),
));
}
#[tokio::test]
async fn update_peers_races_new_alternative_even_when_current_path_is_still_known() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let (peer_full, peer_identity) = peer_identity_for_outbound_refresh_owner(&node);
let peer_node_addr = *peer_identity.node_addr();
let current_addr = TransportAddr::from_string("127.0.0.1:9");
let new_addr = TransportAddr::from_string("127.0.0.1:10");
let old_link_id = LinkId::new(7);
let mut active_peer = ActivePeer::new(peer_identity, old_link_id, 1_000);
active_peer.set_current_addr(transport_id, ¤t_addr);
node.peers.insert(peer_node_addr, active_peer);
node.links.insert(
old_link_id,
Link::connectionless(
old_link_id,
transport_id,
current_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
let peer = crate::config::PeerConfig {
npub: peer_full.npub(),
alias: None,
addresses: vec![
crate::config::PeerAddress::new("udp", "127.0.0.1:9"),
crate::config::PeerAddress::new("udp", "127.0.0.1:10"),
],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
node.config.peers = vec![peer.clone()];
let outcome = node.update_peers(vec![peer]).await.unwrap();
assert_eq!(outcome.unchanged, 1);
assert_eq!(node.peer_count(), 1, "existing link must stay live");
assert_eq!(node.connection_count(), 1);
assert_eq!(
node.connections
.values()
.next()
.and_then(|conn| conn.source_addr()),
Some(&new_addr)
);
let active = node.get_peer(&peer_node_addr).unwrap();
assert_eq!(active.link_id(), old_link_id);
assert_eq!(active.current_addr(), Some(¤t_addr));
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn update_peers_races_more_alternatives_while_peer_is_connecting_with_budget() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let (peer_full, peer_identity) = peer_identity_for_outbound_refresh_owner(&node);
let peer_node_addr = *peer_identity.node_addr();
let current_addr = TransportAddr::from_string("127.0.0.1:9");
let old_link_id = LinkId::new(7);
let mut active_peer = ActivePeer::new(peer_identity, old_link_id, 1_000);
active_peer.set_current_addr(transport_id, ¤t_addr);
node.peers.insert(peer_node_addr, active_peer);
node.links.insert(
old_link_id,
Link::connectionless(
old_link_id,
transport_id,
current_addr,
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
let first = crate::config::PeerConfig {
npub: peer_full.npub(),
alias: None,
addresses: vec![
crate::config::PeerAddress::new("udp", "127.0.0.1:9"),
crate::config::PeerAddress::new("udp", "127.0.0.1:10"),
],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
node.config.peers = vec![first.clone()];
let _ = node.update_peers(vec![first]).await.unwrap();
assert_eq!(node.connection_count(), 1);
let refreshed = crate::config::PeerConfig {
npub: peer_full.npub(),
alias: None,
addresses: vec![
crate::config::PeerAddress::new("udp", "127.0.0.1:9"),
crate::config::PeerAddress::new("udp", "127.0.0.1:10"),
crate::config::PeerAddress::new("udp", "127.0.0.1:11"),
crate::config::PeerAddress::new("udp", "127.0.0.1:12"),
crate::config::PeerAddress::new("udp", "127.0.0.1:13"),
crate::config::PeerAddress::new("udp", "127.0.0.1:14"),
],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
let outcome = node.update_peers(vec![refreshed]).await.unwrap();
assert_eq!(outcome.updated, 1);
assert_eq!(
node.connection_count(),
4,
"one existing in-flight path plus three new paths should hit the per-peer race budget"
);
let attempted: std::collections::HashSet<_> = node
.connections
.values()
.filter_map(|conn| conn.source_addr().map(ToString::to_string))
.collect();
for addr in [
"127.0.0.1:10",
"127.0.0.1:11",
"127.0.0.1:12",
"127.0.0.1:13",
] {
assert!(attempted.contains(addr), "missing attempted path {addr}");
}
assert!(
!attempted.contains("127.0.0.1:14"),
"candidate racing should be bounded per peer"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn update_peers_races_primary_path_when_active_peer_uses_bootstrap_transport() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let bootstrap_id = TransportId::new(1);
let primary_id = TransportId::new(2);
for (transport_id, name) in [(bootstrap_id, "nostr-nat"), (primary_id, "main")] {
let mut udp = UdpTransport::new(
transport_id,
Some(name.to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx.clone(),
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
}
node.bootstrap_transports.insert(bootstrap_id);
let (peer_full, peer_identity) = peer_identity_for_outbound_refresh_owner(&node);
let peer_node_addr = *peer_identity.node_addr();
let current_addr = TransportAddr::from_string("127.0.0.1:9");
let old_link_id = LinkId::new(7);
let mut active_peer = ActivePeer::new(peer_identity, old_link_id, 1_000);
active_peer.set_current_addr(bootstrap_id, ¤t_addr);
node.peers.insert(peer_node_addr, active_peer);
let peer = auto_connect_peer(peer_full.npub(), "127.0.0.1:9");
node.config.peers = vec![peer.clone()];
let outcome = node.update_peers(vec![peer]).await.unwrap();
assert_eq!(outcome.unchanged, 1);
assert_eq!(node.peer_count(), 1);
assert_eq!(
node.connection_count(),
1,
"bootstrap NAT path should not suppress a primary-transport refresh"
);
let conn = node.connections.values().next().unwrap();
assert_eq!(conn.transport_id(), Some(primary_id));
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn outbound_refresh_promotion_moves_active_peer_to_new_transport_tuple() {
let mut node = make_node();
let (peer_full, peer_identity) = peer_identity_for_outbound_refresh_owner(&node);
let peer_node_addr = *peer_identity.node_addr();
let old_transport_id = TransportId::new(1);
let old_link_id = LinkId::new(10);
let old_addr = TransportAddr::from_string("127.0.0.1:7000");
let mut active_peer = ActivePeer::new(peer_identity, old_link_id, 1_000);
active_peer.set_current_addr(old_transport_id, &old_addr);
node.peers.insert(peer_node_addr, active_peer);
node.links.insert(
old_link_id,
Link::connectionless(
old_link_id,
old_transport_id,
old_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
node.addr_to_link
.insert((old_transport_id, old_addr.clone()), old_link_id);
let new_transport_id = TransportId::new(2);
let new_link_id = LinkId::new(11);
let new_addr = TransportAddr::from_string("127.0.0.1:9000");
let mut conn = PeerConnection::outbound(new_link_id, peer_identity, 2_000);
let our_index = node.index_allocator.allocate().unwrap();
let noise_msg1 = conn
.start_handshake(node.identity.keypair(), node.startup_epoch, 2_000)
.unwrap();
conn.set_our_index(our_index);
conn.set_transport_id(new_transport_id);
conn.set_source_addr(new_addr.clone());
node.links.insert(
new_link_id,
Link::connectionless(
new_link_id,
new_transport_id,
new_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
node.addr_to_link
.insert((new_transport_id, new_addr.clone()), new_link_id);
node.connections.insert(new_link_id, conn);
node.pending_outbound
.insert((new_transport_id, our_index.as_u32()), new_link_id);
let mut responder = PeerConnection::inbound(LinkId::new(99), 2_000);
let noise_msg2 = responder
.receive_handshake_init(peer_full.keypair(), [0x42; 8], &noise_msg1, 2_000)
.unwrap();
let their_index = SessionIndex::new(77);
let wire_msg2 = build_msg2(their_index, our_index, &noise_msg2);
let packet =
ReceivedPacket::with_timestamp(new_transport_id, new_addr.clone(), wire_msg2, 2_100);
node.handle_msg2(packet).await;
assert_eq!(node.connection_count(), 0);
assert!(node.pending_outbound.is_empty());
assert!(
!node.links.contains_key(&old_link_id),
"old active link should be retired after successful refresh"
);
assert!(
node.links.contains_key(&new_link_id),
"new outbound link should remain active"
);
assert_eq!(
node.addr_to_link.get(&(old_transport_id, old_addr.clone())),
None
);
assert_eq!(
node.addr_to_link
.get(&(new_transport_id, new_addr.clone()))
.copied(),
Some(new_link_id)
);
let active = node.get_peer(&peer_node_addr).unwrap();
assert_eq!(active.link_id(), new_link_id);
assert_eq!(active.transport_id(), Some(new_transport_id));
assert_eq!(active.current_addr(), Some(&new_addr));
assert_eq!(active.our_index(), Some(our_index));
assert_eq!(active.their_index(), Some(their_index));
assert_eq!(
node.peers_by_index
.get(&(new_transport_id, our_index.as_u32()))
.copied(),
Some(peer_node_addr)
);
}
#[tokio::test]
async fn outbound_restart_promotion_clears_stale_fsp_session() {
use crate::node::session::{EndToEndState, SessionEntry};
use crate::noise::HandshakeState;
let mut node = make_node();
let peer_full = Identity::generate();
let peer_identity = PeerIdentity::from_pubkey_full(peer_full.pubkey_full());
let peer_node_addr = *peer_identity.node_addr();
let old_transport_id = TransportId::new(1);
let old_link_id = LinkId::new(10);
let old_addr = TransportAddr::from_string("127.0.0.1:7000");
let mut old_conn = PeerConnection::outbound(old_link_id, peer_identity, 1_000);
let old_msg1 = old_conn
.start_handshake(node.identity.keypair(), node.startup_epoch, 1_000)
.unwrap();
let mut old_responder = PeerConnection::inbound(LinkId::new(98), 1_000);
let old_msg2 = old_responder
.receive_handshake_init(peer_full.keypair(), [0x11; 8], &old_msg1, 1_000)
.unwrap();
old_conn.complete_handshake(&old_msg2, 1_000).unwrap();
let old_our_index = node.index_allocator.allocate().unwrap();
old_conn.set_our_index(old_our_index);
old_conn.set_their_index(SessionIndex::new(66));
old_conn.set_transport_id(old_transport_id);
old_conn.set_source_addr(old_addr.clone());
node.links.insert(
old_link_id,
Link::connectionless(
old_link_id,
old_transport_id,
old_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
node.addr_to_link
.insert((old_transport_id, old_addr.clone()), old_link_id);
node.connections.insert(old_link_id, old_conn);
node.promote_connection(old_link_id, peer_identity, 1_100)
.unwrap();
assert_eq!(
node.get_peer(&peer_node_addr).unwrap().remote_epoch(),
Some([0x11; 8])
);
let mut fsp_initiator =
HandshakeState::new_initiator(node.identity.keypair(), peer_full.pubkey_full());
let mut fsp_responder = HandshakeState::new_responder(peer_full.keypair());
fsp_initiator.set_local_epoch([0x01; 8]);
fsp_responder.set_local_epoch([0x02; 8]);
let fsp_msg1 = fsp_initiator.write_message_1().unwrap();
fsp_responder.read_message_1(&fsp_msg1).unwrap();
let fsp_msg2 = fsp_responder.write_message_2().unwrap();
fsp_initiator.read_message_2(&fsp_msg2).unwrap();
let stale_session = fsp_initiator.into_session().unwrap();
node.sessions.insert(
peer_node_addr,
SessionEntry::new(
peer_node_addr,
peer_full.pubkey_full(),
EndToEndState::Established(stale_session),
1_200,
true,
),
);
assert!(node.sessions.contains_key(&peer_node_addr));
let new_transport_id = TransportId::new(2);
let new_link_id = LinkId::new(11);
let new_addr = TransportAddr::from_string("127.0.0.1:9000");
let mut new_conn = PeerConnection::outbound(new_link_id, peer_identity, 2_000);
let new_msg1 = new_conn
.start_handshake(node.identity.keypair(), node.startup_epoch, 2_000)
.unwrap();
let mut new_responder = PeerConnection::inbound(LinkId::new(99), 2_000);
let new_msg2 = new_responder
.receive_handshake_init(peer_full.keypair(), [0x22; 8], &new_msg1, 2_000)
.unwrap();
new_conn.complete_handshake(&new_msg2, 2_100).unwrap();
let new_our_index = node.index_allocator.allocate().unwrap();
let their_index = SessionIndex::new(77);
new_conn.set_our_index(new_our_index);
new_conn.set_their_index(their_index);
new_conn.set_transport_id(new_transport_id);
new_conn.set_source_addr(new_addr.clone());
node.links.insert(
new_link_id,
Link::connectionless(
new_link_id,
new_transport_id,
new_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
node.addr_to_link
.insert((new_transport_id, new_addr.clone()), new_link_id);
node.connections.insert(new_link_id, new_conn);
let result = node
.promote_connection(new_link_id, peer_identity, 2_100)
.unwrap();
assert!(matches!(result, PromotionResult::CrossConnectionWon { .. }));
let active = node.get_peer(&peer_node_addr).unwrap();
assert_eq!(active.link_id(), new_link_id);
assert_eq!(active.remote_epoch(), Some([0x22; 8]));
assert!(
!node.sessions.contains_key(&peer_node_addr),
"old FSP session must be removed when the peer's startup epoch changes"
);
}
#[tokio::test]
async fn fmp_recovery_rekey_epoch_change_clears_stale_fsp_session() {
use crate::node::session::{EndToEndState, SessionEntry};
use crate::noise::HandshakeState;
let mut node = make_node();
let peer_full = Identity::generate();
let peer_identity = PeerIdentity::from_pubkey_full(peer_full.pubkey_full());
let peer_node_addr = *peer_identity.node_addr();
let transport_id = TransportId::new(1);
let (packet_tx, _packet_rx) = packet_channel(64);
let mut udp = UdpTransport::new(
transport_id,
Some("rekey-test".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let link_id = LinkId::new(10);
let remote_addr = TransportAddr::from_string("127.0.0.1:9");
let mut conn = PeerConnection::outbound(link_id, peer_identity, 1_000);
let old_msg1 = conn
.start_handshake(node.identity.keypair(), node.startup_epoch, 1_000)
.unwrap();
let mut old_responder = PeerConnection::inbound(LinkId::new(98), 1_000);
let old_msg2 = old_responder
.receive_handshake_init(peer_full.keypair(), [0x11; 8], &old_msg1, 1_000)
.unwrap();
conn.complete_handshake(&old_msg2, 1_000).unwrap();
let our_index = node.index_allocator.allocate().unwrap();
conn.set_our_index(our_index);
conn.set_their_index(SessionIndex::new(66));
conn.set_transport_id(transport_id);
conn.set_source_addr(remote_addr.clone());
node.links.insert(
link_id,
Link::connectionless(
link_id,
transport_id,
remote_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
node.addr_to_link
.insert((transport_id, remote_addr.clone()), link_id);
node.connections.insert(link_id, conn);
node.promote_connection(link_id, peer_identity, 1_100)
.unwrap();
assert_eq!(
node.get_peer(&peer_node_addr).unwrap().remote_epoch(),
Some([0x11; 8])
);
let mut fsp_initiator =
HandshakeState::new_initiator(node.identity.keypair(), peer_full.pubkey_full());
let mut fsp_responder = HandshakeState::new_responder(peer_full.keypair());
fsp_initiator.set_local_epoch([0x01; 8]);
fsp_responder.set_local_epoch([0x02; 8]);
let fsp_msg1 = fsp_initiator.write_message_1().unwrap();
fsp_responder.read_message_1(&fsp_msg1).unwrap();
let fsp_msg2 = fsp_responder.write_message_2().unwrap();
fsp_initiator.read_message_2(&fsp_msg2).unwrap();
let stale_session = fsp_initiator.into_session().unwrap();
node.sessions.insert(
peer_node_addr,
SessionEntry::new(
peer_node_addr,
peer_full.pubkey_full(),
EndToEndState::Established(stale_session),
1_200,
true,
),
);
assert!(node.sessions.contains_key(&peer_node_addr));
assert!(node.initiate_rekey(&peer_node_addr).await);
let rekey_msg1 = node
.get_peer(&peer_node_addr)
.unwrap()
.rekey_msg1()
.expect("rekey msg1 should be stored")
.to_vec();
let header = Msg1Header::parse(&rekey_msg1).expect("valid rekey msg1");
let noise_msg1 = &rekey_msg1[header.noise_msg1_offset..];
let mut new_responder = HandshakeState::new_responder(peer_full.keypair());
new_responder.set_local_epoch([0x22; 8]);
new_responder.read_message_1(noise_msg1).unwrap();
let new_msg2 = new_responder.write_message_2().unwrap();
let their_index = SessionIndex::new(77);
let wire_msg2 = build_msg2(their_index, header.sender_idx, &new_msg2);
let packet =
ReceivedPacket::with_timestamp(transport_id, remote_addr.clone(), wire_msg2, 2_100);
node.handle_msg2(packet).await;
let active = node.get_peer(&peer_node_addr).unwrap();
assert_eq!(active.remote_epoch(), Some([0x22; 8]));
assert!(
active.pending_new_session().is_some(),
"FMP recovery rekey should still complete and await cutover"
);
assert!(
!node.sessions.contains_key(&peer_node_addr),
"old FSP session must be removed when FMP rekey learns a new peer startup epoch"
);
let mut transport = node.transports.remove(&transport_id).unwrap();
transport.stop().await.unwrap();
}
#[tokio::test]
async fn update_peers_treats_seen_at_ms_as_metadata_not_a_change() {
let mut node = make_node();
let npub = npub_for_test();
let baseline = auto_connect_peer(npub.clone(), "127.0.0.1:9");
let _ = node.update_peers(vec![baseline]).await.unwrap();
let mut refreshed = auto_connect_peer(npub, "127.0.0.1:9");
refreshed.addresses[0] = refreshed.addresses[0]
.clone()
.with_seen_at_ms(1_700_000_000_000);
let outcome = node.update_peers(vec![refreshed]).await.unwrap();
assert_eq!(outcome.updated, 0);
assert_eq!(outcome.unchanged, 1);
}
#[tokio::test]
async fn update_peers_rejects_invalid_npub_atomically() {
let mut node = make_node();
let valid = auto_connect_peer(npub_for_test(), "127.0.0.1:9");
let invalid = crate::config::PeerConfig {
npub: "not-a-real-npub".to_string(),
alias: None,
addresses: Vec::new(),
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
let result = node.update_peers(vec![valid, invalid]).await;
assert!(result.is_err(), "invalid npub must reject the whole batch");
assert!(
node.config.peers.is_empty(),
"rejected batch must not partially apply",
);
}