use super::*;
use crate::discovery::nostr::{BootstrapEvent, NostrDiscovery};
use crate::node::wire::{Msg1Header, build_msg2};
use crate::peer::{ActivePeer, PromotionResult};
use crate::transport::ReceivedPacket;
use crate::transport::udp::UdpTransport;
use crate::transport::{TransportHandle, packet_channel};
use std::sync::Arc;
#[test]
fn test_node_creation() {
let node = make_node();
assert_eq!(node.state(), NodeState::Created);
assert_eq!(node.peer_count(), 0);
assert_eq!(node.connection_count(), 0);
assert_eq!(node.link_count(), 0);
assert!(!node.is_leaf_only());
}
#[test]
fn test_node_with_identity() {
let identity = Identity::generate();
let expected_node_addr = *identity.node_addr();
let config = Config::new();
let node = Node::with_identity(identity, config).unwrap();
assert_eq!(node.node_addr(), &expected_node_addr);
}
#[test]
fn test_node_with_identity_validates_config() {
let identity = Identity::generate();
let mut config = Config::new();
config.node.discovery.nostr.enabled = false;
config.peers = vec![crate::config::PeerConfig {
npub: "npub1peer".to_string(),
..Default::default()
}];
let err = Node::with_identity(identity, config).expect_err("expected config validation error");
assert!(matches!(err, NodeError::Config(_)));
}
#[test]
fn test_node_leaf_only() {
let config = Config::new();
let node = Node::leaf_only(config).unwrap();
assert!(node.is_leaf_only());
assert!(node.bloom_state().is_leaf_only());
}
#[tokio::test]
async fn test_nat_bootstrap_failure_falls_back_to_direct_udp_address() {
let peer_identity = Identity::generate();
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let peer_config = crate::config::PeerConfig {
npub: peer_identity.npub(),
alias: None,
addresses: vec![
crate::config::PeerAddress::with_priority("udp", "nat", 1),
crate::config::PeerAddress::with_priority("udp", "127.0.0.1:9", 2),
],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
let peer_identity = PeerIdentity::from_npub(&peer_config.npub).unwrap();
node.try_peer_addresses(&peer_config, peer_identity, false)
.await
.unwrap();
assert_eq!(node.connection_count(), 1);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_try_peer_addresses_races_all_concrete_udp_candidates() {
let peer_identity = Identity::generate();
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let peer_config = crate::config::PeerConfig {
npub: peer_identity.npub(),
alias: None,
addresses: vec![
crate::config::PeerAddress::with_priority("udp", "127.0.0.1:9", 1),
crate::config::PeerAddress::with_priority("udp", "127.0.0.1:10", 2),
],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
let peer_identity = PeerIdentity::from_npub(&peer_config.npub).unwrap();
node.try_peer_addresses(&peer_config, peer_identity, false)
.await
.unwrap();
let mut addrs = node
.connections
.values()
.filter_map(|conn| conn.source_addr().and_then(|addr| addr.as_str()))
.collect::<Vec<_>>();
addrs.sort();
assert_eq!(addrs, vec!["127.0.0.1:10", "127.0.0.1:9"]);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_try_peer_addresses_skips_incompatible_udp_address_family() {
let peer_identity = Identity::generate();
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let peer_config = crate::config::PeerConfig {
npub: peer_identity.npub(),
alias: None,
addresses: vec![
crate::config::PeerAddress::with_priority("udp", "[fd00::1]:9", 1),
crate::config::PeerAddress::with_priority("udp", "127.0.0.1:9", 2),
],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
let peer_identity = PeerIdentity::from_npub(&peer_config.npub).unwrap();
node.try_peer_addresses(&peer_config, peer_identity, false)
.await
.unwrap();
assert_eq!(node.connection_count(), 1);
assert_eq!(
node.connections
.values()
.next()
.and_then(|conn| conn.source_addr())
.and_then(|addr| addr.as_str()),
Some("127.0.0.1:9")
);
assert!(
node.find_link_by_addr(
transport_id,
&crate::transport::TransportAddr::from_string("[fd00::1]:9"),
)
.is_none(),
"IPv6 candidate must not allocate a failed link on an IPv4-only socket"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_transport_discovery_skips_incompatible_udp_address_family() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let candidate = node.transport_discovery_candidate(
transport_id,
crate::transport::TransportAddr::from_string("[fd00::1]:9"),
);
assert!(
candidate.is_none(),
"transport discovery must not feed IPv6 candidates to an IPv4 UDP socket"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_transport_discovery_avoids_bootstrap_udp_transport() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let bootstrap_id = TransportId::new(1);
let primary_id = TransportId::new(2);
for (transport_id, name) in [(bootstrap_id, "bootstrap"), (primary_id, "main")] {
let mut udp = UdpTransport::new(
transport_id,
Some(name.to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx.clone(),
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
}
node.bootstrap_transports.insert(bootstrap_id);
let candidate = node
.transport_discovery_candidate(
bootstrap_id,
crate::transport::TransportAddr::from_string("127.0.0.1:9"),
)
.expect("primary UDP transport should be eligible");
assert_eq!(candidate.0, primary_id);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_udp_transport_picker_ignores_bootstrap_transports() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let bootstrap_id = TransportId::new(1);
let primary_id = TransportId::new(2);
let other_primary_id = TransportId::new(3);
for (transport_id, name) in [
(bootstrap_id, "bootstrap"),
(other_primary_id, "other-primary"),
(primary_id, "primary"),
] {
let mut udp = UdpTransport::new(
transport_id,
Some(name.to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx.clone(),
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
}
node.bootstrap_transports.insert(bootstrap_id);
assert_eq!(node.find_transport_for_type("udp"), Some(primary_id));
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_node_state_transitions() {
let mut node = make_node();
assert!(!node.is_running());
assert!(node.state().can_start());
node.start().await.unwrap();
assert!(node.is_running());
assert!(!node.state().can_start());
node.stop().await.unwrap();
assert!(!node.is_running());
assert_eq!(node.state(), NodeState::Stopped);
}
#[tokio::test]
async fn test_node_start_does_not_wait_for_nostr_relay_startup() {
let mut config = Config::new();
config.node.control.enabled = false;
config.node.discovery.nostr.enabled = true;
config.node.discovery.nostr.advertise = true;
config.node.discovery.nostr.policy = crate::config::NostrDiscoveryPolicy::Open;
config.node.discovery.nostr.advert_relays = vec!["wss://127.0.0.1:9".to_string()];
config.node.discovery.nostr.dm_relays = vec!["wss://127.0.0.1:9".to_string()];
config.transports.udp = crate::config::TransportInstances::Single(crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
advertise_on_nostr: Some(true),
public: Some(false),
accept_connections: Some(true),
..Default::default()
});
let mut node = Node::new(config).unwrap();
tokio::time::timeout(std::time::Duration::from_millis(500), node.start())
.await
.expect("node start should not wait for relay I/O")
.unwrap();
assert!(node.is_running());
assert!(node.nostr_discovery_handle().is_some());
node.stop().await.unwrap();
}
#[tokio::test]
async fn test_node_double_start() {
let mut node = make_node();
node.start().await.unwrap();
let result = node.start().await;
assert!(matches!(result, Err(NodeError::AlreadyStarted)));
node.stop().await.unwrap();
}
#[tokio::test]
async fn test_node_stop_not_started() {
let mut node = make_node();
let result = node.stop().await;
assert!(matches!(result, Err(NodeError::NotStarted)));
}
#[test]
fn test_node_link_management() {
let mut node = make_node();
let link_id = node.allocate_link_id();
let link = Link::connectionless(
link_id,
TransportId::new(1),
TransportAddr::from_string("test"),
LinkDirection::Outbound,
Duration::from_millis(50),
);
node.add_link(link).unwrap();
assert_eq!(node.link_count(), 1);
assert!(node.get_link(&link_id).is_some());
assert_eq!(
node.find_link_by_addr(TransportId::new(1), &TransportAddr::from_string("test")),
Some(link_id)
);
node.remove_link(&link_id);
assert_eq!(node.link_count(), 0);
assert!(
node.find_link_by_addr(TransportId::new(1), &TransportAddr::from_string("test"))
.is_none()
);
}
#[test]
fn test_node_link_limit() {
let mut node = make_node();
node.set_max_links(2);
for i in 0..2 {
let link_id = node.allocate_link_id();
let link = Link::connectionless(
link_id,
TransportId::new(1),
TransportAddr::from_string(&format!("test{}", i)),
LinkDirection::Outbound,
Duration::from_millis(50),
);
node.add_link(link).unwrap();
}
let link_id = node.allocate_link_id();
let link = Link::connectionless(
link_id,
TransportId::new(1),
TransportAddr::from_string("test_extra"),
LinkDirection::Outbound,
Duration::from_millis(50),
);
let result = node.add_link(link);
assert!(matches!(result, Err(NodeError::MaxLinksExceeded { .. })));
}
#[test]
fn test_node_connection_management() {
let mut node = make_node();
let identity = make_peer_identity();
let link_id = LinkId::new(1);
let conn = PeerConnection::outbound(link_id, identity, 1000);
node.add_connection(conn).unwrap();
assert_eq!(node.connection_count(), 1);
assert!(node.get_connection(&link_id).is_some());
node.remove_connection(&link_id);
assert_eq!(node.connection_count(), 0);
}
#[test]
fn test_node_connection_duplicate() {
let mut node = make_node();
let identity = make_peer_identity();
let link_id = LinkId::new(1);
let conn1 = PeerConnection::outbound(link_id, identity, 1000);
let conn2 = PeerConnection::outbound(link_id, identity, 2000);
node.add_connection(conn1).unwrap();
let result = node.add_connection(conn2);
assert!(matches!(result, Err(NodeError::ConnectionAlreadyExists(_))));
}
#[test]
fn test_node_promote_connection() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let node_addr = *identity.node_addr();
node.add_connection(conn).unwrap();
assert_eq!(node.connection_count(), 1);
assert_eq!(node.peer_count(), 0);
let result = node.promote_connection(link_id, identity, 2000).unwrap();
assert!(matches!(result, PromotionResult::Promoted(_)));
assert_eq!(node.connection_count(), 0);
assert_eq!(node.peer_count(), 1);
let peer = node.get_peer(&node_addr).unwrap();
assert_eq!(peer.authenticated_at(), 2000);
assert!(peer.has_session(), "Promoted peer should have NoiseSession");
assert!(
peer.our_index().is_some(),
"Promoted peer should have our_index"
);
assert!(
peer.their_index().is_some(),
"Promoted peer should have their_index"
);
let our_index = peer.our_index().unwrap();
assert_eq!(
node.peers_by_index.get(&(transport_id, our_index.as_u32())),
Some(&node_addr)
);
}
#[test]
fn test_promote_open_discovery_retry_blocks_fallback_transit() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let node_addr = *identity.node_addr();
let retry = crate::node::retry::RetryState::new(crate::config::PeerConfig {
npub: identity.npub(),
alias: None,
addresses: Vec::new(),
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: false,
});
node.retry_pending.insert(node_addr, retry);
node.add_connection(conn).unwrap();
node.promote_connection(link_id, identity, 2000).unwrap();
assert!(
node.discovery_fallback_transit_blocked_peers
.contains(&node_addr),
"open-discovery retry peers should not become ambient lookup transit"
);
}
#[test]
fn test_promote_nonconfigured_open_discovery_peer_blocks_fallback_transit() {
let mut node = make_node();
node.config.node.discovery.nostr.policy = crate::config::NostrDiscoveryPolicy::Open;
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let node_addr = *identity.node_addr();
node.add_connection(conn).unwrap();
node.promote_connection(link_id, identity, 2000).unwrap();
assert!(
node.discovery_fallback_transit_blocked_peers
.contains(&node_addr),
"nonconfigured peers accepted under open discovery should not be fallback transit"
);
}
#[test]
fn test_promote_registers_decrypt_worker() {
let mut node = make_node();
let transport_id = TransportId::new(1);
node.decrypt_workers = Some(crate::node::decrypt_worker::DecryptWorkerPool::spawn(1));
let link_id = LinkId::new(1);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let node_addr = *identity.node_addr();
node.add_connection(conn).unwrap();
node.promote_connection(link_id, identity, 2000).unwrap();
let peer = node.get_peer(&node_addr).unwrap();
let our_index = peer.our_index().unwrap();
assert!(
node.decrypt_registered_sessions
.contains(&(transport_id, our_index.as_u32())),
"decrypt_registered_sessions must contain the new session after promote"
);
}
#[cfg(target_os = "linux")]
#[test]
fn test_deregister_session_index_preserves_connected_udp_on_rekey_drain() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let node_addr = *identity.node_addr();
node.add_connection(conn).unwrap();
node.promote_connection(link_id, identity, 2000).unwrap();
let index_old = node
.get_peer(&node_addr)
.unwrap()
.our_index()
.unwrap()
.as_u32();
let index_new: u32 = 9999;
node.peers_by_index
.insert((transport_id, index_new), node_addr);
node.deregister_session_index((transport_id, index_old));
assert!(
!node.peers_by_index.contains_key(&(transport_id, index_old)),
"old index must be evicted"
);
assert!(
node.peers_by_index.contains_key(&(transport_id, index_new)),
"new index must survive the deregister"
);
assert!(
node.get_peer(&node_addr).is_some(),
"peer must still be present after rekey-drain deregistration"
);
assert!(
!node
.decrypt_registered_sessions
.contains(&(transport_id, index_old)),
"old session must be evicted from decrypt_registered_sessions"
);
}
#[test]
fn test_node_cross_connection_resolution() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id1 = LinkId::new(1);
let (conn1, identity) = make_completed_connection(&mut node, link_id1, transport_id, 1000);
let node_addr = *identity.node_addr();
node.add_connection(conn1).unwrap();
node.promote_connection(link_id1, identity, 1500).unwrap();
assert_eq!(node.peer_count(), 1);
assert_eq!(node.get_peer(&node_addr).unwrap().link_id(), link_id1);
let peer = node.get_peer(&node_addr).unwrap();
let our_idx = peer.our_index().unwrap();
assert_eq!(
node.peers_by_index.get(&(transport_id, our_idx.as_u32())),
Some(&node_addr)
);
assert_eq!(node.peer_count(), 1);
}
#[test]
fn test_node_peer_limit() {
let mut node = make_node();
let transport_id = TransportId::new(1);
node.set_max_peers(2);
for i in 0..2 {
let link_id = LinkId::new(i as u64 + 1);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
node.add_connection(conn).unwrap();
node.promote_connection(link_id, identity, 2000).unwrap();
}
assert_eq!(node.peer_count(), 2);
let link_id = LinkId::new(3);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 3000);
node.add_connection(conn).unwrap();
let result = node.promote_connection(link_id, identity, 4000);
assert!(matches!(result, Err(NodeError::MaxPeersExceeded { .. })));
}
#[test]
fn test_node_link_id_allocation() {
let mut node = make_node();
let id1 = node.allocate_link_id();
let id2 = node.allocate_link_id();
let id3 = node.allocate_link_id();
assert_ne!(id1, id2);
assert_ne!(id2, id3);
assert_eq!(id1.as_u64(), 1);
assert_eq!(id2.as_u64(), 2);
assert_eq!(id3.as_u64(), 3);
}
#[test]
fn test_node_transport_management() {
let mut node = make_node();
assert_eq!(node.transport_count(), 0);
let id1 = node.allocate_transport_id();
let id2 = node.allocate_transport_id();
assert_ne!(id1, id2);
assert!(node.get_transport(&id1).is_none());
assert!(node.get_transport(&id2).is_none());
assert_eq!(node.transport_ids().count(), 0);
}
#[test]
fn test_node_sendable_peers() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id1 = LinkId::new(1);
let (conn1, identity1) = make_completed_connection(&mut node, link_id1, transport_id, 1000);
let node_addr1 = *identity1.node_addr();
node.add_connection(conn1).unwrap();
node.promote_connection(link_id1, identity1, 2000).unwrap();
let link_id2 = LinkId::new(2);
let (conn2, identity2) = make_completed_connection(&mut node, link_id2, transport_id, 1000);
node.add_connection(conn2).unwrap();
node.promote_connection(link_id2, identity2, 2000).unwrap();
let link_id3 = LinkId::new(3);
let (conn3, identity3) = make_completed_connection(&mut node, link_id3, transport_id, 1000);
let node_addr3 = *identity3.node_addr();
node.add_connection(conn3).unwrap();
node.promote_connection(link_id3, identity3, 2000).unwrap();
node.get_peer_mut(&node_addr3).unwrap().mark_disconnected();
assert_eq!(node.peer_count(), 3);
assert_eq!(node.sendable_peer_count(), 2);
let sendable: Vec<_> = node.sendable_peers().collect();
assert_eq!(sendable.len(), 2);
assert!(sendable.iter().any(|p| p.node_addr() == &node_addr1));
}
#[test]
fn test_node_index_allocator_initialized() {
let node = make_node();
assert_eq!(node.index_allocator.count(), 0);
}
#[test]
fn test_node_pending_outbound_tracking() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let index = node.index_allocator.allocate().unwrap();
node.pending_outbound
.insert((transport_id, index.as_u32()), link_id);
let found = node.pending_outbound.get(&(transport_id, index.as_u32()));
assert_eq!(found, Some(&link_id));
node.pending_outbound
.remove(&(transport_id, index.as_u32()));
let _ = node.index_allocator.free(index);
assert_eq!(node.index_allocator.count(), 0);
assert!(node.pending_outbound.is_empty());
}
#[test]
fn test_node_peers_by_index_tracking() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let node_addr = make_node_addr(42);
let index = node.index_allocator.allocate().unwrap();
node.peers_by_index
.insert((transport_id, index.as_u32()), node_addr);
let found = node.peers_by_index.get(&(transport_id, index.as_u32()));
assert_eq!(found, Some(&node_addr));
node.peers_by_index.remove(&(transport_id, index.as_u32()));
let _ = node.index_allocator.free(index);
assert!(node.peers_by_index.is_empty());
}
#[tokio::test]
async fn test_node_rx_loop_requires_start() {
let mut node = make_node();
let result = node.run_rx_loop().await;
assert!(matches!(result, Err(NodeError::NotStarted)));
}
#[tokio::test]
async fn test_node_rx_loop_takes_channel() {
let mut node = make_node();
node.start().await.unwrap();
assert!(node.packet_rx.is_some());
let rx = node.packet_rx.take();
assert!(rx.is_some());
assert!(node.packet_rx.is_none());
node.stop().await.unwrap();
}
#[test]
fn test_rate_limiter_initialized() {
let mut node = make_node();
assert!(node.msg1_rate_limiter.can_start_handshake());
assert!(node.msg1_rate_limiter.start_handshake());
assert_eq!(node.msg1_rate_limiter.pending_count(), 1);
node.msg1_rate_limiter.complete_handshake();
assert_eq!(node.msg1_rate_limiter.pending_count(), 0);
}
#[test]
fn test_promote_cleans_up_pending_outbound_to_same_peer() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let peer_b_full = Identity::generate();
let peer_b_identity = PeerIdentity::from_pubkey_full(peer_b_full.pubkey_full());
let peer_b_node_addr = *peer_b_identity.node_addr();
let pending_link_id = LinkId::new(1);
let pending_time_ms = 1000;
let mut pending_conn =
PeerConnection::outbound(pending_link_id, peer_b_identity, pending_time_ms);
let our_keypair = node.identity.keypair();
let _msg1 = pending_conn
.start_handshake(our_keypair, node.startup_epoch, pending_time_ms)
.unwrap();
let pending_index = node.index_allocator.allocate().unwrap();
pending_conn.set_our_index(pending_index);
pending_conn.set_transport_id(transport_id);
let pending_addr = TransportAddr::from_string("10.0.0.2:2121");
pending_conn.set_source_addr(pending_addr.clone());
let pending_link = Link::connectionless(
pending_link_id,
transport_id,
pending_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
);
node.links.insert(pending_link_id, pending_link);
node.addr_to_link
.insert((transport_id, pending_addr.clone()), pending_link_id);
node.connections.insert(pending_link_id, pending_conn);
node.pending_outbound
.insert((transport_id, pending_index.as_u32()), pending_link_id);
assert_eq!(node.connection_count(), 1);
assert_eq!(node.link_count(), 1);
assert_eq!(node.index_allocator.count(), 1);
let completing_link_id = LinkId::new(2);
let completing_time_ms = 2000;
let mut completing_conn =
PeerConnection::outbound(completing_link_id, peer_b_identity, completing_time_ms);
let our_keypair = node.identity.keypair();
let msg1 = completing_conn
.start_handshake(our_keypair, node.startup_epoch, completing_time_ms)
.unwrap();
let mut resp_conn = PeerConnection::inbound(LinkId::new(999), completing_time_ms);
let peer_keypair = peer_b_full.keypair();
let mut resp_epoch = [0u8; 8];
rand::Rng::fill_bytes(&mut rand::rng(), &mut resp_epoch);
let msg2 = resp_conn
.receive_handshake_init(peer_keypair, resp_epoch, &msg1, completing_time_ms)
.unwrap();
completing_conn
.complete_handshake(&msg2, completing_time_ms)
.unwrap();
let completing_index = node.index_allocator.allocate().unwrap();
completing_conn.set_our_index(completing_index);
completing_conn.set_their_index(SessionIndex::new(99));
completing_conn.set_transport_id(transport_id);
completing_conn.set_source_addr(TransportAddr::from_string("10.0.0.2:4001"));
node.add_connection(completing_conn).unwrap();
assert_eq!(node.connection_count(), 2);
assert_eq!(node.index_allocator.count(), 2);
let result = node
.promote_connection(completing_link_id, peer_b_identity, completing_time_ms)
.unwrap();
assert!(matches!(result, PromotionResult::Promoted(_)));
assert_eq!(
node.connection_count(),
1,
"Pending outbound should be preserved (deferred cleanup)"
);
assert_eq!(node.peer_count(), 1, "Promoted peer should exist");
assert!(
node.pending_outbound
.contains_key(&(transport_id, pending_index.as_u32())),
"pending_outbound entry should still exist (awaiting msg2)"
);
assert_eq!(
node.index_allocator.count(),
2,
"Both indices should remain until msg2 cleanup"
);
let peer = node.get_peer(&peer_b_node_addr).unwrap();
assert_eq!(peer.link_id(), completing_link_id);
}
#[test]
fn test_schedule_retry_creates_entry() {
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut config = Config::new();
config.peers.push(crate::config::PeerConfig::new(
peer_npub,
"udp",
"10.0.0.2:2121",
));
let mut node = Node::new(config).unwrap();
assert!(node.retry_pending.is_empty());
node.schedule_retry(peer_node_addr, 1000);
assert_eq!(node.retry_pending.len(), 1);
let state = node.retry_pending.get(&peer_node_addr).unwrap();
assert_eq!(state.retry_count, 1);
assert!(
state.reconnect,
"Auto-connect peers always get reconnect=true"
);
assert_eq!(state.retry_after_ms, 1000 + 10_000);
}
#[test]
fn test_schedule_retry_increments() {
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut config = Config::new();
config.peers.push(crate::config::PeerConfig::new(
peer_npub,
"udp",
"10.0.0.2:2121",
));
let mut node = Node::new(config).unwrap();
node.schedule_retry(peer_node_addr, 1000);
assert_eq!(
node.retry_pending.get(&peer_node_addr).unwrap().retry_count,
1
);
node.schedule_retry(peer_node_addr, 11_000);
let state = node.retry_pending.get(&peer_node_addr).unwrap();
assert_eq!(state.retry_count, 2);
assert_eq!(state.retry_after_ms, 11_000 + 20_000);
}
#[tokio::test]
async fn test_process_pending_retries_is_budgeted_per_tick() {
let mut node = make_node();
let mut addrs = Vec::new();
for _ in 0..20 {
let identity = Identity::generate();
let npub = identity.npub();
let peer_identity = PeerIdentity::from_npub(&npub).unwrap();
let node_addr = *peer_identity.node_addr();
node.retry_pending.insert(
node_addr,
crate::node::retry::RetryState {
peer_config: crate::config::PeerConfig::new(npub, "udp", "10.0.0.2:2121"),
retry_count: 0,
retry_after_ms: 0,
reconnect: true,
expires_at_ms: None,
},
);
addrs.push(node_addr);
}
node.process_pending_retries(1).await;
let processed = addrs
.iter()
.filter(|addr| {
node.retry_pending
.get(addr)
.is_some_and(|state| state.retry_count > 0)
})
.count();
let deferred = addrs.len().saturating_sub(processed);
assert_eq!(processed, 16);
assert_eq!(deferred, 4);
assert_eq!(node.retry_pending.len(), 20);
}
#[test]
fn test_schedule_retry_auto_connect_never_exhausts() {
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut config = Config::new();
config.node.retry.max_retries = 2;
config.peers.push(crate::config::PeerConfig::new(
peer_npub,
"udp",
"10.0.0.2:2121",
));
let mut node = Node::new(config).unwrap();
node.schedule_retry(peer_node_addr, 1000);
assert!(node.retry_pending.contains_key(&peer_node_addr));
node.schedule_retry(peer_node_addr, 2000);
assert!(node.retry_pending.contains_key(&peer_node_addr));
node.schedule_retry(peer_node_addr, 3000);
assert!(
node.retry_pending.contains_key(&peer_node_addr),
"Auto-connect peers should never exhaust retries"
);
assert_eq!(
node.retry_pending.get(&peer_node_addr).unwrap().retry_count,
3
);
}
#[test]
fn test_schedule_retry_disabled() {
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut config = Config::new();
config.node.retry.max_retries = 0;
config.peers.push(crate::config::PeerConfig::new(
peer_npub,
"udp",
"10.0.0.2:2121",
));
let mut node = Node::new(config).unwrap();
node.schedule_retry(peer_node_addr, 1000);
assert!(
node.retry_pending.is_empty(),
"No retry should be scheduled when max_retries=0"
);
}
#[test]
fn test_schedule_retry_ignores_non_autoconnect() {
let peer_identity = Identity::generate();
let peer_node_addr = *peer_identity.node_addr();
let mut node = make_node();
node.schedule_retry(peer_node_addr, 1000);
assert!(
node.retry_pending.is_empty(),
"No retry for unconfigured peer"
);
}
#[test]
fn test_schedule_retry_skips_connected_peer() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let node_addr = *identity.node_addr();
node.add_connection(conn).unwrap();
node.promote_connection(link_id, identity, 2000).unwrap();
assert_eq!(node.peer_count(), 1);
node.schedule_retry(node_addr, 3000);
assert!(
node.retry_pending.is_empty(),
"No retry for already-connected peer"
);
}
#[test]
fn test_schedule_retry_keeps_connected_bootstrap_peer_refreshable() {
let peer_full = Identity::generate();
let peer_npub = peer_full.npub();
let peer_identity = PeerIdentity::from_pubkey_full(peer_full.pubkey_full());
let peer_node_addr = *peer_identity.node_addr();
let mut config = Config::new();
config.peers.push(crate::config::PeerConfig::new(
peer_npub,
"udp",
"127.0.0.1:9",
));
let mut node = Node::new(config).unwrap();
let bootstrap_id = TransportId::new(99);
node.bootstrap_transports.insert(bootstrap_id);
let mut active_peer = ActivePeer::new(peer_identity, LinkId::new(7), 1_000);
active_peer.set_current_addr(bootstrap_id, &TransportAddr::from_string("127.0.0.1:9"));
node.peers.insert(peer_node_addr, active_peer);
node.schedule_retry(peer_node_addr, 3_000);
assert!(
node.retry_pending.contains_key(&peer_node_addr),
"bootstrap/fallback paths should not permanently suppress direct refresh retries"
);
}
#[tokio::test]
async fn test_try_peer_addresses_skips_connected_peer() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, peer_identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let peer_config = crate::config::PeerConfig::new(peer_identity.npub(), "udp", "127.0.0.1:9");
node.add_connection(conn).unwrap();
node.promote_connection(link_id, peer_identity, 2000)
.unwrap();
let link_count = node.link_count();
let connection_count = node.connection_count();
node.try_peer_addresses(&peer_config, peer_identity, true)
.await
.unwrap();
assert_eq!(
node.link_count(),
link_count,
"stale retry/traversal fallback must not create a duplicate link"
);
assert_eq!(
node.connection_count(),
connection_count,
"stale retry/traversal fallback must not create a duplicate handshake"
);
}
#[tokio::test]
async fn test_try_peer_addresses_skips_connecting_peer() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let peer_identity = make_peer_identity();
let peer_config = crate::config::PeerConfig::new(peer_identity.npub(), "udp", "127.0.0.1:9");
let mut pending = PeerConnection::outbound(LinkId::new(1), peer_identity, 1000);
pending.set_transport_id(transport_id);
pending.set_source_addr(TransportAddr::from_string("127.0.0.1:9"));
node.add_connection(pending).unwrap();
node.try_peer_addresses(&peer_config, peer_identity, true)
.await
.unwrap();
assert_eq!(
node.connection_count(),
1,
"stale retry/traversal fallback must not start a second handshake"
);
assert_eq!(
node.link_count(),
0,
"stale retry/traversal fallback must not allocate a link for the duplicate path"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_nostr_traversal_failure_skips_connected_peer() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let now_ms = Node::now_ms();
let (conn, peer_identity) = make_completed_connection(&mut node, link_id, transport_id, now_ms);
node.add_connection(conn).unwrap();
node.promote_connection(link_id, peer_identity, now_ms)
.unwrap();
let peer_addr = *peer_identity.node_addr();
let current_addr = node
.peers
.get(&peer_addr)
.and_then(|peer| peer.current_addr().cloned())
.expect("promoted test peer has a current address");
node.peers
.get_mut(&peer_addr)
.expect("promoted test peer")
.touch(Node::now_ms());
let bootstrap = Arc::new(NostrDiscovery::new_for_test());
bootstrap.push_event_for_test(BootstrapEvent::Failed {
peer_config: crate::config::PeerConfig::new(
peer_identity.npub(),
"udp",
current_addr.to_string(),
),
reason: "stale traversal failure".to_string(),
});
node.nostr_discovery = Some(bootstrap.clone());
node.poll_nostr_discovery().await;
assert!(
bootstrap.failure_state_snapshot().is_empty(),
"stale failures for connected peers must not affect traversal cooldown"
);
assert!(
node.retry_pending.is_empty(),
"stale failures for connected peers must not enqueue reconnect attempts"
);
}
#[tokio::test]
async fn process_packet_ignores_punch_and_non_fmp_noise_for_bootstrap_cooldown() {
let mut node = make_node();
let bootstrap = Arc::new(NostrDiscovery::new_for_test());
let transport_id = TransportId::new(44);
let peer = Identity::generate();
let peer_npub = peer.npub();
node.nostr_discovery = Some(bootstrap.clone());
node.bootstrap_transports.insert(transport_id);
node.bootstrap_transport_npubs
.insert(transport_id, peer_npub.clone());
let remote = crate::transport::TransportAddr::from_string("127.0.0.1:9");
let mut punch = vec![0u8; 24];
punch[..4].copy_from_slice(&crate::discovery::PUNCH_MAGIC.to_be_bytes());
node.process_packet(ReceivedPacket::new(transport_id, remote.clone(), punch))
.await;
node.process_packet(ReceivedPacket::new(
transport_id,
remote.clone(),
vec![0x45, 0x00, 0x00, 0x00],
))
.await;
assert!(
bootstrap.failure_state_snapshot().is_empty(),
"stray punch/IPv4-looking datagrams must not poison bootstrap cooldown"
);
node.process_packet(ReceivedPacket::new(
transport_id,
remote,
vec![0x11, 0x00, 0x00, 0x00],
))
.await;
assert_eq!(
bootstrap.failure_state_snapshot().len(),
1,
"a plausible FMP packet with a different version should still be treated as structural"
);
}
#[tokio::test]
async fn test_process_pending_retries_drops_expired_entries() {
let mut node = make_node();
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut state = super::super::retry::RetryState::new(crate::config::PeerConfig::new(
peer_npub,
"udp",
"127.0.0.1:9",
));
state.retry_after_ms = 0;
state.expires_at_ms = Some(1_000);
state.reconnect = true;
node.retry_pending.insert(peer_node_addr, state);
node.process_pending_retries(1_000).await;
assert!(
!node.retry_pending.contains_key(&peer_node_addr),
"expired retry entries should be dropped before retry processing"
);
}
#[test]
fn test_schedule_reconnect_preserves_backoff() {
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut config = Config::new();
config.peers.push(crate::config::PeerConfig::new(
peer_npub,
"udp",
"10.0.0.2:2121",
));
let mut node = Node::new(config).unwrap();
node.schedule_retry(peer_node_addr, 1_000); node.schedule_retry(peer_node_addr, 11_000); {
let state = node.retry_pending.get(&peer_node_addr).unwrap();
assert_eq!(state.retry_count, 2, "Two failures should yield count=2");
}
node.schedule_reconnect(peer_node_addr, 31_000);
let state = node.retry_pending.get(&peer_node_addr).unwrap();
assert!(state.reconnect, "Entry should be marked as reconnect");
assert_eq!(
state.retry_count, 3,
"schedule_reconnect should increment existing count (was 2), not reset to 0 (regression: issue #5)"
);
let base_ms = node.config.node.retry.base_interval_secs * 1000;
let max_ms = node.config.node.retry.max_backoff_secs * 1000;
let expected_delay = state.backoff_ms(base_ms, max_ms);
assert_eq!(
state.retry_after_ms,
31_000 + expected_delay,
"retry_after_ms should reflect count=3 backoff"
);
}
#[test]
fn test_schedule_reconnect_fresh_state() {
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut config = Config::new();
config.peers.push(crate::config::PeerConfig::new(
peer_npub,
"udp",
"10.0.0.2:2121",
));
let mut node = Node::new(config).unwrap();
node.schedule_reconnect(peer_node_addr, 1_000);
let state = node.retry_pending.get(&peer_node_addr).unwrap();
assert!(state.reconnect, "Entry should be marked as reconnect");
assert_eq!(
state.retry_count, 0,
"Fresh reconnect should start at count=0"
);
let base_ms = node.config.node.retry.base_interval_secs * 1000;
let max_ms = node.config.node.retry.max_backoff_secs * 1000;
let expected_delay = state.backoff_ms(base_ms, max_ms);
assert_eq!(state.retry_after_ms, 1_000 + expected_delay);
}
#[test]
fn test_disconnect_schedules_reconnect() {
use crate::protocol::{Disconnect, DisconnectReason};
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut config = Config::new();
config.peers.push(crate::config::PeerConfig::new(
peer_npub,
"udp",
"10.0.0.2:2121",
));
let mut node = Node::new(config).unwrap();
let payload = Disconnect::new(DisconnectReason::Shutdown).encode();
node.handle_disconnect(&peer_node_addr, &payload);
let state = node
.retry_pending
.get(&peer_node_addr)
.expect("handle_disconnect should schedule reconnect for auto-connect peer");
assert!(state.reconnect, "Entry should be marked as reconnect");
assert_eq!(
state.retry_count, 0,
"Fresh reconnect after disconnect should start at count=0"
);
}
#[test]
fn test_promote_clears_retry_pending() {
let mut node = make_node();
let transport_id = TransportId::new(1);
let link_id = LinkId::new(1);
let (conn, identity) = make_completed_connection(&mut node, link_id, transport_id, 1000);
let node_addr = *identity.node_addr();
node.retry_pending.insert(
node_addr,
super::super::retry::RetryState::new(crate::config::PeerConfig::default()),
);
assert_eq!(node.retry_pending.len(), 1);
node.add_connection(conn).unwrap();
node.promote_connection(link_id, identity, 2000).unwrap();
assert!(
!node.retry_pending.contains_key(&node_addr),
"retry_pending should be cleared on successful promotion"
);
}
#[test]
fn test_promote_keeps_retry_pending_for_bootstrap_path() {
let mut node = make_node();
let bootstrap_id = TransportId::new(1);
node.bootstrap_transports.insert(bootstrap_id);
let link_id = LinkId::new(1);
let (conn, identity) = make_completed_connection(&mut node, link_id, bootstrap_id, 1000);
let node_addr = *identity.node_addr();
let peer_config = crate::config::PeerConfig::new(identity.npub(), "udp", "127.0.0.1:5000");
node.retry_pending
.insert(node_addr, super::super::retry::RetryState::new(peer_config));
node.add_connection(conn).unwrap();
node.promote_connection(link_id, identity, 2000).unwrap();
assert!(
node.retry_pending.contains_key(&node_addr),
"promotion over bootstrap/fallback transport should keep direct refresh retry state"
);
}
#[tokio::test]
async fn test_initiate_peer_connections_schedules_retry_on_no_transport() {
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut config = Config::new();
config.peers.push(crate::config::PeerConfig::new(
peer_npub,
"udp",
"10.0.0.2:2121",
));
let mut node = Node::new(config).unwrap();
assert!(node.retry_pending.is_empty());
node.initiate_peer_connections().await;
assert!(
node.retry_pending.contains_key(&peer_node_addr),
"startup peer-init failure must enqueue a retry so the peer can recover \
without a daemon restart"
);
}
async fn make_udp_transport_with_mtu(id: u32, mtu: u16) -> TransportHandle {
let (packet_tx, _packet_rx) = packet_channel(64);
let transport_id = TransportId::new(id);
let mut udp = UdpTransport::new(
transport_id,
Some(format!("udp{}", id)),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
mtu: Some(mtu),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
TransportHandle::Udp(udp)
}
#[tokio::test]
async fn test_transport_mtu_returns_min_across_operational() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx);
node.packet_rx = Some(packet_rx);
let udp1 = make_udp_transport_with_mtu(1, 1497).await;
let udp2 = make_udp_transport_with_mtu(2, 1280).await;
let udp3 = make_udp_transport_with_mtu(3, 1400).await;
node.transports.insert(TransportId::new(1), udp1);
node.transports.insert(TransportId::new(2), udp2);
node.transports.insert(TransportId::new(3), udp3);
assert_eq!(node.transport_mtu(), 1280);
assert_eq!(node.effective_ipv6_mtu(), 1203);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_transport_mtu_fallback_when_no_operational_transports() {
let node = make_node();
assert_eq!(node.transport_mtu(), 1280);
}
#[tokio::test]
async fn test_transport_mtu_min_with_single_operational() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx);
node.packet_rx = Some(packet_rx);
let udp = make_udp_transport_with_mtu(1, 1452).await;
node.transports.insert(TransportId::new(1), udp);
assert_eq!(node.transport_mtu(), 1452);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_seed_path_mtu_inserts_when_empty() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx);
node.packet_rx = Some(packet_rx);
let udp = make_udp_transport_with_mtu(1, 1452).await;
node.transports.insert(TransportId::new(1), udp);
let peer_addr = make_node_addr(0xAA);
let fips_addr = crate::FipsAddress::from_node_addr(&peer_addr);
let transport_addr = TransportAddr::from_string("10.0.0.2:2121");
node.seed_path_mtu_for_link_peer(&peer_addr, TransportId::new(1), &transport_addr);
let stored = node
.path_mtu_lookup
.read()
.unwrap()
.get(&fips_addr)
.copied();
assert_eq!(
stored,
Some(1452),
"Empty lookup should be seeded with the link MTU"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_seed_path_mtu_keeps_tighter_existing_value() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx);
node.packet_rx = Some(packet_rx);
let udp = make_udp_transport_with_mtu(1, 1452).await;
node.transports.insert(TransportId::new(1), udp);
let peer_addr = make_node_addr(0xBB);
let fips_addr = crate::FipsAddress::from_node_addr(&peer_addr);
let transport_addr = TransportAddr::from_string("10.0.0.3:2121");
node.path_mtu_lookup
.write()
.unwrap()
.insert(fips_addr, 1280);
node.seed_path_mtu_for_link_peer(&peer_addr, TransportId::new(1), &transport_addr);
let stored = node
.path_mtu_lookup
.read()
.unwrap()
.get(&fips_addr)
.copied();
assert_eq!(
stored,
Some(1280),
"Existing tighter value (1280) must not be loosened by direct-link seed (1452)"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_seed_path_mtu_tightens_looser_existing_value() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx);
node.packet_rx = Some(packet_rx);
let udp = make_udp_transport_with_mtu(1, 1280).await;
node.transports.insert(TransportId::new(1), udp);
let peer_addr = make_node_addr(0xCC);
let fips_addr = crate::FipsAddress::from_node_addr(&peer_addr);
let transport_addr = TransportAddr::from_string("10.0.0.4:2121");
node.path_mtu_lookup
.write()
.unwrap()
.insert(fips_addr, 1452);
node.seed_path_mtu_for_link_peer(&peer_addr, TransportId::new(1), &transport_addr);
let stored = node
.path_mtu_lookup
.read()
.unwrap()
.get(&fips_addr)
.copied();
assert_eq!(
stored,
Some(1280),
"Direct-link seed (1280) must overwrite looser existing value (1452)"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_retry_races_overlay_advert_alongside_static_udp_hint() {
use crate::config::NostrDiscoveryPolicy;
use crate::discovery::nostr::{NostrDiscovery, OverlayEndpointAdvert, OverlayTransportKind};
let mut config = Config::new();
config.node.discovery.nostr.enabled = true;
config.node.discovery.nostr.policy = NostrDiscoveryPolicy::ConfiguredOnly;
let mut node = Node::new(config).unwrap();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let static_sink = tokio::net::UdpSocket::bind("127.0.0.1:0")
.await
.expect("bind static sink");
let stale_static_addr = static_sink
.local_addr()
.expect("static sink local addr")
.to_string();
let fresh_overlay_addr = "127.0.0.1:55180";
let bootstrap = Arc::new(NostrDiscovery::new_for_test());
let endpoint = OverlayEndpointAdvert {
transport: OverlayTransportKind::Udp,
addr: fresh_overlay_addr.to_string(),
};
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let advert = NostrDiscovery::cached_advert_for_test(peer_npub.clone(), endpoint, now_secs);
bootstrap
.insert_advert_for_test(peer_npub.clone(), advert)
.await;
node.nostr_discovery = Some(bootstrap);
let peer_config = crate::config::PeerConfig {
npub: peer_npub.clone(),
alias: None,
addresses: vec![crate::config::PeerAddress::new(
"udp",
stale_static_addr.clone(),
)],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
node.config.peers.push(peer_config.clone());
node.initiate_peer_retry_connection(&peer_config)
.await
.unwrap();
let fresh = TransportAddr::from_string(fresh_overlay_addr);
let stale = TransportAddr::from_string(&stale_static_addr);
let fresh_link = node.find_link_by_addr(transport_id, &fresh);
let stale_link = node.find_link_by_addr(transport_id, &stale);
assert!(
fresh_link.is_some(),
"retry should race fresh overlay advert {fresh_overlay_addr} alongside the static candidate"
);
assert!(
stale_link.is_some(),
"retry should keep stale static {stale_static_addr} in the bounded path race"
);
assert_eq!(node.connection_count(), 2);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_bootstrap_races_static_address_and_overlay_advert() {
use crate::config::NostrDiscoveryPolicy;
use crate::discovery::nostr::{NostrDiscovery, OverlayEndpointAdvert, OverlayTransportKind};
let mut config = Config::new();
config.node.discovery.nostr.enabled = true;
config.node.discovery.nostr.policy = NostrDiscoveryPolicy::ConfiguredOnly;
let mut node = Node::new(config).unwrap();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let static_sink = tokio::net::UdpSocket::bind("127.0.0.1:0")
.await
.expect("bind static sink");
let static_addr = static_sink
.local_addr()
.expect("static sink local addr")
.to_string();
let overlay_addr = "127.0.0.1:55181";
let bootstrap = Arc::new(NostrDiscovery::new_for_test());
let endpoint = OverlayEndpointAdvert {
transport: OverlayTransportKind::Udp,
addr: overlay_addr.to_string(),
};
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let advert = NostrDiscovery::cached_advert_for_test(peer_npub.clone(), endpoint, now_secs);
bootstrap
.insert_advert_for_test(peer_npub.clone(), advert)
.await;
node.nostr_discovery = Some(bootstrap);
let peer_config = crate::config::PeerConfig {
npub: peer_npub.clone(),
alias: None,
addresses: vec![crate::config::PeerAddress::new("udp", static_addr.clone())],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
node.config.peers.push(peer_config.clone());
node.initiate_peer_connection(&peer_config).await.unwrap();
let stat = TransportAddr::from_string(&static_addr);
let overlay = TransportAddr::from_string(overlay_addr);
let overlay_link = node.find_link_by_addr(transport_id, &overlay);
let static_link = node.find_link_by_addr(transport_id, &stat);
assert!(
overlay_link.is_some(),
"cold-start should race fresh overlay fallback alongside a static candidate"
);
assert!(
static_link.is_some(),
"cold-start should keep the unstamped static address in the bounded path race"
);
assert_eq!(node.connection_count(), 2);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_retry_races_fresh_overlay_udp_candidates_without_static_direct() {
use crate::config::NostrDiscoveryPolicy;
use crate::discovery::nostr::{NostrDiscovery, OverlayEndpointAdvert, OverlayTransportKind};
let mut config = Config::new();
config.node.discovery.nostr.enabled = true;
config.node.discovery.nostr.policy = NostrDiscoveryPolicy::ConfiguredOnly;
let mut node = Node::new(config).unwrap();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let first_sink = tokio::net::UdpSocket::bind("127.0.0.1:0")
.await
.expect("bind first sink");
let second_sink = tokio::net::UdpSocket::bind("127.0.0.1:0")
.await
.expect("bind second sink");
let first_addr = first_sink
.local_addr()
.expect("first sink addr")
.to_string();
let second_addr = second_sink
.local_addr()
.expect("second sink addr")
.to_string();
let bootstrap = Arc::new(NostrDiscovery::new_for_test());
let first_endpoint = OverlayEndpointAdvert {
transport: OverlayTransportKind::Udp,
addr: first_addr.clone(),
};
let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let mut advert =
NostrDiscovery::cached_advert_for_test(peer_npub.clone(), first_endpoint, now_secs);
advert.advert.endpoints.push(OverlayEndpointAdvert {
transport: OverlayTransportKind::Udp,
addr: second_addr.clone(),
});
bootstrap
.insert_advert_for_test(peer_npub.clone(), advert)
.await;
node.nostr_discovery = Some(bootstrap);
let peer_config = crate::config::PeerConfig {
npub: peer_npub.clone(),
alias: None,
addresses: Vec::new(),
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
node.config.peers.push(peer_config.clone());
node.initiate_peer_retry_connection(&peer_config)
.await
.unwrap();
assert!(
node.find_link_by_addr(transport_id, &TransportAddr::from_string(&first_addr))
.is_some(),
"first overlay UDP candidate should be raced"
);
assert!(
node.find_link_by_addr(transport_id, &TransportAddr::from_string(&second_addr))
.is_some(),
"a fresh overlay attempt must not suppress a later direct UDP candidate"
);
assert_eq!(node.connection_count(), 2);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn test_seed_path_mtu_noop_for_unknown_transport() {
let node = make_node();
let peer_addr = make_node_addr(0xDD);
let fips_addr = crate::FipsAddress::from_node_addr(&peer_addr);
let transport_addr = TransportAddr::from_string("10.0.0.5:2121");
node.seed_path_mtu_for_link_peer(&peer_addr, TransportId::new(99), &transport_addr);
let map = node.path_mtu_lookup.read().unwrap();
assert!(
map.get(&fips_addr).is_none(),
"Seed must be a no-op when transport_id is not registered"
);
}
fn npub_for_test() -> String {
Identity::generate().npub()
}
fn peer_identity_for_outbound_refresh_owner(node: &Node) -> (Identity, PeerIdentity) {
loop {
let identity = Identity::generate();
let peer_identity = PeerIdentity::from_pubkey_full(identity.pubkey_full());
if node.identity.node_addr() < peer_identity.node_addr() {
return (identity, peer_identity);
}
}
}
fn peer_identity_for_outbound_refresh_loser(node: &Node) -> (Identity, PeerIdentity) {
loop {
let identity = Identity::generate();
let peer_identity = PeerIdentity::from_pubkey_full(identity.pubkey_full());
if node.identity.node_addr() > peer_identity.node_addr() {
return (identity, peer_identity);
}
}
}
fn auto_connect_peer(npub: String, addr: &str) -> crate::config::PeerConfig {
crate::config::PeerConfig {
npub,
alias: None,
addresses: vec![crate::config::PeerAddress::new("udp", addr)],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
}
}
#[tokio::test]
async fn update_peers_preserves_input_priority_order() {
let mut node = make_node();
let first = Identity::generate();
let second = Identity::generate();
let third = Identity::generate();
let first_original = auto_connect_peer(first.npub(), "127.0.0.1:9");
let second_peer = auto_connect_peer(second.npub(), "127.0.0.1:10");
let third_peer = auto_connect_peer(third.npub(), "127.0.0.1:11");
let first_updated = auto_connect_peer(first.npub(), "127.0.0.1:12");
let outcome = node
.update_peers(vec![
first_original,
second_peer.clone(),
third_peer.clone(),
first_updated.clone(),
])
.await
.unwrap();
assert_eq!(outcome.added, 3);
assert_eq!(
node.config
.peers
.iter()
.map(|peer| peer.npub.as_str())
.collect::<Vec<_>>(),
vec![
first_updated.npub.as_str(),
second_peer.npub.as_str(),
third_peer.npub.as_str(),
],
"caller priority order should survive de-duplication"
);
assert_eq!(node.config.peers[0].addresses, first_updated.addresses);
}
#[tokio::test]
async fn update_peers_races_alternate_path_even_when_outbound_would_lose() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let (peer_full, peer_identity) = peer_identity_for_outbound_refresh_loser(&node);
let peer_node_addr = *peer_identity.node_addr();
let old_addr = TransportAddr::from_string("127.0.0.1:7");
let old_link_id = LinkId::new(7);
let mut active_peer = ActivePeer::new(peer_identity, old_link_id, 1_000);
active_peer.set_current_addr(transport_id, &old_addr);
node.peers.insert(peer_node_addr, active_peer);
node.links.insert(
old_link_id,
Link::connectionless(
old_link_id,
transport_id,
old_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
let peer = auto_connect_peer(peer_full.npub(), "127.0.0.1:9");
node.config.peers = vec![peer.clone()];
let outcome = node.update_peers(vec![peer]).await.unwrap();
assert_eq!(outcome.unchanged, 1);
assert_eq!(node.peer_count(), 1, "current active peer must remain live");
assert_eq!(
node.connection_count(),
1,
"alternate path should be attempted even when our outbound would lose cross-connection"
);
let active = node.get_peer(&peer_node_addr).unwrap();
assert_eq!(active.link_id(), old_link_id);
assert_eq!(active.current_addr(), Some(&old_addr));
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn update_peers_returns_zero_on_empty_diff() {
let mut node = make_node();
let outcome = node.update_peers(Vec::new()).await.unwrap();
assert_eq!(outcome.added, 0);
assert_eq!(outcome.removed, 0);
assert_eq!(outcome.updated, 0);
assert_eq!(outcome.unchanged, 0);
}
#[tokio::test]
async fn update_peers_adds_new_peer_and_registers_alias() {
let mut node = make_node();
let npub = npub_for_test();
let mut peer = auto_connect_peer(npub.clone(), "127.0.0.1:9");
peer.alias = Some("alice".to_string());
let outcome = node.update_peers(vec![peer.clone()]).await.unwrap();
assert_eq!(outcome.added, 1);
assert_eq!(outcome.removed, 0);
assert_eq!(outcome.updated, 0);
assert_eq!(outcome.unchanged, 0);
assert_eq!(node.config.peers.len(), 1);
let identity = PeerIdentity::from_npub(&peer.npub).unwrap();
assert_eq!(
node.peer_aliases.get(identity.node_addr()),
Some(&"alice".to_string())
);
}
#[tokio::test]
async fn update_peers_removes_dropped_peer_and_clears_retry_state() {
let mut node = make_node();
let npub = npub_for_test();
let peer = auto_connect_peer(npub.clone(), "127.0.0.1:9");
let _ = node.update_peers(vec![peer.clone()]).await.unwrap();
let identity = PeerIdentity::from_npub(&peer.npub).unwrap();
let node_addr = *identity.node_addr();
assert!(node.retry_pending.contains_key(&node_addr));
let outcome = node.update_peers(Vec::new()).await.unwrap();
assert_eq!(outcome.added, 0);
assert_eq!(outcome.removed, 1);
assert!(node.config.peers.is_empty());
assert!(!node.retry_pending.contains_key(&node_addr));
assert!(!node.peer_aliases.contains_key(&node_addr));
}
#[tokio::test]
async fn update_peers_reports_updated_when_addresses_change() {
let mut node = make_node();
let npub = npub_for_test();
let original = auto_connect_peer(npub.clone(), "127.0.0.1:9");
let _ = node.update_peers(vec![original]).await.unwrap();
let new_version = auto_connect_peer(npub.clone(), "127.0.0.1:55180");
let outcome = node.update_peers(vec![new_version.clone()]).await.unwrap();
assert_eq!(outcome.added, 0);
assert_eq!(outcome.removed, 0);
assert_eq!(outcome.updated, 1);
assert_eq!(outcome.unchanged, 0);
assert_eq!(node.config.peers.len(), 1);
assert_eq!(node.config.peers[0].addresses[0].addr, "127.0.0.1:55180");
}
#[tokio::test]
async fn update_peers_reports_unchanged_for_identical_entry() {
let mut node = make_node();
let npub = npub_for_test();
let peer = auto_connect_peer(npub, "127.0.0.1:9");
let _ = node.update_peers(vec![peer.clone()]).await.unwrap();
let outcome = node.update_peers(vec![peer]).await.unwrap();
assert_eq!(outcome.added, 0);
assert_eq!(outcome.removed, 0);
assert_eq!(outcome.updated, 0);
assert_eq!(outcome.unchanged, 1);
}
#[tokio::test]
async fn update_peers_redials_existing_auto_peer_with_direct_hint() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let npub = npub_for_test();
let original = crate::config::PeerConfig {
npub: npub.clone(),
alias: None,
addresses: Vec::new(),
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
node.config.peers = vec![original];
let refreshed = auto_connect_peer(npub, "127.0.0.1:9");
let outcome = node.update_peers(vec![refreshed]).await.unwrap();
assert_eq!(outcome.added, 0);
assert_eq!(outcome.removed, 0);
assert_eq!(outcome.updated, 1);
assert_eq!(node.connection_count(), 1);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn update_peers_redials_unchanged_auto_peer_without_link() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let peer = auto_connect_peer(npub_for_test(), "127.0.0.1:9");
node.config.peers = vec![peer.clone()];
let outcome = node.update_peers(vec![peer]).await.unwrap();
assert_eq!(outcome.added, 0);
assert_eq!(outcome.removed, 0);
assert_eq!(outcome.updated, 0);
assert_eq!(outcome.unchanged, 1);
assert_eq!(node.connection_count(), 1);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn update_peers_races_alternate_path_for_active_peer_without_dropping_current_link() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let (peer_full, peer_identity) = peer_identity_for_outbound_refresh_owner(&node);
let peer_node_addr = *peer_identity.node_addr();
let old_addr = TransportAddr::from_string("127.0.0.1:7");
let old_link_id = LinkId::new(7);
let mut active_peer = ActivePeer::new(peer_identity, old_link_id, 1_000);
active_peer.set_current_addr(transport_id, &old_addr);
node.peers.insert(peer_node_addr, active_peer);
node.links.insert(
old_link_id,
Link::connectionless(
old_link_id,
transport_id,
old_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
let peer = auto_connect_peer(peer_full.npub(), "127.0.0.1:9");
node.config.peers = vec![peer.clone()];
let outcome = node.update_peers(vec![peer]).await.unwrap();
assert_eq!(outcome.unchanged, 1);
assert_eq!(node.peer_count(), 1, "current active peer must remain live");
assert_eq!(
node.connection_count(),
1,
"alternate path should be a pending handshake, not a peer replacement"
);
let active = node.get_peer(&peer_node_addr).unwrap();
assert_eq!(active.link_id(), old_link_id);
assert_eq!(active.current_addr(), Some(&old_addr));
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn update_peers_does_not_churn_active_peer_already_on_known_candidate() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let (peer_full, peer_identity) = peer_identity_for_outbound_refresh_owner(&node);
let peer_node_addr = *peer_identity.node_addr();
let current_addr = TransportAddr::from_string("127.0.0.1:9");
let old_link_id = LinkId::new(7);
let mut active_peer = ActivePeer::new(peer_identity, old_link_id, 1_000);
active_peer.set_current_addr(transport_id, ¤t_addr);
node.peers.insert(peer_node_addr, active_peer);
let peer = auto_connect_peer(peer_full.npub(), "127.0.0.1:9");
node.config.peers = vec![peer.clone()];
let outcome = node.update_peers(vec![peer]).await.unwrap();
assert_eq!(outcome.unchanged, 1);
assert_eq!(node.peer_count(), 1);
assert_eq!(
node.connection_count(),
0,
"known-good active concrete path should not be redialed every refresh"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[test]
fn active_peer_same_path_discovery_skips_fresh_peer() {
let mut node = make_node();
let (_peer_full, peer_identity) = peer_identity_for_outbound_refresh_owner(&node);
let peer_node_addr = *peer_identity.node_addr();
let transport_id = TransportId::new(1);
let current_addr = TransportAddr::from_string("127.0.0.1:9");
let mut active_peer = ActivePeer::new(peer_identity, LinkId::new(7), Node::now_ms());
active_peer.set_current_addr(transport_id, ¤t_addr);
node.peers.insert(peer_node_addr, active_peer);
let candidate = crate::config::PeerAddress::new("udp", "127.0.0.1:9");
assert!(node.active_peer_candidate_is_fresh_enough_to_skip(
&peer_node_addr,
std::slice::from_ref(&candidate),
));
}
#[test]
fn active_peer_same_path_discovery_refreshes_stale_peer() {
let mut node = make_node();
let (_peer_full, peer_identity) = peer_identity_for_outbound_refresh_owner(&node);
let peer_node_addr = *peer_identity.node_addr();
let transport_id = TransportId::new(1);
let current_addr = TransportAddr::from_string("127.0.0.1:9");
let stale_at = Node::now_ms().saturating_sub(
node.config
.node
.heartbeat_interval_secs
.saturating_add(1)
.saturating_mul(1000),
);
let mut active_peer = ActivePeer::new(peer_identity, LinkId::new(7), stale_at);
active_peer.set_current_addr(transport_id, ¤t_addr);
node.peers.insert(peer_node_addr, active_peer);
let candidate = crate::config::PeerAddress::new("udp", "127.0.0.1:9");
assert!(!node.active_peer_candidate_is_fresh_enough_to_skip(
&peer_node_addr,
std::slice::from_ref(&candidate),
));
}
#[tokio::test]
async fn update_peers_races_new_alternative_even_when_current_path_is_still_known() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let (peer_full, peer_identity) = peer_identity_for_outbound_refresh_owner(&node);
let peer_node_addr = *peer_identity.node_addr();
let current_addr = TransportAddr::from_string("127.0.0.1:9");
let new_addr = TransportAddr::from_string("127.0.0.1:10");
let old_link_id = LinkId::new(7);
let mut active_peer = ActivePeer::new(peer_identity, old_link_id, 1_000);
active_peer.set_current_addr(transport_id, ¤t_addr);
node.peers.insert(peer_node_addr, active_peer);
node.links.insert(
old_link_id,
Link::connectionless(
old_link_id,
transport_id,
current_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
let peer = crate::config::PeerConfig {
npub: peer_full.npub(),
alias: None,
addresses: vec![
crate::config::PeerAddress::new("udp", "127.0.0.1:9"),
crate::config::PeerAddress::new("udp", "127.0.0.1:10"),
],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
node.config.peers = vec![peer.clone()];
let outcome = node.update_peers(vec![peer]).await.unwrap();
assert_eq!(outcome.unchanged, 1);
assert_eq!(node.peer_count(), 1, "existing link must stay live");
assert_eq!(node.connection_count(), 1);
assert_eq!(
node.connections
.values()
.next()
.and_then(|conn| conn.source_addr()),
Some(&new_addr)
);
let active = node.get_peer(&peer_node_addr).unwrap();
assert_eq!(active.link_id(), old_link_id);
assert_eq!(active.current_addr(), Some(¤t_addr));
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn update_peers_races_more_alternatives_while_peer_is_connecting_with_budget() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let transport_id = TransportId::new(1);
let mut udp = UdpTransport::new(
transport_id,
Some("main".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let (peer_full, peer_identity) = peer_identity_for_outbound_refresh_owner(&node);
let peer_node_addr = *peer_identity.node_addr();
let current_addr = TransportAddr::from_string("127.0.0.1:9");
let old_link_id = LinkId::new(7);
let mut active_peer = ActivePeer::new(peer_identity, old_link_id, 1_000);
active_peer.set_current_addr(transport_id, ¤t_addr);
node.peers.insert(peer_node_addr, active_peer);
node.links.insert(
old_link_id,
Link::connectionless(
old_link_id,
transport_id,
current_addr,
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
let first = crate::config::PeerConfig {
npub: peer_full.npub(),
alias: None,
addresses: vec![
crate::config::PeerAddress::new("udp", "127.0.0.1:9"),
crate::config::PeerAddress::new("udp", "127.0.0.1:10"),
],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
node.config.peers = vec![first.clone()];
let _ = node.update_peers(vec![first]).await.unwrap();
assert_eq!(node.connection_count(), 1);
let refreshed = crate::config::PeerConfig {
npub: peer_full.npub(),
alias: None,
addresses: vec![
crate::config::PeerAddress::new("udp", "127.0.0.1:9"),
crate::config::PeerAddress::new("udp", "127.0.0.1:10"),
crate::config::PeerAddress::new("udp", "127.0.0.1:11"),
crate::config::PeerAddress::new("udp", "127.0.0.1:12"),
crate::config::PeerAddress::new("udp", "127.0.0.1:13"),
crate::config::PeerAddress::new("udp", "127.0.0.1:14"),
],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
let outcome = node.update_peers(vec![refreshed]).await.unwrap();
assert_eq!(outcome.updated, 1);
assert_eq!(
node.connection_count(),
4,
"one existing in-flight path plus three new paths should hit the per-peer race budget"
);
let attempted: std::collections::HashSet<_> = node
.connections
.values()
.filter_map(|conn| conn.source_addr().map(ToString::to_string))
.collect();
for addr in [
"127.0.0.1:10",
"127.0.0.1:11",
"127.0.0.1:12",
"127.0.0.1:13",
] {
assert!(attempted.contains(addr), "missing attempted path {addr}");
}
assert!(
!attempted.contains("127.0.0.1:14"),
"candidate racing should be bounded per peer"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn update_peers_races_primary_path_when_active_peer_uses_bootstrap_transport() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let bootstrap_id = TransportId::new(1);
let primary_id = TransportId::new(2);
for (transport_id, name) in [(bootstrap_id, "nostr-nat"), (primary_id, "main")] {
let mut udp = UdpTransport::new(
transport_id,
Some(name.to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx.clone(),
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
}
node.bootstrap_transports.insert(bootstrap_id);
let (peer_full, peer_identity) = peer_identity_for_outbound_refresh_owner(&node);
let peer_node_addr = *peer_identity.node_addr();
let current_addr = TransportAddr::from_string("127.0.0.1:9");
let old_link_id = LinkId::new(7);
let mut active_peer = ActivePeer::new(peer_identity, old_link_id, 1_000);
active_peer.set_current_addr(bootstrap_id, ¤t_addr);
node.peers.insert(peer_node_addr, active_peer);
let peer = auto_connect_peer(peer_full.npub(), "127.0.0.1:9");
node.config.peers = vec![peer.clone()];
let outcome = node.update_peers(vec![peer]).await.unwrap();
assert_eq!(outcome.unchanged, 1);
assert_eq!(node.peer_count(), 1);
assert_eq!(
node.connection_count(),
1,
"bootstrap NAT path should not suppress a primary-transport refresh"
);
let conn = node.connections.values().next().unwrap();
assert_eq!(conn.transport_id(), Some(primary_id));
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn process_pending_retries_races_primary_path_for_active_bootstrap_peer() {
let mut node = make_node();
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let bootstrap_id = TransportId::new(1);
let primary_id = TransportId::new(2);
for (transport_id, name) in [(bootstrap_id, "nostr-nat"), (primary_id, "main")] {
let mut udp = UdpTransport::new(
transport_id,
Some(name.to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx.clone(),
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
}
node.bootstrap_transports.insert(bootstrap_id);
let (peer_full, peer_identity) = peer_identity_for_outbound_refresh_owner(&node);
let peer_node_addr = *peer_identity.node_addr();
let mut active_peer = ActivePeer::new(peer_identity, LinkId::new(7), 1_000);
active_peer.set_current_addr(bootstrap_id, &TransportAddr::from_string("127.0.0.1:8"));
node.peers.insert(peer_node_addr, active_peer);
let peer = auto_connect_peer(peer_full.npub(), "127.0.0.1:9");
node.config.peers = vec![peer.clone()];
let mut state = super::super::retry::RetryState::new(peer);
state.retry_after_ms = 0;
state.reconnect = true;
node.retry_pending.insert(peer_node_addr, state);
node.process_pending_retries(1_000).await;
assert_eq!(node.peer_count(), 1);
assert_eq!(
node.connection_count(),
1,
"retry maintenance should race the configured direct path even while fallback remains active"
);
let conn = node.connections.values().next().unwrap();
assert_eq!(conn.transport_id(), Some(primary_id));
assert!(
node.retry_pending
.get(&peer_node_addr)
.is_some_and(|state| state.retry_after_ms > 1_000),
"retry state should stay pending until the direct-path handshake wins or times out"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn active_fallback_static_hint_also_queues_nostr_traversal() {
use crate::config::NostrDiscoveryPolicy;
use crate::node::session::{EndToEndState, SessionEntry};
use crate::noise::HandshakeState;
let peer_full = Identity::generate();
let peer_identity = PeerIdentity::from_pubkey_full(peer_full.pubkey_full());
let peer_node_addr = *peer_identity.node_addr();
let peer_config = crate::config::PeerConfig {
npub: peer_full.npub(),
alias: None,
addresses: vec![crate::config::PeerAddress::new("udp", "127.0.0.1:9")],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: false,
};
let mut config = Config::new();
config.node.discovery.nostr.enabled = true;
config.node.discovery.nostr.policy = NostrDiscoveryPolicy::ConfiguredOnly;
config.peers = vec![peer_config.clone()];
let mut node = Node::new(config).expect("node");
let (packet_tx, packet_rx) = packet_channel(64);
node.packet_tx = Some(packet_tx.clone());
node.packet_rx = Some(packet_rx);
let bootstrap_id = TransportId::new(1);
let primary_id = TransportId::new(2);
for (transport_id, name) in [(bootstrap_id, "fips-mesh"), (primary_id, "main")] {
let mut udp = UdpTransport::new(
transport_id,
Some(name.to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx.clone(),
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
}
node.bootstrap_transports.insert(bootstrap_id);
let mut active_peer = ActivePeer::new(peer_identity, LinkId::new(7), 1_000);
active_peer.set_current_addr(bootstrap_id, &TransportAddr::from_string("127.0.0.1:8"));
node.peers.insert(peer_node_addr, active_peer);
let mut initiator =
HandshakeState::new_initiator(node.identity.keypair(), peer_full.pubkey_full());
let mut responder = HandshakeState::new_responder(peer_full.keypair());
initiator.set_local_epoch([0x01; 8]);
responder.set_local_epoch([0x02; 8]);
let msg1 = initiator.write_message_1().expect("msg1");
responder.read_message_1(&msg1).expect("read msg1");
let msg2 = responder.write_message_2().expect("msg2");
initiator.read_message_2(&msg2).expect("read msg2");
node.sessions.insert(
peer_node_addr,
SessionEntry::new(
peer_node_addr,
peer_full.pubkey_full(),
EndToEndState::Established(initiator.into_session().expect("session")),
1_000,
true,
),
);
let bootstrap = Arc::new(NostrDiscovery::new_for_test());
node.nostr_discovery = Some(bootstrap.clone());
let mut state = super::super::retry::RetryState::new(peer_config);
state.retry_after_ms = 0;
state.reconnect = true;
node.retry_pending.insert(peer_node_addr, state);
node.process_pending_retries(1_000).await;
assert_eq!(
node.connection_count(),
1,
"static direct hint should still be raced while fallback remains active"
);
assert_eq!(
bootstrap.active_initiator_count_for_test().await,
1,
"stale static hints must not suppress Nostr/mesh traversal refresh"
);
for transport in node.transports.values_mut() {
transport.stop().await.ok();
}
}
#[tokio::test]
async fn mesh_signal_warms_session_instead_of_dropping_without_established_session() {
use super::spanning_tree::{run_tree_test, verify_tree_convergence};
use crate::discovery::nostr::{MeshTraversalSignal, TraversalOffer};
let mut nodes = run_tree_test(2, &[(0, 1)], false).await;
verify_tree_convergence(&nodes);
let peer_node_addr = *nodes[1].node.node_addr();
let peer_npub = nodes[1].node.identity().npub();
let peer_config = crate::config::PeerConfig {
npub: peer_npub.clone(),
alias: None,
addresses: vec![crate::config::PeerAddress::with_priority("udp", "nat", 1)],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: false,
};
let bootstrap = Arc::new(NostrDiscovery::new_for_test());
bootstrap.push_mesh_signal_for_test(MeshTraversalSignal::Offer {
peer_npub: peer_npub.clone(),
offer: TraversalOffer {
message_type: "offer".to_string(),
session_id: "session".to_string(),
issued_at: 1,
expires_at: 2,
nonce: "nonce".to_string(),
sender_npub: nodes[0].node.identity().npub(),
recipient_npub: peer_npub,
reflexive_address: None,
local_addresses: Vec::new(),
stun_server: None,
},
});
nodes[0].node.config.node.discovery.nostr.enabled = true;
nodes[0].node.config.peers = vec![peer_config];
nodes[0].node.nostr_discovery = Some(bootstrap.clone());
nodes[0].node.poll_nostr_discovery().await;
assert!(
nodes[0]
.node
.sessions
.get(&peer_node_addr)
.is_some_and(|entry| entry.is_initiating()),
"mesh signal delivery should warm an end-to-end session over the existing mesh route"
);
assert_eq!(
bootstrap.drain_mesh_signals().await.len(),
1,
"mesh signal should be deferred until the warmed session is established"
);
}
#[tokio::test]
async fn outbound_refresh_promotion_moves_active_peer_to_new_transport_tuple() {
let mut node = make_node();
let (peer_full, peer_identity) = peer_identity_for_outbound_refresh_owner(&node);
let peer_node_addr = *peer_identity.node_addr();
let old_transport_id = TransportId::new(1);
let old_link_id = LinkId::new(10);
let old_addr = TransportAddr::from_string("127.0.0.1:7000");
let mut active_peer = ActivePeer::new(peer_identity, old_link_id, 1_000);
active_peer.set_current_addr(old_transport_id, &old_addr);
node.peers.insert(peer_node_addr, active_peer);
node.links.insert(
old_link_id,
Link::connectionless(
old_link_id,
old_transport_id,
old_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
node.addr_to_link
.insert((old_transport_id, old_addr.clone()), old_link_id);
let new_transport_id = TransportId::new(2);
let new_link_id = LinkId::new(11);
let new_addr = TransportAddr::from_string("127.0.0.1:9000");
let mut conn = PeerConnection::outbound(new_link_id, peer_identity, 2_000);
let our_index = node.index_allocator.allocate().unwrap();
let noise_msg1 = conn
.start_handshake(node.identity.keypair(), node.startup_epoch, 2_000)
.unwrap();
conn.set_our_index(our_index);
conn.set_transport_id(new_transport_id);
conn.set_source_addr(new_addr.clone());
node.links.insert(
new_link_id,
Link::connectionless(
new_link_id,
new_transport_id,
new_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
node.addr_to_link
.insert((new_transport_id, new_addr.clone()), new_link_id);
node.connections.insert(new_link_id, conn);
node.pending_outbound
.insert((new_transport_id, our_index.as_u32()), new_link_id);
let mut responder = PeerConnection::inbound(LinkId::new(99), 2_000);
let noise_msg2 = responder
.receive_handshake_init(peer_full.keypair(), [0x42; 8], &noise_msg1, 2_000)
.unwrap();
let their_index = SessionIndex::new(77);
let wire_msg2 = build_msg2(their_index, our_index, &noise_msg2);
let packet =
ReceivedPacket::with_timestamp(new_transport_id, new_addr.clone(), wire_msg2, 2_100);
node.handle_msg2(packet).await;
assert_eq!(node.connection_count(), 0);
assert!(node.pending_outbound.is_empty());
assert!(
!node.links.contains_key(&old_link_id),
"old active link should be retired after successful refresh"
);
assert!(
node.links.contains_key(&new_link_id),
"new outbound link should remain active"
);
assert_eq!(
node.addr_to_link.get(&(old_transport_id, old_addr.clone())),
None
);
assert_eq!(
node.addr_to_link
.get(&(new_transport_id, new_addr.clone()))
.copied(),
Some(new_link_id)
);
let active = node.get_peer(&peer_node_addr).unwrap();
assert_eq!(active.link_id(), new_link_id);
assert_eq!(active.transport_id(), Some(new_transport_id));
assert_eq!(active.current_addr(), Some(&new_addr));
assert_eq!(active.our_index(), Some(our_index));
assert_eq!(active.their_index(), Some(their_index));
assert_eq!(
node.peers_by_index
.get(&(new_transport_id, our_index.as_u32()))
.copied(),
Some(peer_node_addr)
);
}
#[tokio::test]
async fn outbound_restart_promotion_clears_stale_fsp_session() {
use crate::node::session::{EndToEndState, SessionEntry};
use crate::noise::HandshakeState;
let mut node = make_node();
let peer_full = Identity::generate();
let peer_identity = PeerIdentity::from_pubkey_full(peer_full.pubkey_full());
let peer_node_addr = *peer_identity.node_addr();
let old_transport_id = TransportId::new(1);
let old_link_id = LinkId::new(10);
let old_addr = TransportAddr::from_string("127.0.0.1:7000");
let mut old_conn = PeerConnection::outbound(old_link_id, peer_identity, 1_000);
let old_msg1 = old_conn
.start_handshake(node.identity.keypair(), node.startup_epoch, 1_000)
.unwrap();
let mut old_responder = PeerConnection::inbound(LinkId::new(98), 1_000);
let old_msg2 = old_responder
.receive_handshake_init(peer_full.keypair(), [0x11; 8], &old_msg1, 1_000)
.unwrap();
old_conn.complete_handshake(&old_msg2, 1_000).unwrap();
let old_our_index = node.index_allocator.allocate().unwrap();
old_conn.set_our_index(old_our_index);
old_conn.set_their_index(SessionIndex::new(66));
old_conn.set_transport_id(old_transport_id);
old_conn.set_source_addr(old_addr.clone());
node.links.insert(
old_link_id,
Link::connectionless(
old_link_id,
old_transport_id,
old_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
node.addr_to_link
.insert((old_transport_id, old_addr.clone()), old_link_id);
node.connections.insert(old_link_id, old_conn);
node.promote_connection(old_link_id, peer_identity, 1_100)
.unwrap();
assert_eq!(
node.get_peer(&peer_node_addr).unwrap().remote_epoch(),
Some([0x11; 8])
);
let mut fsp_initiator =
HandshakeState::new_initiator(node.identity.keypair(), peer_full.pubkey_full());
let mut fsp_responder = HandshakeState::new_responder(peer_full.keypair());
fsp_initiator.set_local_epoch([0x01; 8]);
fsp_responder.set_local_epoch([0x02; 8]);
let fsp_msg1 = fsp_initiator.write_message_1().unwrap();
fsp_responder.read_message_1(&fsp_msg1).unwrap();
let fsp_msg2 = fsp_responder.write_message_2().unwrap();
fsp_initiator.read_message_2(&fsp_msg2).unwrap();
let stale_session = fsp_initiator.into_session().unwrap();
node.sessions.insert(
peer_node_addr,
SessionEntry::new(
peer_node_addr,
peer_full.pubkey_full(),
EndToEndState::Established(stale_session),
1_200,
true,
),
);
assert!(node.sessions.contains_key(&peer_node_addr));
let new_transport_id = TransportId::new(2);
let new_link_id = LinkId::new(11);
let new_addr = TransportAddr::from_string("127.0.0.1:9000");
let mut new_conn = PeerConnection::outbound(new_link_id, peer_identity, 2_000);
let new_msg1 = new_conn
.start_handshake(node.identity.keypair(), node.startup_epoch, 2_000)
.unwrap();
let mut new_responder = PeerConnection::inbound(LinkId::new(99), 2_000);
let new_msg2 = new_responder
.receive_handshake_init(peer_full.keypair(), [0x22; 8], &new_msg1, 2_000)
.unwrap();
new_conn.complete_handshake(&new_msg2, 2_100).unwrap();
let new_our_index = node.index_allocator.allocate().unwrap();
let their_index = SessionIndex::new(77);
new_conn.set_our_index(new_our_index);
new_conn.set_their_index(their_index);
new_conn.set_transport_id(new_transport_id);
new_conn.set_source_addr(new_addr.clone());
node.links.insert(
new_link_id,
Link::connectionless(
new_link_id,
new_transport_id,
new_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
node.addr_to_link
.insert((new_transport_id, new_addr.clone()), new_link_id);
node.connections.insert(new_link_id, new_conn);
let result = node
.promote_connection(new_link_id, peer_identity, 2_100)
.unwrap();
assert!(matches!(result, PromotionResult::CrossConnectionWon { .. }));
let active = node.get_peer(&peer_node_addr).unwrap();
assert_eq!(active.link_id(), new_link_id);
assert_eq!(active.remote_epoch(), Some([0x22; 8]));
assert!(
!node.sessions.contains_key(&peer_node_addr),
"old FSP session must be removed when the peer's startup epoch changes"
);
}
#[tokio::test]
async fn fmp_recovery_rekey_epoch_change_clears_stale_fsp_session() {
use crate::node::session::{EndToEndState, SessionEntry};
use crate::noise::HandshakeState;
let mut node = make_node();
let peer_full = Identity::generate();
let peer_identity = PeerIdentity::from_pubkey_full(peer_full.pubkey_full());
let peer_node_addr = *peer_identity.node_addr();
let transport_id = TransportId::new(1);
let (packet_tx, _packet_rx) = packet_channel(64);
let mut udp = UdpTransport::new(
transport_id,
Some("rekey-test".to_string()),
crate::config::UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
..Default::default()
},
packet_tx,
);
udp.start_async().await.unwrap();
node.transports
.insert(transport_id, TransportHandle::Udp(udp));
let link_id = LinkId::new(10);
let remote_addr = TransportAddr::from_string("127.0.0.1:9");
let mut conn = PeerConnection::outbound(link_id, peer_identity, 1_000);
let old_msg1 = conn
.start_handshake(node.identity.keypair(), node.startup_epoch, 1_000)
.unwrap();
let mut old_responder = PeerConnection::inbound(LinkId::new(98), 1_000);
let old_msg2 = old_responder
.receive_handshake_init(peer_full.keypair(), [0x11; 8], &old_msg1, 1_000)
.unwrap();
conn.complete_handshake(&old_msg2, 1_000).unwrap();
let our_index = node.index_allocator.allocate().unwrap();
conn.set_our_index(our_index);
conn.set_their_index(SessionIndex::new(66));
conn.set_transport_id(transport_id);
conn.set_source_addr(remote_addr.clone());
node.links.insert(
link_id,
Link::connectionless(
link_id,
transport_id,
remote_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
node.addr_to_link
.insert((transport_id, remote_addr.clone()), link_id);
node.connections.insert(link_id, conn);
node.promote_connection(link_id, peer_identity, 1_100)
.unwrap();
assert_eq!(
node.get_peer(&peer_node_addr).unwrap().remote_epoch(),
Some([0x11; 8])
);
let mut fsp_initiator =
HandshakeState::new_initiator(node.identity.keypair(), peer_full.pubkey_full());
let mut fsp_responder = HandshakeState::new_responder(peer_full.keypair());
fsp_initiator.set_local_epoch([0x01; 8]);
fsp_responder.set_local_epoch([0x02; 8]);
let fsp_msg1 = fsp_initiator.write_message_1().unwrap();
fsp_responder.read_message_1(&fsp_msg1).unwrap();
let fsp_msg2 = fsp_responder.write_message_2().unwrap();
fsp_initiator.read_message_2(&fsp_msg2).unwrap();
let stale_session = fsp_initiator.into_session().unwrap();
node.sessions.insert(
peer_node_addr,
SessionEntry::new(
peer_node_addr,
peer_full.pubkey_full(),
EndToEndState::Established(stale_session),
1_200,
true,
),
);
assert!(node.sessions.contains_key(&peer_node_addr));
assert!(node.initiate_rekey(&peer_node_addr).await);
let rekey_msg1 = node
.get_peer(&peer_node_addr)
.unwrap()
.rekey_msg1()
.expect("rekey msg1 should be stored")
.to_vec();
let header = Msg1Header::parse(&rekey_msg1).expect("valid rekey msg1");
let noise_msg1 = &rekey_msg1[header.noise_msg1_offset..];
let mut new_responder = HandshakeState::new_responder(peer_full.keypair());
new_responder.set_local_epoch([0x22; 8]);
new_responder.read_message_1(noise_msg1).unwrap();
let new_msg2 = new_responder.write_message_2().unwrap();
let their_index = SessionIndex::new(77);
let wire_msg2 = build_msg2(their_index, header.sender_idx, &new_msg2);
let packet =
ReceivedPacket::with_timestamp(transport_id, remote_addr.clone(), wire_msg2, 2_100);
node.handle_msg2(packet).await;
let active = node.get_peer(&peer_node_addr).unwrap();
assert_eq!(active.remote_epoch(), Some([0x22; 8]));
assert!(
active.pending_new_session().is_some(),
"FMP recovery rekey should still complete and await cutover"
);
assert!(
!node.sessions.contains_key(&peer_node_addr),
"old FSP session must be removed when FMP rekey learns a new peer startup epoch"
);
let mut transport = node.transports.remove(&transport_id).unwrap();
transport.stop().await.unwrap();
}
#[tokio::test]
async fn update_peers_treats_seen_at_ms_as_metadata_not_a_change() {
let mut node = make_node();
let npub = npub_for_test();
let baseline = auto_connect_peer(npub.clone(), "127.0.0.1:9");
let _ = node.update_peers(vec![baseline]).await.unwrap();
let mut refreshed = auto_connect_peer(npub, "127.0.0.1:9");
refreshed.addresses[0] = refreshed.addresses[0]
.clone()
.with_seen_at_ms(1_700_000_000_000);
let outcome = node.update_peers(vec![refreshed]).await.unwrap();
assert_eq!(outcome.updated, 0);
assert_eq!(outcome.unchanged, 1);
}
#[tokio::test]
async fn update_peers_rejects_invalid_npub_atomically() {
let mut node = make_node();
let valid = auto_connect_peer(npub_for_test(), "127.0.0.1:9");
let invalid = crate::config::PeerConfig {
npub: "not-a-real-npub".to_string(),
alias: None,
addresses: Vec::new(),
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
let result = node.update_peers(vec![valid, invalid]).await;
assert!(result.is_err(), "invalid npub must reject the whole batch");
assert!(
node.config.peers.is_empty(),
"rejected batch must not partially apply",
);
}
fn inject_dummy_peers(node: &mut Node, count: usize) {
for i in 0..count {
let identity = make_peer_identity();
let addr = *identity.node_addr();
let peer = ActivePeer::new(identity, LinkId::new((i + 1) as u64), 0);
node.peers.insert(addr, peer);
}
}
#[test]
fn outbound_admission_check_direct() {
let mut node = make_node();
node.set_max_peers(3);
assert!(node.outbound_admission_check());
inject_dummy_peers(&mut node, 2);
assert!(node.outbound_admission_check());
inject_dummy_peers(&mut node, 1);
assert!(!node.outbound_admission_check());
inject_dummy_peers(&mut node, 1);
assert!(!node.outbound_admission_check());
let mut uncapped = make_node();
uncapped.set_max_peers(0);
assert!(uncapped.outbound_admission_check());
inject_dummy_peers(&mut uncapped, 50);
assert!(uncapped.outbound_admission_check());
}
#[test]
fn open_discovery_budget_counts_active_non_configured_peers() {
let mut config = Config::new();
config.node.discovery.nostr.open_discovery_max_pending = 2;
let mut node = Node::new(config).unwrap();
let configured_npubs = std::collections::HashSet::new();
assert_eq!(node.open_discovery_enqueue_budget(&configured_npubs), 2);
inject_dummy_peers(&mut node, 1);
assert_eq!(node.open_discovery_enqueue_budget(&configured_npubs), 1);
inject_dummy_peers(&mut node, 1);
assert_eq!(
node.open_discovery_enqueue_budget(&configured_npubs),
0,
"live open-discovery peers must consume the same cap as pending retries"
);
}
#[test]
fn outbound_admission_check_respects_connection_and_link_caps() {
let mut node = make_node();
node.set_max_connections(2);
node.set_max_links(2);
assert!(node.outbound_admission_check());
node.links.insert(
LinkId::new(1),
Link::connectionless(
LinkId::new(1),
TransportId::new(1),
TransportAddr::from_string("127.0.0.1:10"),
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
node.links.insert(
LinkId::new(2),
Link::connectionless(
LinkId::new(2),
TransportId::new(1),
TransportAddr::from_string("127.0.0.1:11"),
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
assert!(
!node.outbound_admission_check(),
"bootstrap/open-discovery work must stop at max_links, not only max_peers"
);
let mut node = make_node();
node.set_max_connections(1);
let peer_identity = make_peer_identity();
let link_id = LinkId::new(3);
let remote_addr = TransportAddr::from_string("127.0.0.1:12");
let mut conn = PeerConnection::outbound(link_id, peer_identity, 1_000);
conn.set_transport_id(TransportId::new(1));
conn.set_source_addr(remote_addr.clone());
node.links.insert(
link_id,
Link::connectionless(
link_id,
TransportId::new(1),
remote_addr,
LinkDirection::Outbound,
Duration::from_millis(100),
),
);
node.connections.insert(link_id, conn);
assert!(
!node.outbound_admission_check(),
"bootstrap/open-discovery work must stop at max_connections"
);
}
#[tokio::test]
async fn process_pending_retries_gated_at_capacity() {
let mut node = make_node();
node.set_max_peers(2);
inject_dummy_peers(&mut node, 2);
let peer_identity = Identity::generate();
let peer_npub = peer_identity.npub();
let peer_node_addr = *PeerIdentity::from_npub(&peer_npub).unwrap().node_addr();
let mut state = super::super::retry::RetryState::new(crate::config::PeerConfig::new(
peer_npub,
"udp",
"127.0.0.1:9",
));
state.retry_after_ms = 0;
state.reconnect = true;
node.retry_pending.insert(peer_node_addr, state);
let before_peers = node.peer_count();
let before_connections = node.connection_count();
node.process_pending_retries(1_000).await;
let state = node
.retry_pending
.get(&peer_node_addr)
.expect("retry entry must be preserved when suppressed at capacity");
assert_eq!(state.retry_count, 0);
assert_eq!(state.retry_after_ms, 0);
assert_eq!(node.peer_count(), before_peers);
assert_eq!(node.connection_count(), before_connections);
}
#[tokio::test]
async fn poll_nostr_discovery_established_gated_at_capacity() {
use crate::discovery::EstablishedTraversal;
use std::net::UdpSocket;
let mut node = make_node();
node.set_max_peers(2);
inject_dummy_peers(&mut node, 2);
let bootstrap = Arc::new(NostrDiscovery::new_for_test());
let socket = UdpSocket::bind("127.0.0.1:0").expect("bind local UDP socket");
let remote_addr = "127.0.0.1:9999".parse().expect("parse remote addr");
let peer_identity = Identity::generate();
bootstrap.push_event_for_test(BootstrapEvent::Established {
traversal: EstablishedTraversal::new(
"cap-test-session",
peer_identity.npub(),
remote_addr,
socket,
),
});
node.nostr_discovery = Some(bootstrap);
let before_peers = node.peer_count();
let before_links = node.link_count();
let before_connections = node.connection_count();
node.poll_nostr_discovery().await;
assert_eq!(node.peer_count(), before_peers);
assert_eq!(node.link_count(), before_links);
assert_eq!(node.connection_count(), before_connections);
}
#[tokio::test]
async fn poll_nostr_discovery_failed_active_peer_keeps_retry_with_backoff() {
let peer_identity = Identity::generate();
let peer_config = crate::config::PeerConfig {
npub: peer_identity.npub(),
alias: None,
addresses: vec![crate::config::PeerAddress::with_priority("udp", "nat", 1)],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
let peer = PeerIdentity::from_npub(&peer_config.npub).expect("peer identity");
let peer_addr = *peer.node_addr();
let mut config = Config::new();
config.node.discovery.nostr.enabled = true;
config.peers.push(peer_config.clone());
let mut node = Node::new(config).expect("node");
node.peers
.insert(peer_addr, ActivePeer::new(peer, LinkId::new(7), 0));
let bootstrap = Arc::new(NostrDiscovery::new_for_test());
bootstrap.push_event_for_test(BootstrapEvent::Failed {
peer_config: peer_config.clone(),
reason: "signal timeout waiting for answer".to_string(),
});
node.nostr_discovery = Some(bootstrap);
node.poll_nostr_discovery().await;
let state = node
.retry_pending
.get(&peer_addr)
.expect("failed direct upgrade should keep active-peer retry");
assert_eq!(state.retry_count, 1);
assert!(
state.retry_after_ms > 0,
"failed direct upgrade should back off instead of retrying every tick"
);
assert_eq!(state.peer_config.npub, peer_config.npub);
}
#[test]
fn queue_active_fallback_direct_retries_seeds_configured_relayed_peer() {
let peer_identity = Identity::generate();
let peer_config = crate::config::PeerConfig {
npub: peer_identity.npub(),
alias: None,
addresses: vec![crate::config::PeerAddress::with_priority("udp", "nat", 1)],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
let peer = PeerIdentity::from_npub(&peer_config.npub).expect("peer identity");
let peer_addr = *peer.node_addr();
let mut config = Config::new();
config.node.discovery.nostr.enabled = true;
config.peers.push(peer_config.clone());
let mut node = Node::new(config).expect("node");
node.peers
.insert(peer_addr, ActivePeer::new(peer, LinkId::new(7), 0));
let bootstrap = Arc::new(NostrDiscovery::new_for_test());
node.queue_active_fallback_direct_retries(&bootstrap);
let state = node
.retry_pending
.get(&peer_addr)
.expect("active fallback peer should get direct retry state");
assert_eq!(state.peer_config.npub, peer_config.npub);
assert_eq!(state.retry_count, 0);
assert!(state.reconnect);
}
#[tokio::test]
async fn process_pending_retries_allows_active_direct_refresh_at_peer_capacity() {
let peer_identity = Identity::generate();
let peer_config = crate::config::PeerConfig {
npub: peer_identity.npub(),
alias: None,
addresses: vec![crate::config::PeerAddress::with_priority("udp", "nat", 1)],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: true,
};
let peer = PeerIdentity::from_npub(&peer_config.npub).expect("peer identity");
let peer_addr = *peer.node_addr();
let mut config = Config::new();
config.node.discovery.nostr.enabled = true;
config.peers.push(peer_config.clone());
let mut node = Node::new(config).expect("node");
node.set_max_peers(1);
node.peers
.insert(peer_addr, ActivePeer::new(peer, LinkId::new(7), 0));
let mut state = super::super::retry::RetryState::new(peer_config);
state.reconnect = true;
state.retry_after_ms = 0;
node.retry_pending.insert(peer_addr, state);
node.process_pending_retries(1_000).await;
let state = node
.retry_pending
.get(&peer_addr)
.expect("active peer retry should remain scheduled after failed initiation");
assert_eq!(
state.retry_count, 1,
"active direct refresh should be processed even when peer slots are full"
);
}
#[test]
fn nostr_discovery_outbound_admission_atomic_roundtrip() {
let bootstrap = NostrDiscovery::new_for_test();
assert!(bootstrap.outbound_admission_allowed());
bootstrap.set_outbound_admission(false);
assert!(!bootstrap.outbound_admission_allowed());
bootstrap.set_outbound_admission(true);
assert!(bootstrap.outbound_admission_allowed());
assert!(bootstrap.direct_refresh_admission_allowed());
bootstrap.set_direct_refresh_admission(false);
assert!(!bootstrap.direct_refresh_admission_allowed());
bootstrap.set_direct_refresh_admission(true);
assert!(bootstrap.direct_refresh_admission_allowed());
}
#[tokio::test]
async fn poll_nostr_discovery_established_active_peer_bypasses_peer_capacity() {
use crate::discovery::EstablishedTraversal;
use std::net::UdpSocket;
let peer_identity = Identity::generate();
let peer_config = crate::config::PeerConfig {
npub: peer_identity.npub(),
alias: None,
addresses: vec![crate::config::PeerAddress::with_priority("udp", "nat", 1)],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: false,
};
let peer = PeerIdentity::from_npub(&peer_config.npub).expect("peer identity");
let peer_addr = *peer.node_addr();
let mut config = Config::new();
config.node.discovery.nostr.enabled = true;
config.peers.push(peer_config.clone());
let mut node = Node::new(config).expect("node");
node.set_max_peers(1);
node.peers
.insert(peer_addr, ActivePeer::new(peer, LinkId::new(7), 0));
let bootstrap = Arc::new(NostrDiscovery::new_for_test());
let socket = UdpSocket::bind("127.0.0.1:0").expect("bind local UDP socket");
let remote_addr = "127.0.0.1:9999".parse().expect("parse remote addr");
bootstrap.push_event_for_test(BootstrapEvent::Established {
traversal: EstablishedTraversal::new(
"active-refresh-session",
peer_identity.npub(),
remote_addr,
socket,
),
});
node.nostr_discovery = Some(bootstrap);
node.poll_nostr_discovery().await;
assert!(
node.retry_pending.contains_key(&peer_addr),
"active-peer traversal should reach adoption even when peer slots are full"
);
}
#[test]
fn mesh_signaling_allows_configured_roster_peer_without_established_session() {
use crate::node::session::{EndToEndState, SessionEntry};
use crate::noise::HandshakeState;
let peer_identity = Identity::generate();
let peer_config = crate::config::PeerConfig {
npub: peer_identity.npub(),
alias: None,
addresses: vec![crate::config::PeerAddress::with_priority("udp", "nat", 1)],
connect_policy: crate::config::ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: false,
};
let mut config = Config::new();
config.node.discovery.nostr.enabled = true;
config.peers.push(peer_config.clone());
let mut node = Node::new(config).expect("node");
assert!(
node.mesh_signaling_allowed_for_peer(&peer_config),
"configured roster peers should be allowed to use mesh signaling before the end-to-end session is warm"
);
let mut initiator =
HandshakeState::new_initiator(node.identity.keypair(), peer_identity.pubkey_full());
let mut responder = HandshakeState::new_responder(peer_identity.keypair());
initiator.set_local_epoch([0x01; 8]);
responder.set_local_epoch([0x02; 8]);
let msg1 = initiator.write_message_1().expect("msg1");
responder.read_message_1(&msg1).expect("read msg1");
let msg2 = responder.write_message_2().expect("msg2");
initiator.read_message_2(&msg2).expect("read msg2");
let session = initiator.into_session().expect("session");
let peer_addr = *PeerIdentity::from_npub(&peer_config.npub)
.expect("peer identity")
.node_addr();
node.sessions.insert(
peer_addr,
SessionEntry::new(
peer_addr,
peer_identity.pubkey_full(),
EndToEndState::Established(session),
1_000,
true,
),
);
assert!(node.mesh_signaling_allowed_for_peer(&peer_config));
assert!(
!node
.configured_discovery_fallback_transit(&peer_addr)
.expect("peer should still be configured"),
"mesh signaling should not require ambient transit permission"
);
let unconfigured = Identity::generate();
let unconfigured_peer = crate::config::PeerConfig::new(unconfigured.npub(), "udp", "nat");
assert!(!node.mesh_signaling_allowed_for_peer(&unconfigured_peer));
}
async fn craft_and_send_msg1(
node_b: &Node,
sender_identity: &Identity,
socket_a: &tokio::net::UdpSocket,
addr_b: std::net::SocketAddr,
timestamp_ms: u64,
) -> NodeAddr {
use crate::node::wire::build_msg1;
use crate::utils::index::SessionIndex;
let peer_b_identity = PeerIdentity::from_pubkey_full(node_b.identity.pubkey_full());
let sender_pubkey_id = PeerIdentity::from_pubkey_full(sender_identity.pubkey_full());
let sender_node_addr = *sender_pubkey_id.node_addr();
let link_id = LinkId::new(0xDEAD_BEEF);
let mut conn = PeerConnection::outbound(link_id, peer_b_identity, timestamp_ms);
let sender_keypair = sender_identity.keypair();
let mut startup_epoch = [0u8; 8];
rand::Rng::fill_bytes(&mut rand::rng(), &mut startup_epoch);
let noise_msg1 = conn
.start_handshake(sender_keypair, startup_epoch, timestamp_ms)
.expect("start_handshake should produce noise msg1");
let sender_index = SessionIndex::new(0x5151);
let wire_msg1 = build_msg1(sender_index, &noise_msg1);
socket_a
.send_to(&wire_msg1, addr_b)
.await
.expect("sender_socket.send_to");
sender_node_addr
}
async fn pump_one_msg1_into_node(
node: &mut Node,
packet_rx: &mut crate::transport::PacketRx,
timeout_ms: u64,
) -> Result<(), &'static str> {
use tokio::time::{Duration, timeout};
let packet = timeout(Duration::from_millis(timeout_ms), packet_rx.recv())
.await
.map_err(|_| "timed out waiting for msg1 on packet_rx")?
.ok_or("packet channel closed")?;
node.handle_msg1(packet).await;
Ok(())
}
#[tokio::test]
async fn handle_msg1_silent_drops_at_cap_for_new_peer() {
use crate::config::UdpConfig;
use tokio::time::{Duration, timeout};
let mut node = make_node();
node.set_max_peers(2);
inject_dummy_peers(&mut node, 2);
assert_eq!(node.peer_count(), 2);
let transport_id_b = TransportId::new(1);
let udp_config = UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
mtu: Some(1280),
..Default::default()
};
let (packet_tx_b, mut packet_rx_b) = packet_channel(64);
let mut transport_b = UdpTransport::new(transport_id_b, None, udp_config, packet_tx_b);
transport_b.start_async().await.unwrap();
let addr_b = transport_b.local_addr().unwrap();
node.transports
.insert(transport_id_b, TransportHandle::Udp(transport_b));
let socket_a = tokio::net::UdpSocket::bind("127.0.0.1:0")
.await
.expect("bind sender socket");
let before_peers = node.peer_count();
let before_pending = node.msg1_rate_limiter.pending_count();
let sender = Identity::generate();
let sender_node_addr = craft_and_send_msg1(&node, &sender, &socket_a, addr_b, 1000).await;
assert!(!node.peers.contains_key(&sender_node_addr));
pump_one_msg1_into_node(&mut node, &mut packet_rx_b, 1000)
.await
.expect("msg1 must reach packet_rx_b");
assert_eq!(node.peer_count(), before_peers);
assert!(!node.peers.contains_key(&sender_node_addr));
assert_eq!(node.msg1_rate_limiter.pending_count(), before_pending);
let mut buf = [0u8; 2048];
let recv = timeout(Duration::from_millis(300), socket_a.recv_from(&mut buf)).await;
let received_bytes = recv.ok().and_then(|inner| inner.ok()).map(|(n, _)| n);
assert!(
received_bytes.is_none(),
"Msg2 must not be sent at max_peers cap; observed {received_bytes:?} bytes"
);
}
#[tokio::test]
async fn handle_msg1_admits_existing_peer_at_cap() {
use crate::config::UdpConfig;
let mut node = make_node();
node.set_max_peers(2);
inject_dummy_peers(&mut node, 1);
let existing_sender = Identity::generate();
let existing_pid = PeerIdentity::from_pubkey_full(existing_sender.pubkey_full());
let existing_node_addr = *existing_pid.node_addr();
let existing_link_id = LinkId::new(7777);
let peer = ActivePeer::new(existing_pid, existing_link_id, 0);
node.peers.insert(existing_node_addr, peer);
assert_eq!(node.peer_count(), 2);
let transport_id_b = TransportId::new(1);
let udp_config = UdpConfig {
bind_addr: Some("127.0.0.1:0".to_string()),
mtu: Some(1280),
..Default::default()
};
let (packet_tx_b, mut packet_rx_b) = packet_channel(64);
let mut transport_b = UdpTransport::new(transport_id_b, None, udp_config, packet_tx_b);
transport_b.start_async().await.unwrap();
let addr_b = transport_b.local_addr().unwrap();
node.transports
.insert(transport_id_b, TransportHandle::Udp(transport_b));
let socket_a = tokio::net::UdpSocket::bind("127.0.0.1:0")
.await
.expect("bind sender socket");
let before_pending = node.msg1_rate_limiter.pending_count();
let sender_node_addr =
craft_and_send_msg1(&node, &existing_sender, &socket_a, addr_b, 2000).await;
assert_eq!(sender_node_addr, existing_node_addr);
pump_one_msg1_into_node(&mut node, &mut packet_rx_b, 1000)
.await
.expect("msg1 must reach packet_rx_b");
assert_eq!(node.peer_count(), 2);
assert!(node.peers.contains_key(&existing_node_addr));
assert_eq!(node.msg1_rate_limiter.pending_count(), before_pending);
}