use super::{Node, NodeError, NodeState};
use crate::config::{ConnectPolicy, NostrDiscoveryPolicy, PeerAddress, PeerConfig};
use crate::discovery::nostr::{
ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, MeshTraversalSignal, NostrDiscovery,
OverlayAdvert, OverlayEndpointAdvert, OverlayTransportKind,
};
use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
use crate::node::acl::PeerAclContext;
use crate::node::wire::build_msg1;
use crate::peer::PeerConnection;
use crate::protocol::{Disconnect, DisconnectReason, SessionMessageType};
use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
use crate::{NodeAddr, PeerIdentity};
use secp256k1::PublicKey;
use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, SocketAddr};
use std::thread;
use std::time::Duration;
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum MeshSignalSessionAction {
Send,
Defer,
Drop,
}
#[cfg(debug_assertions)]
fn node_start_debug_log(message: impl AsRef<str>) {
use std::io::Write as _;
if let Ok(mut file) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
{
let _ = writeln!(
file,
"{:?} {}",
std::time::SystemTime::now(),
message.as_ref()
);
}
}
#[cfg(not(debug_assertions))]
fn node_start_debug_log(_message: impl AsRef<str>) {}
fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
match ip {
IpAddr::V4(v4) => {
v4.is_private()
|| v4.is_loopback()
|| v4.is_link_local()
|| v4.is_unspecified()
|| v4.is_multicast()
|| v4.is_broadcast()
|| v4.is_documentation()
|| (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
}
IpAddr::V6(v6) => {
v6.is_loopback()
|| v6.is_unspecified()
|| v6.is_unique_local()
|| v6.is_multicast()
|| (v6.segments()[0] & 0xffc0) == 0xfe80
}
}
}
fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
matches!(
(local, remote),
(SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
)
}
const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
impl Node {
pub(super) async fn update_peers(
&mut self,
new_peers: Vec<crate::config::PeerConfig>,
) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
use std::collections::{HashMap, HashSet};
let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
HashMap::with_capacity(new_peers.len());
let mut new_order = Vec::with_capacity(new_peers.len());
for peer in new_peers {
let identity = match PeerIdentity::from_npub(&peer.npub) {
Ok(id) => id,
Err(e) => {
return Err(crate::node::NodeError::InvalidPeerNpub {
npub: peer.npub.clone(),
reason: e.to_string(),
});
}
};
let node_addr = *identity.node_addr();
if !new_by_addr.contains_key(&node_addr) {
new_order.push(node_addr);
}
new_by_addr.insert(node_addr, peer);
}
let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
.config
.peers()
.iter()
.filter_map(|pc| {
PeerIdentity::from_npub(&pc.npub)
.ok()
.map(|id| (*id.node_addr(), pc.clone()))
})
.collect();
let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
let added: Vec<_> = new_addrs.difference(¤t_addrs).copied().collect();
let kept: Vec<_> = new_addrs.intersection(¤t_addrs).copied().collect();
let mut outcome = crate::node::UpdatePeersOutcome::default();
for node_addr in &removed {
if self.retry_pending.remove(node_addr).is_some() {
debug!(
peer = %self.peer_display_name(node_addr),
"Dropping retry entry for peer removed from runtime peer list"
);
}
self.peer_aliases.remove(node_addr);
self.set_discovery_fallback_transit_allowed(*node_addr, false);
outcome.removed += 1;
}
let mut auto_connect_refresh_configs = Vec::new();
for node_addr in &kept {
let new_pc = &new_by_addr[node_addr];
let current_pc = ¤t_by_addr[node_addr];
if new_pc.addresses != current_pc.addresses
|| new_pc.alias != current_pc.alias
|| new_pc.connect_policy != current_pc.connect_policy
|| new_pc.auto_reconnect != current_pc.auto_reconnect
|| new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
{
outcome.updated += 1;
self.set_discovery_fallback_transit_allowed(
*node_addr,
new_pc.discovery_fallback_transit,
);
if let Some(state) = self.retry_pending.get_mut(node_addr) {
state.peer_config = new_pc.clone();
state.retry_after_ms = Self::now_ms();
}
if let Some(alias) = new_pc.alias.clone() {
self.peer_aliases.insert(*node_addr, alias);
}
if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
auto_connect_refresh_configs.push(new_pc.clone());
}
} else {
outcome.unchanged += 1;
self.set_discovery_fallback_transit_allowed(
*node_addr,
new_pc.discovery_fallback_transit,
);
if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
auto_connect_refresh_configs.push(new_pc.clone());
}
}
}
let added_configs: Vec<crate::config::PeerConfig> = new_order
.iter()
.filter(|addr| added.contains(addr))
.map(|addr| new_by_addr[addr].clone())
.collect();
self.config.peers = new_order
.iter()
.filter_map(|addr| new_by_addr.get(addr).cloned())
.collect();
self.configured_peer_send_weights = Self::configured_peer_send_weights(&self.config);
for peer_config in added_configs {
outcome.added += 1;
let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
continue;
};
let name = peer_config
.alias
.clone()
.unwrap_or_else(|| identity.short_npub());
self.peer_aliases.insert(*identity.node_addr(), name);
self.set_discovery_fallback_transit_allowed(
*identity.node_addr(),
peer_config.discovery_fallback_transit,
);
self.register_identity(*identity.node_addr(), identity.pubkey_full());
if !peer_config.is_auto_connect() {
continue;
}
match self
.try_auto_connect_graph_session(&peer_config, identity)
.await
{
Ok(true) => continue,
Ok(false) => {}
Err(err) => {
debug!(
npub = %peer_config.npub,
error = %err,
"Existing FIPS graph did not warm newly added peer"
);
}
}
if let Err(e) = self.initiate_peer_connection(&peer_config).await {
warn!(
npub = %peer_config.npub,
error = %e,
"Failed to initiate connection for newly added peer"
);
if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
}
if matches!(e, crate::node::NodeError::NoTransportForType(_))
&& let Some(bootstrap) = self.nostr_discovery.clone()
{
bootstrap
.request_advert_stale_check(peer_config.npub.clone())
.await;
}
}
}
for peer_config in auto_connect_refresh_configs {
let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
continue;
};
let node_addr = *peer_identity.node_addr();
if self.peers.contains_key(&node_addr) {
match self
.initiate_active_peer_alternative_connection(&peer_config)
.await
{
Ok(attempted) => {
if attempted {
debug!(
peer = %self.peer_display_name(&node_addr),
"Started non-disruptive alternate-path handshake for active peer"
);
}
}
Err(e) => {
debug!(
npub = %peer_config.npub,
error = %e,
"Active peer alternate-path refresh did not start"
);
}
}
continue;
}
match self
.try_auto_connect_graph_session(&peer_config, peer_identity)
.await
{
Ok(true) => continue,
Ok(false) => {}
Err(err) => {
debug!(
npub = %peer_config.npub,
error = %err,
"Existing FIPS graph did not warm refreshed peer"
);
}
}
match self.initiate_peer_connection(&peer_config).await {
Ok(()) => {
let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
if let Some(state) = self.retry_pending.get_mut(&node_addr) {
state.peer_config = peer_config;
state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
}
}
Err(e) => {
debug!(
npub = %peer_config.npub,
error = %e,
"Refreshed peer addresses did not initiate a direct connection"
);
self.schedule_retry_after_error(node_addr, Self::now_ms(), &e);
}
}
}
self.warm_auto_connect_graph_sessions().await;
Ok(outcome)
}
pub(super) async fn initiate_peer_connections(&mut self) {
let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
.config
.peers()
.iter()
.filter_map(|pc| {
PeerIdentity::from_npub(&pc.npub)
.ok()
.map(|id| (id, pc.alias.clone()))
})
.collect();
for (identity, alias) in peer_identities {
let name = alias.unwrap_or_else(|| identity.short_npub());
self.peer_aliases.insert(*identity.node_addr(), name);
self.register_identity(*identity.node_addr(), identity.pubkey_full());
}
let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
if peer_configs.is_empty() {
debug!("No static peers configured");
return;
}
debug!(
count = peer_configs.len(),
"Initiating static peer connections"
);
for peer_config in peer_configs {
let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
Ok(identity) => identity,
Err(_) => continue,
};
match self
.try_auto_connect_graph_session(&peer_config, peer_identity)
.await
{
Ok(true) => continue,
Ok(false) => {}
Err(err) => {
debug!(
npub = %peer_config.npub,
error = %err,
"Existing FIPS graph did not warm auto-connect peer"
);
}
}
if let Err(e) = self.initiate_peer_connection(&peer_config).await {
warn!(
npub = %peer_config.npub,
alias = ?peer_config.alias,
error = %e,
"Failed to initiate peer connection"
);
if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
}
if matches!(e, crate::node::NodeError::NoTransportForType(_))
&& let Some(bootstrap) = self.nostr_discovery.clone()
{
bootstrap
.request_advert_stale_check(peer_config.npub.clone())
.await;
}
}
}
self.warm_auto_connect_graph_sessions().await;
}
pub(in crate::node) async fn try_auto_connect_graph_session(
&mut self,
peer_config: &PeerConfig,
peer_identity: PeerIdentity,
) -> Result<bool, NodeError> {
if !peer_config.is_auto_connect() {
return Ok(false);
}
let peer_node_addr = *peer_identity.node_addr();
if self.peers.contains_key(&peer_node_addr) {
return Ok(false);
}
if self.auto_connect_should_race_direct_path(peer_config) {
return Ok(false);
}
if self
.sessions
.get(&peer_node_addr)
.is_some_and(|entry| entry.is_established() || entry.is_initiating())
{
return Ok(true);
}
if self.find_next_hop(&peer_node_addr).is_none() {
return Ok(false);
}
self.register_identity(peer_node_addr, peer_identity.pubkey_full());
match self
.initiate_session(peer_node_addr, peer_identity.pubkey_full())
.await
{
Ok(()) => {
debug!(
peer = %self.peer_display_name(&peer_node_addr),
"Warmed auto-connect peer session over existing FIPS graph"
);
Ok(true)
}
Err(NodeError::SendFailed { node_addr, reason })
if node_addr == peer_node_addr && reason == "no route to destination" =>
{
self.maybe_initiate_lookup(&peer_node_addr).await;
Ok(false)
}
Err(err) => Err(err),
}
}
fn auto_connect_should_race_direct_path(&self, peer_config: &PeerConfig) -> bool {
!peer_config.addresses.is_empty() || self.config.node.discovery.nostr.enabled
}
pub(super) async fn initiate_peer_connection(
&mut self,
peer_config: &crate::config::PeerConfig,
) -> Result<(), NodeError> {
self.initiate_peer_connection_inner(peer_config).await
}
pub(super) async fn initiate_peer_retry_connection(
&mut self,
peer_config: &crate::config::PeerConfig,
) -> Result<(), NodeError> {
self.initiate_peer_connection_inner(peer_config).await
}
pub(in crate::node) async fn initiate_active_peer_alternative_connection(
&mut self,
peer_config: &crate::config::PeerConfig,
) -> Result<bool, NodeError> {
let peer_identity =
PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
npub: peer_config.npub.clone(),
reason: e.to_string(),
})?;
let peer_node_addr = *peer_identity.node_addr();
if !self.peers.contains_key(&peer_node_addr) {
self.initiate_peer_connection(peer_config).await?;
return Ok(true);
}
self.try_active_peer_alternative_addresses(peer_config, peer_identity)
.await
}
async fn initiate_peer_connection_inner(
&mut self,
peer_config: &crate::config::PeerConfig,
) -> Result<(), NodeError> {
let peer_identity =
PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
npub: peer_config.npub.clone(),
reason: e.to_string(),
})?;
let peer_node_addr = *peer_identity.node_addr();
if self.peers.contains_key(&peer_node_addr) {
debug!(
npub = %peer_config.npub,
"Peer already exists, skipping"
);
return Ok(());
}
self.try_peer_addresses(peer_config, peer_identity, true)
.await
}
fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
self.connections.values().any(|conn| {
conn.expected_identity()
.map(|id| id.node_addr() == peer_node_addr)
.unwrap_or(false)
})
}
fn is_connecting_to_peer_on_path(
&self,
peer_node_addr: &NodeAddr,
transport_id: TransportId,
remote_addr: &TransportAddr,
) -> bool {
self.connections.values().any(|conn| {
conn.expected_identity()
.map(|id| id.node_addr() == peer_node_addr)
.unwrap_or(false)
&& conn.transport_id() == Some(transport_id)
&& conn.source_addr() == Some(remote_addr)
}) || self.pending_connects.iter().any(|pending| {
pending.peer_identity.node_addr() == peer_node_addr
&& pending.transport_id == transport_id
&& &pending.remote_addr == remote_addr
})
}
pub(in crate::node) fn should_warm_auto_connect_session(
&self,
peer_node_addr: &NodeAddr,
) -> bool {
if self.peers.contains_key(peer_node_addr)
|| self
.sessions
.get(peer_node_addr)
.is_some_and(|entry| entry.is_established())
{
return false;
}
self.config.peers().iter().any(|peer| {
peer.is_auto_connect()
&& PeerIdentity::from_npub(&peer.npub)
.map(|identity| identity.node_addr() == peer_node_addr)
.unwrap_or(false)
})
}
pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
if !self.peers.values().any(|peer| peer.can_send()) {
return 0;
}
let mut budget = self.graph_session_warmup_budget();
if budget == 0 {
return 0;
}
let peer_identities: Vec<_> = self
.config
.auto_connect_peers()
.filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
.collect();
let mut warmed = 0;
for identity in peer_identities {
if budget == 0 {
break;
}
let peer_node_addr = *identity.node_addr();
if peer_node_addr == *self.identity.node_addr()
|| !self.should_warm_auto_connect_session(&peer_node_addr)
|| self
.sessions
.get(&peer_node_addr)
.is_some_and(|entry| entry.is_initiating())
{
continue;
}
self.register_identity(peer_node_addr, identity.pubkey_full());
if self.find_next_hop(&peer_node_addr).is_some() {
match self
.initiate_session(peer_node_addr, identity.pubkey_full())
.await
{
Ok(()) => {
warmed += 1;
budget = budget.saturating_sub(1);
debug!(
peer = %self.peer_display_name(&peer_node_addr),
"Warmed auto-connect peer session over existing FIPS graph"
);
}
Err(NodeError::SendFailed { node_addr, reason })
if node_addr == peer_node_addr && reason == "no route to destination" =>
{
self.maybe_initiate_lookup(&peer_node_addr).await;
warmed += 1;
budget = budget.saturating_sub(1);
}
Err(err) => {
debug!(
peer = %self.peer_display_name(&peer_node_addr),
error = %err,
"Failed to warm auto-connect peer session"
);
}
}
} else {
self.maybe_initiate_lookup(&peer_node_addr).await;
warmed += 1;
budget = budget.saturating_sub(1);
}
}
warmed
}
pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
let max_destinations = self.config.node.session.pending_max_destinations;
if max_destinations == 0 {
return 0;
}
let pending_sessions = self
.sessions
.values()
.filter(|entry| !entry.is_established())
.count();
let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
max_destinations
.saturating_sub(pending_total)
.min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
}
fn outbound_handshake_slots(&self) -> usize {
let used = self
.connections
.len()
.saturating_add(self.pending_connects.len());
if self.max_connections == 0 {
usize::MAX
} else {
self.max_connections.saturating_sub(used)
}
}
fn outbound_link_slots(&self) -> usize {
if self.max_links == 0 {
usize::MAX
} else {
self.max_links.saturating_sub(self.links.len())
}
}
fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
if !self.peers.contains_key(peer_node_addr)
&& self.max_peers > 0
&& self.peers.len() >= self.max_peers
{
return 0;
}
let in_flight_for_peer = self
.connections
.values()
.filter(|conn| {
conn.expected_identity()
.map(|id| id.node_addr() == peer_node_addr)
.unwrap_or(false)
})
.count()
.saturating_add(
self.pending_connects
.iter()
.filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
.count(),
);
self.outbound_handshake_slots()
.min(self.outbound_link_slots())
.min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
}
fn discovery_connect_budget(&self) -> usize {
self.outbound_handshake_slots()
.min(self.outbound_link_slots())
.min(MAX_DISCOVERY_CONNECTS_PER_TICK)
}
fn find_udp_transport_for_remote_addr(
&self,
remote_addr: SocketAddr,
) -> Option<(TransportId, SocketAddr)> {
self.transports
.iter()
.filter(|(id, handle)| {
handle.transport_type().name == "udp"
&& handle.is_operational()
&& !self.bootstrap_transports.contains(id)
})
.filter_map(|(id, handle)| {
let local_addr = handle.local_addr()?;
socket_addr_families_compatible(local_addr, remote_addr)
.then_some((*id, local_addr))
})
.min_by_key(|(id, _)| id.as_u32())
}
pub(super) fn transport_discovery_candidate(
&self,
discovered_transport_id: TransportId,
discovered_addr: TransportAddr,
) -> Option<(TransportId, TransportAddr, &'static str)> {
let transport = self.transports.get(&discovered_transport_id)?;
let transport_name = transport.transport_type().name;
if transport_name != "udp" {
return Some((discovered_transport_id, discovered_addr, transport_name));
}
let Some(remote_socket_addr) = discovered_addr
.as_str()
.and_then(|addr| addr.parse::<SocketAddr>().ok())
else {
if self.bootstrap_transports.contains(&discovered_transport_id) {
debug!(
transport_id = %discovered_transport_id,
remote_addr = %discovered_addr,
"transport discovery: skip non-numeric UDP address from bootstrap transport"
);
return None;
}
return Some((discovered_transport_id, discovered_addr, transport_name));
};
let Some((transport_id, local_addr)) =
self.find_udp_transport_for_remote_addr(remote_socket_addr)
else {
debug!(
transport_id = %discovered_transport_id,
remote_addr = %discovered_addr,
"transport discovery: skip UDP peer with no compatible local socket"
);
return None;
};
if transport_id != discovered_transport_id {
debug!(
discovered_transport_id = %discovered_transport_id,
selected_transport_id = %transport_id,
local_addr = %local_addr,
remote_addr = %remote_socket_addr,
"transport discovery: selected compatible UDP transport"
);
}
Some((
transport_id,
TransportAddr::from_socket_addr(remote_socket_addr),
transport_name,
))
}
fn peer_address_string_for_transport_candidate(
&self,
transport_id: TransportId,
transport_name: &str,
remote_addr: &TransportAddr,
) -> String {
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
let _ = (transport_id, transport_name);
#[cfg(any(target_os = "linux", target_os = "macos"))]
if transport_name == "ethernet"
&& remote_addr.as_bytes().len() == 6
&& let Some(interface) = self
.transports
.get(&transport_id)
.and_then(|transport| transport.interface_name())
{
let mut mac = [0u8; 6];
mac.copy_from_slice(remote_addr.as_bytes());
return format!(
"{interface}/{}",
crate::transport::ethernet::format_mac(&mac)
);
}
remote_addr.to_string()
}
fn resolve_peer_address_for_match(
&self,
candidate: &PeerAddress,
) -> Option<(TransportId, TransportAddr)> {
if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
return None;
}
if candidate.transport == "ethernet" {
return self.resolve_ethernet_addr(&candidate.addr).ok();
}
if candidate.transport == "ble" {
#[cfg(bluer_available)]
{
return self.resolve_ble_addr(&candidate.addr).ok();
}
#[cfg(not(bluer_available))]
{
return None;
}
}
let transport_id = if candidate.transport == "udp"
&& let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
{
self.find_udp_transport_for_remote_addr(remote_socket_addr)
.map(|(id, _)| id)?
} else {
self.find_transport_for_type(&candidate.transport)?
};
Some((transport_id, TransportAddr::from_string(&candidate.addr)))
}
pub(super) async fn initiate_connection(
&mut self,
transport_id: TransportId,
remote_addr: TransportAddr,
peer_identity: PeerIdentity,
) -> Result<(), NodeError> {
let peer_node_addr = *peer_identity.node_addr();
if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
debug!(
peer = %self.peer_display_name(&peer_node_addr),
transport_id = %transport_id,
remote_addr = %remote_addr,
"Connection already in progress for candidate path"
);
return Ok(());
}
if self.outbound_handshake_slots() == 0 {
return Err(NodeError::MaxConnectionsExceeded {
max: self.max_connections,
});
}
if self.outbound_link_slots() == 0 {
return Err(NodeError::MaxLinksExceeded {
max: self.max_links,
});
}
if !self.peers.contains_key(&peer_node_addr)
&& self.max_peers > 0
&& self.peers.len() >= self.max_peers
{
return Err(NodeError::MaxPeersExceeded {
max: self.max_peers,
});
}
self.authorize_peer(
&peer_identity,
PeerAclContext::OutboundConnect,
transport_id,
&remote_addr,
)?;
let is_connection_oriented = self
.transports
.get(&transport_id)
.map(|t| t.transport_type().connection_oriented)
.unwrap_or(false);
let link_id = self.allocate_link_id();
let link = if is_connection_oriented {
Link::new(
link_id,
transport_id,
remote_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(self.config.node.base_rtt_ms),
)
} else {
Link::connectionless(
link_id,
transport_id,
remote_addr.clone(),
LinkDirection::Outbound,
Duration::from_millis(self.config.node.base_rtt_ms),
)
};
self.links.insert(link_id, link);
self.addr_to_link
.insert((transport_id, remote_addr.clone()), link_id);
if is_connection_oriented {
if let Some(transport) = self.transports.get(&transport_id) {
match transport.connect(&remote_addr).await {
Ok(()) => {
debug!(
peer = %self.peer_display_name(&peer_node_addr),
transport_id = %transport_id,
remote_addr = %remote_addr,
link_id = %link_id,
"Transport connect initiated (non-blocking)"
);
self.pending_connects.push(super::PendingConnect {
link_id,
transport_id,
remote_addr,
peer_identity,
});
}
Err(e) => {
self.links.remove(&link_id);
self.addr_to_link.remove(&(transport_id, remote_addr));
return Err(NodeError::from_transport_error(e));
}
}
}
Ok(())
} else {
self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
.await
}
}
pub(super) async fn start_handshake(
&mut self,
link_id: LinkId,
transport_id: TransportId,
remote_addr: TransportAddr,
peer_identity: PeerIdentity,
) -> Result<(), NodeError> {
let peer_node_addr = *peer_identity.node_addr();
let current_time_ms = Self::now_ms();
let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
let our_index = match self.index_allocator.allocate() {
Ok(idx) => idx,
Err(e) => {
self.links.remove(&link_id);
self.addr_to_link.remove(&(transport_id, remote_addr));
return Err(NodeError::IndexAllocationFailed(e.to_string()));
}
};
let our_keypair = self.identity.keypair();
let noise_msg1 =
match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
Ok(msg) => msg,
Err(e) => {
let _ = self.index_allocator.free(our_index);
self.links.remove(&link_id);
self.addr_to_link.remove(&(transport_id, remote_addr));
return Err(NodeError::HandshakeFailed(e.to_string()));
}
};
connection.set_our_index(our_index);
connection.set_transport_id(transport_id);
connection.set_source_addr(remote_addr.clone());
let wire_msg1 = build_msg1(our_index, &noise_msg1);
debug!(
peer = %self.peer_display_name(&peer_node_addr),
transport_id = %transport_id,
remote_addr = %remote_addr,
link_id = %link_id,
our_index = %our_index,
"Connection initiated"
);
let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
self.pending_outbound
.insert((transport_id, our_index.as_u32()), link_id);
self.connections.insert(link_id, connection);
let send_result = match self.transports.get(&transport_id) {
Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
None => None,
};
match send_result {
Some(send_result) => {
self.note_local_send_outcome(&send_result);
match send_result {
Ok(bytes) => {
debug!(
link_id = %link_id,
our_index = %our_index,
bytes,
"Sent Noise handshake message 1 (wire format)"
);
}
Err(e) => {
warn!(
link_id = %link_id,
transport_id = %transport_id,
remote_addr = %remote_addr,
our_index = %our_index,
error = %e,
"Failed to send handshake message"
);
self.pending_outbound
.remove(&(transport_id, our_index.as_u32()));
self.connections.remove(&link_id);
self.links.remove(&link_id);
self.addr_to_link
.remove(&(transport_id, remote_addr.clone()));
let _ = self.index_allocator.free(our_index);
return Err(NodeError::from_transport_error(e));
}
}
}
None => {
self.pending_outbound
.remove(&(transport_id, our_index.as_u32()));
self.connections.remove(&link_id);
self.links.remove(&link_id);
self.addr_to_link
.remove(&(transport_id, remote_addr.clone()));
let _ = self.index_allocator.free(our_index);
return Err(NodeError::TransportError(format!(
"transport {transport_id} disappeared before first handshake send"
)));
}
}
Ok(())
}
pub(super) async fn poll_transport_discovery(&mut self) {
let mut to_connect = Vec::new();
let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
let mut connect_budget = self.discovery_connect_budget();
let mut skipped_budget = 0usize;
for transport in self.transports.values() {
if !transport.is_operational() {
continue;
}
if !transport.auto_connect() {
let _ = transport.discover();
continue;
}
let discovered = match transport.discover() {
Ok(peers) => peers,
Err(_) => continue,
};
for peer in discovered {
let discovered_transport_id = peer.transport_id;
let pubkey = match peer.pubkey_hint {
Some(pk) => pk,
None => continue,
};
let identity = PeerIdentity::from_pubkey(pubkey);
let node_addr = *identity.node_addr();
if node_addr == *self.identity.node_addr() {
continue;
}
let Some((candidate_transport_id, remote_addr, transport_name)) =
self.transport_discovery_candidate(discovered_transport_id, peer.addr)
else {
continue;
};
if self.peers.contains_key(&node_addr) {
let candidate = PeerAddress::new(
transport_name,
self.peer_address_string_for_transport_candidate(
candidate_transport_id,
transport_name,
&remote_addr,
),
);
if self.active_peer_candidate_is_fresh_enough_to_skip(
&node_addr,
std::slice::from_ref(&candidate),
) {
continue;
}
if self.is_connecting_to_peer_on_path(
&node_addr,
candidate_transport_id,
&remote_addr,
) {
continue;
}
let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
if connect_budget == 0
|| self
.path_candidate_attempt_budget(&node_addr)
.saturating_sub(queued_for_peer)
== 0
{
skipped_budget = skipped_budget.saturating_add(1);
continue;
}
to_connect.push((candidate_transport_id, remote_addr, identity, true));
*queued_per_peer.entry(node_addr).or_default() += 1;
connect_budget = connect_budget.saturating_sub(1);
continue;
}
if self.is_connecting_to_peer_on_path(
&node_addr,
candidate_transport_id,
&remote_addr,
) {
continue;
}
let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
if connect_budget == 0
|| self
.path_candidate_attempt_budget(&node_addr)
.saturating_sub(queued_for_peer)
== 0
{
skipped_budget = skipped_budget.saturating_add(1);
continue;
}
to_connect.push((candidate_transport_id, remote_addr, identity, false));
*queued_per_peer.entry(node_addr).or_default() += 1;
connect_budget = connect_budget.saturating_sub(1);
}
}
if skipped_budget > 0 {
debug!(
skipped = skipped_budget,
queued = to_connect.len(),
"Transport discovery connect budget exhausted"
);
}
for (transport_id, remote_addr, identity, active_refresh) in to_connect {
info!(
peer = %self.peer_display_name(identity.node_addr()),
transport_id = %transport_id,
remote_addr = %remote_addr,
active_refresh,
"Auto-connecting to discovered peer"
);
if let Err(e) = self
.initiate_connection(transport_id, remote_addr, identity)
.await
{
warn!(error = %e, "Failed to auto-connect to discovered peer");
}
}
}
pub(super) async fn poll_nostr_discovery(&mut self) {
let Some(bootstrap) = self.nostr_discovery.clone() else {
return;
};
bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
debug!(error = %err, "Failed to refresh local Nostr overlay advert");
}
self.drain_nostr_mesh_signals(&bootstrap).await;
for event in bootstrap.drain_events().await {
match event {
BootstrapEvent::Established { traversal } => {
let active_refresh = PeerIdentity::from_npub(&traversal.peer_npub)
.ok()
.is_some_and(|identity| self.peers.contains_key(identity.node_addr()));
let admission_allowed = if active_refresh {
self.outbound_direct_refresh_admission_check()
} else {
self.outbound_admission_check()
};
if !admission_allowed {
debug!(
peer_npub = %traversal.peer_npub,
peers = self.peers.len(),
max_peers = self.max_peers,
active_refresh,
"Dropping established NAT traversal: at capacity"
);
continue;
}
let peer_npub = traversal.peer_npub.clone();
match self.adopt_established_traversal(traversal).await {
Ok(_) => {
info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
}
Err(err) => {
warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
}
}
}
}
BootstrapEvent::Failed {
peer_config,
reason,
} => {
let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
Ok(identity) => identity,
Err(_) => continue,
};
let node_addr = *peer_identity.node_addr();
let now_ms = Self::now_ms();
if self.peers.contains_key(&node_addr) {
if self.active_peer_should_keep_direct_retry(&node_addr, &peer_config) {
let decision =
bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
if decision.should_warn {
warn!(
npub = %peer_config.npub,
error = %reason,
consecutive_failures = decision.consecutive_failures,
cooldown_secs = decision
.cooldown_until_ms
.map(|t| t.saturating_sub(now_ms) / 1000),
"Direct-path NAT traversal upgrade failed"
);
} else {
debug!(
npub = %peer_config.npub,
error = %reason,
consecutive_failures = decision.consecutive_failures,
"Direct-path NAT traversal upgrade failed (suppressed by warn-rate-limit)"
);
}
if decision.crossed_threshold {
bootstrap
.request_advert_stale_check(peer_config.npub.clone())
.await;
}
self.schedule_retry(node_addr, now_ms);
if self.nostr_cooldown_applies_to_peer_config(&peer_config)
&& let Some(cooldown_until_ms) = decision.cooldown_until_ms
&& let Some(state) = self.retry_pending.get_mut(&node_addr)
{
state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
}
} else {
debug!(
npub = %peer_config.npub,
error = %reason,
"Ignoring failed NAT traversal for already-connected peer on fresh direct path"
);
}
continue;
}
if self.is_connecting_to_peer(&node_addr) {
debug!(
npub = %peer_config.npub,
error = %reason,
"Ignoring failed NAT traversal while peer handshake is already in progress"
);
continue;
}
let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
if decision.should_warn {
warn!(
npub = %peer_config.npub,
error = %reason,
consecutive_failures = decision.consecutive_failures,
cooldown_secs = decision
.cooldown_until_ms
.map(|t| t.saturating_sub(now_ms) / 1000),
"NAT traversal failed"
);
} else {
debug!(
npub = %peer_config.npub,
error = %reason,
consecutive_failures = decision.consecutive_failures,
"NAT traversal failed (suppressed by warn-rate-limit)"
);
}
if decision.crossed_threshold {
bootstrap
.request_advert_stale_check(peer_config.npub.clone())
.await;
}
if self
.try_peer_addresses(&peer_config, peer_identity, false)
.await
.is_ok()
{
continue;
}
self.schedule_retry(node_addr, now_ms);
if self.nostr_cooldown_applies_to_peer_config(&peer_config)
&& let Some(cooldown_until_ms) = decision.cooldown_until_ms
&& let Some(state) = self.retry_pending.get_mut(&node_addr)
{
state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
}
}
}
}
self.maybe_run_startup_open_discovery_sweep(&bootstrap)
.await;
self.queue_open_discovery_retries(&bootstrap).await;
self.queue_active_fallback_direct_retries(&bootstrap);
}
async fn drain_nostr_mesh_signals(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
let mut deferred = Vec::new();
for signal in bootstrap.drain_mesh_signals().await {
let (peer_npub, msg_type, payload) = match &signal {
MeshTraversalSignal::Offer { peer_npub, offer } => {
let payload = match serde_json::to_vec(&offer) {
Ok(payload) => payload,
Err(error) => {
debug!(
peer = %peer_npub,
error = %error,
"Failed to encode mesh traversal offer"
);
continue;
}
};
(
peer_npub.clone(),
SessionMessageType::TraversalOffer.to_byte(),
payload,
)
}
MeshTraversalSignal::Answer { peer_npub, answer } => {
let payload = match serde_json::to_vec(&answer) {
Ok(payload) => payload,
Err(error) => {
debug!(
peer = %peer_npub,
error = %error,
"Failed to encode mesh traversal answer"
);
continue;
}
};
(
peer_npub.clone(),
SessionMessageType::TraversalAnswer.to_byte(),
payload,
)
}
};
let peer_identity = match PeerIdentity::from_npub(&peer_npub) {
Ok(identity) => identity,
Err(error) => {
debug!(
peer = %peer_npub,
error = %error,
"Cannot send mesh traversal signal to invalid peer npub"
);
continue;
}
};
let peer_addr = *peer_identity.node_addr();
match self
.mesh_signal_session_action(peer_addr, peer_identity.pubkey_full())
.await
{
MeshSignalSessionAction::Send => {}
MeshSignalSessionAction::Defer => {
deferred.push(signal);
continue;
}
MeshSignalSessionAction::Drop => continue,
}
if let Err(error) = self.send_session_msg(&peer_addr, msg_type, &payload).await {
debug!(
peer = %self.peer_display_name(&peer_addr),
error = %error,
"Failed to send mesh traversal signal"
);
}
}
for signal in deferred {
bootstrap.requeue_mesh_signal(signal);
}
}
async fn mesh_signal_session_action(
&mut self,
peer_addr: NodeAddr,
peer_pubkey: PublicKey,
) -> MeshSignalSessionAction {
if let Some(entry) = self.sessions.get(&peer_addr) {
if entry.is_established() {
return MeshSignalSessionAction::Send;
}
if entry.is_initiating() || entry.is_awaiting_msg3() {
debug!(
peer = %self.peer_display_name(&peer_addr),
"Deferring mesh traversal signal until end-to-end session is established"
);
return MeshSignalSessionAction::Defer;
}
}
if self.find_next_hop(&peer_addr).is_none() {
debug!(
peer = %self.peer_display_name(&peer_addr),
"Cannot warm mesh traversal signal session without a FIPS route"
);
self.maybe_initiate_lookup(&peer_addr).await;
return MeshSignalSessionAction::Drop;
}
self.register_identity(peer_addr, peer_pubkey);
match self.initiate_session(peer_addr, peer_pubkey).await {
Ok(()) => {
debug!(
peer = %self.peer_display_name(&peer_addr),
"Warming end-to-end session for mesh traversal signal"
);
MeshSignalSessionAction::Defer
}
Err(NodeError::SendFailed { node_addr, reason })
if node_addr == peer_addr && reason == "no route to destination" =>
{
debug!(
peer = %self.peer_display_name(&peer_addr),
"Cannot warm mesh traversal signal session without a FIPS route"
);
self.maybe_initiate_lookup(&peer_addr).await;
MeshSignalSessionAction::Drop
}
Err(error) => {
debug!(
peer = %self.peer_display_name(&peer_addr),
error = %error,
"Failed to warm end-to-end session for mesh traversal signal"
);
MeshSignalSessionAction::Drop
}
}
}
pub(super) fn lan_discovery_scope(&self) -> Option<String> {
if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
let scope = scope.trim();
if !scope.is_empty() {
return Some(scope.to_string());
}
}
let app = self.config.node.discovery.nostr.app.trim();
if app.is_empty() {
return None;
}
if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
let scope = rest.trim();
if scope.is_empty() {
None
} else {
Some(scope.to_string())
}
} else {
Some(app.to_string())
}
}
pub(super) fn start_local_instance_discovery(&mut self) {
if !self.config.node.discovery.local.enabled {
return;
}
let Some(scope) = self.lan_discovery_scope() else {
debug!("local instance discovery not started: no discovery scope");
return;
};
let now_ms = Self::now_ms();
match crate::discovery::local::LocalInstanceRegistry::new(
self.identity.npub(),
scope,
&self.config.node.discovery.local,
now_ms,
) {
Ok(registry) => {
self.local_instance_registry = Some(registry);
self.local_instance_started_at_ms = Some(now_ms);
self.last_local_instance_publish_ms = None;
self.last_local_instance_scan_ms = None;
self.publish_local_instance_record(now_ms);
info!("Same-host FIPS instance discovery enabled");
}
Err(crate::discovery::local::LocalInstanceRegistryError::Disabled) => {
debug!("same-host FIPS instance discovery disabled");
}
Err(err) => {
debug!(error = %err, "same-host FIPS instance discovery not started");
}
}
}
fn local_instance_contacts(&self) -> Vec<crate::discovery::local::LocalInstanceContact> {
let mut contacts = Vec::new();
for handle in self.transports.values() {
if !handle.is_operational() || !handle.accept_connections() {
continue;
}
let transport = handle.transport_type().name;
if transport != "udp" && transport != "tcp" {
continue;
}
let Some(local_addr) = handle.local_addr() else {
continue;
};
let Some(contact) =
crate::discovery::local::contact_for_transport_addr(transport, local_addr)
else {
continue;
};
if contacts
.iter()
.any(|existing: &crate::discovery::local::LocalInstanceContact| {
existing.transport == contact.transport && existing.addr == contact.addr
})
{
continue;
}
contacts.push(contact);
}
contacts
}
fn publish_local_instance_record(&mut self, now_ms: u64) {
let Some(registry) = self.local_instance_registry.clone() else {
return;
};
let contacts = self.local_instance_contacts();
match registry.publish(contacts, now_ms) {
Ok(()) => {
self.last_local_instance_publish_ms = Some(now_ms);
}
Err(err) => {
debug!(error = %err, "failed to publish same-host FIPS instance record");
}
}
}
fn maybe_publish_local_instance_record(&mut self, now_ms: u64) {
if self.local_instance_registry.is_none() {
return;
}
let interval_ms = self.config.node.discovery.local.publish_interval_ms();
let due = self
.last_local_instance_publish_ms
.map(|last| now_ms.saturating_sub(last) >= interval_ms)
.unwrap_or(true);
if due {
self.publish_local_instance_record(now_ms);
}
}
fn local_instance_scan_due(&self, now_ms: u64) -> bool {
if self.local_instance_registry.is_none() {
return false;
}
let cfg = &self.config.node.discovery.local;
let interval_ms = if self
.local_instance_started_at_ms
.map(|started| now_ms.saturating_sub(started) <= cfg.startup_scan_duration_ms())
.unwrap_or(false)
{
cfg.startup_scan_interval_ms()
} else {
cfg.scan_interval_ms()
};
self.last_local_instance_scan_ms
.map(|last| now_ms.saturating_sub(last) >= interval_ms)
.unwrap_or(true)
}
fn local_instance_peer_allowed(&self, identity: &PeerIdentity) -> bool {
if self.config.peers().iter().any(|peer| {
PeerIdentity::from_npub(&peer.npub)
.map(|configured| configured.node_addr() == identity.node_addr())
.unwrap_or(false)
}) {
return true;
}
self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Open
}
fn local_instance_peer_addresses(
&self,
record: &crate::discovery::local::LocalInstanceRecord,
) -> Vec<PeerAddress> {
let mut addresses = Vec::new();
for contact in &record.contacts {
if contact.transport != "udp" && contact.transport != "tcp" {
continue;
}
let Ok(socket_addr) = contact.addr.parse::<SocketAddr>() else {
debug!(
npub = %record.npub,
transport = %contact.transport,
addr = %contact.addr,
"local instance discovery: skip non-socket contact"
);
continue;
};
if !socket_addr.ip().is_loopback() {
debug!(
npub = %record.npub,
addr = %contact.addr,
"local instance discovery: skip non-loopback contact"
);
continue;
}
let address =
PeerAddress::with_priority(contact.transport.clone(), contact.addr.clone(), 10)
.with_seen_at_ms(record.updated_at_ms);
if addresses.iter().any(|existing: &PeerAddress| {
existing.transport == address.transport && existing.addr == address.addr
}) {
continue;
}
addresses.push(address);
}
addresses
}
pub(super) async fn poll_local_instance_discovery(&mut self) {
let Some(registry) = self.local_instance_registry.clone() else {
return;
};
let now_ms = Self::now_ms();
self.maybe_publish_local_instance_record(now_ms);
if !self.local_instance_scan_due(now_ms) {
return;
}
self.last_local_instance_scan_ms = Some(now_ms);
let records = match registry.scan(now_ms, self.config.node.discovery.local.stale_after_ms())
{
Ok(records) => records,
Err(err) => {
debug!(error = %err, "same-host FIPS instance scan failed");
return;
}
};
if records.is_empty() {
return;
}
let mut connect_budget = self.discovery_connect_budget();
let mut skipped_budget = 0usize;
for record in records {
let identity = match PeerIdentity::from_npub(&record.npub) {
Ok(identity) => identity,
Err(err) => {
debug!(npub = %record.npub, error = %err, "local instance discovery: skip bad npub");
continue;
}
};
let peer_node_addr = *identity.node_addr();
if peer_node_addr == *self.identity.node_addr() {
continue;
}
if !self.local_instance_peer_allowed(&identity) {
debug!(
npub = %identity.short_npub(),
"local instance discovery: skip unconfigured peer"
);
continue;
}
let addresses = self.local_instance_peer_addresses(&record);
if addresses.is_empty() {
continue;
}
if self.peers.contains_key(&peer_node_addr)
&& self.active_peer_candidate_is_fresh_enough_to_skip(&peer_node_addr, &addresses)
{
continue;
}
for address in addresses {
let Some((transport_id, remote_addr)) =
self.resolve_peer_address_for_match(&address)
else {
continue;
};
if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
continue;
}
if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
skipped_budget = skipped_budget.saturating_add(1);
continue;
}
info!(
npub = %identity.short_npub(),
transport = %address.transport,
addr = %address.addr,
"same-host FIPS instance discovery: initiating handshake"
);
if let Err(err) = self
.initiate_connection(transport_id, remote_addr, identity)
.await
{
debug!(
npub = %record.npub,
error = %err,
"same-host FIPS instance discovery: failed to initiate connection"
);
}
connect_budget = connect_budget.saturating_sub(1);
}
}
if skipped_budget > 0 {
debug!(
skipped = skipped_budget,
"same-host FIPS instance discovery connect budget exhausted"
);
}
}
pub(super) async fn poll_lan_discovery(&mut self) {
let Some(runtime) = self.lan_discovery.clone() else {
return;
};
let events = runtime.drain_events().await;
if events.is_empty() {
return;
}
let mut connect_budget = self.discovery_connect_budget();
let mut skipped_budget = 0usize;
for event in events {
let crate::discovery::lan::LanEvent::Discovered(peer) = event;
let Some((transport_id, local_addr)) =
self.find_udp_transport_for_remote_addr(peer.addr)
else {
debug!(
addr = %peer.addr,
"lan: skip discovered peer with no compatible UDP transport"
);
continue;
};
let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
Ok(id) => id,
Err(err) => {
debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
continue;
}
};
let peer_node_addr = *identity.node_addr();
let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
if self.peers.contains_key(&peer_node_addr) {
let candidate = PeerAddress::new("udp", peer.addr.to_string());
if self.active_peer_candidate_is_fresh_enough_to_skip(
&peer_node_addr,
std::slice::from_ref(&candidate),
) {
continue;
}
if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
continue;
}
if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
skipped_budget = skipped_budget.saturating_add(1);
continue;
}
info!(
npub = %identity.short_npub(),
addr = %peer.addr,
local_addr = %local_addr,
"lan: initiating alternate-path handshake to active peer"
);
if let Err(err) = self
.initiate_connection(transport_id, remote_addr, identity)
.await
{
debug!(
npub = %peer.npub,
error = %err,
"lan: failed to initiate active peer alternate-path handshake"
);
}
connect_budget = connect_budget.saturating_sub(1);
continue;
}
if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
continue;
}
if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
skipped_budget = skipped_budget.saturating_add(1);
continue;
}
info!(
npub = %identity.short_npub(),
addr = %peer.addr,
local_addr = %local_addr,
"lan: initiating handshake to discovered peer"
);
if let Err(err) = self
.initiate_connection(transport_id, remote_addr, identity)
.await
{
debug!(
npub = %peer.npub,
error = %err,
"lan: failed to initiate connection to discovered peer"
);
}
connect_budget = connect_budget.saturating_sub(1);
}
if skipped_budget > 0 {
debug!(
skipped = skipped_budget,
"lan: discovery connect budget exhausted"
);
}
}
pub(super) async fn poll_pending_connects(&mut self) {
if self.pending_connects.is_empty() {
return;
}
let mut completed = Vec::new();
for (i, pending) in self.pending_connects.iter().enumerate() {
let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
transport.connection_state(&pending.remote_addr)
} else {
crate::transport::ConnectionState::Failed("transport removed".into())
};
match state {
crate::transport::ConnectionState::Connected => {
completed.push((i, true, None));
}
crate::transport::ConnectionState::Failed(reason) => {
completed.push((i, false, Some(reason)));
}
crate::transport::ConnectionState::Connecting => {
}
crate::transport::ConnectionState::None => {
completed.push((i, false, Some("no connection attempt found".into())));
}
}
}
for (i, success, reason) in completed.into_iter().rev() {
let pending = self.pending_connects.remove(i);
if success {
if let Some(link) = self.links.get_mut(&pending.link_id) {
link.set_connected();
}
debug!(
peer = %self.peer_display_name(pending.peer_identity.node_addr()),
transport_id = %pending.transport_id,
remote_addr = %pending.remote_addr,
link_id = %pending.link_id,
"Transport connected, starting handshake"
);
if let Err(e) = self
.start_handshake(
pending.link_id,
pending.transport_id,
pending.remote_addr.clone(),
pending.peer_identity,
)
.await
{
warn!(
link_id = %pending.link_id,
error = %e,
"Failed to start handshake after transport connect"
);
self.remove_link(&pending.link_id);
}
} else {
let reason = reason.unwrap_or_default();
warn!(
peer = %self.peer_display_name(pending.peer_identity.node_addr()),
transport_id = %pending.transport_id,
remote_addr = %pending.remote_addr,
link_id = %pending.link_id,
reason = %reason,
"Transport connect failed"
);
self.remove_link(&pending.link_id);
self.links.remove(&pending.link_id);
self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
}
}
}
pub async fn start(&mut self) -> Result<(), NodeError> {
node_start_debug_log("Node::start begin");
if !self.state.can_start() {
return Err(NodeError::AlreadyStarted);
}
self.state = NodeState::Starting;
node_start_debug_log("Node::start state set to starting");
let packet_buffer_size = self.config.node.buffers.packet_channel;
let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
self.packet_tx = Some(packet_tx.clone());
self.packet_rx = Some(packet_rx);
node_start_debug_log("Node::start packet channel created");
node_start_debug_log("Node::start create transports begin");
let transport_handles = self.create_transports(&packet_tx).await;
node_start_debug_log(format!(
"Node::start create transports complete count={}",
transport_handles.len()
));
for mut handle in transport_handles {
let transport_id = handle.transport_id();
let transport_type = handle.transport_type().name;
let name = handle.name().map(|s| s.to_string());
node_start_debug_log(format!(
"Node::start transport start begin id={} type={} name={:?}",
transport_id, transport_type, name
));
match handle.start().await {
Ok(()) => {
node_start_debug_log(format!(
"Node::start transport start ok id={} type={}",
transport_id, transport_type
));
self.transports.insert(transport_id, handle);
}
Err(e) => {
node_start_debug_log(format!(
"Node::start transport start error id={} type={} error={}",
transport_id, transport_type, e
));
if let Some(ref n) = name {
warn!(transport_type, name = %n, error = %e, "Transport failed to start");
} else {
warn!(transport_type, error = %e, "Transport failed to start");
}
}
}
}
if !self.transports.is_empty() {
info!(count = self.transports.len(), "Transports initialized");
}
#[cfg(unix)]
{
if self.config.node.worker_pools_enabled {
node_start_debug_log("Node::start worker pools begin");
let cpu_default = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
.max(1);
let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(cpu_default)
.max(1);
self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
encrypt_worker_count,
));
info!(
workers = encrypt_worker_count,
"Spawned FMP-encrypt worker pool"
);
let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(cpu_default);
if decrypt_worker_count == 0 {
info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
} else {
self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
decrypt_worker_count,
));
info!(
workers = decrypt_worker_count,
"Spawned FMP+FSP-decrypt worker pool"
);
}
node_start_debug_log("Node::start worker pools complete");
} else {
node_start_debug_log("Node::start worker pools disabled");
info!("FIPS worker pools disabled; using in-line crypto/send path");
}
}
if self.config.node.discovery.nostr.enabled {
node_start_debug_log("Node::start nostr discovery start begin");
match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
.await
{
Ok(runtime) => {
node_start_debug_log("Node::start nostr discovery runtime created");
if let Err(err) = self.refresh_overlay_advert(&runtime).await {
warn!(error = %err, "Failed to publish initial Nostr overlay advert");
}
node_start_debug_log("Node::start nostr overlay advert refreshed");
self.nostr_discovery = Some(runtime);
self.nostr_discovery_started_at_ms = Some(Self::now_ms());
info!("Nostr overlay discovery enabled");
}
Err(err) => {
node_start_debug_log(format!(
"Node::start nostr discovery start error error={}",
err
));
warn!(error = %err, "Failed to start Nostr overlay discovery");
}
}
}
if self.config.node.discovery.lan.enabled {
node_start_debug_log("Node::start lan discovery start begin");
let advertised_udp_port = self
.transports
.values()
.filter(|h| h.is_operational())
.filter(|h| h.transport_type().name == "udp")
.find_map(|h| h.local_addr().map(|addr| addr.port()))
.unwrap_or(0);
let scope = self.lan_discovery_scope();
match crate::discovery::lan::LanDiscovery::start(
&self.identity,
scope,
advertised_udp_port,
self.config.node.discovery.lan.clone(),
)
.await
{
Ok(runtime) => {
node_start_debug_log("Node::start lan discovery start ok");
self.lan_discovery = Some(runtime);
info!("LAN mDNS discovery enabled");
}
Err(err) => {
node_start_debug_log(format!(
"Node::start lan discovery start error error={}",
err
));
debug!(error = %err, "LAN mDNS discovery not started");
}
}
}
self.start_local_instance_discovery();
self.poll_local_instance_discovery().await;
node_start_debug_log("Node::start initiate peer connections begin");
self.initiate_peer_connections().await;
node_start_debug_log("Node::start initiate peer connections complete");
if self.config.tun.enabled {
node_start_debug_log("Node::start tun init begin");
let address = *self.identity.address();
match TunDevice::create(&self.config.tun, address).await {
Ok(device) => {
let mtu = device.mtu();
let name = device.name().to_string();
let our_addr = *device.address();
info!("TUN device active:");
info!(" name: {}", name);
info!(" address: {}", device.address());
info!(" mtu: {}", mtu);
let effective_mtu = self.effective_ipv6_mtu();
let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20);
info!("effective MTU: {} bytes", effective_mtu);
debug!(" max TCP MSS: {} bytes", max_mss);
#[cfg(target_os = "macos")]
let (shutdown_read_fd, shutdown_write_fd) = {
let mut fds = [0i32; 2];
if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
"failed to create shutdown pipe".into(),
)));
}
(fds[0], fds[1])
};
let (writer, tun_tx) =
device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
let writer_handle = thread::spawn(move || {
writer.run();
});
let reader_tun_tx = tun_tx.clone();
let tun_channel_size = self.config.node.buffers.tun_channel;
let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
let transport_mtu = self.transport_mtu();
let path_mtu_lookup = self.path_mtu_lookup.clone();
#[cfg(target_os = "macos")]
let reader_handle = thread::spawn(move || {
run_tun_reader(
device,
mtu,
our_addr,
reader_tun_tx,
outbound_tx,
transport_mtu,
path_mtu_lookup,
shutdown_read_fd,
);
});
#[cfg(not(target_os = "macos"))]
let reader_handle = thread::spawn(move || {
run_tun_reader(
device,
mtu,
our_addr,
reader_tun_tx,
outbound_tx,
transport_mtu,
path_mtu_lookup,
);
});
self.tun_state = TunState::Active;
self.tun_name = Some(name);
self.tun_tx = Some(tun_tx);
self.tun_outbound_rx = Some(outbound_rx);
self.tun_reader_handle = Some(reader_handle);
self.tun_writer_handle = Some(writer_handle);
#[cfg(target_os = "macos")]
{
self.tun_shutdown_fd = Some(shutdown_write_fd);
}
}
Err(e) => {
self.tun_state = TunState::Failed;
warn!(error = %e, "Failed to initialize TUN, continuing without it");
}
}
node_start_debug_log("Node::start tun init complete");
}
if self.config.dns.enabled {
node_start_debug_log("Node::start dns init begin");
let addr_str = self.config.dns.bind_addr();
match addr_str.parse::<std::net::IpAddr>() {
Ok(ip) => {
let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
match Self::bind_dns_socket(bind) {
Ok(socket) => {
let dns_channel_size = self.config.node.buffers.dns_channel;
let (identity_tx, identity_rx) =
tokio::sync::mpsc::channel(dns_channel_size);
let dns_ttl = self.config.dns.ttl();
let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
self.config.peers(),
);
let reloader = if self.config.node.system_files_enabled {
let hosts_path = std::path::PathBuf::from(
crate::upper::hosts::DEFAULT_HOSTS_PATH,
);
crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
} else {
crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
};
let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
info!(
bind = %bind,
hosts = reloader.hosts().len(),
mesh_ifindex = ?mesh_ifindex,
"DNS responder started for .fips domain (auto-reload enabled)"
);
let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
socket,
identity_tx,
dns_ttl,
reloader,
mesh_ifindex,
));
self.dns_identity_rx = Some(identity_rx);
self.dns_task = Some(handle);
}
Err(e) => {
warn!(bind = %bind, error = %e, "Failed to start DNS responder");
}
}
}
Err(e) => {
warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
}
}
node_start_debug_log("Node::start dns init complete");
}
self.state = NodeState::Running;
node_start_debug_log("Node::start running");
info!("Node started:");
info!(" state: {}", self.state);
info!(" transports: {}", self.transports.len());
info!(" connections: {}", self.connections.len());
Ok(())
}
fn bind_dns_socket(
addr: std::net::SocketAddr,
) -> Result<tokio::net::UdpSocket, std::io::Error> {
use socket2::{Domain, Protocol, Socket, Type};
let domain = if addr.is_ipv4() {
Domain::IPV4
} else {
Domain::IPV6
};
let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
if addr.is_ipv6() {
sock.set_only_v6(false)?;
#[cfg(unix)]
Self::set_recv_pktinfo_v6(&sock)?;
}
sock.set_nonblocking(true)?;
sock.bind(&addr.into())?;
tokio::net::UdpSocket::from_std(sock.into())
}
#[cfg(unix)]
fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
use std::os::fd::AsRawFd;
let enable: libc::c_int = 1;
let ret = unsafe {
libc::setsockopt(
sock.as_raw_fd(),
libc::IPPROTO_IPV6,
libc::IPV6_RECVPKTINFO,
&enable as *const _ as *const libc::c_void,
std::mem::size_of::<libc::c_int>() as libc::socklen_t,
)
};
if ret < 0 {
return Err(std::io::Error::last_os_error());
}
Ok(())
}
fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
#[cfg(unix)]
{
let c_name = std::ffi::CString::new(name).ok()?;
let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
if idx == 0 { None } else { Some(idx) }
}
#[cfg(not(unix))]
{
let _ = name;
None
}
}
pub async fn stop(&mut self) -> Result<(), NodeError> {
if !self.state.can_stop() {
return Err(NodeError::NotStarted);
}
self.state = NodeState::Stopping;
info!(state = %self.state, "Node stopping");
if let Some(handle) = self.dns_task.take() {
handle.abort();
debug!("DNS responder stopped");
}
self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
.await;
if let Some(bootstrap) = self.nostr_discovery.take()
&& let Err(e) = bootstrap.shutdown().await
{
warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
}
if let Some(lan) = self.lan_discovery.take() {
lan.shutdown().await;
}
if let Some(registry) = self.local_instance_registry.take()
&& let Err(err) = registry.remove()
{
debug!(error = %err, "failed to remove same-host FIPS instance record");
}
let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
for transport_id in transport_ids {
if let Some(mut handle) = self.transports.remove(&transport_id) {
let transport_type = handle.transport_type().name;
match handle.stop().await {
Ok(()) => {
info!(transport_id = %transport_id, transport_type, "Transport stopped");
}
Err(e) => {
warn!(
transport_id = %transport_id,
transport_type,
error = %e,
"Transport stop failed"
);
}
}
}
}
self.packet_tx.take();
self.packet_rx.take();
if let Some(name) = self.tun_name.take() {
info!(name = %name, "Shutting down TUN interface");
self.tun_tx.take();
if let Err(e) = shutdown_tun_interface(&name).await {
warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
}
#[cfg(target_os = "macos")]
if let Some(fd) = self.tun_shutdown_fd.take() {
unsafe {
libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
libc::close(fd);
}
}
if let Some(handle) = self.tun_reader_handle.take() {
let _ = handle.join();
}
if let Some(handle) = self.tun_writer_handle.take() {
let _ = handle.join();
}
self.tun_state = TunState::Disabled;
}
self.state = NodeState::Stopped;
info!(state = %self.state, "Node stopped");
Ok(())
}
async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
let disconnect = Disconnect::new(reason);
let plaintext = disconnect.encode();
let peer_addrs: Vec<NodeAddr> = self
.peers
.iter()
.filter(|(_, peer)| peer.can_send() && peer.has_session())
.map(|(addr, _)| *addr)
.collect();
if peer_addrs.is_empty() {
debug!(
total_peers = self.peers.len(),
"No sendable peers for disconnect notification"
);
return;
}
let mut sent = 0usize;
for node_addr in &peer_addrs {
match self
.send_encrypted_link_message(node_addr, &plaintext)
.await
{
Ok(()) => sent += 1,
Err(e) => {
debug!(
peer = %self.peer_display_name(node_addr),
error = %e,
"Failed to send disconnect (transport may be down)"
);
}
}
}
info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
}
pub(in crate::node) fn static_peer_addresses(
&self,
peer_config: &PeerConfig,
) -> Vec<PeerAddress> {
peer_config
.addresses_by_priority()
.into_iter()
.cloned()
.collect()
}
async fn nostr_peer_fallback_addresses(
&self,
peer_config: &PeerConfig,
existing: &[PeerAddress],
) -> Vec<PeerAddress> {
if !self.config.node.discovery.nostr.enabled
|| self.config.node.discovery.nostr.policy
== crate::config::NostrDiscoveryPolicy::Disabled
{
return Vec::new();
}
let Some(bootstrap) = self.nostr_discovery.clone() else {
return Vec::new();
};
if self.nostr_cooldown_applies_to_peer_config(peer_config)
&& bootstrap
.cooldown_until(&peer_config.npub, Self::now_ms())
.is_some()
{
debug!(
npub = %peer_config.npub,
"Skipping cached Nostr fallback endpoints while peer is in traversal cooldown"
);
return Vec::new();
}
let endpoints = match bootstrap
.cached_advert_endpoints_for_peer(&peer_config.npub)
.await
{
Some(endpoints) => endpoints,
None => {
debug!(
npub = %peer_config.npub,
"No cached Nostr advert endpoints for configured peer"
);
return Vec::new();
}
};
let mut fallback = Vec::new();
let mut next_priority = existing
.iter()
.map(|addr| addr.priority)
.max()
.unwrap_or(100)
.saturating_add(1);
let seen_at_ms = Self::now_ms();
for endpoint in endpoints {
let Some(candidate) =
Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
else {
continue;
};
if existing
.iter()
.any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
|| fallback.iter().any(|addr: &PeerAddress| {
addr.transport == candidate.transport && addr.addr == candidate.addr
})
{
continue;
}
fallback.push(candidate);
next_priority = next_priority.saturating_add(1);
}
fallback
}
pub(in crate::node) async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
if !self.config.node.discovery.nostr.enabled
|| self.config.node.discovery.nostr.policy
== crate::config::NostrDiscoveryPolicy::Disabled
{
return false;
}
let Some(bootstrap) = self.nostr_discovery.clone() else {
return false;
};
let now_ms = Self::now_ms();
if self.nostr_cooldown_applies_to_peer_config(peer_config)
&& let Some(cooldown_until_ms) = bootstrap.cooldown_until(&peer_config.npub, now_ms)
{
debug!(
npub = %peer_config.npub,
cooldown_secs = cooldown_until_ms.saturating_sub(now_ms) / 1000,
"Skipping Nostr traversal request while peer is in cooldown"
);
return false;
}
bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
let mesh_signaling_allowed = self.mesh_signaling_allowed_for_peer(peer_config);
bootstrap
.request_connect_with_mesh_signaling(peer_config.clone(), mesh_signaling_allowed)
.await;
info!(
npub = %peer_config.npub,
mesh_signaling_allowed,
"Started background UDP NAT traversal attempt"
);
true
}
fn nostr_cooldown_applies_to_peer_config(&self, peer_config: &PeerConfig) -> bool {
!self.mesh_signaling_allowed_for_peer(peer_config)
}
pub(in crate::node) fn mesh_signaling_allowed_for_peer(
&self,
peer_config: &PeerConfig,
) -> bool {
let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
return false;
};
let peer_addr = identity.node_addr();
self.configured_peer(peer_addr).is_some()
}
fn overlay_endpoint_to_peer_address(
endpoint: &OverlayEndpointAdvert,
priority: u8,
seen_at_ms: u64,
) -> Option<PeerAddress> {
let transport = match endpoint.transport {
OverlayTransportKind::Udp => "udp",
OverlayTransportKind::Tcp => "tcp",
OverlayTransportKind::Tor => "tor",
OverlayTransportKind::WebRtc => "webrtc",
};
Some(
PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
.with_seen_at_ms(seen_at_ms),
)
}
async fn attempt_peer_address_list(
&mut self,
peer_config: &PeerConfig,
peer_identity: PeerIdentity,
allow_bootstrap_nat: bool,
addresses: &[PeerAddress],
) -> Result<(), NodeError> {
let mut attempted = false;
let mut local_route_error = None;
let peer_node_addr = *peer_identity.node_addr();
let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
for addr in addresses {
if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
if !allow_bootstrap_nat {
continue;
}
if self.request_nostr_bootstrap(peer_config).await {
attempted = true;
continue;
}
debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
continue;
}
let (transport_id, remote_addr) = if addr.transport == "ethernet" {
match self.resolve_ethernet_addr(&addr.addr) {
Ok(result) => result,
Err(e) => {
debug!(
transport = %addr.transport,
addr = %addr.addr,
error = %e,
"Failed to resolve Ethernet address"
);
continue;
}
}
} else if addr.transport == "ble" {
#[cfg(bluer_available)]
{
match self.resolve_ble_addr(&addr.addr) {
Ok(result) => result,
Err(e) => {
debug!(
transport = %addr.transport,
addr = %addr.addr,
error = %e,
"Failed to resolve BLE address"
);
continue;
}
}
}
#[cfg(not(bluer_available))]
{
debug!(transport = %addr.transport, "BLE transport not available on this build");
continue;
}
} else {
let tid = if addr.transport == "udp"
&& let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
{
match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
Some((id, _)) => id,
None => {
debug!(
transport = %addr.transport,
addr = %addr.addr,
"No compatible operational UDP transport for address"
);
continue;
}
}
} else {
match self.find_transport_for_type(&addr.transport) {
Some(id) => id,
None => {
debug!(
transport = %addr.transport,
addr = %addr.addr,
"No operational transport for address type"
);
continue;
}
}
};
(tid, TransportAddr::from_string(&addr.addr))
};
if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
attempted = true;
debug!(
npub = %peer_config.npub,
transport_id = %transport_id,
remote_addr = %remote_addr,
"Skipping duplicate in-flight candidate path"
);
continue;
}
if concrete_budget == 0 {
debug!(
npub = %peer_config.npub,
max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
"Path candidate race budget exhausted"
);
break;
}
match self
.initiate_connection(transport_id, remote_addr, peer_identity)
.await
{
Ok(()) => {
attempted = true;
concrete_budget = concrete_budget.saturating_sub(1);
}
Err(e @ NodeError::AccessDenied(_)) => return Err(e),
Err(e) => {
if e.is_local_route_unavailable() && local_route_error.is_none() {
local_route_error = Some(e.to_string());
}
debug!(
npub = %peer_config.npub,
transport_id = %transport_id,
error = %e,
"Connection attempt failed, trying next address"
);
}
}
}
if attempted {
return Ok(());
}
if let Some(error) = local_route_error {
return Err(NodeError::LocalRouteUnavailable(error));
}
Err(NodeError::NoTransportForType(format!(
"no operational transport for any of {}'s addresses",
peer_config.npub
)))
}
async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
self.run_open_discovery_sweep(bootstrap, None, "per-tick")
.await;
}
pub(in crate::node) fn queue_active_fallback_direct_retries(
&mut self,
_bootstrap: &std::sync::Arc<NostrDiscovery>,
) {
let now_ms = Self::now_ms();
let peer_configs = self
.config
.auto_connect_peers()
.cloned()
.collect::<Vec<_>>();
for peer_config in peer_configs {
let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
continue;
};
let node_addr = *peer_identity.node_addr();
if self.retry_pending.contains_key(&node_addr)
|| !self.peers.contains_key(&node_addr)
|| self.is_connecting_to_peer(&node_addr)
|| !self.active_peer_should_keep_direct_retry(&node_addr, &peer_config)
{
continue;
}
let mut state = super::retry::RetryState::new(peer_config.clone());
state.reconnect = true;
state.retry_after_ms = now_ms;
self.retry_pending.insert(node_addr, state);
debug!(
peer = %self.peer_display_name(&node_addr),
"Queued direct-path retry for active fallback peer"
);
}
}
pub(in crate::node) async fn run_open_discovery_sweep(
&mut self,
bootstrap: &std::sync::Arc<NostrDiscovery>,
max_age_secs: Option<u64>,
caller: &'static str,
) {
if !self.config.node.discovery.nostr.enabled
|| self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
{
return;
}
let configured_npubs = self
.config
.peers()
.iter()
.map(|peer| peer.npub.clone())
.collect::<HashSet<_>>();
let now_ms = Self::now_ms();
let now_secs = now_ms / 1000;
let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
if enqueue_budget == 0 {
debug!(
caller = %caller,
"open-discovery sweep: enqueue budget is 0, skipping"
);
return;
}
let candidates = bootstrap.cached_open_discovery_candidates(64).await;
let cached_count = candidates.len();
let mut enqueued = 0usize;
let mut skipped_age = 0usize;
let mut skipped_configured = 0usize;
let mut skipped_self = 0usize;
let mut skipped_connected = 0usize;
let mut skipped_retry_pending = 0usize;
let mut skipped_connecting = 0usize;
let mut skipped_no_endpoints = 0usize;
let mut skipped_invalid_npub = 0usize;
let mut skipped_cooldown = 0usize;
for (npub, endpoints, created_at_secs) in candidates {
if enqueue_budget == 0 {
break;
}
if let Some(max_age) = max_age_secs
&& now_secs.saturating_sub(created_at_secs) > max_age
{
skipped_age = skipped_age.saturating_add(1);
continue;
}
if configured_npubs.contains(&npub) {
if let Ok(identity) = PeerIdentity::from_npub(&npub) {
let configured_addr = *identity.node_addr();
if bootstrap.cooldown_until(&npub, now_ms).is_some() {
skipped_cooldown = skipped_cooldown.saturating_add(1);
skipped_configured = skipped_configured.saturating_add(1);
continue;
}
if let Some(state) = self.retry_pending.get_mut(&configured_addr)
&& state.retry_after_ms > now_ms
{
state.retry_after_ms = now_ms;
debug!(
caller = %caller,
peer = %self.peer_display_name(&configured_addr),
advert_age_secs = now_secs.saturating_sub(created_at_secs),
"Expediting configured-peer retry after fresh overlay advert"
);
}
}
skipped_configured = skipped_configured.saturating_add(1);
continue;
}
let peer_identity = match PeerIdentity::from_npub(&npub) {
Ok(identity) => identity,
Err(_) => {
skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
continue;
}
};
let node_addr = *peer_identity.node_addr();
if node_addr == *self.identity.node_addr() {
skipped_self = skipped_self.saturating_add(1);
continue;
}
if self.peers.contains_key(&node_addr) {
skipped_connected = skipped_connected.saturating_add(1);
continue;
}
if self.retry_pending.contains_key(&node_addr) {
skipped_retry_pending = skipped_retry_pending.saturating_add(1);
continue;
}
if bootstrap.cooldown_until(&npub, now_ms).is_some() {
skipped_cooldown = skipped_cooldown.saturating_add(1);
continue;
}
let connecting = self.connections.values().any(|conn| {
conn.expected_identity()
.map(|id| id.node_addr() == &node_addr)
.unwrap_or(false)
});
if connecting {
skipped_connecting = skipped_connecting.saturating_add(1);
continue;
}
let mut addresses = Vec::new();
let mut priority = 120u8;
let seen_at_ms = Self::now_ms();
for endpoint in endpoints {
let Some(candidate) =
Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
else {
continue;
};
if addresses.iter().any(|existing: &PeerAddress| {
existing.transport == candidate.transport && existing.addr == candidate.addr
}) {
continue;
}
addresses.push(candidate);
priority = priority.saturating_add(1);
}
if addresses.is_empty() {
skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
continue;
}
self.peer_aliases
.entry(node_addr)
.or_insert_with(|| peer_identity.short_npub());
self.register_identity(node_addr, peer_identity.pubkey_full());
let mut state = super::retry::RetryState::new(PeerConfig {
npub: npub.clone(),
alias: None,
addresses,
connect_policy: ConnectPolicy::AutoConnect,
auto_reconnect: true,
discovery_fallback_transit: false,
});
state.reconnect = false;
state.retry_after_ms = now_ms;
state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
self.retry_pending.insert(node_addr, state);
info!(
caller = %caller,
peer = %peer_identity.short_npub(),
advert_age_secs = now_secs.saturating_sub(created_at_secs),
"open-discovery sweep: queued retry for cached advert"
);
enqueue_budget = enqueue_budget.saturating_sub(1);
enqueued = enqueued.saturating_add(1);
}
let total_skipped = skipped_age
+ skipped_configured
+ skipped_self
+ skipped_connected
+ skipped_retry_pending
+ skipped_connecting
+ skipped_no_endpoints
+ skipped_invalid_npub
+ skipped_cooldown;
let should_summarize = caller == "startup" || enqueued > 0;
if should_summarize {
info!(
caller = %caller,
cached = cached_count,
queued = enqueued,
skipped_age = skipped_age,
skipped_configured = skipped_configured,
skipped_self = skipped_self,
skipped_connected = skipped_connected,
skipped_retry_pending = skipped_retry_pending,
skipped_connecting = skipped_connecting,
skipped_no_endpoints = skipped_no_endpoints,
skipped_invalid_npub = skipped_invalid_npub,
skipped_cooldown = skipped_cooldown,
skipped_total = total_skipped,
"open-discovery sweep complete"
);
}
}
async fn maybe_run_startup_open_discovery_sweep(
&mut self,
bootstrap: &std::sync::Arc<NostrDiscovery>,
) {
if self.startup_open_discovery_sweep_done {
return;
}
if !self.config.node.discovery.nostr.enabled
|| self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
{
self.startup_open_discovery_sweep_done = true;
return;
}
let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
return;
};
let now_ms = Self::now_ms();
let delay_ms = self
.config
.node
.discovery
.nostr
.startup_sweep_delay_secs
.saturating_mul(1000);
if now_ms < started_at_ms.saturating_add(delay_ms) {
return;
}
let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
.await;
self.startup_open_discovery_sweep_done = true;
}
fn available_outbound_slots(&self) -> usize {
let connection_used = self
.connections
.len()
.saturating_add(self.pending_connects.len());
let connection_slots = if self.max_connections == 0 {
usize::MAX
} else {
self.max_connections.saturating_sub(connection_used)
};
let peer_slots = if self.max_peers == 0 {
usize::MAX
} else {
self.max_peers.saturating_sub(self.peers.len())
};
let link_slots = if self.max_links == 0 {
usize::MAX
} else {
self.max_links.saturating_sub(self.links.len())
};
connection_slots.min(peer_slots).min(link_slots)
}
pub(in crate::node) fn open_discovery_enqueue_budget(
&self,
configured_npubs: &HashSet<String>,
) -> usize {
let current_open_discovery_active = self
.peers
.values()
.filter(|peer| !configured_npubs.contains(&peer.npub()))
.count();
let current_open_discovery_pending = self
.retry_pending
.values()
.filter(|state| !configured_npubs.contains(&state.peer_config.npub))
.count();
let cap_remaining = self
.config
.node
.discovery
.nostr
.open_discovery_max_pending
.saturating_sub(current_open_discovery_active)
.saturating_sub(current_open_discovery_pending);
cap_remaining.min(self.available_outbound_slots())
}
fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
now_ms.saturating_add(
self.config
.node
.discovery
.nostr
.advert_ttl_secs
.saturating_mul(1000)
.saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
)
}
async fn build_overlay_advert(
&self,
bootstrap: &std::sync::Arc<NostrDiscovery>,
) -> Option<OverlayAdvert> {
if !self.config.node.discovery.nostr.enabled {
return None;
}
let mut endpoints = Vec::new();
let mut has_udp_nat = false;
let mut has_webrtc = false;
for handle in self.transports.values() {
if !handle.is_operational() {
continue;
}
match handle.transport_type().name {
"udp" => {
let Some(cfg) = self.lookup_udp_config(handle.name()) else {
continue;
};
if !cfg.advertise_on_nostr() {
continue;
}
if cfg.is_public() {
if let Some(explicit) = cfg.external_advert_addr() {
endpoints.push(OverlayEndpointAdvert {
transport: OverlayTransportKind::Udp,
addr: explicit.to_string(),
});
} else {
match handle.local_addr() {
Some(addr)
if !addr.ip().is_unspecified()
&& !is_unroutable_advert_ip(addr.ip()) =>
{
endpoints.push(OverlayEndpointAdvert {
transport: OverlayTransportKind::Udp,
addr: addr.to_string(),
});
}
Some(addr) => {
let key = handle.transport_id().as_u32();
let port = addr.port();
if let Some(public) =
bootstrap.learn_public_udp_addr(key, port).await
{
endpoints.push(OverlayEndpointAdvert {
transport: OverlayTransportKind::Udp,
addr: public.to_string(),
});
} else {
warn!(
transport_id = key,
bind_addr = %addr,
"advert: udp public=true but bind is wildcard \
or private and STUN observation failed; \
advertising no UDP endpoint. Either set \
transports.udp.external_addr, bind to a \
specific *public* IP, or ensure \
node.discovery.nostr.stun_servers is reachable"
);
}
}
None => {}
}
}
} else {
endpoints.push(OverlayEndpointAdvert {
transport: OverlayTransportKind::Udp,
addr: "nat".to_string(),
});
has_udp_nat = true;
}
}
"webrtc" => {
let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
continue;
};
if !cfg.advertise_on_nostr() {
continue;
}
endpoints.push(OverlayEndpointAdvert {
transport: OverlayTransportKind::WebRtc,
addr: hex::encode(self.identity.pubkey_full().serialize()),
});
has_webrtc = true;
}
"tcp" => {
let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
continue;
};
if !cfg.advertise_on_nostr() {
continue;
}
if let Some(explicit) = cfg.external_advert_addr() {
endpoints.push(OverlayEndpointAdvert {
transport: OverlayTransportKind::Tcp,
addr: explicit.to_string(),
});
} else {
match handle.local_addr() {
Some(addr)
if !addr.ip().is_unspecified()
&& !is_unroutable_advert_ip(addr.ip()) =>
{
endpoints.push(OverlayEndpointAdvert {
transport: OverlayTransportKind::Tcp,
addr: addr.to_string(),
});
}
Some(addr) => {
warn!(
bind_addr = %addr,
"advert: tcp advertise_on_nostr=true bound to wildcard \
or private IP and no transports.tcp.external_addr set; \
advertising no TCP endpoint. Either set external_addr \
to the public IP (recommended for cloud 1:1-NAT setups) \
or bind explicitly to the public IP"
);
}
None => {}
}
}
}
"tor" => {
let Some(cfg) = self.lookup_tor_config(handle.name()) else {
continue;
};
if !cfg.advertise_on_nostr() {
continue;
}
if let Some(addr) = handle.onion_address() {
endpoints.push(OverlayEndpointAdvert {
transport: OverlayTransportKind::Tor,
addr: format!("{}:{}", addr, cfg.advertised_port()),
});
}
}
_ => {}
}
}
if endpoints.is_empty() {
return None;
}
Some(OverlayAdvert {
identifier: ADVERT_IDENTIFIER.to_string(),
version: ADVERT_VERSION,
endpoints,
signal_relays: (has_udp_nat || has_webrtc)
.then(|| self.config.node.discovery.nostr.dm_relays.clone()),
stun_servers: (has_udp_nat || has_webrtc)
.then(|| self.config.node.discovery.nostr.stun_servers.clone()),
})
}
async fn refresh_overlay_advert(
&self,
bootstrap: &std::sync::Arc<NostrDiscovery>,
) -> Result<(), crate::discovery::nostr::BootstrapError> {
let advert = self.build_overlay_advert(bootstrap).await;
bootstrap.update_local_advert(advert).await
}
fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
match (&self.config.transports.udp, transport_name) {
(crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
(crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
_ => None,
}
}
fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
match (&self.config.transports.tcp, transport_name) {
(crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
(crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
_ => None,
}
}
fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
match (&self.config.transports.tor, transport_name) {
(crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
(crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
_ => None,
}
}
fn lookup_webrtc_config(
&self,
transport_name: Option<&str>,
) -> Option<&crate::config::WebRtcConfig> {
match (&self.config.transports.webrtc, transport_name) {
(crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
(crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
_ => None,
}
}
pub(in crate::node) async fn try_peer_addresses(
&mut self,
peer_config: &PeerConfig,
peer_identity: PeerIdentity,
allow_bootstrap_nat: bool,
) -> Result<(), NodeError> {
let peer_node_addr = *peer_identity.node_addr();
if self.peers.contains_key(&peer_node_addr) {
debug!(
npub = %peer_config.npub,
"Peer already exists, skipping address attempts"
);
return Ok(());
}
let candidates = self.peer_address_candidates(peer_config).await;
if candidates.is_empty() {
if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
return Ok(());
}
return Err(NodeError::NoTransportForType(format!(
"no addresses known for {}",
peer_config.npub
)));
}
if self
.attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
.await
.is_ok()
{
if allow_bootstrap_nat {
self.request_nostr_bootstrap(peer_config).await;
}
return Ok(());
}
if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
return Ok(());
}
Err(NodeError::NoTransportForType(format!(
"no operational transport for any of {}'s addresses",
peer_config.npub
)))
}
async fn try_active_peer_alternative_addresses(
&mut self,
peer_config: &PeerConfig,
peer_identity: PeerIdentity,
) -> Result<bool, NodeError> {
let peer_node_addr = *peer_identity.node_addr();
let candidates = self.peer_address_candidates(peer_config).await;
let should_try_nostr =
self.active_peer_should_keep_direct_retry(&peer_node_addr, peer_config);
if candidates.is_empty() {
if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
return Ok(true);
}
return Err(NodeError::NoTransportForType(format!(
"no addresses known for {}",
peer_config.npub
)));
}
let alternatives: Vec<_> = candidates
.into_iter()
.filter(|addr| !self.active_peer_matches_candidate(&peer_node_addr, addr))
.collect();
if alternatives.is_empty() {
if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
return Ok(true);
}
return Ok(false);
}
let needs_separate_nostr_attempt = should_try_nostr
&& !alternatives
.iter()
.any(|addr| addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat"));
let address_result = self
.attempt_peer_address_list(peer_config, peer_identity, true, &alternatives)
.await;
let nostr_attempted =
needs_separate_nostr_attempt && self.request_nostr_bootstrap(peer_config).await;
match address_result {
Ok(()) => Ok(true),
Err(err) if nostr_attempted => {
debug!(
npub = %peer_config.npub,
error = %err,
"Static active-peer direct-path alternatives failed; Nostr traversal still queued"
);
Ok(true)
}
Err(err) => Err(err),
}
}
async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
let static_addresses = self.static_peer_addresses(peer_config);
let overlay_addresses = self
.nostr_peer_fallback_addresses(peer_config, &static_addresses)
.await;
let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
for addr in overlay_addresses.into_iter().chain(static_addresses) {
if !candidates.iter().any(|existing: &PeerAddress| {
existing.transport == addr.transport && existing.addr == addr.addr
}) {
candidates.push(addr);
}
}
candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
_ if a.priority != b.priority => a.priority.cmp(&b.priority),
(Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
(None, None) => std::cmp::Ordering::Equal,
});
candidates
}
fn active_peer_matches_any_candidate(
&self,
peer_node_addr: &NodeAddr,
candidates: &[PeerAddress],
) -> bool {
candidates
.iter()
.any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
}
pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
&self,
peer_node_addr: &NodeAddr,
candidates: &[PeerAddress],
) -> bool {
if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
return false;
}
!self.active_peer_needs_same_path_refresh(peer_node_addr)
}
pub(in crate::node) fn active_peer_should_keep_direct_retry(
&self,
peer_node_addr: &NodeAddr,
peer_config: &PeerConfig,
) -> bool {
let Some(peer) = self.peers.get(peer_node_addr) else {
return false;
};
let static_addresses = self.static_peer_addresses(peer_config);
if !static_addresses.is_empty() {
return !self
.active_peer_candidate_is_fresh_enough_to_skip(peer_node_addr, &static_addresses);
}
if peer_config.npub.is_empty() {
return false;
}
if !self.config.node.discovery.nostr.enabled
|| self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Disabled
{
return false;
}
peer.transport_id()
.and_then(|id| self.transports.get(&id))
.map(|transport| transport.transport_type().name != "udp")
.unwrap_or(true)
}
pub(in crate::node) fn clear_retry_unless_direct_refresh_needed(
&mut self,
peer_node_addr: &NodeAddr,
) {
let keep_retry = self
.retry_pending
.get(peer_node_addr)
.map(|state| state.peer_config.clone())
.is_some_and(|peer_config| {
self.active_peer_should_keep_direct_retry(peer_node_addr, &peer_config)
});
if !keep_retry {
self.retry_pending.remove(peer_node_addr);
}
}
fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
let Some(peer) = self.peers.get(peer_node_addr) else {
return false;
};
let stale_after_ms = self
.config
.node
.heartbeat_interval_secs
.saturating_mul(1000)
.max(1000);
peer.idle_time(Self::now_ms()) > stale_after_ms
}
pub(in crate::node) fn active_peer_matches_candidate(
&self,
peer_node_addr: &NodeAddr,
candidate: &PeerAddress,
) -> bool {
let Some(peer) = self.peers.get(peer_node_addr) else {
return false;
};
let Some(current_addr) = peer.current_addr() else {
return false;
};
if let Some(peer_transport_id) = peer.transport_id()
&& let Some((candidate_transport_id, candidate_addr)) =
self.resolve_peer_address_for_match(candidate)
{
return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
}
if peer
.transport_id()
.map(|id| self.bootstrap_transports.contains(&id))
.unwrap_or(false)
{
return false;
}
let current_addr = current_addr.to_string();
let current_transport = peer
.transport_id()
.and_then(|id| self.transports.get(&id))
.map(|transport| transport.transport_type().name);
candidate.addr == current_addr
&& current_transport
.map(|transport| transport == candidate.transport)
.unwrap_or(true)
}
pub(in crate::node) fn active_peer_uses_recent_endpoint_path(
&self,
peer_node_addr: &NodeAddr,
peer_config: &PeerConfig,
) -> bool {
peer_config.addresses.iter().any(|addr| {
addr.seen_at_ms.is_some() && self.active_peer_matches_candidate(peer_node_addr, addr)
})
}
pub(in crate::node) fn active_peer_uses_traversal_path(
&self,
peer_node_addr: &NodeAddr,
peer_config: &PeerConfig,
) -> bool {
let via_bootstrap_transport = self
.peers
.get(peer_node_addr)
.and_then(|peer| peer.transport_id())
.map(|id| self.bootstrap_transports.contains(&id))
.unwrap_or(false);
via_bootstrap_transport
|| self.active_peer_uses_recent_endpoint_path(peer_node_addr, peer_config)
}
pub(crate) async fn api_connect(
&mut self,
npub: &str,
address: &str,
transport: &str,
) -> Result<serde_json::Value, String> {
let peer_config = PeerConfig {
npub: npub.to_string(),
alias: None,
addresses: vec![PeerAddress::new(transport, address)],
connect_policy: ConnectPolicy::Manual,
auto_reconnect: false,
discovery_fallback_transit: true,
};
if let Ok(identity) = PeerIdentity::from_npub(npub) {
self.peer_aliases
.insert(*identity.node_addr(), identity.short_npub());
self.register_identity(*identity.node_addr(), identity.pubkey_full());
}
self.initiate_peer_connection(&peer_config)
.await
.map(|()| {
info!(
npub = %npub,
address = %address,
transport = %transport,
"API connect initiated"
);
serde_json::json!({
"npub": npub,
"address": address,
"transport": transport,
})
})
.map_err(|e| e.to_string())
}
pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
let peer_identity =
PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
let node_addr = *peer_identity.node_addr();
if !self.peers.contains_key(&node_addr) {
return Err(format!("peer not found: {npub}"));
}
self.remove_active_peer(&node_addr);
self.retry_pending.remove(&node_addr);
info!(npub = %npub, "API disconnect completed");
Ok(serde_json::json!({
"npub": npub,
"disconnected": true,
}))
}
pub async fn adopt_established_traversal(
&mut self,
traversal: EstablishedTraversal,
) -> Result<BootstrapHandoffResult, NodeError> {
debug!(
peer_npub = %traversal.peer_npub,
session_id = %traversal.session_id,
remote_addr = %traversal.remote_addr,
"adopting established traversal socket"
);
if !self.state.is_operational() {
return Err(NodeError::NotStarted);
}
let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
NodeError::InvalidPeerNpub {
npub: traversal.peer_npub.clone(),
reason: e.to_string(),
}
})?;
let peer_node_addr = *peer_identity.node_addr();
if self.peers.contains_key(&peer_node_addr) {
debug!(
peer_npub = %traversal.peer_npub,
"Adopting NAT traversal handoff as alternate path for already-connected peer"
);
}
self.peer_aliases
.insert(peer_node_addr, peer_identity.short_npub());
self.register_identity(peer_node_addr, peer_identity.pubkey_full());
let transport_id = self.allocate_transport_id();
let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
let mut cfg = self
.lookup_udp_config(traversal.transport_name.as_deref())
.or_else(|| self.lookup_udp_config(None))
.cloned()
.unwrap_or_default();
cfg.bind_addr = None;
cfg.external_addr = None;
cfg
});
let mut transport = crate::transport::udp::UdpTransport::new(
transport_id,
traversal.transport_name.clone(),
inherited_config,
packet_tx,
);
transport
.adopt_socket_async(traversal.socket)
.await
.map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
let local_addr = transport.local_addr().ok_or_else(|| {
NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
})?;
self.transports.insert(
transport_id,
crate::transport::TransportHandle::Udp(transport),
);
self.bootstrap_transports.insert(transport_id);
self.bootstrap_transport_npubs
.insert(transport_id, traversal.peer_npub.clone());
let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
if let Err(err) = self
.initiate_connection(transport_id, remote_addr.clone(), peer_identity)
.await
{
self.bootstrap_transports.remove(&transport_id);
self.bootstrap_transport_npubs.remove(&transport_id);
if let Some(mut handle) = self.transports.remove(&transport_id) {
let _ = handle.stop().await;
}
return Err(err);
}
info!(
peer = %self.peer_display_name(&peer_node_addr),
transport_id = %transport_id,
local_addr = %local_addr,
remote_addr = %traversal.remote_addr,
session_id = %traversal.session_id,
"adopted NAT traversal socket; handshake initiated"
);
Ok(BootstrapHandoffResult {
transport_id,
local_addr,
remote_addr: traversal.remote_addr,
peer_node_addr,
session_id: traversal.session_id,
})
}
}