Skip to main content

fips_core/node/
lifecycle.rs

1//! Node lifecycle management: start, stop, and peer connection initiation.
2
3use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, NostrDiscoveryPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6    ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, MeshTraversalSignal, NostrDiscovery,
7    OverlayAdvert, OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason, SessionMessageType};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use secp256k1::PublicKey;
18use std::collections::{HashMap, HashSet};
19use std::net::{IpAddr, SocketAddr};
20use std::thread;
21use std::time::Duration;
22use tracing::{debug, info, warn};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25enum MeshSignalSessionAction {
26    Send,
27    Defer,
28    Drop,
29}
30
31#[cfg(debug_assertions)]
32fn node_start_debug_log(message: impl AsRef<str>) {
33    use std::io::Write as _;
34
35    if let Ok(mut file) = std::fs::OpenOptions::new()
36        .create(true)
37        .append(true)
38        .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
39    {
40        let _ = writeln!(
41            file,
42            "{:?} {}",
43            std::time::SystemTime::now(),
44            message.as_ref()
45        );
46    }
47}
48
49#[cfg(not(debug_assertions))]
50fn node_start_debug_log(_message: impl AsRef<str>) {}
51
52/// True if `ip` is not a viable canonical advert endpoint for peers off
53/// the publisher's own LAN. Covers RFC1918, loopback, link-local, IPv4
54/// CGNAT (100.64/10), unspecified, multicast/benchmark, and IPv6
55/// unique-local/loopback/unspecified. We never publish these as the
56/// peer's primary `runtime_endpoint`; an off-LAN consumer can't route
57/// to them, and latching one in onto a slow overlay-relay fallback is
58/// the original bug this guard exists to prevent.
59fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
60    match ip {
61        IpAddr::V4(v4) => {
62            v4.is_private()
63                || v4.is_loopback()
64                || v4.is_link_local()
65                || v4.is_unspecified()
66                || v4.is_multicast()
67                || v4.is_broadcast()
68                || v4.is_documentation()
69                // 100.64.0.0/10 — CGNAT, RFC 6598. Not routable on the
70                // public internet; behaves like an extra NAT layer.
71                || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
72        }
73        IpAddr::V6(v6) => {
74            v6.is_loopback()
75                || v6.is_unspecified()
76                || v6.is_unique_local()
77                || v6.is_multicast()
78                // IPv6 link-local: fe80::/10
79                || (v6.segments()[0] & 0xffc0) == 0xfe80
80        }
81    }
82}
83
84fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
85    matches!(
86        (local, remote),
87        (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
88    )
89}
90
91const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
92const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
93const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
94const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
95
96impl Node {
97    /// Initiate connections to configured static peers.
98    ///
99    /// For each peer configured with AutoConnect policy, creates a link and
100    /// peer entry, then starts the Noise handshake by sending the first message.
101    /// Replace the runtime peer list. Newly added auto-connect peers get
102    /// `initiate_peer_connection` immediately; removed peers are dropped from
103    /// the retry queue (the regular liveness timeout reaps any active session
104    /// — we don't proactively disconnect, since the same npub might be on its
105    /// way back via a fresh advert). Existing auto-connect entries with new
106    /// direct addresses are dialed immediately, and retry state is refreshed
107    /// so later attempts also use the latest hints.
108    pub(super) async fn update_peers(
109        &mut self,
110        new_peers: Vec<crate::config::PeerConfig>,
111    ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
112        use std::collections::{HashMap, HashSet};
113
114        let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
115            HashMap::with_capacity(new_peers.len());
116        let mut new_order = Vec::with_capacity(new_peers.len());
117        for peer in new_peers {
118            let identity = match PeerIdentity::from_npub(&peer.npub) {
119                Ok(id) => id,
120                Err(e) => {
121                    return Err(crate::node::NodeError::InvalidPeerNpub {
122                        npub: peer.npub.clone(),
123                        reason: e.to_string(),
124                    });
125                }
126            };
127            // Last-write-wins on duplicates so callers passing a multi-source
128            // candidate list (e.g. operator hints + recent-peers cache for
129            // the same npub) get the merge they meant.
130            let node_addr = *identity.node_addr();
131            if !new_by_addr.contains_key(&node_addr) {
132                new_order.push(node_addr);
133            }
134            new_by_addr.insert(node_addr, peer);
135        }
136
137        let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
138            .config
139            .peers()
140            .iter()
141            .filter_map(|pc| {
142                PeerIdentity::from_npub(&pc.npub)
143                    .ok()
144                    .map(|id| (*id.node_addr(), pc.clone()))
145            })
146            .collect();
147
148        let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
149        let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
150
151        let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
152        let added: Vec<_> = new_addrs.difference(&current_addrs).copied().collect();
153        let kept: Vec<_> = new_addrs.intersection(&current_addrs).copied().collect();
154
155        let mut outcome = crate::node::UpdatePeersOutcome::default();
156
157        for node_addr in &removed {
158            if self.retry_pending.remove(node_addr).is_some() {
159                debug!(
160                    peer = %self.peer_display_name(node_addr),
161                    "Dropping retry entry for peer removed from runtime peer list"
162                );
163            }
164            self.peer_aliases.remove(node_addr);
165            self.set_discovery_fallback_transit_allowed(*node_addr, false);
166            outcome.removed += 1;
167        }
168
169        let mut auto_connect_refresh_configs = Vec::new();
170        for node_addr in &kept {
171            let new_pc = &new_by_addr[node_addr];
172            let current_pc = &current_by_addr[node_addr];
173            if new_pc.addresses != current_pc.addresses
174                || new_pc.alias != current_pc.alias
175                || new_pc.connect_policy != current_pc.connect_policy
176                || new_pc.auto_reconnect != current_pc.auto_reconnect
177                || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
178            {
179                outcome.updated += 1;
180                self.set_discovery_fallback_transit_allowed(
181                    *node_addr,
182                    new_pc.discovery_fallback_transit,
183                );
184                if let Some(state) = self.retry_pending.get_mut(node_addr) {
185                    state.peer_config = new_pc.clone();
186                    state.retry_after_ms = Self::now_ms();
187                }
188                if let Some(alias) = new_pc.alias.clone() {
189                    self.peer_aliases.insert(*node_addr, alias);
190                }
191                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
192                    auto_connect_refresh_configs.push(new_pc.clone());
193                }
194            } else {
195                outcome.unchanged += 1;
196                self.set_discovery_fallback_transit_allowed(
197                    *node_addr,
198                    new_pc.discovery_fallback_transit,
199                );
200                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
201                    auto_connect_refresh_configs.push(new_pc.clone());
202                }
203            }
204        }
205
206        let added_configs: Vec<crate::config::PeerConfig> = new_order
207            .iter()
208            .filter(|addr| added.contains(addr))
209            .map(|addr| new_by_addr[addr].clone())
210            .collect();
211
212        // Replace the live config peer list before initiating connections so
213        // any helper that consults `self.config.peers()` during the dial
214        // (alias lookup, retry-state seeding) sees the new entries.
215        self.config.peers = new_order
216            .iter()
217            .filter_map(|addr| new_by_addr.get(addr).cloned())
218            .collect();
219        self.configured_peer_send_weights = Self::configured_peer_send_weights(&self.config);
220
221        for peer_config in added_configs {
222            outcome.added += 1;
223            let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
224                continue;
225            };
226            let name = peer_config
227                .alias
228                .clone()
229                .unwrap_or_else(|| identity.short_npub());
230            self.peer_aliases.insert(*identity.node_addr(), name);
231            self.set_discovery_fallback_transit_allowed(
232                *identity.node_addr(),
233                peer_config.discovery_fallback_transit,
234            );
235            self.register_identity(*identity.node_addr(), identity.pubkey_full());
236
237            if !peer_config.is_auto_connect() {
238                continue;
239            }
240
241            match self
242                .try_auto_connect_graph_session(&peer_config, identity)
243                .await
244            {
245                Ok(true) => continue,
246                Ok(false) => {}
247                Err(err) => {
248                    debug!(
249                        npub = %peer_config.npub,
250                        error = %err,
251                        "Existing FIPS graph did not warm newly added peer"
252                    );
253                }
254            }
255
256            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
257                warn!(
258                    npub = %peer_config.npub,
259                    error = %e,
260                    "Failed to initiate connection for newly added peer"
261                );
262                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
263                    self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
264                }
265                if matches!(e, crate::node::NodeError::NoTransportForType(_))
266                    && let Some(bootstrap) = self.nostr_discovery.clone()
267                {
268                    bootstrap
269                        .request_advert_stale_check(peer_config.npub.clone())
270                        .await;
271                }
272            }
273        }
274
275        for peer_config in auto_connect_refresh_configs {
276            let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
277                continue;
278            };
279            let node_addr = *peer_identity.node_addr();
280
281            if self.peers.contains_key(&node_addr) {
282                match self
283                    .initiate_active_peer_alternative_connection(&peer_config)
284                    .await
285                {
286                    Ok(attempted) => {
287                        if attempted {
288                            debug!(
289                                peer = %self.peer_display_name(&node_addr),
290                                "Started non-disruptive alternate-path handshake for active peer"
291                            );
292                        }
293                    }
294                    Err(e) => {
295                        debug!(
296                            npub = %peer_config.npub,
297                            error = %e,
298                            "Active peer alternate-path refresh did not start"
299                        );
300                    }
301                }
302                continue;
303            }
304
305            match self
306                .try_auto_connect_graph_session(&peer_config, peer_identity)
307                .await
308            {
309                Ok(true) => continue,
310                Ok(false) => {}
311                Err(err) => {
312                    debug!(
313                        npub = %peer_config.npub,
314                        error = %err,
315                        "Existing FIPS graph did not warm refreshed peer"
316                    );
317                }
318            }
319
320            match self.initiate_peer_connection(&peer_config).await {
321                Ok(()) => {
322                    let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
323                    if let Some(state) = self.retry_pending.get_mut(&node_addr) {
324                        state.peer_config = peer_config;
325                        state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
326                    }
327                }
328                Err(e) => {
329                    debug!(
330                        npub = %peer_config.npub,
331                        error = %e,
332                        "Refreshed peer addresses did not initiate a direct connection"
333                    );
334                    self.schedule_retry_after_error(node_addr, Self::now_ms(), &e);
335                }
336            }
337        }
338
339        self.warm_auto_connect_graph_sessions().await;
340
341        Ok(outcome)
342    }
343
344    pub(super) async fn initiate_peer_connections(&mut self) {
345        // Build display name map from all configured peers (alias or short npub),
346        // and pre-seed the identity cache from each peer's npub so that TUN packets
347        // addressed to a configured peer can be dispatched (and trigger session
348        // initiation) immediately on startup — without waiting for the link-layer
349        // handshake to complete first.
350        let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
351            .config
352            .peers()
353            .iter()
354            .filter_map(|pc| {
355                PeerIdentity::from_npub(&pc.npub)
356                    .ok()
357                    .map(|id| (id, pc.alias.clone()))
358            })
359            .collect();
360
361        for (identity, alias) in peer_identities {
362            let name = alias.unwrap_or_else(|| identity.short_npub());
363            self.peer_aliases.insert(*identity.node_addr(), name);
364            // Pre-seed identity cache. The parity may be wrong (npub is x-only)
365            // but will be corrected to the real value when the peer is promoted
366            // after a successful Noise handshake.
367            self.register_identity(*identity.node_addr(), identity.pubkey_full());
368        }
369
370        // Collect peer configs to avoid borrow conflicts
371        let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
372
373        if peer_configs.is_empty() {
374            debug!("No static peers configured");
375            return;
376        }
377
378        debug!(
379            count = peer_configs.len(),
380            "Initiating static peer connections"
381        );
382
383        for peer_config in peer_configs {
384            let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
385                Ok(identity) => identity,
386                Err(_) => continue,
387            };
388            match self
389                .try_auto_connect_graph_session(&peer_config, peer_identity)
390                .await
391            {
392                Ok(true) => continue,
393                Ok(false) => {}
394                Err(err) => {
395                    debug!(
396                        npub = %peer_config.npub,
397                        error = %err,
398                        "Existing FIPS graph did not warm auto-connect peer"
399                    );
400                }
401            }
402            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
403                warn!(
404                    npub = %peer_config.npub,
405                    alias = ?peer_config.alias,
406                    error = %e,
407                    "Failed to initiate peer connection"
408                );
409                // Schedule a retry so transient address-resolution failures
410                // (e.g. cached endpoints stale, NAT rebinds, all addresses
411                // currently unreachable) recover without a daemon restart.
412                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
413                    self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
414                }
415                // No-transport failures most often mean the cached overlay
416                // advert is pointing at a dead post-NAT-rebind address. The
417                // advert cache is read-only inside fetch_advert, so retries
418                // would loop on the same dead address until expiry. Force a
419                // re-fetch so the next retry tick picks up fresh endpoints.
420                if matches!(e, crate::node::NodeError::NoTransportForType(_))
421                    && let Some(bootstrap) = self.nostr_discovery.clone()
422                {
423                    bootstrap
424                        .request_advert_stale_check(peer_config.npub.clone())
425                        .await;
426                }
427            }
428        }
429
430        self.warm_auto_connect_graph_sessions().await;
431    }
432
433    pub(in crate::node) async fn try_auto_connect_graph_session(
434        &mut self,
435        peer_config: &PeerConfig,
436        peer_identity: PeerIdentity,
437    ) -> Result<bool, NodeError> {
438        if !peer_config.is_auto_connect() {
439            return Ok(false);
440        }
441
442        let peer_node_addr = *peer_identity.node_addr();
443        if self.peers.contains_key(&peer_node_addr) {
444            return Ok(false);
445        }
446        if self.auto_connect_should_race_direct_path(peer_config) {
447            return Ok(false);
448        }
449        if self
450            .sessions
451            .get(&peer_node_addr)
452            .is_some_and(|entry| entry.is_established() || entry.is_initiating())
453        {
454            return Ok(true);
455        }
456        if self.find_next_hop(&peer_node_addr).is_none() {
457            return Ok(false);
458        }
459
460        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
461        match self
462            .initiate_session(peer_node_addr, peer_identity.pubkey_full())
463            .await
464        {
465            Ok(()) => {
466                debug!(
467                    peer = %self.peer_display_name(&peer_node_addr),
468                    "Warmed auto-connect peer session over existing FIPS graph"
469                );
470                Ok(true)
471            }
472            Err(NodeError::SendFailed { node_addr, reason })
473                if node_addr == peer_node_addr && reason == "no route to destination" =>
474            {
475                self.maybe_initiate_lookup(&peer_node_addr).await;
476                Ok(false)
477            }
478            Err(err) => Err(err),
479        }
480    }
481
482    fn auto_connect_should_race_direct_path(&self, peer_config: &PeerConfig) -> bool {
483        !peer_config.addresses.is_empty() || self.config.node.discovery.nostr.enabled
484    }
485
486    /// Initiate a connection to a single peer.
487    ///
488    /// Creates a link, starts the Noise handshake, and sends the first message.
489    pub(super) async fn initiate_peer_connection(
490        &mut self,
491        peer_config: &crate::config::PeerConfig,
492    ) -> Result<(), NodeError> {
493        self.initiate_peer_connection_inner(peer_config).await
494    }
495
496    /// Initiate a connection from the retry path. Identical to
497    /// [`initiate_peer_connection`] today — both paths fan out across every
498    /// known address (overlay-fresh first, then operator/cache hints) in a
499    /// single pass. The two entry points stay separate so callers can be
500    /// distinguished in tracing.
501    pub(super) async fn initiate_peer_retry_connection(
502        &mut self,
503        peer_config: &crate::config::PeerConfig,
504    ) -> Result<(), NodeError> {
505        self.initiate_peer_connection_inner(peer_config).await
506    }
507
508    pub(in crate::node) async fn initiate_active_peer_alternative_connection(
509        &mut self,
510        peer_config: &crate::config::PeerConfig,
511    ) -> Result<bool, NodeError> {
512        let peer_identity =
513            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
514                npub: peer_config.npub.clone(),
515                reason: e.to_string(),
516            })?;
517        let peer_node_addr = *peer_identity.node_addr();
518
519        if !self.peers.contains_key(&peer_node_addr) {
520            self.initiate_peer_connection(peer_config).await?;
521            return Ok(true);
522        }
523
524        // Keep the current link live and race fresh concrete candidates.
525        // Cross-connection resolution still decides which replacement link
526        // wins if both peers try the same upgrade; the important part here is
527        // that a stale path does not depend on the remote peer receiving our
528        // hint first before either side attempts the better address.
529        self.try_active_peer_alternative_addresses(peer_config, peer_identity)
530            .await
531    }
532
533    async fn initiate_peer_connection_inner(
534        &mut self,
535        peer_config: &crate::config::PeerConfig,
536    ) -> Result<(), NodeError> {
537        // Parse the peer's npub to get their identity
538        let peer_identity =
539            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
540                npub: peer_config.npub.clone(),
541                reason: e.to_string(),
542            })?;
543
544        let peer_node_addr = *peer_identity.node_addr();
545
546        // Check if peer already exists (fully authenticated)
547        if self.peers.contains_key(&peer_node_addr) {
548            debug!(
549                npub = %peer_config.npub,
550                "Peer already exists, skipping"
551            );
552            return Ok(());
553        }
554
555        self.try_peer_addresses(peer_config, peer_identity, true)
556            .await
557    }
558
559    fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
560        self.connections.values().any(|conn| {
561            conn.expected_identity()
562                .map(|id| id.node_addr() == peer_node_addr)
563                .unwrap_or(false)
564        })
565    }
566
567    fn is_connecting_to_peer_on_path(
568        &self,
569        peer_node_addr: &NodeAddr,
570        transport_id: TransportId,
571        remote_addr: &TransportAddr,
572    ) -> bool {
573        self.connections.values().any(|conn| {
574            conn.expected_identity()
575                .map(|id| id.node_addr() == peer_node_addr)
576                .unwrap_or(false)
577                && conn.transport_id() == Some(transport_id)
578                && conn.source_addr() == Some(remote_addr)
579        }) || self.pending_connects.iter().any(|pending| {
580            pending.peer_identity.node_addr() == peer_node_addr
581                && pending.transport_id == transport_id
582                && &pending.remote_addr == remote_addr
583        })
584    }
585
586    pub(in crate::node) fn should_warm_auto_connect_session(
587        &self,
588        peer_node_addr: &NodeAddr,
589    ) -> bool {
590        if self.peers.contains_key(peer_node_addr)
591            || self
592                .sessions
593                .get(peer_node_addr)
594                .is_some_and(|entry| entry.is_established())
595        {
596            return false;
597        }
598
599        self.config.peers().iter().any(|peer| {
600            peer.is_auto_connect()
601                && PeerIdentity::from_npub(&peer.npub)
602                    .map(|identity| identity.node_addr() == peer_node_addr)
603                    .unwrap_or(false)
604        })
605    }
606
607    pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
608        if !self.peers.values().any(|peer| peer.can_send()) {
609            return 0;
610        }
611
612        let mut budget = self.graph_session_warmup_budget();
613        if budget == 0 {
614            return 0;
615        }
616
617        let peer_identities: Vec<_> = self
618            .config
619            .auto_connect_peers()
620            .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
621            .collect();
622
623        let mut warmed = 0;
624        for identity in peer_identities {
625            if budget == 0 {
626                break;
627            }
628
629            let peer_node_addr = *identity.node_addr();
630            if peer_node_addr == *self.identity.node_addr()
631                || !self.should_warm_auto_connect_session(&peer_node_addr)
632                || self
633                    .sessions
634                    .get(&peer_node_addr)
635                    .is_some_and(|entry| entry.is_initiating())
636            {
637                continue;
638            }
639
640            self.register_identity(peer_node_addr, identity.pubkey_full());
641
642            if self.find_next_hop(&peer_node_addr).is_some() {
643                match self
644                    .initiate_session(peer_node_addr, identity.pubkey_full())
645                    .await
646                {
647                    Ok(()) => {
648                        warmed += 1;
649                        budget = budget.saturating_sub(1);
650                        debug!(
651                            peer = %self.peer_display_name(&peer_node_addr),
652                            "Warmed auto-connect peer session over existing FIPS graph"
653                        );
654                    }
655                    Err(NodeError::SendFailed { node_addr, reason })
656                        if node_addr == peer_node_addr && reason == "no route to destination" =>
657                    {
658                        self.maybe_initiate_lookup(&peer_node_addr).await;
659                        warmed += 1;
660                        budget = budget.saturating_sub(1);
661                    }
662                    Err(err) => {
663                        debug!(
664                            peer = %self.peer_display_name(&peer_node_addr),
665                            error = %err,
666                            "Failed to warm auto-connect peer session"
667                        );
668                    }
669                }
670            } else {
671                self.maybe_initiate_lookup(&peer_node_addr).await;
672                warmed += 1;
673                budget = budget.saturating_sub(1);
674            }
675        }
676
677        warmed
678    }
679
680    pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
681        let max_destinations = self.config.node.session.pending_max_destinations;
682        if max_destinations == 0 {
683            return 0;
684        }
685
686        let pending_sessions = self
687            .sessions
688            .values()
689            .filter(|entry| !entry.is_established())
690            .count();
691        let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
692        max_destinations
693            .saturating_sub(pending_total)
694            .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
695    }
696
697    fn outbound_handshake_slots(&self) -> usize {
698        let used = self
699            .connections
700            .len()
701            .saturating_add(self.pending_connects.len());
702        if self.max_connections == 0 {
703            usize::MAX
704        } else {
705            self.max_connections.saturating_sub(used)
706        }
707    }
708
709    fn outbound_link_slots(&self) -> usize {
710        if self.max_links == 0 {
711            usize::MAX
712        } else {
713            self.max_links.saturating_sub(self.links.len())
714        }
715    }
716
717    fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
718        if !self.peers.contains_key(peer_node_addr)
719            && self.max_peers > 0
720            && self.peers.len() >= self.max_peers
721        {
722            return 0;
723        }
724
725        let in_flight_for_peer = self
726            .connections
727            .values()
728            .filter(|conn| {
729                conn.expected_identity()
730                    .map(|id| id.node_addr() == peer_node_addr)
731                    .unwrap_or(false)
732            })
733            .count()
734            .saturating_add(
735                self.pending_connects
736                    .iter()
737                    .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
738                    .count(),
739            );
740
741        self.outbound_handshake_slots()
742            .min(self.outbound_link_slots())
743            .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
744    }
745
746    fn discovery_connect_budget(&self) -> usize {
747        self.outbound_handshake_slots()
748            .min(self.outbound_link_slots())
749            .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
750    }
751
752    /// Find a UDP transport whose bound socket can send to `remote_addr`.
753    ///
754    /// LAN discovery can surface both IPv4 and IPv6 addresses for the same
755    /// service. A wildcard IPv4 socket cannot send to an IPv6 link-local
756    /// target, and vice versa, so callers must choose by socket family rather
757    /// than by transport type alone.
758    fn find_udp_transport_for_remote_addr(
759        &self,
760        remote_addr: SocketAddr,
761    ) -> Option<(TransportId, SocketAddr)> {
762        self.transports
763            .iter()
764            .filter(|(id, handle)| {
765                handle.transport_type().name == "udp"
766                    && handle.is_operational()
767                    && !self.bootstrap_transports.contains(id)
768            })
769            .filter_map(|(id, handle)| {
770                let local_addr = handle.local_addr()?;
771                socket_addr_families_compatible(local_addr, remote_addr)
772                    .then_some((*id, local_addr))
773            })
774            .min_by_key(|(id, _)| id.as_u32())
775    }
776
777    pub(super) fn transport_discovery_candidate(
778        &self,
779        discovered_transport_id: TransportId,
780        discovered_addr: TransportAddr,
781    ) -> Option<(TransportId, TransportAddr, &'static str)> {
782        let transport = self.transports.get(&discovered_transport_id)?;
783        let transport_name = transport.transport_type().name;
784
785        if transport_name != "udp" {
786            return Some((discovered_transport_id, discovered_addr, transport_name));
787        }
788
789        let Some(remote_socket_addr) = discovered_addr
790            .as_str()
791            .and_then(|addr| addr.parse::<SocketAddr>().ok())
792        else {
793            if self.bootstrap_transports.contains(&discovered_transport_id) {
794                debug!(
795                    transport_id = %discovered_transport_id,
796                    remote_addr = %discovered_addr,
797                    "transport discovery: skip non-numeric UDP address from bootstrap transport"
798                );
799                return None;
800            }
801            return Some((discovered_transport_id, discovered_addr, transport_name));
802        };
803
804        let Some((transport_id, local_addr)) =
805            self.find_udp_transport_for_remote_addr(remote_socket_addr)
806        else {
807            debug!(
808                transport_id = %discovered_transport_id,
809                remote_addr = %discovered_addr,
810                "transport discovery: skip UDP peer with no compatible local socket"
811            );
812            return None;
813        };
814
815        if transport_id != discovered_transport_id {
816            debug!(
817                discovered_transport_id = %discovered_transport_id,
818                selected_transport_id = %transport_id,
819                local_addr = %local_addr,
820                remote_addr = %remote_socket_addr,
821                "transport discovery: selected compatible UDP transport"
822            );
823        }
824
825        Some((
826            transport_id,
827            TransportAddr::from_socket_addr(remote_socket_addr),
828            transport_name,
829        ))
830    }
831
832    fn peer_address_string_for_transport_candidate(
833        &self,
834        transport_id: TransportId,
835        transport_name: &str,
836        remote_addr: &TransportAddr,
837    ) -> String {
838        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
839        let _ = (transport_id, transport_name);
840
841        #[cfg(any(target_os = "linux", target_os = "macos"))]
842        if transport_name == "ethernet"
843            && remote_addr.as_bytes().len() == 6
844            && let Some(interface) = self
845                .transports
846                .get(&transport_id)
847                .and_then(|transport| transport.interface_name())
848        {
849            let mut mac = [0u8; 6];
850            mac.copy_from_slice(remote_addr.as_bytes());
851            return format!(
852                "{interface}/{}",
853                crate::transport::ethernet::format_mac(&mac)
854            );
855        }
856
857        remote_addr.to_string()
858    }
859
860    fn resolve_peer_address_for_match(
861        &self,
862        candidate: &PeerAddress,
863    ) -> Option<(TransportId, TransportAddr)> {
864        if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
865            return None;
866        }
867
868        if candidate.transport == "ethernet" {
869            return self.resolve_ethernet_addr(&candidate.addr).ok();
870        }
871
872        if candidate.transport == "ble" {
873            #[cfg(bluer_available)]
874            {
875                return self.resolve_ble_addr(&candidate.addr).ok();
876            }
877            #[cfg(not(bluer_available))]
878            {
879                return None;
880            }
881        }
882
883        let transport_id = if candidate.transport == "udp"
884            && let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
885        {
886            self.find_udp_transport_for_remote_addr(remote_socket_addr)
887                .map(|(id, _)| id)?
888        } else {
889            self.find_transport_for_type(&candidate.transport)?
890        };
891
892        Some((transport_id, TransportAddr::from_string(&candidate.addr)))
893    }
894
895    /// Initiate a connection to a peer on a specific transport and address.
896    ///
897    /// For connectionless transports (UDP, Ethernet): allocates a link, starts
898    /// the Noise IK handshake, sends msg1, and registers the connection for
899    /// msg2 dispatch.
900    ///
901    /// For connection-oriented transports (TCP, Tor): allocates a link and
902    /// starts a non-blocking transport connect. The handshake is deferred
903    /// until the transport connection is established — the tick handler
904    /// polls `connection_state()` and initiates the handshake when ready.
905    pub(super) async fn initiate_connection(
906        &mut self,
907        transport_id: TransportId,
908        remote_addr: TransportAddr,
909        peer_identity: PeerIdentity,
910    ) -> Result<(), NodeError> {
911        let peer_node_addr = *peer_identity.node_addr();
912
913        if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
914            debug!(
915                peer = %self.peer_display_name(&peer_node_addr),
916                transport_id = %transport_id,
917                remote_addr = %remote_addr,
918                "Connection already in progress for candidate path"
919            );
920            return Ok(());
921        }
922
923        if self.outbound_handshake_slots() == 0 {
924            return Err(NodeError::MaxConnectionsExceeded {
925                max: self.max_connections,
926            });
927        }
928
929        if self.outbound_link_slots() == 0 {
930            return Err(NodeError::MaxLinksExceeded {
931                max: self.max_links,
932            });
933        }
934
935        if !self.peers.contains_key(&peer_node_addr)
936            && self.max_peers > 0
937            && self.peers.len() >= self.max_peers
938        {
939            return Err(NodeError::MaxPeersExceeded {
940                max: self.max_peers,
941            });
942        }
943
944        self.authorize_peer(
945            &peer_identity,
946            PeerAclContext::OutboundConnect,
947            transport_id,
948            &remote_addr,
949        )?;
950
951        let is_connection_oriented = self
952            .transports
953            .get(&transport_id)
954            .map(|t| t.transport_type().connection_oriented)
955            .unwrap_or(false);
956
957        // Allocate link ID and create link
958        let link_id = self.allocate_link_id();
959
960        let link = if is_connection_oriented {
961            Link::new(
962                link_id,
963                transport_id,
964                remote_addr.clone(),
965                LinkDirection::Outbound,
966                Duration::from_millis(self.config.node.base_rtt_ms),
967            )
968        } else {
969            Link::connectionless(
970                link_id,
971                transport_id,
972                remote_addr.clone(),
973                LinkDirection::Outbound,
974                Duration::from_millis(self.config.node.base_rtt_ms),
975            )
976        };
977
978        self.links.insert(link_id, link);
979
980        // Add reverse lookup for packet dispatch
981        self.addr_to_link
982            .insert((transport_id, remote_addr.clone()), link_id);
983
984        if is_connection_oriented {
985            // Connection-oriented: start non-blocking connect, defer handshake
986            if let Some(transport) = self.transports.get(&transport_id) {
987                match transport.connect(&remote_addr).await {
988                    Ok(()) => {
989                        debug!(
990                            peer = %self.peer_display_name(&peer_node_addr),
991                            transport_id = %transport_id,
992                            remote_addr = %remote_addr,
993                            link_id = %link_id,
994                            "Transport connect initiated (non-blocking)"
995                        );
996                        self.pending_connects.push(super::PendingConnect {
997                            link_id,
998                            transport_id,
999                            remote_addr,
1000                            peer_identity,
1001                        });
1002                    }
1003                    Err(e) => {
1004                        // Clean up link
1005                        self.links.remove(&link_id);
1006                        self.addr_to_link.remove(&(transport_id, remote_addr));
1007                        return Err(NodeError::from_transport_error(e));
1008                    }
1009                }
1010            }
1011            Ok(())
1012        } else {
1013            // Connectionless: proceed with immediate handshake
1014            self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
1015                .await
1016        }
1017    }
1018
1019    /// Start the Noise handshake on a link and send msg1.
1020    ///
1021    /// Called immediately for connectionless transports, or after the
1022    /// transport connection is established for connection-oriented transports.
1023    pub(super) async fn start_handshake(
1024        &mut self,
1025        link_id: LinkId,
1026        transport_id: TransportId,
1027        remote_addr: TransportAddr,
1028        peer_identity: PeerIdentity,
1029    ) -> Result<(), NodeError> {
1030        let peer_node_addr = *peer_identity.node_addr();
1031
1032        // Create connection in handshake phase (outbound knows expected identity)
1033        let current_time_ms = Self::now_ms();
1034        let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
1035
1036        // Allocate a session index for this handshake
1037        let our_index = match self.index_allocator.allocate() {
1038            Ok(idx) => idx,
1039            Err(e) => {
1040                // Clean up the link we just created
1041                self.links.remove(&link_id);
1042                self.addr_to_link.remove(&(transport_id, remote_addr));
1043                return Err(NodeError::IndexAllocationFailed(e.to_string()));
1044            }
1045        };
1046
1047        // Start the Noise handshake and get message 1
1048        let our_keypair = self.identity.keypair();
1049        let noise_msg1 =
1050            match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
1051                Ok(msg) => msg,
1052                Err(e) => {
1053                    // Clean up the index and link
1054                    let _ = self.index_allocator.free(our_index);
1055                    self.links.remove(&link_id);
1056                    self.addr_to_link.remove(&(transport_id, remote_addr));
1057                    return Err(NodeError::HandshakeFailed(e.to_string()));
1058                }
1059            };
1060
1061        // Set index and transport info on the connection
1062        connection.set_our_index(our_index);
1063        connection.set_transport_id(transport_id);
1064        connection.set_source_addr(remote_addr.clone());
1065
1066        // Build wire format msg1: [0x01][sender_idx:4 LE][noise_msg1:82]
1067        let wire_msg1 = build_msg1(our_index, &noise_msg1);
1068
1069        debug!(
1070            peer = %self.peer_display_name(&peer_node_addr),
1071            transport_id = %transport_id,
1072            remote_addr = %remote_addr,
1073            link_id = %link_id,
1074            our_index = %our_index,
1075            "Connection initiated"
1076        );
1077
1078        // Store msg1 for resend and schedule first resend
1079        let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
1080        connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
1081
1082        // Track in pending_outbound for msg2 dispatch
1083        self.pending_outbound
1084            .insert((transport_id, our_index.as_u32()), link_id);
1085        self.connections.insert(link_id, connection);
1086
1087        // Send the wire format handshake message. If the very first send fails
1088        // synchronously (for example an IPv6 candidate on an IPv4-only UDP
1089        // socket), undo this candidate so the caller can try the next address
1090        // in the same dial pass.
1091        let send_result = match self.transports.get(&transport_id) {
1092            Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
1093            None => None,
1094        };
1095        match send_result {
1096            Some(send_result) => {
1097                self.note_local_send_outcome(&send_result);
1098                match send_result {
1099                    Ok(bytes) => {
1100                        debug!(
1101                            link_id = %link_id,
1102                            our_index = %our_index,
1103                            bytes,
1104                            "Sent Noise handshake message 1 (wire format)"
1105                        );
1106                    }
1107                    Err(e) => {
1108                        warn!(
1109                            link_id = %link_id,
1110                            transport_id = %transport_id,
1111                            remote_addr = %remote_addr,
1112                            our_index = %our_index,
1113                            error = %e,
1114                            "Failed to send handshake message"
1115                        );
1116                        self.pending_outbound
1117                            .remove(&(transport_id, our_index.as_u32()));
1118                        self.connections.remove(&link_id);
1119                        self.links.remove(&link_id);
1120                        self.addr_to_link
1121                            .remove(&(transport_id, remote_addr.clone()));
1122                        let _ = self.index_allocator.free(our_index);
1123                        return Err(NodeError::from_transport_error(e));
1124                    }
1125                }
1126            }
1127            None => {
1128                self.pending_outbound
1129                    .remove(&(transport_id, our_index.as_u32()));
1130                self.connections.remove(&link_id);
1131                self.links.remove(&link_id);
1132                self.addr_to_link
1133                    .remove(&(transport_id, remote_addr.clone()));
1134                let _ = self.index_allocator.free(our_index);
1135                return Err(NodeError::TransportError(format!(
1136                    "transport {transport_id} disappeared before first handshake send"
1137                )));
1138            }
1139        }
1140
1141        Ok(())
1142    }
1143
1144    /// Poll all transports for discovered peers and auto-connect.
1145    ///
1146    /// Called from the tick handler. Iterates operational transports,
1147    /// drains their discovery buffers, and initiates connections to
1148    /// newly discovered peers (if auto_connect is enabled).
1149    pub(super) async fn poll_transport_discovery(&mut self) {
1150        // Collect discoveries first to avoid borrow conflict with self
1151        let mut to_connect = Vec::new();
1152        let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
1153        let mut connect_budget = self.discovery_connect_budget();
1154        let mut skipped_budget = 0usize;
1155
1156        for transport in self.transports.values() {
1157            if !transport.is_operational() {
1158                continue;
1159            }
1160            if !transport.auto_connect() {
1161                // Still drain the buffer so it doesn't grow unbounded
1162                let _ = transport.discover();
1163                continue;
1164            }
1165            let discovered = match transport.discover() {
1166                Ok(peers) => peers,
1167                Err(_) => continue,
1168            };
1169            for peer in discovered {
1170                let discovered_transport_id = peer.transport_id;
1171                let pubkey = match peer.pubkey_hint {
1172                    Some(pk) => pk,
1173                    None => continue,
1174                };
1175                let identity = PeerIdentity::from_pubkey(pubkey);
1176                let node_addr = *identity.node_addr();
1177
1178                // Skip self
1179                if node_addr == *self.identity.node_addr() {
1180                    continue;
1181                }
1182
1183                let Some((candidate_transport_id, remote_addr, transport_name)) =
1184                    self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1185                else {
1186                    continue;
1187                };
1188
1189                if self.peers.contains_key(&node_addr) {
1190                    let candidate = PeerAddress::new(
1191                        transport_name,
1192                        self.peer_address_string_for_transport_candidate(
1193                            candidate_transport_id,
1194                            transport_name,
1195                            &remote_addr,
1196                        ),
1197                    );
1198                    if self.active_peer_candidate_is_fresh_enough_to_skip(
1199                        &node_addr,
1200                        std::slice::from_ref(&candidate),
1201                    ) {
1202                        continue;
1203                    }
1204                    if self.is_connecting_to_peer_on_path(
1205                        &node_addr,
1206                        candidate_transport_id,
1207                        &remote_addr,
1208                    ) {
1209                        continue;
1210                    }
1211                    let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1212                    if connect_budget == 0
1213                        || self
1214                            .path_candidate_attempt_budget(&node_addr)
1215                            .saturating_sub(queued_for_peer)
1216                            == 0
1217                    {
1218                        skipped_budget = skipped_budget.saturating_add(1);
1219                        continue;
1220                    }
1221                    to_connect.push((candidate_transport_id, remote_addr, identity, true));
1222                    *queued_per_peer.entry(node_addr).or_default() += 1;
1223                    connect_budget = connect_budget.saturating_sub(1);
1224                    continue;
1225                }
1226
1227                if self.is_connecting_to_peer_on_path(
1228                    &node_addr,
1229                    candidate_transport_id,
1230                    &remote_addr,
1231                ) {
1232                    continue;
1233                }
1234
1235                let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1236                if connect_budget == 0
1237                    || self
1238                        .path_candidate_attempt_budget(&node_addr)
1239                        .saturating_sub(queued_for_peer)
1240                        == 0
1241                {
1242                    skipped_budget = skipped_budget.saturating_add(1);
1243                    continue;
1244                }
1245                to_connect.push((candidate_transport_id, remote_addr, identity, false));
1246                *queued_per_peer.entry(node_addr).or_default() += 1;
1247                connect_budget = connect_budget.saturating_sub(1);
1248            }
1249        }
1250
1251        if skipped_budget > 0 {
1252            debug!(
1253                skipped = skipped_budget,
1254                queued = to_connect.len(),
1255                "Transport discovery connect budget exhausted"
1256            );
1257        }
1258
1259        for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1260            info!(
1261                peer = %self.peer_display_name(identity.node_addr()),
1262                transport_id = %transport_id,
1263                remote_addr = %remote_addr,
1264                active_refresh,
1265                "Auto-connecting to discovered peer"
1266            );
1267            if let Err(e) = self
1268                .initiate_connection(transport_id, remote_addr, identity)
1269                .await
1270            {
1271                warn!(error = %e, "Failed to auto-connect to discovered peer");
1272            }
1273        }
1274    }
1275
1276    pub(super) async fn poll_nostr_discovery(&mut self) {
1277        let Some(bootstrap) = self.nostr_discovery.clone() else {
1278            return;
1279        };
1280
1281        bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
1282        bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
1283
1284        if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1285            debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1286        }
1287
1288        self.drain_nostr_mesh_signals(&bootstrap).await;
1289
1290        for event in bootstrap.drain_events().await {
1291            match event {
1292                BootstrapEvent::Established { traversal } => {
1293                    let active_refresh = PeerIdentity::from_npub(&traversal.peer_npub)
1294                        .ok()
1295                        .is_some_and(|identity| self.peers.contains_key(identity.node_addr()));
1296                    let admission_allowed = if active_refresh {
1297                        self.outbound_direct_refresh_admission_check()
1298                    } else {
1299                        self.outbound_admission_check()
1300                    };
1301                    if !admission_allowed {
1302                        debug!(
1303                            peer_npub = %traversal.peer_npub,
1304                            peers = self.peers.len(),
1305                            max_peers = self.max_peers,
1306                            active_refresh,
1307                            "Dropping established NAT traversal: at capacity"
1308                        );
1309                        continue;
1310                    }
1311                    let peer_npub = traversal.peer_npub.clone();
1312                    match self.adopt_established_traversal(traversal).await {
1313                        Ok(_) => {
1314                            info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1315                        }
1316                        Err(err) => {
1317                            warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1318                            if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1319                                self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1320                            }
1321                        }
1322                    }
1323                }
1324                BootstrapEvent::Failed {
1325                    peer_config,
1326                    reason,
1327                } => {
1328                    let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1329                        Ok(identity) => identity,
1330                        Err(_) => continue,
1331                    };
1332                    let node_addr = *peer_identity.node_addr();
1333                    let now_ms = Self::now_ms();
1334                    if self.peers.contains_key(&node_addr) {
1335                        if self.active_peer_should_keep_direct_retry(&node_addr, &peer_config) {
1336                            let decision =
1337                                bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1338                            if decision.should_warn {
1339                                warn!(
1340                                    npub = %peer_config.npub,
1341                                    error = %reason,
1342                                    consecutive_failures = decision.consecutive_failures,
1343                                    cooldown_secs = decision
1344                                        .cooldown_until_ms
1345                                        .map(|t| t.saturating_sub(now_ms) / 1000),
1346                                    "Direct-path NAT traversal upgrade failed"
1347                                );
1348                            } else {
1349                                debug!(
1350                                    npub = %peer_config.npub,
1351                                    error = %reason,
1352                                    consecutive_failures = decision.consecutive_failures,
1353                                    "Direct-path NAT traversal upgrade failed (suppressed by warn-rate-limit)"
1354                                );
1355                            }
1356                            if decision.crossed_threshold {
1357                                bootstrap
1358                                    .request_advert_stale_check(peer_config.npub.clone())
1359                                    .await;
1360                            }
1361                            self.schedule_retry(node_addr, now_ms);
1362                            if self.nostr_cooldown_applies_to_peer_config(&peer_config)
1363                                && let Some(cooldown_until_ms) = decision.cooldown_until_ms
1364                                && let Some(state) = self.retry_pending.get_mut(&node_addr)
1365                            {
1366                                state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1367                            }
1368                        } else {
1369                            debug!(
1370                                npub = %peer_config.npub,
1371                                error = %reason,
1372                                "Ignoring failed NAT traversal for already-connected peer on fresh direct path"
1373                            );
1374                        }
1375                        continue;
1376                    }
1377                    if self.is_connecting_to_peer(&node_addr) {
1378                        debug!(
1379                            npub = %peer_config.npub,
1380                            error = %reason,
1381                            "Ignoring failed NAT traversal while peer handshake is already in progress"
1382                        );
1383                        continue;
1384                    }
1385
1386                    let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1387                    if decision.should_warn {
1388                        warn!(
1389                            npub = %peer_config.npub,
1390                            error = %reason,
1391                            consecutive_failures = decision.consecutive_failures,
1392                            cooldown_secs = decision
1393                                .cooldown_until_ms
1394                                .map(|t| t.saturating_sub(now_ms) / 1000),
1395                            "NAT traversal failed"
1396                        );
1397                    } else {
1398                        debug!(
1399                            npub = %peer_config.npub,
1400                            error = %reason,
1401                            consecutive_failures = decision.consecutive_failures,
1402                            "NAT traversal failed (suppressed by warn-rate-limit)"
1403                        );
1404                    }
1405
1406                    // B6: stale-advert eviction on the streak-threshold
1407                    // crossing. Fire-and-forget; the outcome is logged so
1408                    // operators can see when peers get cleaned up.
1409                    if decision.crossed_threshold {
1410                        bootstrap
1411                            .request_advert_stale_check(peer_config.npub.clone())
1412                            .await;
1413                    }
1414
1415                    if self
1416                        .try_peer_addresses(&peer_config, peer_identity, false)
1417                        .await
1418                        .is_ok()
1419                    {
1420                        continue;
1421                    }
1422
1423                    self.schedule_retry(node_addr, now_ms);
1424                    if self.nostr_cooldown_applies_to_peer_config(&peer_config)
1425                        && let Some(cooldown_until_ms) = decision.cooldown_until_ms
1426                        && let Some(state) = self.retry_pending.get_mut(&node_addr)
1427                    {
1428                        // Push the next retry past the cooldown so the
1429                        // open-discovery sweep doesn't re-enqueue and the
1430                        // per-attempt backoff doesn't fire sooner.
1431                        state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1432                    }
1433                }
1434            }
1435        }
1436
1437        self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1438            .await;
1439        self.queue_open_discovery_retries(&bootstrap).await;
1440        self.queue_active_fallback_direct_retries(&bootstrap);
1441    }
1442
1443    async fn drain_nostr_mesh_signals(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
1444        let mut deferred = Vec::new();
1445
1446        for signal in bootstrap.drain_mesh_signals().await {
1447            let (peer_npub, msg_type, payload) = match &signal {
1448                MeshTraversalSignal::Offer { peer_npub, offer } => {
1449                    let payload = match serde_json::to_vec(&offer) {
1450                        Ok(payload) => payload,
1451                        Err(error) => {
1452                            debug!(
1453                                peer = %peer_npub,
1454                                error = %error,
1455                                "Failed to encode mesh traversal offer"
1456                            );
1457                            continue;
1458                        }
1459                    };
1460                    (
1461                        peer_npub.clone(),
1462                        SessionMessageType::TraversalOffer.to_byte(),
1463                        payload,
1464                    )
1465                }
1466                MeshTraversalSignal::Answer { peer_npub, answer } => {
1467                    let payload = match serde_json::to_vec(&answer) {
1468                        Ok(payload) => payload,
1469                        Err(error) => {
1470                            debug!(
1471                                peer = %peer_npub,
1472                                error = %error,
1473                                "Failed to encode mesh traversal answer"
1474                            );
1475                            continue;
1476                        }
1477                    };
1478                    (
1479                        peer_npub.clone(),
1480                        SessionMessageType::TraversalAnswer.to_byte(),
1481                        payload,
1482                    )
1483                }
1484            };
1485
1486            let peer_identity = match PeerIdentity::from_npub(&peer_npub) {
1487                Ok(identity) => identity,
1488                Err(error) => {
1489                    debug!(
1490                        peer = %peer_npub,
1491                        error = %error,
1492                        "Cannot send mesh traversal signal to invalid peer npub"
1493                    );
1494                    continue;
1495                }
1496            };
1497            let peer_addr = *peer_identity.node_addr();
1498            match self
1499                .mesh_signal_session_action(peer_addr, peer_identity.pubkey_full())
1500                .await
1501            {
1502                MeshSignalSessionAction::Send => {}
1503                MeshSignalSessionAction::Defer => {
1504                    deferred.push(signal);
1505                    continue;
1506                }
1507                MeshSignalSessionAction::Drop => continue,
1508            }
1509
1510            if let Err(error) = self.send_session_msg(&peer_addr, msg_type, &payload).await {
1511                debug!(
1512                    peer = %self.peer_display_name(&peer_addr),
1513                    error = %error,
1514                    "Failed to send mesh traversal signal"
1515                );
1516            }
1517        }
1518
1519        for signal in deferred {
1520            bootstrap.requeue_mesh_signal(signal);
1521        }
1522    }
1523
1524    async fn mesh_signal_session_action(
1525        &mut self,
1526        peer_addr: NodeAddr,
1527        peer_pubkey: PublicKey,
1528    ) -> MeshSignalSessionAction {
1529        if let Some(entry) = self.sessions.get(&peer_addr) {
1530            if entry.is_established() {
1531                return MeshSignalSessionAction::Send;
1532            }
1533            if entry.is_initiating() || entry.is_awaiting_msg3() {
1534                debug!(
1535                    peer = %self.peer_display_name(&peer_addr),
1536                    "Deferring mesh traversal signal until end-to-end session is established"
1537                );
1538                return MeshSignalSessionAction::Defer;
1539            }
1540        }
1541
1542        if self.find_next_hop(&peer_addr).is_none() {
1543            debug!(
1544                peer = %self.peer_display_name(&peer_addr),
1545                "Cannot warm mesh traversal signal session without a FIPS route"
1546            );
1547            self.maybe_initiate_lookup(&peer_addr).await;
1548            return MeshSignalSessionAction::Drop;
1549        }
1550
1551        self.register_identity(peer_addr, peer_pubkey);
1552        match self.initiate_session(peer_addr, peer_pubkey).await {
1553            Ok(()) => {
1554                debug!(
1555                    peer = %self.peer_display_name(&peer_addr),
1556                    "Warming end-to-end session for mesh traversal signal"
1557                );
1558                MeshSignalSessionAction::Defer
1559            }
1560            Err(NodeError::SendFailed { node_addr, reason })
1561                if node_addr == peer_addr && reason == "no route to destination" =>
1562            {
1563                debug!(
1564                    peer = %self.peer_display_name(&peer_addr),
1565                    "Cannot warm mesh traversal signal session without a FIPS route"
1566                );
1567                self.maybe_initiate_lookup(&peer_addr).await;
1568                MeshSignalSessionAction::Drop
1569            }
1570            Err(error) => {
1571                debug!(
1572                    peer = %self.peer_display_name(&peer_addr),
1573                    error = %error,
1574                    "Failed to warm end-to-end session for mesh traversal signal"
1575                );
1576                MeshSignalSessionAction::Drop
1577            }
1578        }
1579    }
1580
1581    /// Resolve the LAN-only discovery scope. Applications with explicit
1582    /// connectivity config can set `node.discovery.lan.scope` without
1583    /// changing the public Nostr discovery `app` tag. The older fallback
1584    /// extracts a scope from the Nostr app tag used by default scoped
1585    /// discovery.
1586    pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1587        if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1588            let scope = scope.trim();
1589            if !scope.is_empty() {
1590                return Some(scope.to_string());
1591            }
1592        }
1593
1594        let app = self.config.node.discovery.nostr.app.trim();
1595        if app.is_empty() {
1596            return None;
1597        }
1598        if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1599            let scope = rest.trim();
1600            if scope.is_empty() {
1601                None
1602            } else {
1603                Some(scope.to_string())
1604            }
1605        } else {
1606            Some(app.to_string())
1607        }
1608    }
1609
1610    pub(super) fn start_local_instance_discovery(&mut self) {
1611        if !self.config.node.discovery.local.enabled {
1612            return;
1613        }
1614        let Some(scope) = self.lan_discovery_scope() else {
1615            debug!("local instance discovery not started: no discovery scope");
1616            return;
1617        };
1618        let now_ms = Self::now_ms();
1619        match crate::discovery::local::LocalInstanceRegistry::new(
1620            self.identity.npub(),
1621            scope,
1622            &self.config.node.discovery.local,
1623            now_ms,
1624        ) {
1625            Ok(registry) => {
1626                self.local_instance_registry = Some(registry);
1627                self.local_instance_started_at_ms = Some(now_ms);
1628                self.last_local_instance_publish_ms = None;
1629                self.last_local_instance_scan_ms = None;
1630                self.publish_local_instance_record(now_ms);
1631                info!("Same-host FIPS instance discovery enabled");
1632            }
1633            Err(crate::discovery::local::LocalInstanceRegistryError::Disabled) => {
1634                debug!("same-host FIPS instance discovery disabled");
1635            }
1636            Err(err) => {
1637                debug!(error = %err, "same-host FIPS instance discovery not started");
1638            }
1639        }
1640    }
1641
1642    fn local_instance_contacts(&self) -> Vec<crate::discovery::local::LocalInstanceContact> {
1643        let mut contacts = Vec::new();
1644        for handle in self.transports.values() {
1645            if !handle.is_operational() || !handle.accept_connections() {
1646                continue;
1647            }
1648            let transport = handle.transport_type().name;
1649            if transport != "udp" && transport != "tcp" {
1650                continue;
1651            }
1652            let Some(local_addr) = handle.local_addr() else {
1653                continue;
1654            };
1655            let Some(contact) =
1656                crate::discovery::local::contact_for_transport_addr(transport, local_addr)
1657            else {
1658                continue;
1659            };
1660            if contacts
1661                .iter()
1662                .any(|existing: &crate::discovery::local::LocalInstanceContact| {
1663                    existing.transport == contact.transport && existing.addr == contact.addr
1664                })
1665            {
1666                continue;
1667            }
1668            contacts.push(contact);
1669        }
1670        contacts
1671    }
1672
1673    fn publish_local_instance_record(&mut self, now_ms: u64) {
1674        let Some(registry) = self.local_instance_registry.clone() else {
1675            return;
1676        };
1677        let contacts = self.local_instance_contacts();
1678        match registry.publish(contacts, now_ms) {
1679            Ok(()) => {
1680                self.last_local_instance_publish_ms = Some(now_ms);
1681            }
1682            Err(err) => {
1683                debug!(error = %err, "failed to publish same-host FIPS instance record");
1684            }
1685        }
1686    }
1687
1688    fn maybe_publish_local_instance_record(&mut self, now_ms: u64) {
1689        if self.local_instance_registry.is_none() {
1690            return;
1691        }
1692        let interval_ms = self.config.node.discovery.local.publish_interval_ms();
1693        let due = self
1694            .last_local_instance_publish_ms
1695            .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1696            .unwrap_or(true);
1697        if due {
1698            self.publish_local_instance_record(now_ms);
1699        }
1700    }
1701
1702    fn local_instance_scan_due(&self, now_ms: u64) -> bool {
1703        if self.local_instance_registry.is_none() {
1704            return false;
1705        }
1706        let cfg = &self.config.node.discovery.local;
1707        let interval_ms = if self
1708            .local_instance_started_at_ms
1709            .map(|started| now_ms.saturating_sub(started) <= cfg.startup_scan_duration_ms())
1710            .unwrap_or(false)
1711        {
1712            cfg.startup_scan_interval_ms()
1713        } else {
1714            cfg.scan_interval_ms()
1715        };
1716        self.last_local_instance_scan_ms
1717            .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1718            .unwrap_or(true)
1719    }
1720
1721    fn local_instance_peer_allowed(&self, identity: &PeerIdentity) -> bool {
1722        if self.config.peers().iter().any(|peer| {
1723            PeerIdentity::from_npub(&peer.npub)
1724                .map(|configured| configured.node_addr() == identity.node_addr())
1725                .unwrap_or(false)
1726        }) {
1727            return true;
1728        }
1729        self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Open
1730    }
1731
1732    fn local_instance_peer_addresses(
1733        &self,
1734        record: &crate::discovery::local::LocalInstanceRecord,
1735    ) -> Vec<PeerAddress> {
1736        let mut addresses = Vec::new();
1737        for contact in &record.contacts {
1738            if contact.transport != "udp" && contact.transport != "tcp" {
1739                continue;
1740            }
1741            let Ok(socket_addr) = contact.addr.parse::<SocketAddr>() else {
1742                debug!(
1743                    npub = %record.npub,
1744                    transport = %contact.transport,
1745                    addr = %contact.addr,
1746                    "local instance discovery: skip non-socket contact"
1747                );
1748                continue;
1749            };
1750            if !socket_addr.ip().is_loopback() {
1751                debug!(
1752                    npub = %record.npub,
1753                    addr = %contact.addr,
1754                    "local instance discovery: skip non-loopback contact"
1755                );
1756                continue;
1757            }
1758            let address =
1759                PeerAddress::with_priority(contact.transport.clone(), contact.addr.clone(), 10)
1760                    .with_seen_at_ms(record.updated_at_ms);
1761            if addresses.iter().any(|existing: &PeerAddress| {
1762                existing.transport == address.transport && existing.addr == address.addr
1763            }) {
1764                continue;
1765            }
1766            addresses.push(address);
1767        }
1768        addresses
1769    }
1770
1771    /// Scan the same-host registry only on startup and then at a low cadence.
1772    /// This avoids a per-second filesystem poll while preserving the fast path
1773    /// for processes launched around the same time.
1774    pub(super) async fn poll_local_instance_discovery(&mut self) {
1775        let Some(registry) = self.local_instance_registry.clone() else {
1776            return;
1777        };
1778        let now_ms = Self::now_ms();
1779        self.maybe_publish_local_instance_record(now_ms);
1780        if !self.local_instance_scan_due(now_ms) {
1781            return;
1782        }
1783        self.last_local_instance_scan_ms = Some(now_ms);
1784
1785        let records = match registry.scan(now_ms, self.config.node.discovery.local.stale_after_ms())
1786        {
1787            Ok(records) => records,
1788            Err(err) => {
1789                debug!(error = %err, "same-host FIPS instance scan failed");
1790                return;
1791            }
1792        };
1793        if records.is_empty() {
1794            return;
1795        }
1796
1797        let mut connect_budget = self.discovery_connect_budget();
1798        let mut skipped_budget = 0usize;
1799        for record in records {
1800            let identity = match PeerIdentity::from_npub(&record.npub) {
1801                Ok(identity) => identity,
1802                Err(err) => {
1803                    debug!(npub = %record.npub, error = %err, "local instance discovery: skip bad npub");
1804                    continue;
1805                }
1806            };
1807            let peer_node_addr = *identity.node_addr();
1808            if peer_node_addr == *self.identity.node_addr() {
1809                continue;
1810            }
1811            if !self.local_instance_peer_allowed(&identity) {
1812                debug!(
1813                    npub = %identity.short_npub(),
1814                    "local instance discovery: skip unconfigured peer"
1815                );
1816                continue;
1817            }
1818
1819            let addresses = self.local_instance_peer_addresses(&record);
1820            if addresses.is_empty() {
1821                continue;
1822            }
1823
1824            if self.peers.contains_key(&peer_node_addr)
1825                && self.active_peer_candidate_is_fresh_enough_to_skip(&peer_node_addr, &addresses)
1826            {
1827                continue;
1828            }
1829
1830            for address in addresses {
1831                let Some((transport_id, remote_addr)) =
1832                    self.resolve_peer_address_for_match(&address)
1833                else {
1834                    continue;
1835                };
1836                if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1837                    continue;
1838                }
1839                if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1840                    skipped_budget = skipped_budget.saturating_add(1);
1841                    continue;
1842                }
1843                info!(
1844                    npub = %identity.short_npub(),
1845                    transport = %address.transport,
1846                    addr = %address.addr,
1847                    "same-host FIPS instance discovery: initiating handshake"
1848                );
1849                if let Err(err) = self
1850                    .initiate_connection(transport_id, remote_addr, identity)
1851                    .await
1852                {
1853                    debug!(
1854                        npub = %record.npub,
1855                        error = %err,
1856                        "same-host FIPS instance discovery: failed to initiate connection"
1857                    );
1858                }
1859                connect_budget = connect_budget.saturating_sub(1);
1860            }
1861        }
1862        if skipped_budget > 0 {
1863            debug!(
1864                skipped = skipped_budget,
1865                "same-host FIPS instance discovery connect budget exhausted"
1866            );
1867        }
1868    }
1869
1870    /// Drain mDNS-discovered peers and initiate Noise XX handshakes. For
1871    /// active peers this is a non-disruptive alternate-path refresh: the
1872    /// current link stays live until a new handshake authenticates and
1873    /// promotes. The handshake itself is the authentication — a spoofed
1874    /// mDNS advert with someone else's npub fails the XX exchange and
1875    /// is dropped.
1876    pub(super) async fn poll_lan_discovery(&mut self) {
1877        let Some(runtime) = self.lan_discovery.clone() else {
1878            return;
1879        };
1880        let events = runtime.drain_events().await;
1881        if events.is_empty() {
1882            return;
1883        }
1884        let mut connect_budget = self.discovery_connect_budget();
1885        let mut skipped_budget = 0usize;
1886        for event in events {
1887            let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1888            let Some((transport_id, local_addr)) =
1889                self.find_udp_transport_for_remote_addr(peer.addr)
1890            else {
1891                debug!(
1892                    addr = %peer.addr,
1893                    "lan: skip discovered peer with no compatible UDP transport"
1894                );
1895                continue;
1896            };
1897            let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1898                Ok(id) => id,
1899                Err(err) => {
1900                    debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1901                    continue;
1902                }
1903            };
1904            let peer_node_addr = *identity.node_addr();
1905            let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1906            if self.peers.contains_key(&peer_node_addr) {
1907                let candidate = PeerAddress::new("udp", peer.addr.to_string());
1908                if self.active_peer_candidate_is_fresh_enough_to_skip(
1909                    &peer_node_addr,
1910                    std::slice::from_ref(&candidate),
1911                ) {
1912                    continue;
1913                }
1914                if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1915                    continue;
1916                }
1917                if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1918                    skipped_budget = skipped_budget.saturating_add(1);
1919                    continue;
1920                }
1921                info!(
1922                    npub = %identity.short_npub(),
1923                    addr = %peer.addr,
1924                    local_addr = %local_addr,
1925                    "lan: initiating alternate-path handshake to active peer"
1926                );
1927                if let Err(err) = self
1928                    .initiate_connection(transport_id, remote_addr, identity)
1929                    .await
1930                {
1931                    debug!(
1932                        npub = %peer.npub,
1933                        error = %err,
1934                        "lan: failed to initiate active peer alternate-path handshake"
1935                    );
1936                }
1937                connect_budget = connect_budget.saturating_sub(1);
1938                continue;
1939            }
1940            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1941                continue;
1942            }
1943            if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1944                skipped_budget = skipped_budget.saturating_add(1);
1945                continue;
1946            }
1947            info!(
1948                npub = %identity.short_npub(),
1949                addr = %peer.addr,
1950                local_addr = %local_addr,
1951                "lan: initiating handshake to discovered peer"
1952            );
1953            if let Err(err) = self
1954                .initiate_connection(transport_id, remote_addr, identity)
1955                .await
1956            {
1957                debug!(
1958                    npub = %peer.npub,
1959                    error = %err,
1960                    "lan: failed to initiate connection to discovered peer"
1961                );
1962            }
1963            connect_budget = connect_budget.saturating_sub(1);
1964        }
1965        if skipped_budget > 0 {
1966            debug!(
1967                skipped = skipped_budget,
1968                "lan: discovery connect budget exhausted"
1969            );
1970        }
1971    }
1972
1973    /// Poll pending transport connects and initiate handshakes for ready ones.
1974    ///
1975    /// Called from the tick handler. For each pending connect, queries the
1976    /// transport's connection state. When a connection is established,
1977    /// marks the link as Connected and starts the Noise handshake.
1978    /// Failed connections are cleaned up and scheduled for retry.
1979    pub(super) async fn poll_pending_connects(&mut self) {
1980        if self.pending_connects.is_empty() {
1981            return;
1982        }
1983
1984        let mut completed = Vec::new();
1985
1986        for (i, pending) in self.pending_connects.iter().enumerate() {
1987            let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
1988                transport.connection_state(&pending.remote_addr)
1989            } else {
1990                crate::transport::ConnectionState::Failed("transport removed".into())
1991            };
1992
1993            match state {
1994                crate::transport::ConnectionState::Connected => {
1995                    completed.push((i, true, None));
1996                }
1997                crate::transport::ConnectionState::Failed(reason) => {
1998                    completed.push((i, false, Some(reason)));
1999                }
2000                crate::transport::ConnectionState::Connecting => {
2001                    // Still in progress, check on next tick
2002                }
2003                crate::transport::ConnectionState::None => {
2004                    // Shouldn't happen — treat as failure
2005                    completed.push((i, false, Some("no connection attempt found".into())));
2006                }
2007            }
2008        }
2009
2010        // Process completions in reverse order to preserve indices
2011        for (i, success, reason) in completed.into_iter().rev() {
2012            let pending = self.pending_connects.remove(i);
2013
2014            if success {
2015                // Mark link as Connected
2016                if let Some(link) = self.links.get_mut(&pending.link_id) {
2017                    link.set_connected();
2018                }
2019
2020                debug!(
2021                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2022                    transport_id = %pending.transport_id,
2023                    remote_addr = %pending.remote_addr,
2024                    link_id = %pending.link_id,
2025                    "Transport connected, starting handshake"
2026                );
2027
2028                // Start the handshake now that the transport is connected
2029                if let Err(e) = self
2030                    .start_handshake(
2031                        pending.link_id,
2032                        pending.transport_id,
2033                        pending.remote_addr.clone(),
2034                        pending.peer_identity,
2035                    )
2036                    .await
2037                {
2038                    warn!(
2039                        link_id = %pending.link_id,
2040                        error = %e,
2041                        "Failed to start handshake after transport connect"
2042                    );
2043                    // Clean up link on handshake failure
2044                    self.remove_link(&pending.link_id);
2045                }
2046            } else {
2047                let reason = reason.unwrap_or_default();
2048                warn!(
2049                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2050                    transport_id = %pending.transport_id,
2051                    remote_addr = %pending.remote_addr,
2052                    link_id = %pending.link_id,
2053                    reason = %reason,
2054                    "Transport connect failed"
2055                );
2056
2057                // Clean up link and schedule retry
2058                self.remove_link(&pending.link_id);
2059                self.links.remove(&pending.link_id);
2060                self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
2061            }
2062        }
2063    }
2064
2065    // === State Transitions ===
2066
2067    /// Start the node.
2068    ///
2069    /// Initializes the TUN interface (if configured), spawns I/O threads,
2070    /// and transitions to the Running state.
2071    pub async fn start(&mut self) -> Result<(), NodeError> {
2072        node_start_debug_log("Node::start begin");
2073        if !self.state.can_start() {
2074            return Err(NodeError::AlreadyStarted);
2075        }
2076        self.state = NodeState::Starting;
2077        node_start_debug_log("Node::start state set to starting");
2078
2079        // Create packet channel for transport -> Node communication
2080        let packet_buffer_size = self.config.node.buffers.packet_channel;
2081        let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
2082        self.packet_tx = Some(packet_tx.clone());
2083        self.packet_rx = Some(packet_rx);
2084        node_start_debug_log("Node::start packet channel created");
2085
2086        // Initialize transports first (before TUN, before Nostr discovery).
2087        node_start_debug_log("Node::start create transports begin");
2088        let transport_handles = self.create_transports(&packet_tx).await;
2089        node_start_debug_log(format!(
2090            "Node::start create transports complete count={}",
2091            transport_handles.len()
2092        ));
2093
2094        for mut handle in transport_handles {
2095            let transport_id = handle.transport_id();
2096            let transport_type = handle.transport_type().name;
2097            let name = handle.name().map(|s| s.to_string());
2098
2099            node_start_debug_log(format!(
2100                "Node::start transport start begin id={} type={} name={:?}",
2101                transport_id, transport_type, name
2102            ));
2103            match handle.start().await {
2104                Ok(()) => {
2105                    node_start_debug_log(format!(
2106                        "Node::start transport start ok id={} type={}",
2107                        transport_id, transport_type
2108                    ));
2109                    self.transports.insert(transport_id, handle);
2110                }
2111                Err(e) => {
2112                    node_start_debug_log(format!(
2113                        "Node::start transport start error id={} type={} error={}",
2114                        transport_id, transport_type, e
2115                    ));
2116                    if let Some(ref n) = name {
2117                        warn!(transport_type, name = %n, error = %e, "Transport failed to start");
2118                    } else {
2119                        warn!(transport_type, error = %e, "Transport failed to start");
2120                    }
2121                }
2122            }
2123        }
2124
2125        if !self.transports.is_empty() {
2126            info!(count = self.transports.len(), "Transports initialized");
2127        }
2128
2129        // Spawn the off-task FMP-encrypt + UDP-send + FMP-decrypt
2130        // worker pools. **Unix only** — both pools issue direct
2131        // sendmmsg(2) / sendmsg(2)+UDP_GSO / recvmmsg(2) calls on
2132        // raw file descriptors via `AsRawFd`, which is a unix-only
2133        // trait. On Windows the rx_loop's tokio-based send/recv
2134        // remain the canonical path; the perf overhaul lands its
2135        // gains on unix.
2136        //
2137        // Worker count defaults to the number of CPUs, overridable
2138        // via `FIPS_ENCRYPT_WORKERS=N` / `FIPS_DECRYPT_WORKERS=N`
2139        // for debug / benchmarking. Hash-by-destination means a
2140        // single TCP flow pins to one worker (preserves wire
2141        // ordering); additional workers light up under multi-flow
2142        // / multi-peer load. See `node::encrypt_worker` /
2143        // `node::decrypt_worker` for full rationale.
2144        #[cfg(unix)]
2145        {
2146            if self.config.node.worker_pools_enabled {
2147                node_start_debug_log("Node::start worker pools begin");
2148                let cpu_default = std::thread::available_parallelism()
2149                    .map(|n| n.get())
2150                    .unwrap_or(1)
2151                    .max(1);
2152                let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
2153                    .ok()
2154                    .and_then(|s| s.parse().ok())
2155                    .unwrap_or(cpu_default)
2156                    .max(1);
2157                self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
2158                    encrypt_worker_count,
2159                ));
2160                info!(
2161                    workers = encrypt_worker_count,
2162                    "Spawned FMP-encrypt worker pool"
2163                );
2164
2165                // `FIPS_DECRYPT_WORKERS=0` disables the pool entirely and
2166                // falls through to the in-line rx_loop decrypt path (the
2167                // "test-mode" branch in `handle_encrypted_frame`, which is
2168                // in fact a fully functional synchronous decrypt). Useful
2169                // as an A/B against the worker pipeline when chasing
2170                // scheduling/queueing regressions on the native macOS
2171                // path. Any non-zero value (env or default) spawns the
2172                // pool as before.
2173                let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
2174                    .ok()
2175                    .and_then(|s| s.parse().ok())
2176                    .unwrap_or(cpu_default);
2177                if decrypt_worker_count == 0 {
2178                    info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
2179                } else {
2180                    self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
2181                        decrypt_worker_count,
2182                    ));
2183                    info!(
2184                        workers = decrypt_worker_count,
2185                        "Spawned FMP+FSP-decrypt worker pool"
2186                    );
2187                }
2188                node_start_debug_log("Node::start worker pools complete");
2189            } else {
2190                node_start_debug_log("Node::start worker pools disabled");
2191                info!("FIPS worker pools disabled; using in-line crypto/send path");
2192            }
2193        }
2194
2195        if self.config.node.discovery.nostr.enabled {
2196            node_start_debug_log("Node::start nostr discovery start begin");
2197            match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
2198                .await
2199            {
2200                Ok(runtime) => {
2201                    node_start_debug_log("Node::start nostr discovery runtime created");
2202                    if let Err(err) = self.refresh_overlay_advert(&runtime).await {
2203                        warn!(error = %err, "Failed to publish initial Nostr overlay advert");
2204                    }
2205                    node_start_debug_log("Node::start nostr overlay advert refreshed");
2206                    self.nostr_discovery = Some(runtime);
2207                    self.nostr_discovery_started_at_ms = Some(Self::now_ms());
2208                    info!("Nostr overlay discovery enabled");
2209                }
2210                Err(err) => {
2211                    node_start_debug_log(format!(
2212                        "Node::start nostr discovery start error error={}",
2213                        err
2214                    ));
2215                    warn!(error = %err, "Failed to start Nostr overlay discovery");
2216                }
2217            }
2218        }
2219
2220        // mDNS / DNS-SD LAN discovery. Independent of Nostr — runs even
2221        // when Nostr is disabled, since it gives us sub-second pairing
2222        // on the same link without any relay or NAT-traversal roundtrip.
2223        if self.config.node.discovery.lan.enabled {
2224            node_start_debug_log("Node::start lan discovery start begin");
2225            let advertised_udp_port = self
2226                .transports
2227                .values()
2228                .filter(|h| h.is_operational())
2229                .filter(|h| h.transport_type().name == "udp")
2230                .find_map(|h| h.local_addr().map(|addr| addr.port()))
2231                .unwrap_or(0);
2232            let scope = self.lan_discovery_scope();
2233            match crate::discovery::lan::LanDiscovery::start(
2234                &self.identity,
2235                scope,
2236                advertised_udp_port,
2237                self.config.node.discovery.lan.clone(),
2238            )
2239            .await
2240            {
2241                Ok(runtime) => {
2242                    node_start_debug_log("Node::start lan discovery start ok");
2243                    self.lan_discovery = Some(runtime);
2244                    info!("LAN mDNS discovery enabled");
2245                }
2246                Err(err) => {
2247                    node_start_debug_log(format!(
2248                        "Node::start lan discovery start error error={}",
2249                        err
2250                    ));
2251                    debug!(error = %err, "LAN mDNS discovery not started");
2252                }
2253            }
2254        }
2255
2256        self.start_local_instance_discovery();
2257        self.poll_local_instance_discovery().await;
2258
2259        // Connect to static peers before TUN is active
2260        // This allows handshake messages to be sent before we start accepting packets
2261        node_start_debug_log("Node::start initiate peer connections begin");
2262        self.initiate_peer_connections().await;
2263        node_start_debug_log("Node::start initiate peer connections complete");
2264
2265        // Initialize TUN interface last, after transports and peers are ready
2266        if self.config.tun.enabled {
2267            node_start_debug_log("Node::start tun init begin");
2268            let address = *self.identity.address();
2269            match TunDevice::create(&self.config.tun, address).await {
2270                Ok(device) => {
2271                    let mtu = device.mtu();
2272                    let name = device.name().to_string();
2273                    let our_addr = *device.address();
2274
2275                    info!("TUN device active:");
2276                    info!("     name: {}", name);
2277                    info!("  address: {}", device.address());
2278                    info!("      mtu: {}", mtu);
2279
2280                    // Calculate max MSS for TCP clamping
2281                    let effective_mtu = self.effective_ipv6_mtu();
2282                    let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); // IPv6 + TCP headers
2283
2284                    info!("effective MTU: {} bytes", effective_mtu);
2285                    debug!("   max TCP MSS: {} bytes", max_mss);
2286
2287                    // On macOS, create a shutdown pipe. Writing to it unblocks the
2288                    // reader thread's select() loop without closing the TUN fd
2289                    // (which would cause a double-close when TunDevice drops).
2290                    #[cfg(target_os = "macos")]
2291                    let (shutdown_read_fd, shutdown_write_fd) = {
2292                        let mut fds = [0i32; 2];
2293                        if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
2294                            return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
2295                                "failed to create shutdown pipe".into(),
2296                            )));
2297                        }
2298                        (fds[0], fds[1])
2299                    };
2300
2301                    // Create writer (dups the fd for independent write access).
2302                    // Pass path_mtu_lookup so inbound SYN-ACK clamp can read
2303                    // per-destination path MTU learned via discovery.
2304                    let (writer, tun_tx) =
2305                        device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
2306
2307                    // Spawn writer thread
2308                    let writer_handle = thread::spawn(move || {
2309                        writer.run();
2310                    });
2311
2312                    // Clone tun_tx for the reader
2313                    let reader_tun_tx = tun_tx.clone();
2314
2315                    // Create outbound channel for TUN reader → Node
2316                    let tun_channel_size = self.config.node.buffers.tun_channel;
2317                    let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
2318
2319                    // Spawn reader thread
2320                    let transport_mtu = self.transport_mtu();
2321                    let path_mtu_lookup = self.path_mtu_lookup.clone();
2322                    #[cfg(target_os = "macos")]
2323                    let reader_handle = thread::spawn(move || {
2324                        run_tun_reader(
2325                            device,
2326                            mtu,
2327                            our_addr,
2328                            reader_tun_tx,
2329                            outbound_tx,
2330                            transport_mtu,
2331                            path_mtu_lookup,
2332                            shutdown_read_fd,
2333                        );
2334                    });
2335                    #[cfg(not(target_os = "macos"))]
2336                    let reader_handle = thread::spawn(move || {
2337                        run_tun_reader(
2338                            device,
2339                            mtu,
2340                            our_addr,
2341                            reader_tun_tx,
2342                            outbound_tx,
2343                            transport_mtu,
2344                            path_mtu_lookup,
2345                        );
2346                    });
2347
2348                    self.tun_state = TunState::Active;
2349                    self.tun_name = Some(name);
2350                    self.tun_tx = Some(tun_tx);
2351                    self.tun_outbound_rx = Some(outbound_rx);
2352                    self.tun_reader_handle = Some(reader_handle);
2353                    self.tun_writer_handle = Some(writer_handle);
2354                    #[cfg(target_os = "macos")]
2355                    {
2356                        self.tun_shutdown_fd = Some(shutdown_write_fd);
2357                    }
2358                }
2359                Err(e) => {
2360                    self.tun_state = TunState::Failed;
2361                    warn!(error = %e, "Failed to initialize TUN, continuing without it");
2362                }
2363            }
2364            node_start_debug_log("Node::start tun init complete");
2365        }
2366
2367        // Initialize DNS responder (independent of TUN).
2368        //
2369        // Default bind_addr is "::1" (IPv6 loopback). The shipped
2370        // fips-dns-setup configures systemd-resolved via a global
2371        // /etc/systemd/resolved.conf.d/fips.conf drop-in pointing at
2372        // [::1]:5354, which sidesteps a Linux IPV6_PKTINFO behaviour
2373        // where self-destined traffic to fips0's address is attributed
2374        // to fips0 in PKTINFO and gets silently dropped by the
2375        // mesh-interface filter in src/upper/dns.rs.
2376        //
2377        // For mesh-reachable resolution (rare), set bind_addr: "::"
2378        // in fips.yaml. The mesh-interface filter remains active to
2379        // prevent hosts-file alias enumeration in that mode.
2380        // `IPV6_V6ONLY=0` is set explicitly so IPv4 clients on
2381        // 127.0.0.1 still reach us regardless of kernel sysctl
2382        // defaults — but only when bind is on a wildcard / IPv6 path.
2383        if self.config.dns.enabled {
2384            node_start_debug_log("Node::start dns init begin");
2385            let addr_str = self.config.dns.bind_addr();
2386            match addr_str.parse::<std::net::IpAddr>() {
2387                Ok(ip) => {
2388                    let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
2389                    match Self::bind_dns_socket(bind) {
2390                        Ok(socket) => {
2391                            let dns_channel_size = self.config.node.buffers.dns_channel;
2392                            let (identity_tx, identity_rx) =
2393                                tokio::sync::mpsc::channel(dns_channel_size);
2394                            let dns_ttl = self.config.dns.ttl();
2395                            let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
2396                                self.config.peers(),
2397                            );
2398                            let reloader = if self.config.node.system_files_enabled {
2399                                let hosts_path = std::path::PathBuf::from(
2400                                    crate::upper::hosts::DEFAULT_HOSTS_PATH,
2401                                );
2402                                crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
2403                            } else {
2404                                crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
2405                            };
2406                            // Resolve the TUN ifindex so the responder can
2407                            // drop queries arriving on the mesh interface
2408                            // (fips0). Without this, the `::` bind exposes
2409                            // /etc/fips/hosts alias probing to any mesh peer.
2410                            // When TUN isn't enabled or the name can't be
2411                            // resolved, `None` disables the filter (there
2412                            // is no mesh surface to defend anyway).
2413                            let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
2414                            info!(
2415                                bind = %bind,
2416                                hosts = reloader.hosts().len(),
2417                                mesh_ifindex = ?mesh_ifindex,
2418                                "DNS responder started for .fips domain (auto-reload enabled)"
2419                            );
2420                            let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
2421                                socket,
2422                                identity_tx,
2423                                dns_ttl,
2424                                reloader,
2425                                mesh_ifindex,
2426                            ));
2427                            self.dns_identity_rx = Some(identity_rx);
2428                            self.dns_task = Some(handle);
2429                        }
2430                        Err(e) => {
2431                            warn!(bind = %bind, error = %e, "Failed to start DNS responder");
2432                        }
2433                    }
2434                }
2435                Err(e) => {
2436                    warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
2437                }
2438            }
2439            node_start_debug_log("Node::start dns init complete");
2440        }
2441
2442        self.state = NodeState::Running;
2443        node_start_debug_log("Node::start running");
2444        info!("Node started:");
2445        info!("       state: {}", self.state);
2446        info!("  transports: {}", self.transports.len());
2447        info!(" connections: {}", self.connections.len());
2448        Ok(())
2449    }
2450
2451    /// Bind a UDP socket for the DNS responder.
2452    ///
2453    /// For IPv6 binds (including `::`), sets `IPV6_V6ONLY=0` so the socket
2454    /// also accepts IPv4-mapped addresses. This guarantees dual-stack
2455    /// delivery regardless of `net.ipv6.bindv6only` sysctl on the host —
2456    /// v4 clients on 127.0.0.1 and v6 clients on the fips0 address both
2457    /// land on the same socket.
2458    ///
2459    /// Also enables `IPV6_RECVPKTINFO` on IPv6 sockets so the responder
2460    /// can learn the arrival interface per packet. The responder uses that
2461    /// to drop queries arriving on the mesh TUN, closing the hosts-file
2462    /// probing side-channel created by the `::` bind.
2463    fn bind_dns_socket(
2464        addr: std::net::SocketAddr,
2465    ) -> Result<tokio::net::UdpSocket, std::io::Error> {
2466        use socket2::{Domain, Protocol, Socket, Type};
2467        let domain = if addr.is_ipv4() {
2468            Domain::IPV4
2469        } else {
2470            Domain::IPV6
2471        };
2472        let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
2473        if addr.is_ipv6() {
2474            sock.set_only_v6(false)?;
2475            #[cfg(unix)]
2476            Self::set_recv_pktinfo_v6(&sock)?;
2477        }
2478        sock.set_nonblocking(true)?;
2479        sock.bind(&addr.into())?;
2480        tokio::net::UdpSocket::from_std(sock.into())
2481    }
2482
2483    /// Enable `IPV6_RECVPKTINFO` on an IPv6 UDP socket.
2484    ///
2485    /// After this setsockopt, each `recvmsg()` call on the socket receives
2486    /// an `IPV6_PKTINFO` control message containing the arrival interface
2487    /// index, which the DNS responder uses for its mesh-interface filter.
2488    #[cfg(unix)]
2489    fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
2490        use std::os::fd::AsRawFd;
2491        let enable: libc::c_int = 1;
2492        let ret = unsafe {
2493            libc::setsockopt(
2494                sock.as_raw_fd(),
2495                libc::IPPROTO_IPV6,
2496                libc::IPV6_RECVPKTINFO,
2497                &enable as *const _ as *const libc::c_void,
2498                std::mem::size_of::<libc::c_int>() as libc::socklen_t,
2499            )
2500        };
2501        if ret < 0 {
2502            return Err(std::io::Error::last_os_error());
2503        }
2504        Ok(())
2505    }
2506
2507    /// Resolve the mesh TUN interface index by name.
2508    ///
2509    /// Returns `None` if the interface does not exist (e.g. TUN disabled
2510    /// or not yet created). A `None` result disables the DNS responder's
2511    /// mesh-interface filter — safe, because if there is no fips0 there
2512    /// is no mesh exposure to defend against.
2513    fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
2514        #[cfg(unix)]
2515        {
2516            let c_name = std::ffi::CString::new(name).ok()?;
2517            let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
2518            if idx == 0 { None } else { Some(idx) }
2519        }
2520        #[cfg(not(unix))]
2521        {
2522            let _ = name;
2523            None
2524        }
2525    }
2526
2527    /// Stop the node.
2528    ///
2529    /// Shuts down TUN interface, stops I/O threads, and transitions to
2530    /// the Stopped state.
2531    pub async fn stop(&mut self) -> Result<(), NodeError> {
2532        if !self.state.can_stop() {
2533            return Err(NodeError::NotStarted);
2534        }
2535        self.state = NodeState::Stopping;
2536        info!(state = %self.state, "Node stopping");
2537
2538        // Stop DNS responder
2539        if let Some(handle) = self.dns_task.take() {
2540            handle.abort();
2541            debug!("DNS responder stopped");
2542        }
2543
2544        // Send disconnect notifications to all active peers before closing transports
2545        self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
2546            .await;
2547
2548        // Stop Nostr overlay discovery background work and withdraw any advert.
2549        if let Some(bootstrap) = self.nostr_discovery.take()
2550            && let Err(e) = bootstrap.shutdown().await
2551        {
2552            warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
2553        }
2554
2555        // Tear down LAN mDNS responder + browser. Best-effort: the
2556        // OS will eventually time the advert out via its TTL even if
2557        // we don't get a clean unregister out before the daemon exits.
2558        if let Some(lan) = self.lan_discovery.take() {
2559            lan.shutdown().await;
2560        }
2561
2562        if let Some(registry) = self.local_instance_registry.take()
2563            && let Err(err) = registry.remove()
2564        {
2565            debug!(error = %err, "failed to remove same-host FIPS instance record");
2566        }
2567
2568        // Shutdown transports (they're packet producers)
2569        let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
2570        for transport_id in transport_ids {
2571            if let Some(mut handle) = self.transports.remove(&transport_id) {
2572                let transport_type = handle.transport_type().name;
2573                match handle.stop().await {
2574                    Ok(()) => {
2575                        info!(transport_id = %transport_id, transport_type, "Transport stopped");
2576                    }
2577                    Err(e) => {
2578                        warn!(
2579                            transport_id = %transport_id,
2580                            transport_type,
2581                            error = %e,
2582                            "Transport stop failed"
2583                        );
2584                    }
2585                }
2586            }
2587        }
2588
2589        // Drop packet channels
2590        self.packet_tx.take();
2591        self.packet_rx.take();
2592
2593        // Shutdown TUN interface
2594        if let Some(name) = self.tun_name.take() {
2595            info!(name = %name, "Shutting down TUN interface");
2596
2597            // Drop the tun_tx to signal the writer to stop
2598            self.tun_tx.take();
2599
2600            // Delete the interface (on Linux, causes reader to get EFAULT)
2601            if let Err(e) = shutdown_tun_interface(&name).await {
2602                warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
2603            }
2604
2605            // On macOS, signal the reader thread to exit by writing to the
2606            // shutdown pipe. The reader's select() will wake up and break.
2607            #[cfg(target_os = "macos")]
2608            if let Some(fd) = self.tun_shutdown_fd.take() {
2609                unsafe {
2610                    libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
2611                    libc::close(fd);
2612                }
2613            }
2614
2615            // Wait for threads to finish
2616            if let Some(handle) = self.tun_reader_handle.take() {
2617                let _ = handle.join();
2618            }
2619            if let Some(handle) = self.tun_writer_handle.take() {
2620                let _ = handle.join();
2621            }
2622
2623            self.tun_state = TunState::Disabled;
2624        }
2625
2626        self.state = NodeState::Stopped;
2627        info!(state = %self.state, "Node stopped");
2628        Ok(())
2629    }
2630
2631    /// Send disconnect notifications to all active peers.
2632    ///
2633    /// Best-effort: send failures are logged and ignored since the transport
2634    /// may already be degraded. This runs before transports are shut down.
2635    async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
2636        let disconnect = Disconnect::new(reason);
2637        let plaintext = disconnect.encode();
2638
2639        // Collect node_addrs to avoid borrow conflict with send helper
2640        let peer_addrs: Vec<NodeAddr> = self
2641            .peers
2642            .iter()
2643            .filter(|(_, peer)| peer.can_send() && peer.has_session())
2644            .map(|(addr, _)| *addr)
2645            .collect();
2646
2647        if peer_addrs.is_empty() {
2648            debug!(
2649                total_peers = self.peers.len(),
2650                "No sendable peers for disconnect notification"
2651            );
2652            return;
2653        }
2654
2655        let mut sent = 0usize;
2656        for node_addr in &peer_addrs {
2657            match self
2658                .send_encrypted_link_message(node_addr, &plaintext)
2659                .await
2660            {
2661                Ok(()) => sent += 1,
2662                Err(e) => {
2663                    debug!(
2664                        peer = %self.peer_display_name(node_addr),
2665                        error = %e,
2666                        "Failed to send disconnect (transport may be down)"
2667                    );
2668                }
2669            }
2670        }
2671
2672        info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2673    }
2674
2675    pub(in crate::node) fn static_peer_addresses(
2676        &self,
2677        peer_config: &PeerConfig,
2678    ) -> Vec<PeerAddress> {
2679        peer_config
2680            .addresses_by_priority()
2681            .into_iter()
2682            .cloned()
2683            .collect()
2684    }
2685
2686    async fn nostr_peer_fallback_addresses(
2687        &self,
2688        peer_config: &PeerConfig,
2689        existing: &[PeerAddress],
2690    ) -> Vec<PeerAddress> {
2691        if !self.config.node.discovery.nostr.enabled
2692            || self.config.node.discovery.nostr.policy
2693                == crate::config::NostrDiscoveryPolicy::Disabled
2694        {
2695            return Vec::new();
2696        }
2697
2698        let Some(bootstrap) = self.nostr_discovery.clone() else {
2699            return Vec::new();
2700        };
2701        if self.nostr_cooldown_applies_to_peer_config(peer_config)
2702            && bootstrap
2703                .cooldown_until(&peer_config.npub, Self::now_ms())
2704                .is_some()
2705        {
2706            debug!(
2707                npub = %peer_config.npub,
2708                "Skipping cached Nostr fallback endpoints while peer is in traversal cooldown"
2709            );
2710            return Vec::new();
2711        }
2712        let endpoints = match bootstrap
2713            .cached_advert_endpoints_for_peer(&peer_config.npub)
2714            .await
2715        {
2716            Some(endpoints) => endpoints,
2717            None => {
2718                debug!(
2719                    npub = %peer_config.npub,
2720                    "No cached Nostr advert endpoints for configured peer"
2721                );
2722                return Vec::new();
2723            }
2724        };
2725
2726        let mut fallback = Vec::new();
2727        let mut next_priority = existing
2728            .iter()
2729            .map(|addr| addr.priority)
2730            .max()
2731            .unwrap_or(100)
2732            .saturating_add(1);
2733        // Stamp every overlay-derived candidate with the current wall clock.
2734        // The dialer still honors explicit priority first; this timestamp is
2735        // only a recency tiebreaker within the same priority tier. We use a
2736        // single timestamp per fetch because all candidates in one advert are
2737        // equally fresh.
2738        let seen_at_ms = Self::now_ms();
2739        for endpoint in endpoints {
2740            let Some(candidate) =
2741                Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2742            else {
2743                continue;
2744            };
2745            if existing
2746                .iter()
2747                .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2748                || fallback.iter().any(|addr: &PeerAddress| {
2749                    addr.transport == candidate.transport && addr.addr == candidate.addr
2750                })
2751            {
2752                continue;
2753            }
2754            fallback.push(candidate);
2755            next_priority = next_priority.saturating_add(1);
2756        }
2757        fallback
2758    }
2759
2760    pub(in crate::node) async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2761        if !self.config.node.discovery.nostr.enabled
2762            || self.config.node.discovery.nostr.policy
2763                == crate::config::NostrDiscoveryPolicy::Disabled
2764        {
2765            return false;
2766        }
2767        let Some(bootstrap) = self.nostr_discovery.clone() else {
2768            return false;
2769        };
2770        let now_ms = Self::now_ms();
2771        if self.nostr_cooldown_applies_to_peer_config(peer_config)
2772            && let Some(cooldown_until_ms) = bootstrap.cooldown_until(&peer_config.npub, now_ms)
2773        {
2774            debug!(
2775                npub = %peer_config.npub,
2776                cooldown_secs = cooldown_until_ms.saturating_sub(now_ms) / 1000,
2777                "Skipping Nostr traversal request while peer is in cooldown"
2778            );
2779            return false;
2780        }
2781        bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
2782        bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
2783        let mesh_signaling_allowed = self.mesh_signaling_allowed_for_peer(peer_config);
2784        bootstrap
2785            .request_connect_with_mesh_signaling(peer_config.clone(), mesh_signaling_allowed)
2786            .await;
2787        info!(
2788            npub = %peer_config.npub,
2789            mesh_signaling_allowed,
2790            "Started background UDP NAT traversal attempt"
2791        );
2792        true
2793    }
2794
2795    fn nostr_cooldown_applies_to_peer_config(&self, peer_config: &PeerConfig) -> bool {
2796        !self.mesh_signaling_allowed_for_peer(peer_config)
2797    }
2798
2799    pub(in crate::node) fn mesh_signaling_allowed_for_peer(
2800        &self,
2801        peer_config: &PeerConfig,
2802    ) -> bool {
2803        let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
2804            return false;
2805        };
2806        let peer_addr = identity.node_addr();
2807        self.configured_peer(peer_addr).is_some()
2808    }
2809
2810    fn overlay_endpoint_to_peer_address(
2811        endpoint: &OverlayEndpointAdvert,
2812        priority: u8,
2813        seen_at_ms: u64,
2814    ) -> Option<PeerAddress> {
2815        let transport = match endpoint.transport {
2816            OverlayTransportKind::Udp => "udp",
2817            OverlayTransportKind::Tcp => "tcp",
2818            OverlayTransportKind::Tor => "tor",
2819            OverlayTransportKind::WebRtc => "webrtc",
2820        };
2821        Some(
2822            PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2823                .with_seen_at_ms(seen_at_ms),
2824        )
2825    }
2826
2827    async fn attempt_peer_address_list(
2828        &mut self,
2829        peer_config: &PeerConfig,
2830        peer_identity: PeerIdentity,
2831        allow_bootstrap_nat: bool,
2832        addresses: &[PeerAddress],
2833    ) -> Result<(), NodeError> {
2834        let mut attempted = false;
2835        let mut local_route_error = None;
2836        let peer_node_addr = *peer_identity.node_addr();
2837        let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2838
2839        for addr in addresses {
2840            if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2841                if !allow_bootstrap_nat {
2842                    continue;
2843                }
2844                if self.request_nostr_bootstrap(peer_config).await {
2845                    attempted = true;
2846                    continue;
2847                }
2848                debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2849                continue;
2850            }
2851
2852            let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2853                match self.resolve_ethernet_addr(&addr.addr) {
2854                    Ok(result) => result,
2855                    Err(e) => {
2856                        debug!(
2857                            transport = %addr.transport,
2858                            addr = %addr.addr,
2859                            error = %e,
2860                            "Failed to resolve Ethernet address"
2861                        );
2862                        continue;
2863                    }
2864                }
2865            } else if addr.transport == "ble" {
2866                #[cfg(bluer_available)]
2867                {
2868                    match self.resolve_ble_addr(&addr.addr) {
2869                        Ok(result) => result,
2870                        Err(e) => {
2871                            debug!(
2872                                transport = %addr.transport,
2873                                addr = %addr.addr,
2874                                error = %e,
2875                                "Failed to resolve BLE address"
2876                            );
2877                            continue;
2878                        }
2879                    }
2880                }
2881                #[cfg(not(bluer_available))]
2882                {
2883                    debug!(transport = %addr.transport, "BLE transport not available on this build");
2884                    continue;
2885                }
2886            } else {
2887                let tid = if addr.transport == "udp"
2888                    && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2889                {
2890                    match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2891                        Some((id, _)) => id,
2892                        None => {
2893                            debug!(
2894                                transport = %addr.transport,
2895                                addr = %addr.addr,
2896                                "No compatible operational UDP transport for address"
2897                            );
2898                            continue;
2899                        }
2900                    }
2901                } else {
2902                    match self.find_transport_for_type(&addr.transport) {
2903                        Some(id) => id,
2904                        None => {
2905                            debug!(
2906                                transport = %addr.transport,
2907                                addr = %addr.addr,
2908                                "No operational transport for address type"
2909                            );
2910                            continue;
2911                        }
2912                    }
2913                };
2914                (tid, TransportAddr::from_string(&addr.addr))
2915            };
2916
2917            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2918                attempted = true;
2919                debug!(
2920                    npub = %peer_config.npub,
2921                    transport_id = %transport_id,
2922                    remote_addr = %remote_addr,
2923                    "Skipping duplicate in-flight candidate path"
2924                );
2925                continue;
2926            }
2927
2928            if concrete_budget == 0 {
2929                debug!(
2930                    npub = %peer_config.npub,
2931                    max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2932                    "Path candidate race budget exhausted"
2933                );
2934                break;
2935            }
2936
2937            match self
2938                .initiate_connection(transport_id, remote_addr, peer_identity)
2939                .await
2940            {
2941                Ok(()) => {
2942                    attempted = true;
2943                    concrete_budget = concrete_budget.saturating_sub(1);
2944                }
2945                Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2946                Err(e) => {
2947                    if e.is_local_route_unavailable() && local_route_error.is_none() {
2948                        local_route_error = Some(e.to_string());
2949                    }
2950                    debug!(
2951                        npub = %peer_config.npub,
2952                        transport_id = %transport_id,
2953                        error = %e,
2954                        "Connection attempt failed, trying next address"
2955                    );
2956                }
2957            }
2958        }
2959
2960        if attempted {
2961            return Ok(());
2962        }
2963
2964        if let Some(error) = local_route_error {
2965            return Err(NodeError::LocalRouteUnavailable(error));
2966        }
2967
2968        Err(NodeError::NoTransportForType(format!(
2969            "no operational transport for any of {}'s addresses",
2970            peer_config.npub
2971        )))
2972    }
2973
2974    async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
2975        self.run_open_discovery_sweep(bootstrap, None, "per-tick")
2976            .await;
2977    }
2978
2979    pub(in crate::node) fn queue_active_fallback_direct_retries(
2980        &mut self,
2981        _bootstrap: &std::sync::Arc<NostrDiscovery>,
2982    ) {
2983        let now_ms = Self::now_ms();
2984        let peer_configs = self
2985            .config
2986            .auto_connect_peers()
2987            .cloned()
2988            .collect::<Vec<_>>();
2989
2990        for peer_config in peer_configs {
2991            let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
2992                continue;
2993            };
2994            let node_addr = *peer_identity.node_addr();
2995
2996            if self.retry_pending.contains_key(&node_addr)
2997                || !self.peers.contains_key(&node_addr)
2998                || self.is_connecting_to_peer(&node_addr)
2999                || !self.active_peer_should_keep_direct_retry(&node_addr, &peer_config)
3000            {
3001                continue;
3002            }
3003
3004            let mut state = super::retry::RetryState::new(peer_config.clone());
3005            state.reconnect = true;
3006            state.retry_after_ms = now_ms;
3007            self.retry_pending.insert(node_addr, state);
3008
3009            debug!(
3010                peer = %self.peer_display_name(&node_addr),
3011                "Queued direct-path retry for active fallback peer"
3012            );
3013        }
3014    }
3015
3016    /// Open-discovery cache sweep. Iterates the cached overlay adverts and
3017    /// queues retries for non-configured, not-yet-connected peers.
3018    ///
3019    /// `max_age_secs`, if set, filters out adverts whose `created_at` is
3020    /// older than `now - max_age_secs`. The per-tick sweep passes `None`
3021    /// (relies on the cache's own `valid_until_ms` filter); the one-shot
3022    /// startup sweep passes `Some(startup_sweep_max_age_secs)`.
3023    ///
3024    /// `caller` is a short label included in log lines so per-tick and
3025    /// startup sweeps are distinguishable in operator-facing logs.
3026    pub(in crate::node) async fn run_open_discovery_sweep(
3027        &mut self,
3028        bootstrap: &std::sync::Arc<NostrDiscovery>,
3029        max_age_secs: Option<u64>,
3030        caller: &'static str,
3031    ) {
3032        if !self.config.node.discovery.nostr.enabled
3033            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3034        {
3035            return;
3036        }
3037
3038        let configured_npubs = self
3039            .config
3040            .peers()
3041            .iter()
3042            .map(|peer| peer.npub.clone())
3043            .collect::<HashSet<_>>();
3044        let now_ms = Self::now_ms();
3045        let now_secs = now_ms / 1000;
3046        let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
3047        if enqueue_budget == 0 {
3048            debug!(
3049                caller = %caller,
3050                "open-discovery sweep: enqueue budget is 0, skipping"
3051            );
3052            return;
3053        }
3054
3055        let candidates = bootstrap.cached_open_discovery_candidates(64).await;
3056        let cached_count = candidates.len();
3057        let mut enqueued = 0usize;
3058        let mut skipped_age = 0usize;
3059        let mut skipped_configured = 0usize;
3060        let mut skipped_self = 0usize;
3061        let mut skipped_connected = 0usize;
3062        let mut skipped_retry_pending = 0usize;
3063        let mut skipped_connecting = 0usize;
3064        let mut skipped_no_endpoints = 0usize;
3065        let mut skipped_invalid_npub = 0usize;
3066        let mut skipped_cooldown = 0usize;
3067
3068        for (npub, endpoints, created_at_secs) in candidates {
3069            if enqueue_budget == 0 {
3070                break;
3071            }
3072
3073            if let Some(max_age) = max_age_secs
3074                && now_secs.saturating_sub(created_at_secs) > max_age
3075            {
3076                skipped_age = skipped_age.saturating_add(1);
3077                continue;
3078            }
3079
3080            if configured_npubs.contains(&npub) {
3081                // Configured peers don't go through the open-discovery
3082                // enqueue path — their `PeerConfig` is already in
3083                // `self.config.peers()`, so the regular retry queue is
3084                // what drives their reconnect. But on cold start with
3085                // NAT'd peers, every initial `initiate_peer_connection`
3086                // fails (no overlay data yet, static cache hints empty
3087                // or stale), each pushes the peer into `retry_pending`
3088                // with exponential backoff (5/10/20/40/80s), and by the
3089                // time the next backoff slot fires the Nostr advert is
3090                // already cached — we just don't act on it for ~80s.
3091                //
3092                // The arrival of an advert (which this sweep sees) means
3093                // we now have a path to dial. If the peer's retry is
3094                // scheduled in the future, pull it forward to "now" so
3095                // the next `process_pending_retries` tick fires it
3096                // immediately. The retry path (`initiate_peer_retry_
3097                // connection` → `try_peer_addresses`) then refetches
3098                // the advert and dials it — no behavioral change
3099                // beyond schedule timing.
3100                if let Ok(identity) = PeerIdentity::from_npub(&npub) {
3101                    let configured_addr = *identity.node_addr();
3102                    if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3103                        skipped_cooldown = skipped_cooldown.saturating_add(1);
3104                        skipped_configured = skipped_configured.saturating_add(1);
3105                        continue;
3106                    }
3107                    if let Some(state) = self.retry_pending.get_mut(&configured_addr)
3108                        && state.retry_after_ms > now_ms
3109                    {
3110                        state.retry_after_ms = now_ms;
3111                        debug!(
3112                            caller = %caller,
3113                            peer = %self.peer_display_name(&configured_addr),
3114                            advert_age_secs = now_secs.saturating_sub(created_at_secs),
3115                            "Expediting configured-peer retry after fresh overlay advert"
3116                        );
3117                    }
3118                }
3119                skipped_configured = skipped_configured.saturating_add(1);
3120                continue;
3121            }
3122
3123            let peer_identity = match PeerIdentity::from_npub(&npub) {
3124                Ok(identity) => identity,
3125                Err(_) => {
3126                    skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
3127                    continue;
3128                }
3129            };
3130            let node_addr = *peer_identity.node_addr();
3131            if node_addr == *self.identity.node_addr() {
3132                skipped_self = skipped_self.saturating_add(1);
3133                continue;
3134            }
3135            if self.peers.contains_key(&node_addr) {
3136                skipped_connected = skipped_connected.saturating_add(1);
3137                continue;
3138            }
3139            if self.retry_pending.contains_key(&node_addr) {
3140                skipped_retry_pending = skipped_retry_pending.saturating_add(1);
3141                continue;
3142            }
3143            if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3144                skipped_cooldown = skipped_cooldown.saturating_add(1);
3145                continue;
3146            }
3147            let connecting = self.connections.values().any(|conn| {
3148                conn.expected_identity()
3149                    .map(|id| id.node_addr() == &node_addr)
3150                    .unwrap_or(false)
3151            });
3152            if connecting {
3153                skipped_connecting = skipped_connecting.saturating_add(1);
3154                continue;
3155            }
3156
3157            let mut addresses = Vec::new();
3158            let mut priority = 120u8;
3159            let seen_at_ms = Self::now_ms();
3160            for endpoint in endpoints {
3161                let Some(candidate) =
3162                    Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
3163                else {
3164                    continue;
3165                };
3166                if addresses.iter().any(|existing: &PeerAddress| {
3167                    existing.transport == candidate.transport && existing.addr == candidate.addr
3168                }) {
3169                    continue;
3170                }
3171                addresses.push(candidate);
3172                priority = priority.saturating_add(1);
3173            }
3174            if addresses.is_empty() {
3175                skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
3176                continue;
3177            }
3178
3179            self.peer_aliases
3180                .entry(node_addr)
3181                .or_insert_with(|| peer_identity.short_npub());
3182            self.register_identity(node_addr, peer_identity.pubkey_full());
3183
3184            let mut state = super::retry::RetryState::new(PeerConfig {
3185                npub: npub.clone(),
3186                alias: None,
3187                addresses,
3188                connect_policy: ConnectPolicy::AutoConnect,
3189                auto_reconnect: true,
3190                discovery_fallback_transit: false,
3191            });
3192            state.reconnect = false;
3193            state.retry_after_ms = now_ms;
3194            state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
3195            self.retry_pending.insert(node_addr, state);
3196            info!(
3197                caller = %caller,
3198                peer = %peer_identity.short_npub(),
3199                advert_age_secs = now_secs.saturating_sub(created_at_secs),
3200                "open-discovery sweep: queued retry for cached advert"
3201            );
3202            enqueue_budget = enqueue_budget.saturating_sub(1);
3203            enqueued = enqueued.saturating_add(1);
3204        }
3205
3206        // Always log a one-line summary on the startup sweep so operators
3207        // can verify it ran. Per-tick sweeps are noisier; only summarize
3208        // when something happened.
3209        let total_skipped = skipped_age
3210            + skipped_configured
3211            + skipped_self
3212            + skipped_connected
3213            + skipped_retry_pending
3214            + skipped_connecting
3215            + skipped_no_endpoints
3216            + skipped_invalid_npub
3217            + skipped_cooldown;
3218        let should_summarize = caller == "startup" || enqueued > 0;
3219        if should_summarize {
3220            info!(
3221                caller = %caller,
3222                cached = cached_count,
3223                queued = enqueued,
3224                skipped_age = skipped_age,
3225                skipped_configured = skipped_configured,
3226                skipped_self = skipped_self,
3227                skipped_connected = skipped_connected,
3228                skipped_retry_pending = skipped_retry_pending,
3229                skipped_connecting = skipped_connecting,
3230                skipped_no_endpoints = skipped_no_endpoints,
3231                skipped_invalid_npub = skipped_invalid_npub,
3232                skipped_cooldown = skipped_cooldown,
3233                skipped_total = total_skipped,
3234                "open-discovery sweep complete"
3235            );
3236        }
3237    }
3238
3239    /// One-shot startup sweep: runs once after the configured settle
3240    /// delay, iterating the cached overlay adverts and queueing retries
3241    /// for any peer with a recent enough advert that we haven't already
3242    /// configured statically or established a link to.
3243    ///
3244    /// Gated identically to [`run_open_discovery_sweep`]: requires
3245    /// `node.discovery.nostr.enabled` and `policy == open`.
3246    async fn maybe_run_startup_open_discovery_sweep(
3247        &mut self,
3248        bootstrap: &std::sync::Arc<NostrDiscovery>,
3249    ) {
3250        if self.startup_open_discovery_sweep_done {
3251            return;
3252        }
3253        if !self.config.node.discovery.nostr.enabled
3254            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3255        {
3256            // Mark done so we don't keep re-checking on every tick.
3257            self.startup_open_discovery_sweep_done = true;
3258            return;
3259        }
3260        let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
3261            return;
3262        };
3263        let now_ms = Self::now_ms();
3264        let delay_ms = self
3265            .config
3266            .node
3267            .discovery
3268            .nostr
3269            .startup_sweep_delay_secs
3270            .saturating_mul(1000);
3271        if now_ms < started_at_ms.saturating_add(delay_ms) {
3272            return;
3273        }
3274
3275        let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
3276        self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
3277            .await;
3278        self.startup_open_discovery_sweep_done = true;
3279    }
3280
3281    fn available_outbound_slots(&self) -> usize {
3282        let connection_used = self
3283            .connections
3284            .len()
3285            .saturating_add(self.pending_connects.len());
3286        let connection_slots = if self.max_connections == 0 {
3287            usize::MAX
3288        } else {
3289            self.max_connections.saturating_sub(connection_used)
3290        };
3291
3292        let peer_slots = if self.max_peers == 0 {
3293            usize::MAX
3294        } else {
3295            self.max_peers.saturating_sub(self.peers.len())
3296        };
3297
3298        let link_slots = if self.max_links == 0 {
3299            usize::MAX
3300        } else {
3301            self.max_links.saturating_sub(self.links.len())
3302        };
3303
3304        connection_slots.min(peer_slots).min(link_slots)
3305    }
3306
3307    pub(in crate::node) fn open_discovery_enqueue_budget(
3308        &self,
3309        configured_npubs: &HashSet<String>,
3310    ) -> usize {
3311        let current_open_discovery_active = self
3312            .peers
3313            .values()
3314            .filter(|peer| !configured_npubs.contains(&peer.npub()))
3315            .count();
3316        let current_open_discovery_pending = self
3317            .retry_pending
3318            .values()
3319            .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
3320            .count();
3321
3322        let cap_remaining = self
3323            .config
3324            .node
3325            .discovery
3326            .nostr
3327            .open_discovery_max_pending
3328            .saturating_sub(current_open_discovery_active)
3329            .saturating_sub(current_open_discovery_pending);
3330
3331        cap_remaining.min(self.available_outbound_slots())
3332    }
3333
3334    fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
3335        now_ms.saturating_add(
3336            self.config
3337                .node
3338                .discovery
3339                .nostr
3340                .advert_ttl_secs
3341                .saturating_mul(1000)
3342                .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
3343        )
3344    }
3345
3346    async fn build_overlay_advert(
3347        &self,
3348        bootstrap: &std::sync::Arc<NostrDiscovery>,
3349    ) -> Option<OverlayAdvert> {
3350        if !self.config.node.discovery.nostr.enabled {
3351            return None;
3352        }
3353
3354        let mut endpoints = Vec::new();
3355        let mut has_udp_nat = false;
3356        let mut has_webrtc = false;
3357
3358        for handle in self.transports.values() {
3359            if !handle.is_operational() {
3360                continue;
3361            }
3362
3363            match handle.transport_type().name {
3364                "udp" => {
3365                    let Some(cfg) = self.lookup_udp_config(handle.name()) else {
3366                        continue;
3367                    };
3368                    if !cfg.advertise_on_nostr() {
3369                        continue;
3370                    }
3371                    if cfg.is_public() {
3372                        // Precedence:
3373                        // 1. operator-supplied `external_addr` (skips STUN)
3374                        // 2. non-wildcard *public* `local_addr` (operator
3375                        //    bound to a specific public IP directly)
3376                        // 3. STUN auto-discovery against ephemeral socket
3377                        //    (also taken when bind is wildcard *or* private —
3378                        //    a private bind is not peer-reachable, so we
3379                        //    must publish the public reflexive instead)
3380                        // 4. loud warn + omit endpoint
3381                        if let Some(explicit) = cfg.external_advert_addr() {
3382                            endpoints.push(OverlayEndpointAdvert {
3383                                transport: OverlayTransportKind::Udp,
3384                                addr: explicit.to_string(),
3385                            });
3386                        } else {
3387                            match handle.local_addr() {
3388                                Some(addr)
3389                                    if !addr.ip().is_unspecified()
3390                                        && !is_unroutable_advert_ip(addr.ip()) =>
3391                                {
3392                                    endpoints.push(OverlayEndpointAdvert {
3393                                        transport: OverlayTransportKind::Udp,
3394                                        addr: addr.to_string(),
3395                                    });
3396                                }
3397                                Some(addr) => {
3398                                    let key = handle.transport_id().as_u32();
3399                                    let port = addr.port();
3400                                    if let Some(public) =
3401                                        bootstrap.learn_public_udp_addr(key, port).await
3402                                    {
3403                                        endpoints.push(OverlayEndpointAdvert {
3404                                            transport: OverlayTransportKind::Udp,
3405                                            addr: public.to_string(),
3406                                        });
3407                                    } else {
3408                                        warn!(
3409                                            transport_id = key,
3410                                            bind_addr = %addr,
3411                                            "advert: udp public=true but bind is wildcard \
3412                                            or private and STUN observation failed; \
3413                                            advertising no UDP endpoint. Either set \
3414                                            transports.udp.external_addr, bind to a \
3415                                            specific *public* IP, or ensure \
3416                                            node.discovery.nostr.stun_servers is reachable"
3417                                        );
3418                                    }
3419                                }
3420                                None => {}
3421                            }
3422                        }
3423                    } else {
3424                        endpoints.push(OverlayEndpointAdvert {
3425                            transport: OverlayTransportKind::Udp,
3426                            addr: "nat".to_string(),
3427                        });
3428                        has_udp_nat = true;
3429                    }
3430                }
3431                "webrtc" => {
3432                    let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
3433                        continue;
3434                    };
3435                    if !cfg.advertise_on_nostr() {
3436                        continue;
3437                    }
3438                    endpoints.push(OverlayEndpointAdvert {
3439                        transport: OverlayTransportKind::WebRtc,
3440                        addr: hex::encode(self.identity.pubkey_full().serialize()),
3441                    });
3442                    has_webrtc = true;
3443                }
3444                "tcp" => {
3445                    let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
3446                        continue;
3447                    };
3448                    if !cfg.advertise_on_nostr() {
3449                        continue;
3450                    }
3451                    // Precedence:
3452                    // 1. operator-supplied `external_addr` (only path that
3453                    //    works on cloud-NAT setups where the public IP is
3454                    //    not on a host interface).
3455                    // 2. non-wildcard *public* `local_addr` (operator bound
3456                    //    to a specific public IP directly).
3457                    // 3. loud warn + omit endpoint (no TCP STUN equivalent).
3458                    //
3459                    // A wildcard *or* private bind is never advertised as-is
3460                    // — peers off-LAN can't reach a private bind, and there
3461                    // is no TCP STUN to discover a public reflexive.
3462                    if let Some(explicit) = cfg.external_advert_addr() {
3463                        endpoints.push(OverlayEndpointAdvert {
3464                            transport: OverlayTransportKind::Tcp,
3465                            addr: explicit.to_string(),
3466                        });
3467                    } else {
3468                        match handle.local_addr() {
3469                            Some(addr)
3470                                if !addr.ip().is_unspecified()
3471                                    && !is_unroutable_advert_ip(addr.ip()) =>
3472                            {
3473                                endpoints.push(OverlayEndpointAdvert {
3474                                    transport: OverlayTransportKind::Tcp,
3475                                    addr: addr.to_string(),
3476                                });
3477                            }
3478                            Some(addr) => {
3479                                warn!(
3480                                    bind_addr = %addr,
3481                                    "advert: tcp advertise_on_nostr=true bound to wildcard \
3482                                    or private IP and no transports.tcp.external_addr set; \
3483                                    advertising no TCP endpoint. Either set external_addr \
3484                                    to the public IP (recommended for cloud 1:1-NAT setups) \
3485                                    or bind explicitly to the public IP"
3486                                );
3487                            }
3488                            None => {}
3489                        }
3490                    }
3491                }
3492                "tor" => {
3493                    let Some(cfg) = self.lookup_tor_config(handle.name()) else {
3494                        continue;
3495                    };
3496                    if !cfg.advertise_on_nostr() {
3497                        continue;
3498                    }
3499                    if let Some(addr) = handle.onion_address() {
3500                        endpoints.push(OverlayEndpointAdvert {
3501                            transport: OverlayTransportKind::Tor,
3502                            addr: format!("{}:{}", addr, cfg.advertised_port()),
3503                        });
3504                    }
3505                }
3506                _ => {}
3507            }
3508        }
3509
3510        if endpoints.is_empty() {
3511            return None;
3512        }
3513
3514        Some(OverlayAdvert {
3515            identifier: ADVERT_IDENTIFIER.to_string(),
3516            version: ADVERT_VERSION,
3517            endpoints,
3518            signal_relays: (has_udp_nat || has_webrtc)
3519                .then(|| self.config.node.discovery.nostr.dm_relays.clone()),
3520            stun_servers: (has_udp_nat || has_webrtc)
3521                .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
3522        })
3523    }
3524
3525    async fn refresh_overlay_advert(
3526        &self,
3527        bootstrap: &std::sync::Arc<NostrDiscovery>,
3528    ) -> Result<(), crate::discovery::nostr::BootstrapError> {
3529        let advert = self.build_overlay_advert(bootstrap).await;
3530        bootstrap.update_local_advert(advert).await
3531    }
3532
3533    fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
3534        match (&self.config.transports.udp, transport_name) {
3535            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3536            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3537            _ => None,
3538        }
3539    }
3540
3541    fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
3542        match (&self.config.transports.tcp, transport_name) {
3543            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3544            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3545            _ => None,
3546        }
3547    }
3548
3549    fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
3550        match (&self.config.transports.tor, transport_name) {
3551            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3552            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3553            _ => None,
3554        }
3555    }
3556
3557    fn lookup_webrtc_config(
3558        &self,
3559        transport_name: Option<&str>,
3560    ) -> Option<&crate::config::WebRtcConfig> {
3561        match (&self.config.transports.webrtc, transport_name) {
3562            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3563            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3564            _ => None,
3565        }
3566    }
3567
3568    pub(in crate::node) async fn try_peer_addresses(
3569        &mut self,
3570        peer_config: &PeerConfig,
3571        peer_identity: PeerIdentity,
3572        allow_bootstrap_nat: bool,
3573    ) -> Result<(), NodeError> {
3574        let peer_node_addr = *peer_identity.node_addr();
3575        if self.peers.contains_key(&peer_node_addr) {
3576            debug!(
3577                npub = %peer_config.npub,
3578                "Peer already exists, skipping address attempts"
3579            );
3580            return Ok(());
3581        }
3582
3583        let candidates = self.peer_address_candidates(peer_config).await;
3584
3585        if candidates.is_empty() {
3586            if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3587                return Ok(());
3588            }
3589            return Err(NodeError::NoTransportForType(format!(
3590                "no addresses known for {}",
3591                peer_config.npub
3592            )));
3593        }
3594
3595        if self
3596            .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
3597            .await
3598            .is_ok()
3599        {
3600            if allow_bootstrap_nat {
3601                self.request_nostr_bootstrap(peer_config).await;
3602            }
3603            return Ok(());
3604        }
3605
3606        if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3607            return Ok(());
3608        }
3609
3610        Err(NodeError::NoTransportForType(format!(
3611            "no operational transport for any of {}'s addresses",
3612            peer_config.npub
3613        )))
3614    }
3615
3616    async fn try_active_peer_alternative_addresses(
3617        &mut self,
3618        peer_config: &PeerConfig,
3619        peer_identity: PeerIdentity,
3620    ) -> Result<bool, NodeError> {
3621        let peer_node_addr = *peer_identity.node_addr();
3622        let candidates = self.peer_address_candidates(peer_config).await;
3623        let should_try_nostr =
3624            self.active_peer_should_keep_direct_retry(&peer_node_addr, peer_config);
3625
3626        if candidates.is_empty() {
3627            if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3628                return Ok(true);
3629            }
3630            return Err(NodeError::NoTransportForType(format!(
3631                "no addresses known for {}",
3632                peer_config.npub
3633            )));
3634        }
3635
3636        let alternatives: Vec<_> = candidates
3637            .into_iter()
3638            .filter(|addr| !self.active_peer_matches_candidate(&peer_node_addr, addr))
3639            .collect();
3640
3641        if alternatives.is_empty() {
3642            if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3643                return Ok(true);
3644            }
3645            return Ok(false);
3646        }
3647
3648        let needs_separate_nostr_attempt = should_try_nostr
3649            && !alternatives
3650                .iter()
3651                .any(|addr| addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat"));
3652        let address_result = self
3653            .attempt_peer_address_list(peer_config, peer_identity, true, &alternatives)
3654            .await;
3655        let nostr_attempted =
3656            needs_separate_nostr_attempt && self.request_nostr_bootstrap(peer_config).await;
3657
3658        match address_result {
3659            Ok(()) => Ok(true),
3660            Err(err) if nostr_attempted => {
3661                debug!(
3662                    npub = %peer_config.npub,
3663                    error = %err,
3664                    "Static active-peer direct-path alternatives failed; Nostr traversal still queued"
3665                );
3666                Ok(true)
3667            }
3668            Err(err) => Err(err),
3669        }
3670    }
3671
3672    async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
3673        // Merge every candidate from every source we have for this peer.
3674        // Explicitly configured addresses keep first shot, then freshly
3675        // fetched overlay adverts are appended as fallback candidates. This
3676        // lets native peers try known LAN/nvpn/static UDP routes before
3677        // slower WebRTC/Nostr-discovered paths, while still racing every
3678        // concrete candidate that fits in the per-peer budget.
3679        let static_addresses = self.static_peer_addresses(peer_config);
3680        let overlay_addresses = self
3681            .nostr_peer_fallback_addresses(peer_config, &static_addresses)
3682            .await;
3683
3684        let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
3685        for addr in overlay_addresses.into_iter().chain(static_addresses) {
3686            if !candidates.iter().any(|existing: &PeerAddress| {
3687                existing.transport == addr.transport && existing.addr == addr.addr
3688            }) {
3689                candidates.push(addr);
3690            }
3691        }
3692
3693        // Stable sort: recently observed candidates win over unstamped static
3694        // hints, then explicit priority and recency break ties. Static hints
3695        // remain candidates, but they are not privileged forever after a
3696        // network move; a fresh advert/STUN-observed endpoint gets the first
3697        // shot when the race budget is tight.
3698        candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
3699            (Some(_), None) => std::cmp::Ordering::Less,
3700            (None, Some(_)) => std::cmp::Ordering::Greater,
3701            _ if a.priority != b.priority => a.priority.cmp(&b.priority),
3702            (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
3703            (None, None) => std::cmp::Ordering::Equal,
3704        });
3705
3706        candidates
3707    }
3708
3709    fn active_peer_matches_any_candidate(
3710        &self,
3711        peer_node_addr: &NodeAddr,
3712        candidates: &[PeerAddress],
3713    ) -> bool {
3714        candidates
3715            .iter()
3716            .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
3717    }
3718
3719    pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
3720        &self,
3721        peer_node_addr: &NodeAddr,
3722        candidates: &[PeerAddress],
3723    ) -> bool {
3724        if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
3725            return false;
3726        }
3727        !self.active_peer_needs_same_path_refresh(peer_node_addr)
3728    }
3729
3730    pub(in crate::node) fn active_peer_should_keep_direct_retry(
3731        &self,
3732        peer_node_addr: &NodeAddr,
3733        peer_config: &PeerConfig,
3734    ) -> bool {
3735        let Some(peer) = self.peers.get(peer_node_addr) else {
3736            return false;
3737        };
3738
3739        let static_addresses = self.static_peer_addresses(peer_config);
3740        if !static_addresses.is_empty() {
3741            return !self
3742                .active_peer_candidate_is_fresh_enough_to_skip(peer_node_addr, &static_addresses);
3743        }
3744
3745        if peer_config.npub.is_empty() {
3746            return false;
3747        }
3748
3749        if !self.config.node.discovery.nostr.enabled
3750            || self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Disabled
3751        {
3752            return false;
3753        }
3754
3755        peer.transport_id()
3756            .and_then(|id| self.transports.get(&id))
3757            .map(|transport| transport.transport_type().name != "udp")
3758            .unwrap_or(true)
3759    }
3760
3761    pub(in crate::node) fn clear_retry_unless_direct_refresh_needed(
3762        &mut self,
3763        peer_node_addr: &NodeAddr,
3764    ) {
3765        let keep_retry = self
3766            .retry_pending
3767            .get(peer_node_addr)
3768            .map(|state| state.peer_config.clone())
3769            .is_some_and(|peer_config| {
3770                self.active_peer_should_keep_direct_retry(peer_node_addr, &peer_config)
3771            });
3772
3773        if !keep_retry {
3774            self.retry_pending.remove(peer_node_addr);
3775        }
3776    }
3777
3778    fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
3779        let Some(peer) = self.peers.get(peer_node_addr) else {
3780            return false;
3781        };
3782        let stale_after_ms = self
3783            .config
3784            .node
3785            .heartbeat_interval_secs
3786            .saturating_mul(1000)
3787            .max(1000);
3788        peer.idle_time(Self::now_ms()) > stale_after_ms
3789    }
3790
3791    pub(in crate::node) fn active_peer_matches_candidate(
3792        &self,
3793        peer_node_addr: &NodeAddr,
3794        candidate: &PeerAddress,
3795    ) -> bool {
3796        let Some(peer) = self.peers.get(peer_node_addr) else {
3797            return false;
3798        };
3799        let Some(current_addr) = peer.current_addr() else {
3800            return false;
3801        };
3802        if let Some(peer_transport_id) = peer.transport_id()
3803            && let Some((candidate_transport_id, candidate_addr)) =
3804                self.resolve_peer_address_for_match(candidate)
3805        {
3806            return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
3807        }
3808        if peer
3809            .transport_id()
3810            .map(|id| self.bootstrap_transports.contains(&id))
3811            .unwrap_or(false)
3812        {
3813            return false;
3814        }
3815        let current_addr = current_addr.to_string();
3816        let current_transport = peer
3817            .transport_id()
3818            .and_then(|id| self.transports.get(&id))
3819            .map(|transport| transport.transport_type().name);
3820
3821        candidate.addr == current_addr
3822            && current_transport
3823                .map(|transport| transport == candidate.transport)
3824                .unwrap_or(true)
3825    }
3826
3827    pub(in crate::node) fn active_peer_uses_recent_endpoint_path(
3828        &self,
3829        peer_node_addr: &NodeAddr,
3830        peer_config: &PeerConfig,
3831    ) -> bool {
3832        peer_config.addresses.iter().any(|addr| {
3833            addr.seen_at_ms.is_some() && self.active_peer_matches_candidate(peer_node_addr, addr)
3834        })
3835    }
3836
3837    pub(in crate::node) fn active_peer_uses_traversal_path(
3838        &self,
3839        peer_node_addr: &NodeAddr,
3840        peer_config: &PeerConfig,
3841    ) -> bool {
3842        let via_bootstrap_transport = self
3843            .peers
3844            .get(peer_node_addr)
3845            .and_then(|peer| peer.transport_id())
3846            .map(|id| self.bootstrap_transports.contains(&id))
3847            .unwrap_or(false);
3848
3849        via_bootstrap_transport
3850            || self.active_peer_uses_recent_endpoint_path(peer_node_addr, peer_config)
3851    }
3852
3853    // === Control API methods ===
3854
3855    /// Connect to a peer via the control API.
3856    ///
3857    /// Creates an ephemeral peer connection (not persisted to config, no
3858    /// auto-reconnect). Reuses the same connection path as auto-connect
3859    /// peers. Returns JSON data on success or an error message.
3860    pub(crate) async fn api_connect(
3861        &mut self,
3862        npub: &str,
3863        address: &str,
3864        transport: &str,
3865    ) -> Result<serde_json::Value, String> {
3866        let peer_config = PeerConfig {
3867            npub: npub.to_string(),
3868            alias: None,
3869            addresses: vec![PeerAddress::new(transport, address)],
3870            connect_policy: ConnectPolicy::Manual,
3871            auto_reconnect: false,
3872            discovery_fallback_transit: true,
3873        };
3874
3875        // Pre-seed identity cache (same as initiate_peer_connections does)
3876        if let Ok(identity) = PeerIdentity::from_npub(npub) {
3877            self.peer_aliases
3878                .insert(*identity.node_addr(), identity.short_npub());
3879            self.register_identity(*identity.node_addr(), identity.pubkey_full());
3880        }
3881
3882        self.initiate_peer_connection(&peer_config)
3883            .await
3884            .map(|()| {
3885                info!(
3886                    npub = %npub,
3887                    address = %address,
3888                    transport = %transport,
3889                    "API connect initiated"
3890                );
3891                serde_json::json!({
3892                    "npub": npub,
3893                    "address": address,
3894                    "transport": transport,
3895                })
3896            })
3897            .map_err(|e| e.to_string())
3898    }
3899
3900    /// Disconnect a peer via the control API.
3901    ///
3902    /// Removes the peer and suppresses auto-reconnect.
3903    pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
3904        let peer_identity =
3905            PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
3906        let node_addr = *peer_identity.node_addr();
3907
3908        if !self.peers.contains_key(&node_addr) {
3909            return Err(format!("peer not found: {npub}"));
3910        }
3911
3912        // Remove the peer (full cleanup: sessions, indices, links, tree, bloom)
3913        self.remove_active_peer(&node_addr);
3914
3915        // Suppress any pending auto-reconnect
3916        self.retry_pending.remove(&node_addr);
3917
3918        info!(npub = %npub, "API disconnect completed");
3919
3920        Ok(serde_json::json!({
3921            "npub": npub,
3922            "disconnected": true,
3923        }))
3924    }
3925
3926    /// Adopt an already-established UDP traversal and start the normal FIPS
3927    /// Noise handshake over it.
3928    ///
3929    /// This is intended for integration with an external rendezvous runtime
3930    /// that has already completed relay signaling, STUN observation, and UDP
3931    /// hole punching. After handoff, the adopted socket is owned by FIPS.
3932    pub async fn adopt_established_traversal(
3933        &mut self,
3934        traversal: EstablishedTraversal,
3935    ) -> Result<BootstrapHandoffResult, NodeError> {
3936        debug!(
3937            peer_npub = %traversal.peer_npub,
3938            session_id = %traversal.session_id,
3939            remote_addr = %traversal.remote_addr,
3940            "adopting established traversal socket"
3941        );
3942
3943        if !self.state.is_operational() {
3944            return Err(NodeError::NotStarted);
3945        }
3946
3947        let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
3948        let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
3949            NodeError::InvalidPeerNpub {
3950                npub: traversal.peer_npub.clone(),
3951                reason: e.to_string(),
3952            }
3953        })?;
3954        let peer_node_addr = *peer_identity.node_addr();
3955        if self.peers.contains_key(&peer_node_addr) {
3956            debug!(
3957                peer_npub = %traversal.peer_npub,
3958                "Adopting NAT traversal handoff as alternate path for already-connected peer"
3959            );
3960        }
3961
3962        self.peer_aliases
3963            .insert(peer_node_addr, peer_identity.short_npub());
3964        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
3965
3966        let transport_id = self.allocate_transport_id();
3967        // Adopted ephemeral UDP transports inherit MTU + socket-buffer sizing
3968        // (and accept_connections / advertise flags) from the operator's
3969        // configured [transports.udp] when the bootstrap runtime doesn't
3970        // pass an explicit override. Lookup tries `transport_name` first
3971        // (covers the `Named` multi-listener variant) and falls back to the
3972        // unnamed `Single` listener, so single- and named-listener configs
3973        // both inherit cleanly.
3974        //
3975        // Tradeoff: `UdpConfig::default()` sets MTU 1280 (IPv6 minimum), the
3976        // only value guaranteed to survive arbitrary middlebox paths.
3977        // Inheriting a higher operator-chosen MTU means NAT-traversed flows
3978        // initially attempt that MTU and may black-hole on tighter paths
3979        // until reactive `MtuExceeded` recovery kicks in. Operators who
3980        // raise the primary MTU based on known-clean topology accept that
3981        // tradeoff; the silent drop on a too-low default was strictly
3982        // worse for the common case where the primary MTU is reachable.
3983        //
3984        // Bind / external address fields are cleared since the socket is
3985        // already bound.
3986        let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
3987            let mut cfg = self
3988                .lookup_udp_config(traversal.transport_name.as_deref())
3989                .or_else(|| self.lookup_udp_config(None))
3990                .cloned()
3991                .unwrap_or_default();
3992            cfg.bind_addr = None;
3993            cfg.external_addr = None;
3994            cfg
3995        });
3996        let mut transport = crate::transport::udp::UdpTransport::new(
3997            transport_id,
3998            traversal.transport_name.clone(),
3999            inherited_config,
4000            packet_tx,
4001        );
4002
4003        transport
4004            .adopt_socket_async(traversal.socket)
4005            .await
4006            .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
4007
4008        let local_addr = transport.local_addr().ok_or_else(|| {
4009            NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
4010        })?;
4011
4012        self.transports.insert(
4013            transport_id,
4014            crate::transport::TransportHandle::Udp(transport),
4015        );
4016        self.bootstrap_transports.insert(transport_id);
4017        self.bootstrap_transport_npubs
4018            .insert(transport_id, traversal.peer_npub.clone());
4019
4020        let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
4021        if let Err(err) = self
4022            .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
4023            .await
4024        {
4025            self.bootstrap_transports.remove(&transport_id);
4026            self.bootstrap_transport_npubs.remove(&transport_id);
4027            if let Some(mut handle) = self.transports.remove(&transport_id) {
4028                let _ = handle.stop().await;
4029            }
4030            return Err(err);
4031        }
4032
4033        info!(
4034            peer = %self.peer_display_name(&peer_node_addr),
4035            transport_id = %transport_id,
4036            local_addr = %local_addr,
4037            remote_addr = %traversal.remote_addr,
4038            session_id = %traversal.session_id,
4039            "adopted NAT traversal socket; handshake initiated"
4040        );
4041
4042        Ok(BootstrapHandoffResult {
4043            transport_id,
4044            local_addr,
4045            remote_addr: traversal.remote_addr,
4046            peer_node_addr,
4047            session_id: traversal.session_id,
4048        })
4049    }
4050}