Skip to main content

fips_core/node/
lifecycle.rs

1//! Node lifecycle management: start, stop, and peer connection initiation.
2
3use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, NostrDiscoveryPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6    ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, MeshTraversalSignal, NostrDiscovery,
7    OverlayAdvert, OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason, SessionMessageType};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use secp256k1::PublicKey;
18use std::collections::{HashMap, HashSet};
19use std::net::{IpAddr, SocketAddr};
20use std::thread;
21use std::time::Duration;
22use tracing::{debug, info, warn};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25enum MeshSignalSessionAction {
26    Send,
27    Defer,
28    Drop,
29}
30
31#[cfg(debug_assertions)]
32fn node_start_debug_log(message: impl AsRef<str>) {
33    use std::io::Write as _;
34
35    if let Ok(mut file) = std::fs::OpenOptions::new()
36        .create(true)
37        .append(true)
38        .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
39    {
40        let _ = writeln!(
41            file,
42            "{:?} {}",
43            std::time::SystemTime::now(),
44            message.as_ref()
45        );
46    }
47}
48
49#[cfg(not(debug_assertions))]
50fn node_start_debug_log(_message: impl AsRef<str>) {}
51
52/// True if `ip` is not a viable canonical advert endpoint for peers off
53/// the publisher's own LAN. Covers RFC1918, loopback, link-local, IPv4
54/// CGNAT (100.64/10), unspecified, multicast/benchmark, and IPv6
55/// unique-local/loopback/unspecified. We never publish these as the
56/// peer's primary `runtime_endpoint`; an off-LAN consumer can't route
57/// to them, and latching one in onto a slow overlay-relay fallback is
58/// the original bug this guard exists to prevent.
59fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
60    match ip {
61        IpAddr::V4(v4) => {
62            v4.is_private()
63                || v4.is_loopback()
64                || v4.is_link_local()
65                || v4.is_unspecified()
66                || v4.is_multicast()
67                || v4.is_broadcast()
68                || v4.is_documentation()
69                // 100.64.0.0/10 — CGNAT, RFC 6598. Not routable on the
70                // public internet; behaves like an extra NAT layer.
71                || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
72        }
73        IpAddr::V6(v6) => {
74            v6.is_loopback()
75                || v6.is_unspecified()
76                || v6.is_unique_local()
77                || v6.is_multicast()
78                // IPv6 link-local: fe80::/10
79                || (v6.segments()[0] & 0xffc0) == 0xfe80
80        }
81    }
82}
83
84fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
85    matches!(
86        (local, remote),
87        (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
88    )
89}
90
91const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
92const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
93const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
94const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
95
96impl Node {
97    /// Initiate connections to configured static peers.
98    ///
99    /// For each peer configured with AutoConnect policy, creates a link and
100    /// peer entry, then starts the Noise handshake by sending the first message.
101    /// Replace the runtime peer list. Newly added auto-connect peers get
102    /// `initiate_peer_connection` immediately; removed peers are dropped from
103    /// the retry queue (the regular liveness timeout reaps any active session
104    /// — we don't proactively disconnect, since the same npub might be on its
105    /// way back via a fresh advert). Existing auto-connect entries with new
106    /// direct addresses are dialed immediately, and retry state is refreshed
107    /// so later attempts also use the latest hints.
108    pub(super) async fn update_peers(
109        &mut self,
110        new_peers: Vec<crate::config::PeerConfig>,
111    ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
112        use std::collections::{HashMap, HashSet};
113
114        let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
115            HashMap::with_capacity(new_peers.len());
116        let mut new_order = Vec::with_capacity(new_peers.len());
117        for peer in new_peers {
118            let identity = match PeerIdentity::from_npub(&peer.npub) {
119                Ok(id) => id,
120                Err(e) => {
121                    return Err(crate::node::NodeError::InvalidPeerNpub {
122                        npub: peer.npub.clone(),
123                        reason: e.to_string(),
124                    });
125                }
126            };
127            // Last-write-wins on duplicates so callers passing a multi-source
128            // candidate list (e.g. operator hints + recent-peers cache for
129            // the same npub) get the merge they meant.
130            let node_addr = *identity.node_addr();
131            if !new_by_addr.contains_key(&node_addr) {
132                new_order.push(node_addr);
133            }
134            new_by_addr.insert(node_addr, peer);
135        }
136
137        let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
138            .config
139            .peers()
140            .iter()
141            .filter_map(|pc| {
142                PeerIdentity::from_npub(&pc.npub)
143                    .ok()
144                    .map(|id| (*id.node_addr(), pc.clone()))
145            })
146            .collect();
147
148        let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
149        let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
150
151        let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
152        let added: Vec<_> = new_addrs.difference(&current_addrs).copied().collect();
153        let kept: Vec<_> = new_addrs.intersection(&current_addrs).copied().collect();
154
155        let mut outcome = crate::node::UpdatePeersOutcome::default();
156
157        for node_addr in &removed {
158            if self.retry_pending.remove(node_addr).is_some() {
159                debug!(
160                    peer = %self.peer_display_name(node_addr),
161                    "Dropping retry entry for peer removed from runtime peer list"
162                );
163            }
164            self.peer_aliases.remove(node_addr);
165            self.set_discovery_fallback_transit_allowed(*node_addr, false);
166            outcome.removed += 1;
167        }
168
169        let mut auto_connect_refresh_configs = Vec::new();
170        for node_addr in &kept {
171            let new_pc = &new_by_addr[node_addr];
172            let current_pc = &current_by_addr[node_addr];
173            if new_pc.addresses != current_pc.addresses
174                || new_pc.alias != current_pc.alias
175                || new_pc.connect_policy != current_pc.connect_policy
176                || new_pc.auto_reconnect != current_pc.auto_reconnect
177                || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
178            {
179                outcome.updated += 1;
180                self.set_discovery_fallback_transit_allowed(
181                    *node_addr,
182                    new_pc.discovery_fallback_transit,
183                );
184                if let Some(state) = self.retry_pending.get_mut(node_addr) {
185                    state.peer_config = new_pc.clone();
186                    state.retry_after_ms = Self::now_ms();
187                }
188                if let Some(alias) = new_pc.alias.clone() {
189                    self.peer_aliases.insert(*node_addr, alias);
190                }
191                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
192                    auto_connect_refresh_configs.push(new_pc.clone());
193                }
194            } else {
195                outcome.unchanged += 1;
196                self.set_discovery_fallback_transit_allowed(
197                    *node_addr,
198                    new_pc.discovery_fallback_transit,
199                );
200                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
201                    auto_connect_refresh_configs.push(new_pc.clone());
202                }
203            }
204        }
205
206        let added_configs: Vec<crate::config::PeerConfig> = new_order
207            .iter()
208            .filter(|addr| added.contains(addr))
209            .map(|addr| new_by_addr[addr].clone())
210            .collect();
211
212        // Replace the live config peer list before initiating connections so
213        // any helper that consults `self.config.peers()` during the dial
214        // (alias lookup, retry-state seeding) sees the new entries.
215        self.config.peers = new_order
216            .iter()
217            .filter_map(|addr| new_by_addr.get(addr).cloned())
218            .collect();
219        self.configured_peer_send_weights = Self::configured_peer_send_weights(&self.config);
220
221        for peer_config in added_configs {
222            outcome.added += 1;
223            let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
224                continue;
225            };
226            let name = peer_config
227                .alias
228                .clone()
229                .unwrap_or_else(|| identity.short_npub());
230            self.peer_aliases.insert(*identity.node_addr(), name);
231            self.set_discovery_fallback_transit_allowed(
232                *identity.node_addr(),
233                peer_config.discovery_fallback_transit,
234            );
235            self.register_identity(*identity.node_addr(), identity.pubkey_full());
236
237            if !peer_config.is_auto_connect() {
238                continue;
239            }
240
241            match self
242                .try_auto_connect_graph_session(&peer_config, identity)
243                .await
244            {
245                Ok(true) => continue,
246                Ok(false) => {}
247                Err(err) => {
248                    debug!(
249                        npub = %peer_config.npub,
250                        error = %err,
251                        "Existing FIPS graph did not warm newly added peer"
252                    );
253                }
254            }
255
256            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
257                warn!(
258                    npub = %peer_config.npub,
259                    error = %e,
260                    "Failed to initiate connection for newly added peer"
261                );
262                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
263                    self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
264                }
265                if matches!(e, crate::node::NodeError::NoTransportForType(_))
266                    && let Some(bootstrap) = self.nostr_discovery.clone()
267                {
268                    bootstrap
269                        .request_advert_stale_check(peer_config.npub.clone())
270                        .await;
271                }
272            }
273        }
274
275        for peer_config in auto_connect_refresh_configs {
276            let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
277                continue;
278            };
279            let node_addr = *peer_identity.node_addr();
280
281            if self.peers.contains_key(&node_addr) {
282                match self
283                    .initiate_active_peer_alternative_connection(&peer_config)
284                    .await
285                {
286                    Ok(attempted) => {
287                        if attempted {
288                            debug!(
289                                peer = %self.peer_display_name(&node_addr),
290                                "Started non-disruptive alternate-path handshake for active peer"
291                            );
292                        }
293                    }
294                    Err(e) => {
295                        debug!(
296                            npub = %peer_config.npub,
297                            error = %e,
298                            "Active peer alternate-path refresh did not start"
299                        );
300                    }
301                }
302                continue;
303            }
304
305            match self
306                .try_auto_connect_graph_session(&peer_config, peer_identity)
307                .await
308            {
309                Ok(true) => continue,
310                Ok(false) => {}
311                Err(err) => {
312                    debug!(
313                        npub = %peer_config.npub,
314                        error = %err,
315                        "Existing FIPS graph did not warm refreshed peer"
316                    );
317                }
318            }
319
320            match self.initiate_peer_connection(&peer_config).await {
321                Ok(()) => {
322                    let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
323                    if let Some(state) = self.retry_pending.get_mut(&node_addr) {
324                        state.peer_config = peer_config;
325                        state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
326                    }
327                }
328                Err(e) => {
329                    debug!(
330                        npub = %peer_config.npub,
331                        error = %e,
332                        "Refreshed peer addresses did not initiate a direct connection"
333                    );
334                    self.schedule_retry_after_error(node_addr, Self::now_ms(), &e);
335                }
336            }
337        }
338
339        self.warm_auto_connect_graph_sessions().await;
340
341        Ok(outcome)
342    }
343
344    pub(super) async fn initiate_peer_connections(&mut self) {
345        // Build display name map from all configured peers (alias or short npub),
346        // and pre-seed the identity cache from each peer's npub so that TUN packets
347        // addressed to a configured peer can be dispatched (and trigger session
348        // initiation) immediately on startup — without waiting for the link-layer
349        // handshake to complete first.
350        let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
351            .config
352            .peers()
353            .iter()
354            .filter_map(|pc| {
355                PeerIdentity::from_npub(&pc.npub)
356                    .ok()
357                    .map(|id| (id, pc.alias.clone()))
358            })
359            .collect();
360
361        for (identity, alias) in peer_identities {
362            let name = alias.unwrap_or_else(|| identity.short_npub());
363            self.peer_aliases.insert(*identity.node_addr(), name);
364            // Pre-seed identity cache. The parity may be wrong (npub is x-only)
365            // but will be corrected to the real value when the peer is promoted
366            // after a successful Noise handshake.
367            self.register_identity(*identity.node_addr(), identity.pubkey_full());
368        }
369
370        // Collect peer configs to avoid borrow conflicts
371        let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
372
373        if peer_configs.is_empty() {
374            debug!("No static peers configured");
375            return;
376        }
377
378        debug!(
379            count = peer_configs.len(),
380            "Initiating static peer connections"
381        );
382
383        for peer_config in peer_configs {
384            let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
385                Ok(identity) => identity,
386                Err(_) => continue,
387            };
388            match self
389                .try_auto_connect_graph_session(&peer_config, peer_identity)
390                .await
391            {
392                Ok(true) => continue,
393                Ok(false) => {}
394                Err(err) => {
395                    debug!(
396                        npub = %peer_config.npub,
397                        error = %err,
398                        "Existing FIPS graph did not warm auto-connect peer"
399                    );
400                }
401            }
402            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
403                warn!(
404                    npub = %peer_config.npub,
405                    alias = ?peer_config.alias,
406                    error = %e,
407                    "Failed to initiate peer connection"
408                );
409                // Schedule a retry so transient address-resolution failures
410                // (e.g. cached endpoints stale, NAT rebinds, all addresses
411                // currently unreachable) recover without a daemon restart.
412                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
413                    self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
414                }
415                // No-transport failures most often mean the cached overlay
416                // advert is pointing at a dead post-NAT-rebind address. The
417                // advert cache is read-only inside fetch_advert, so retries
418                // would loop on the same dead address until expiry. Force a
419                // re-fetch so the next retry tick picks up fresh endpoints.
420                if matches!(e, crate::node::NodeError::NoTransportForType(_))
421                    && let Some(bootstrap) = self.nostr_discovery.clone()
422                {
423                    bootstrap
424                        .request_advert_stale_check(peer_config.npub.clone())
425                        .await;
426                }
427            }
428        }
429
430        self.warm_auto_connect_graph_sessions().await;
431    }
432
433    pub(in crate::node) async fn try_auto_connect_graph_session(
434        &mut self,
435        peer_config: &PeerConfig,
436        peer_identity: PeerIdentity,
437    ) -> Result<bool, NodeError> {
438        if !peer_config.is_auto_connect() {
439            return Ok(false);
440        }
441
442        let peer_node_addr = *peer_identity.node_addr();
443        if self
444            .peers
445            .get(&peer_node_addr)
446            .is_some_and(|peer| peer.can_send())
447        {
448            return Ok(false);
449        }
450        if self.auto_connect_should_race_direct_path(peer_config) {
451            return Ok(false);
452        }
453        if self
454            .sessions
455            .get(&peer_node_addr)
456            .is_some_and(|entry| entry.is_established() || entry.is_initiating())
457        {
458            return Ok(true);
459        }
460        if self.find_next_hop(&peer_node_addr).is_none() {
461            return Ok(false);
462        }
463
464        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
465        match self
466            .initiate_session(peer_node_addr, peer_identity.pubkey_full())
467            .await
468        {
469            Ok(()) => {
470                debug!(
471                    peer = %self.peer_display_name(&peer_node_addr),
472                    "Warmed auto-connect peer session over existing FIPS graph"
473                );
474                Ok(true)
475            }
476            Err(NodeError::SendFailed { node_addr, reason })
477                if node_addr == peer_node_addr && reason == "no route to destination" =>
478            {
479                self.maybe_initiate_lookup(&peer_node_addr).await;
480                Ok(false)
481            }
482            Err(err) => Err(err),
483        }
484    }
485
486    fn auto_connect_should_race_direct_path(&self, peer_config: &PeerConfig) -> bool {
487        !peer_config.addresses.is_empty() || self.config.node.discovery.nostr.enabled
488    }
489
490    /// Initiate a connection to a single peer.
491    ///
492    /// Creates a link, starts the Noise handshake, and sends the first message.
493    pub(super) async fn initiate_peer_connection(
494        &mut self,
495        peer_config: &crate::config::PeerConfig,
496    ) -> Result<(), NodeError> {
497        self.initiate_peer_connection_inner(peer_config).await
498    }
499
500    /// Initiate a connection from the retry path. Identical to
501    /// [`initiate_peer_connection`] today — both paths fan out across every
502    /// known address (overlay-fresh first, then operator/cache hints) in a
503    /// single pass. The two entry points stay separate so callers can be
504    /// distinguished in tracing.
505    pub(super) async fn initiate_peer_retry_connection(
506        &mut self,
507        peer_config: &crate::config::PeerConfig,
508    ) -> Result<(), NodeError> {
509        self.initiate_peer_connection_inner(peer_config).await
510    }
511
512    pub(in crate::node) async fn initiate_active_peer_alternative_connection(
513        &mut self,
514        peer_config: &crate::config::PeerConfig,
515    ) -> Result<bool, NodeError> {
516        self.initiate_active_peer_alternative_connection_inner(peer_config, false)
517            .await
518    }
519
520    pub(in crate::node) async fn initiate_active_peer_direct_refresh_connection(
521        &mut self,
522        peer_config: &crate::config::PeerConfig,
523    ) -> Result<bool, NodeError> {
524        self.initiate_active_peer_alternative_connection_inner(peer_config, true)
525            .await
526    }
527
528    async fn initiate_active_peer_alternative_connection_inner(
529        &mut self,
530        peer_config: &crate::config::PeerConfig,
531        allow_same_path_refresh: bool,
532    ) -> Result<bool, NodeError> {
533        let peer_identity =
534            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
535                npub: peer_config.npub.clone(),
536                reason: e.to_string(),
537            })?;
538        let peer_node_addr = *peer_identity.node_addr();
539
540        if !self.peers.contains_key(&peer_node_addr) {
541            self.initiate_peer_connection(peer_config).await?;
542            return Ok(true);
543        }
544
545        // Keep the current link live and race fresh concrete candidates.
546        // Cross-connection resolution still decides which replacement link
547        // wins if both peers try the same upgrade; the important part here is
548        // that a stale path does not depend on the remote peer receiving our
549        // hint first before either side attempts the better address.
550        self.try_active_peer_alternative_addresses(
551            peer_config,
552            peer_identity,
553            allow_same_path_refresh,
554        )
555        .await
556    }
557
558    async fn initiate_peer_connection_inner(
559        &mut self,
560        peer_config: &crate::config::PeerConfig,
561    ) -> Result<(), NodeError> {
562        // Parse the peer's npub to get their identity
563        let peer_identity =
564            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
565                npub: peer_config.npub.clone(),
566                reason: e.to_string(),
567            })?;
568
569        let peer_node_addr = *peer_identity.node_addr();
570
571        // Check if peer already exists (fully authenticated)
572        if self.peers.contains_key(&peer_node_addr) {
573            debug!(
574                npub = %peer_config.npub,
575                "Peer already exists, skipping"
576            );
577            return Ok(());
578        }
579
580        self.try_peer_addresses(peer_config, peer_identity, true)
581            .await
582    }
583
584    fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
585        self.connections.values().any(|conn| {
586            conn.expected_identity()
587                .map(|id| id.node_addr() == peer_node_addr)
588                .unwrap_or(false)
589        })
590    }
591
592    fn is_connecting_to_peer_on_path(
593        &self,
594        peer_node_addr: &NodeAddr,
595        transport_id: TransportId,
596        remote_addr: &TransportAddr,
597    ) -> bool {
598        self.connections.values().any(|conn| {
599            conn.expected_identity()
600                .map(|id| id.node_addr() == peer_node_addr)
601                .unwrap_or(false)
602                && conn.transport_id() == Some(transport_id)
603                && conn.source_addr() == Some(remote_addr)
604        }) || self.pending_connects.iter().any(|pending| {
605            pending.peer_identity.node_addr() == peer_node_addr
606                && pending.transport_id == transport_id
607                && &pending.remote_addr == remote_addr
608        })
609    }
610
611    pub(in crate::node) fn should_warm_auto_connect_session(
612        &self,
613        peer_node_addr: &NodeAddr,
614    ) -> bool {
615        if self
616            .peers
617            .get(peer_node_addr)
618            .is_some_and(|peer| peer.can_send())
619            || self
620                .sessions
621                .get(peer_node_addr)
622                .is_some_and(|entry| entry.is_established())
623        {
624            return false;
625        }
626
627        self.config.peers().iter().any(|peer| {
628            peer.is_auto_connect()
629                && PeerIdentity::from_npub(&peer.npub)
630                    .map(|identity| identity.node_addr() == peer_node_addr)
631                    .unwrap_or(false)
632        })
633    }
634
635    pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
636        if !self.peers.values().any(|peer| peer.can_send()) {
637            return 0;
638        }
639
640        let mut budget = self.graph_session_warmup_budget();
641        if budget == 0 {
642            return 0;
643        }
644
645        let peer_identities: Vec<_> = self
646            .config
647            .auto_connect_peers()
648            .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
649            .collect();
650
651        let mut warmed = 0;
652        for identity in peer_identities {
653            if budget == 0 {
654                break;
655            }
656
657            let peer_node_addr = *identity.node_addr();
658            if peer_node_addr == *self.identity.node_addr()
659                || !self.should_warm_auto_connect_session(&peer_node_addr)
660                || self
661                    .sessions
662                    .get(&peer_node_addr)
663                    .is_some_and(|entry| entry.is_initiating())
664            {
665                continue;
666            }
667
668            self.register_identity(peer_node_addr, identity.pubkey_full());
669
670            if self.find_next_hop(&peer_node_addr).is_some() {
671                match self
672                    .initiate_session(peer_node_addr, identity.pubkey_full())
673                    .await
674                {
675                    Ok(()) => {
676                        warmed += 1;
677                        budget = budget.saturating_sub(1);
678                        debug!(
679                            peer = %self.peer_display_name(&peer_node_addr),
680                            "Warmed auto-connect peer session over existing FIPS graph"
681                        );
682                    }
683                    Err(NodeError::SendFailed { node_addr, reason })
684                        if node_addr == peer_node_addr && reason == "no route to destination" =>
685                    {
686                        self.maybe_initiate_lookup(&peer_node_addr).await;
687                        warmed += 1;
688                        budget = budget.saturating_sub(1);
689                    }
690                    Err(err) => {
691                        debug!(
692                            peer = %self.peer_display_name(&peer_node_addr),
693                            error = %err,
694                            "Failed to warm auto-connect peer session"
695                        );
696                    }
697                }
698            } else {
699                self.maybe_initiate_lookup(&peer_node_addr).await;
700                warmed += 1;
701                budget = budget.saturating_sub(1);
702            }
703        }
704
705        warmed
706    }
707
708    pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
709        let max_destinations = self.config.node.session.pending_max_destinations;
710        if max_destinations == 0 {
711            return 0;
712        }
713
714        let pending_sessions = self
715            .sessions
716            .values()
717            .filter(|entry| !entry.is_established())
718            .count();
719        let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
720        max_destinations
721            .saturating_sub(pending_total)
722            .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
723    }
724
725    fn outbound_handshake_slots(&self) -> usize {
726        let used = self
727            .connections
728            .len()
729            .saturating_add(self.pending_connects.len());
730        if self.max_connections == 0 {
731            usize::MAX
732        } else {
733            self.max_connections.saturating_sub(used)
734        }
735    }
736
737    fn outbound_link_slots(&self) -> usize {
738        if self.max_links == 0 {
739            usize::MAX
740        } else {
741            self.max_links.saturating_sub(self.links.len())
742        }
743    }
744
745    fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
746        if !self.peers.contains_key(peer_node_addr)
747            && self.max_peers > 0
748            && self.peers.len() >= self.max_peers
749        {
750            return 0;
751        }
752
753        let in_flight_for_peer = self
754            .connections
755            .values()
756            .filter(|conn| {
757                conn.expected_identity()
758                    .map(|id| id.node_addr() == peer_node_addr)
759                    .unwrap_or(false)
760            })
761            .count()
762            .saturating_add(
763                self.pending_connects
764                    .iter()
765                    .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
766                    .count(),
767            );
768
769        self.outbound_handshake_slots()
770            .min(self.outbound_link_slots())
771            .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
772    }
773
774    fn discovery_connect_budget(&self) -> usize {
775        self.outbound_handshake_slots()
776            .min(self.outbound_link_slots())
777            .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
778    }
779
780    /// Find a UDP transport whose bound socket can send to `remote_addr`.
781    ///
782    /// LAN discovery can surface both IPv4 and IPv6 addresses for the same
783    /// service. A wildcard IPv4 socket cannot send to an IPv6 link-local
784    /// target, and vice versa, so callers must choose by socket family rather
785    /// than by transport type alone.
786    fn find_udp_transport_for_remote_addr(
787        &self,
788        remote_addr: SocketAddr,
789    ) -> Option<(TransportId, SocketAddr)> {
790        self.transports
791            .iter()
792            .filter(|(id, handle)| {
793                handle.transport_type().name == "udp"
794                    && handle.is_operational()
795                    && !self.bootstrap_transports.contains(id)
796            })
797            .filter_map(|(id, handle)| {
798                let local_addr = handle.local_addr()?;
799                socket_addr_families_compatible(local_addr, remote_addr)
800                    .then_some((*id, local_addr))
801            })
802            .min_by_key(|(id, _)| id.as_u32())
803    }
804
805    pub(super) fn transport_discovery_candidate(
806        &self,
807        discovered_transport_id: TransportId,
808        discovered_addr: TransportAddr,
809    ) -> Option<(TransportId, TransportAddr, &'static str)> {
810        let transport = self.transports.get(&discovered_transport_id)?;
811        let transport_name = transport.transport_type().name;
812
813        if transport_name != "udp" {
814            return Some((discovered_transport_id, discovered_addr, transport_name));
815        }
816
817        let Some(remote_socket_addr) = discovered_addr
818            .as_str()
819            .and_then(|addr| addr.parse::<SocketAddr>().ok())
820        else {
821            if self.bootstrap_transports.contains(&discovered_transport_id) {
822                debug!(
823                    transport_id = %discovered_transport_id,
824                    remote_addr = %discovered_addr,
825                    "transport discovery: skip non-numeric UDP address from bootstrap transport"
826                );
827                return None;
828            }
829            return Some((discovered_transport_id, discovered_addr, transport_name));
830        };
831
832        let Some((transport_id, local_addr)) =
833            self.find_udp_transport_for_remote_addr(remote_socket_addr)
834        else {
835            debug!(
836                transport_id = %discovered_transport_id,
837                remote_addr = %discovered_addr,
838                "transport discovery: skip UDP peer with no compatible local socket"
839            );
840            return None;
841        };
842
843        if transport_id != discovered_transport_id {
844            debug!(
845                discovered_transport_id = %discovered_transport_id,
846                selected_transport_id = %transport_id,
847                local_addr = %local_addr,
848                remote_addr = %remote_socket_addr,
849                "transport discovery: selected compatible UDP transport"
850            );
851        }
852
853        Some((
854            transport_id,
855            TransportAddr::from_socket_addr(remote_socket_addr),
856            transport_name,
857        ))
858    }
859
860    fn peer_address_string_for_transport_candidate(
861        &self,
862        transport_id: TransportId,
863        transport_name: &str,
864        remote_addr: &TransportAddr,
865    ) -> String {
866        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
867        let _ = (transport_id, transport_name);
868
869        #[cfg(any(target_os = "linux", target_os = "macos"))]
870        if transport_name == "ethernet"
871            && remote_addr.as_bytes().len() == 6
872            && let Some(interface) = self
873                .transports
874                .get(&transport_id)
875                .and_then(|transport| transport.interface_name())
876        {
877            let mut mac = [0u8; 6];
878            mac.copy_from_slice(remote_addr.as_bytes());
879            return format!(
880                "{interface}/{}",
881                crate::transport::ethernet::format_mac(&mac)
882            );
883        }
884
885        remote_addr.to_string()
886    }
887
888    fn resolve_peer_address_for_match(
889        &self,
890        candidate: &PeerAddress,
891    ) -> Option<(TransportId, TransportAddr)> {
892        if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
893            return None;
894        }
895
896        if candidate.transport == "ethernet" {
897            return self.resolve_ethernet_addr(&candidate.addr).ok();
898        }
899
900        if candidate.transport == "ble" {
901            #[cfg(bluer_available)]
902            {
903                return self.resolve_ble_addr(&candidate.addr).ok();
904            }
905            #[cfg(not(bluer_available))]
906            {
907                return None;
908            }
909        }
910
911        let transport_id = if candidate.transport == "udp"
912            && let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
913        {
914            self.find_udp_transport_for_remote_addr(remote_socket_addr)
915                .map(|(id, _)| id)?
916        } else {
917            self.find_transport_for_type(&candidate.transport)?
918        };
919
920        Some((transport_id, TransportAddr::from_string(&candidate.addr)))
921    }
922
923    /// Initiate a connection to a peer on a specific transport and address.
924    ///
925    /// For connectionless transports (UDP, Ethernet): allocates a link, starts
926    /// the Noise IK handshake, sends msg1, and registers the connection for
927    /// msg2 dispatch.
928    ///
929    /// For connection-oriented transports (TCP, Tor): allocates a link and
930    /// starts a non-blocking transport connect. The handshake is deferred
931    /// until the transport connection is established — the tick handler
932    /// polls `connection_state()` and initiates the handshake when ready.
933    pub(super) async fn initiate_connection(
934        &mut self,
935        transport_id: TransportId,
936        remote_addr: TransportAddr,
937        peer_identity: PeerIdentity,
938    ) -> Result<(), NodeError> {
939        let peer_node_addr = *peer_identity.node_addr();
940
941        if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
942            debug!(
943                peer = %self.peer_display_name(&peer_node_addr),
944                transport_id = %transport_id,
945                remote_addr = %remote_addr,
946                "Connection already in progress for candidate path"
947            );
948            return Ok(());
949        }
950
951        if self.outbound_handshake_slots() == 0 {
952            return Err(NodeError::MaxConnectionsExceeded {
953                max: self.max_connections,
954            });
955        }
956
957        if self.outbound_link_slots() == 0 {
958            return Err(NodeError::MaxLinksExceeded {
959                max: self.max_links,
960            });
961        }
962
963        if !self.peers.contains_key(&peer_node_addr)
964            && self.max_peers > 0
965            && self.peers.len() >= self.max_peers
966        {
967            return Err(NodeError::MaxPeersExceeded {
968                max: self.max_peers,
969            });
970        }
971
972        self.authorize_peer(
973            &peer_identity,
974            PeerAclContext::OutboundConnect,
975            transport_id,
976            &remote_addr,
977        )?;
978
979        let is_connection_oriented = self
980            .transports
981            .get(&transport_id)
982            .map(|t| t.transport_type().connection_oriented)
983            .unwrap_or(false);
984
985        // Allocate link ID and create link
986        let link_id = self.allocate_link_id();
987
988        let link = if is_connection_oriented {
989            Link::new(
990                link_id,
991                transport_id,
992                remote_addr.clone(),
993                LinkDirection::Outbound,
994                Duration::from_millis(self.config.node.base_rtt_ms),
995            )
996        } else {
997            Link::connectionless(
998                link_id,
999                transport_id,
1000                remote_addr.clone(),
1001                LinkDirection::Outbound,
1002                Duration::from_millis(self.config.node.base_rtt_ms),
1003            )
1004        };
1005
1006        self.links.insert(link_id, link);
1007
1008        // Add reverse lookup for packet dispatch
1009        self.addr_to_link
1010            .insert((transport_id, remote_addr.clone()), link_id);
1011
1012        if is_connection_oriented {
1013            // Connection-oriented: start non-blocking connect, defer handshake
1014            if let Some(transport) = self.transports.get(&transport_id) {
1015                match transport.connect(&remote_addr).await {
1016                    Ok(()) => {
1017                        debug!(
1018                            peer = %self.peer_display_name(&peer_node_addr),
1019                            transport_id = %transport_id,
1020                            remote_addr = %remote_addr,
1021                            link_id = %link_id,
1022                            "Transport connect initiated (non-blocking)"
1023                        );
1024                        self.pending_connects.push(super::PendingConnect {
1025                            link_id,
1026                            transport_id,
1027                            remote_addr,
1028                            peer_identity,
1029                        });
1030                    }
1031                    Err(e) => {
1032                        // Clean up link
1033                        self.links.remove(&link_id);
1034                        self.addr_to_link.remove(&(transport_id, remote_addr));
1035                        return Err(NodeError::from_transport_error(e));
1036                    }
1037                }
1038            }
1039            Ok(())
1040        } else {
1041            // Connectionless: proceed with immediate handshake
1042            self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
1043                .await
1044        }
1045    }
1046
1047    /// Start the Noise handshake on a link and send msg1.
1048    ///
1049    /// Called immediately for connectionless transports, or after the
1050    /// transport connection is established for connection-oriented transports.
1051    pub(super) async fn start_handshake(
1052        &mut self,
1053        link_id: LinkId,
1054        transport_id: TransportId,
1055        remote_addr: TransportAddr,
1056        peer_identity: PeerIdentity,
1057    ) -> Result<(), NodeError> {
1058        let peer_node_addr = *peer_identity.node_addr();
1059
1060        // Create connection in handshake phase (outbound knows expected identity)
1061        let current_time_ms = Self::now_ms();
1062        let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
1063
1064        // Allocate a session index for this handshake
1065        let our_index = match self.index_allocator.allocate() {
1066            Ok(idx) => idx,
1067            Err(e) => {
1068                // Clean up the link we just created
1069                self.links.remove(&link_id);
1070                self.addr_to_link.remove(&(transport_id, remote_addr));
1071                return Err(NodeError::IndexAllocationFailed(e.to_string()));
1072            }
1073        };
1074
1075        // Start the Noise handshake and get message 1
1076        let our_keypair = self.identity.keypair();
1077        let noise_msg1 =
1078            match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
1079                Ok(msg) => msg,
1080                Err(e) => {
1081                    // Clean up the index and link
1082                    let _ = self.index_allocator.free(our_index);
1083                    self.links.remove(&link_id);
1084                    self.addr_to_link.remove(&(transport_id, remote_addr));
1085                    return Err(NodeError::HandshakeFailed(e.to_string()));
1086                }
1087            };
1088
1089        // Set index and transport info on the connection
1090        connection.set_our_index(our_index);
1091        connection.set_transport_id(transport_id);
1092        connection.set_source_addr(remote_addr.clone());
1093
1094        // Build wire format msg1: [0x01][sender_idx:4 LE][noise_msg1:82]
1095        let wire_msg1 = build_msg1(our_index, &noise_msg1);
1096
1097        debug!(
1098            peer = %self.peer_display_name(&peer_node_addr),
1099            transport_id = %transport_id,
1100            remote_addr = %remote_addr,
1101            link_id = %link_id,
1102            our_index = %our_index,
1103            "Connection initiated"
1104        );
1105
1106        // Store msg1 for resend and schedule first resend
1107        let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
1108        connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
1109
1110        // Track in pending_outbound for msg2 dispatch
1111        self.pending_outbound
1112            .insert((transport_id, our_index.as_u32()), link_id);
1113        self.connections.insert(link_id, connection);
1114
1115        // Send the wire format handshake message. If the very first send fails
1116        // synchronously (for example an IPv6 candidate on an IPv4-only UDP
1117        // socket), undo this candidate so the caller can try the next address
1118        // in the same dial pass.
1119        let send_result = match self.transports.get(&transport_id) {
1120            Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
1121            None => None,
1122        };
1123        match send_result {
1124            Some(send_result) => {
1125                self.note_local_send_outcome(&send_result);
1126                match send_result {
1127                    Ok(bytes) => {
1128                        debug!(
1129                            link_id = %link_id,
1130                            our_index = %our_index,
1131                            bytes,
1132                            "Sent Noise handshake message 1 (wire format)"
1133                        );
1134                    }
1135                    Err(e) => {
1136                        warn!(
1137                            link_id = %link_id,
1138                            transport_id = %transport_id,
1139                            remote_addr = %remote_addr,
1140                            our_index = %our_index,
1141                            error = %e,
1142                            "Failed to send handshake message"
1143                        );
1144                        self.pending_outbound
1145                            .remove(&(transport_id, our_index.as_u32()));
1146                        self.connections.remove(&link_id);
1147                        self.links.remove(&link_id);
1148                        self.addr_to_link
1149                            .remove(&(transport_id, remote_addr.clone()));
1150                        let _ = self.index_allocator.free(our_index);
1151                        return Err(NodeError::from_transport_error(e));
1152                    }
1153                }
1154            }
1155            None => {
1156                self.pending_outbound
1157                    .remove(&(transport_id, our_index.as_u32()));
1158                self.connections.remove(&link_id);
1159                self.links.remove(&link_id);
1160                self.addr_to_link
1161                    .remove(&(transport_id, remote_addr.clone()));
1162                let _ = self.index_allocator.free(our_index);
1163                return Err(NodeError::TransportError(format!(
1164                    "transport {transport_id} disappeared before first handshake send"
1165                )));
1166            }
1167        }
1168
1169        Ok(())
1170    }
1171
1172    /// Poll all transports for discovered peers and auto-connect.
1173    ///
1174    /// Called from the tick handler. Iterates operational transports,
1175    /// drains their discovery buffers, and initiates connections to
1176    /// newly discovered peers (if auto_connect is enabled).
1177    pub(super) async fn poll_transport_discovery(&mut self) {
1178        // Collect discoveries first to avoid borrow conflict with self
1179        let mut to_connect = Vec::new();
1180        let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
1181        let mut connect_budget = self.discovery_connect_budget();
1182        let mut skipped_budget = 0usize;
1183
1184        for transport in self.transports.values() {
1185            if !transport.is_operational() {
1186                continue;
1187            }
1188            if !transport.auto_connect() {
1189                // Still drain the buffer so it doesn't grow unbounded
1190                let _ = transport.discover();
1191                continue;
1192            }
1193            let discovered = match transport.discover() {
1194                Ok(peers) => peers,
1195                Err(_) => continue,
1196            };
1197            for peer in discovered {
1198                let discovered_transport_id = peer.transport_id;
1199                let pubkey = match peer.pubkey_hint {
1200                    Some(pk) => pk,
1201                    None => continue,
1202                };
1203                let identity = PeerIdentity::from_pubkey(pubkey);
1204                let node_addr = *identity.node_addr();
1205
1206                // Skip self
1207                if node_addr == *self.identity.node_addr() {
1208                    continue;
1209                }
1210
1211                let Some((candidate_transport_id, remote_addr, transport_name)) =
1212                    self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1213                else {
1214                    continue;
1215                };
1216
1217                if self.peers.contains_key(&node_addr) {
1218                    let candidate = PeerAddress::new(
1219                        transport_name,
1220                        self.peer_address_string_for_transport_candidate(
1221                            candidate_transport_id,
1222                            transport_name,
1223                            &remote_addr,
1224                        ),
1225                    );
1226                    if self.active_peer_candidate_is_fresh_enough_to_skip(
1227                        &node_addr,
1228                        std::slice::from_ref(&candidate),
1229                    ) {
1230                        continue;
1231                    }
1232                    if self.is_connecting_to_peer_on_path(
1233                        &node_addr,
1234                        candidate_transport_id,
1235                        &remote_addr,
1236                    ) {
1237                        continue;
1238                    }
1239                    let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1240                    if connect_budget == 0
1241                        || self
1242                            .path_candidate_attempt_budget(&node_addr)
1243                            .saturating_sub(queued_for_peer)
1244                            == 0
1245                    {
1246                        skipped_budget = skipped_budget.saturating_add(1);
1247                        continue;
1248                    }
1249                    to_connect.push((candidate_transport_id, remote_addr, identity, true));
1250                    *queued_per_peer.entry(node_addr).or_default() += 1;
1251                    connect_budget = connect_budget.saturating_sub(1);
1252                    continue;
1253                }
1254
1255                if self.is_connecting_to_peer_on_path(
1256                    &node_addr,
1257                    candidate_transport_id,
1258                    &remote_addr,
1259                ) {
1260                    continue;
1261                }
1262
1263                let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1264                if connect_budget == 0
1265                    || self
1266                        .path_candidate_attempt_budget(&node_addr)
1267                        .saturating_sub(queued_for_peer)
1268                        == 0
1269                {
1270                    skipped_budget = skipped_budget.saturating_add(1);
1271                    continue;
1272                }
1273                to_connect.push((candidate_transport_id, remote_addr, identity, false));
1274                *queued_per_peer.entry(node_addr).or_default() += 1;
1275                connect_budget = connect_budget.saturating_sub(1);
1276            }
1277        }
1278
1279        if skipped_budget > 0 {
1280            debug!(
1281                skipped = skipped_budget,
1282                queued = to_connect.len(),
1283                "Transport discovery connect budget exhausted"
1284            );
1285        }
1286
1287        for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1288            info!(
1289                peer = %self.peer_display_name(identity.node_addr()),
1290                transport_id = %transport_id,
1291                remote_addr = %remote_addr,
1292                active_refresh,
1293                "Auto-connecting to discovered peer"
1294            );
1295            if let Err(e) = self
1296                .initiate_connection(transport_id, remote_addr, identity)
1297                .await
1298            {
1299                warn!(error = %e, "Failed to auto-connect to discovered peer");
1300            }
1301        }
1302    }
1303
1304    pub(super) async fn poll_nostr_discovery(&mut self) {
1305        let Some(bootstrap) = self.nostr_discovery.clone() else {
1306            return;
1307        };
1308
1309        bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
1310        bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
1311
1312        if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1313            debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1314        }
1315
1316        self.drain_nostr_mesh_signals(&bootstrap).await;
1317
1318        for event in bootstrap.drain_events().await {
1319            match event {
1320                BootstrapEvent::Established { traversal } => {
1321                    let active_refresh = PeerIdentity::from_npub(&traversal.peer_npub)
1322                        .ok()
1323                        .is_some_and(|identity| self.peers.contains_key(identity.node_addr()));
1324                    let admission_allowed = if active_refresh {
1325                        self.outbound_direct_refresh_admission_check()
1326                    } else {
1327                        self.outbound_admission_check()
1328                    };
1329                    if !admission_allowed {
1330                        debug!(
1331                            peer_npub = %traversal.peer_npub,
1332                            peers = self.peers.len(),
1333                            max_peers = self.max_peers,
1334                            active_refresh,
1335                            "Dropping established NAT traversal: at capacity"
1336                        );
1337                        continue;
1338                    }
1339                    let peer_npub = traversal.peer_npub.clone();
1340                    match self.adopt_established_traversal(traversal).await {
1341                        Ok(_) => {
1342                            info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1343                        }
1344                        Err(err) => {
1345                            warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1346                            if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1347                                self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1348                            }
1349                        }
1350                    }
1351                }
1352                BootstrapEvent::Failed {
1353                    peer_config,
1354                    reason,
1355                } => {
1356                    let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1357                        Ok(identity) => identity,
1358                        Err(_) => continue,
1359                    };
1360                    let node_addr = *peer_identity.node_addr();
1361                    let now_ms = Self::now_ms();
1362                    if self.peers.contains_key(&node_addr) {
1363                        if self.active_peer_should_keep_direct_retry(&node_addr, &peer_config) {
1364                            let decision =
1365                                bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1366                            if decision.should_warn {
1367                                warn!(
1368                                    npub = %peer_config.npub,
1369                                    error = %reason,
1370                                    consecutive_failures = decision.consecutive_failures,
1371                                    cooldown_secs = decision
1372                                        .cooldown_until_ms
1373                                        .map(|t| t.saturating_sub(now_ms) / 1000),
1374                                    "Direct-path NAT traversal upgrade failed"
1375                                );
1376                            } else {
1377                                debug!(
1378                                    npub = %peer_config.npub,
1379                                    error = %reason,
1380                                    consecutive_failures = decision.consecutive_failures,
1381                                    "Direct-path NAT traversal upgrade failed (suppressed by warn-rate-limit)"
1382                                );
1383                            }
1384                            if decision.crossed_threshold {
1385                                bootstrap
1386                                    .request_advert_stale_check(peer_config.npub.clone())
1387                                    .await;
1388                            }
1389                            self.schedule_link_dead_reprobe(node_addr, now_ms);
1390                        } else {
1391                            debug!(
1392                                npub = %peer_config.npub,
1393                                error = %reason,
1394                                "Ignoring failed NAT traversal for already-connected peer on fresh direct path"
1395                            );
1396                        }
1397                        continue;
1398                    }
1399                    if self.is_connecting_to_peer(&node_addr) {
1400                        debug!(
1401                            npub = %peer_config.npub,
1402                            error = %reason,
1403                            "Ignoring failed NAT traversal while peer handshake is already in progress"
1404                        );
1405                        continue;
1406                    }
1407
1408                    let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1409                    if decision.should_warn {
1410                        warn!(
1411                            npub = %peer_config.npub,
1412                            error = %reason,
1413                            consecutive_failures = decision.consecutive_failures,
1414                            cooldown_secs = decision
1415                                .cooldown_until_ms
1416                                .map(|t| t.saturating_sub(now_ms) / 1000),
1417                            "NAT traversal failed"
1418                        );
1419                    } else {
1420                        debug!(
1421                            npub = %peer_config.npub,
1422                            error = %reason,
1423                            consecutive_failures = decision.consecutive_failures,
1424                            "NAT traversal failed (suppressed by warn-rate-limit)"
1425                        );
1426                    }
1427
1428                    // B6: stale-advert eviction on the streak-threshold
1429                    // crossing. Fire-and-forget; the outcome is logged so
1430                    // operators can see when peers get cleaned up.
1431                    if decision.crossed_threshold {
1432                        bootstrap
1433                            .request_advert_stale_check(peer_config.npub.clone())
1434                            .await;
1435                    }
1436
1437                    if self
1438                        .try_peer_addresses(&peer_config, peer_identity, false)
1439                        .await
1440                        .is_ok()
1441                    {
1442                        continue;
1443                    }
1444
1445                    self.schedule_retry(node_addr, now_ms);
1446                    if self.nostr_cooldown_applies_to_peer_config(&peer_config)
1447                        && let Some(cooldown_until_ms) = decision.cooldown_until_ms
1448                        && let Some(state) = self.retry_pending.get_mut(&node_addr)
1449                    {
1450                        // Push the next retry past the cooldown so the
1451                        // open-discovery sweep doesn't re-enqueue and the
1452                        // per-attempt backoff doesn't fire sooner.
1453                        state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1454                    }
1455                }
1456            }
1457        }
1458
1459        self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1460            .await;
1461        self.queue_open_discovery_retries(&bootstrap).await;
1462        self.queue_active_fallback_direct_retries(&bootstrap);
1463    }
1464
1465    async fn drain_nostr_mesh_signals(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
1466        let mut deferred = Vec::new();
1467
1468        for signal in bootstrap.drain_mesh_signals().await {
1469            let (peer_npub, msg_type, payload) = match &signal {
1470                MeshTraversalSignal::Offer { peer_npub, offer } => {
1471                    let payload = match serde_json::to_vec(&offer) {
1472                        Ok(payload) => payload,
1473                        Err(error) => {
1474                            debug!(
1475                                peer = %peer_npub,
1476                                error = %error,
1477                                "Failed to encode mesh traversal offer"
1478                            );
1479                            continue;
1480                        }
1481                    };
1482                    (
1483                        peer_npub.clone(),
1484                        SessionMessageType::TraversalOffer.to_byte(),
1485                        payload,
1486                    )
1487                }
1488                MeshTraversalSignal::Answer { peer_npub, answer } => {
1489                    let payload = match serde_json::to_vec(&answer) {
1490                        Ok(payload) => payload,
1491                        Err(error) => {
1492                            debug!(
1493                                peer = %peer_npub,
1494                                error = %error,
1495                                "Failed to encode mesh traversal answer"
1496                            );
1497                            continue;
1498                        }
1499                    };
1500                    (
1501                        peer_npub.clone(),
1502                        SessionMessageType::TraversalAnswer.to_byte(),
1503                        payload,
1504                    )
1505                }
1506            };
1507
1508            let peer_identity = match PeerIdentity::from_npub(&peer_npub) {
1509                Ok(identity) => identity,
1510                Err(error) => {
1511                    debug!(
1512                        peer = %peer_npub,
1513                        error = %error,
1514                        "Cannot send mesh traversal signal to invalid peer npub"
1515                    );
1516                    continue;
1517                }
1518            };
1519            let peer_addr = *peer_identity.node_addr();
1520            match self
1521                .mesh_signal_session_action(peer_addr, peer_identity.pubkey_full())
1522                .await
1523            {
1524                MeshSignalSessionAction::Send => {}
1525                MeshSignalSessionAction::Defer => {
1526                    deferred.push(signal);
1527                    continue;
1528                }
1529                MeshSignalSessionAction::Drop => continue,
1530            }
1531
1532            if let Err(error) = self.send_session_msg(&peer_addr, msg_type, &payload).await {
1533                debug!(
1534                    peer = %self.peer_display_name(&peer_addr),
1535                    error = %error,
1536                    "Failed to send mesh traversal signal"
1537                );
1538            }
1539        }
1540
1541        for signal in deferred {
1542            bootstrap.requeue_mesh_signal(signal);
1543        }
1544    }
1545
1546    async fn mesh_signal_session_action(
1547        &mut self,
1548        peer_addr: NodeAddr,
1549        peer_pubkey: PublicKey,
1550    ) -> MeshSignalSessionAction {
1551        if let Some(entry) = self.sessions.get(&peer_addr) {
1552            if entry.is_established() {
1553                return MeshSignalSessionAction::Send;
1554            }
1555            if entry.is_initiating() || entry.is_awaiting_msg3() {
1556                debug!(
1557                    peer = %self.peer_display_name(&peer_addr),
1558                    "Deferring mesh traversal signal until end-to-end session is established"
1559                );
1560                return MeshSignalSessionAction::Defer;
1561            }
1562        }
1563
1564        if self.find_next_hop(&peer_addr).is_none() {
1565            debug!(
1566                peer = %self.peer_display_name(&peer_addr),
1567                "Cannot warm mesh traversal signal session without a FIPS route"
1568            );
1569            self.maybe_initiate_lookup(&peer_addr).await;
1570            return MeshSignalSessionAction::Drop;
1571        }
1572
1573        self.register_identity(peer_addr, peer_pubkey);
1574        match self.initiate_session(peer_addr, peer_pubkey).await {
1575            Ok(()) => {
1576                debug!(
1577                    peer = %self.peer_display_name(&peer_addr),
1578                    "Warming end-to-end session for mesh traversal signal"
1579                );
1580                MeshSignalSessionAction::Defer
1581            }
1582            Err(NodeError::SendFailed { node_addr, reason })
1583                if node_addr == peer_addr && reason == "no route to destination" =>
1584            {
1585                debug!(
1586                    peer = %self.peer_display_name(&peer_addr),
1587                    "Cannot warm mesh traversal signal session without a FIPS route"
1588                );
1589                self.maybe_initiate_lookup(&peer_addr).await;
1590                MeshSignalSessionAction::Drop
1591            }
1592            Err(error) => {
1593                debug!(
1594                    peer = %self.peer_display_name(&peer_addr),
1595                    error = %error,
1596                    "Failed to warm end-to-end session for mesh traversal signal"
1597                );
1598                MeshSignalSessionAction::Drop
1599            }
1600        }
1601    }
1602
1603    /// Resolve the LAN-only discovery scope. Applications with explicit
1604    /// connectivity config can set `node.discovery.lan.scope` without
1605    /// changing the public Nostr discovery `app` tag. The older fallback
1606    /// extracts a scope from the Nostr app tag used by default scoped
1607    /// discovery.
1608    pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1609        if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1610            let scope = scope.trim();
1611            if !scope.is_empty() {
1612                return Some(scope.to_string());
1613            }
1614        }
1615
1616        let app = self.config.node.discovery.nostr.app.trim();
1617        if app.is_empty() {
1618            return None;
1619        }
1620        if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1621            let scope = rest.trim();
1622            if scope.is_empty() {
1623                None
1624            } else {
1625                Some(scope.to_string())
1626            }
1627        } else {
1628            Some(app.to_string())
1629        }
1630    }
1631
1632    pub(super) fn start_local_instance_discovery(&mut self) {
1633        if !self.config.node.discovery.local.enabled {
1634            return;
1635        }
1636        let Some(scope) = self.lan_discovery_scope() else {
1637            debug!("local instance discovery not started: no discovery scope");
1638            return;
1639        };
1640        let now_ms = Self::now_ms();
1641        match crate::discovery::local::LocalInstanceRegistry::new(
1642            self.identity.npub(),
1643            scope,
1644            &self.config.node.discovery.local,
1645            now_ms,
1646        ) {
1647            Ok(registry) => {
1648                self.local_instance_registry = Some(registry);
1649                self.local_instance_started_at_ms = Some(now_ms);
1650                self.last_local_instance_publish_ms = None;
1651                self.last_local_instance_scan_ms = None;
1652                self.publish_local_instance_record(now_ms);
1653                info!("Same-host FIPS instance discovery enabled");
1654            }
1655            Err(crate::discovery::local::LocalInstanceRegistryError::Disabled) => {
1656                debug!("same-host FIPS instance discovery disabled");
1657            }
1658            Err(err) => {
1659                debug!(error = %err, "same-host FIPS instance discovery not started");
1660            }
1661        }
1662    }
1663
1664    fn local_instance_contacts(&self) -> Vec<crate::discovery::local::LocalInstanceContact> {
1665        let mut contacts = Vec::new();
1666        for handle in self.transports.values() {
1667            if !handle.is_operational() || !handle.accept_connections() {
1668                continue;
1669            }
1670            let transport = handle.transport_type().name;
1671            if transport != "udp" && transport != "tcp" {
1672                continue;
1673            }
1674            let Some(local_addr) = handle.local_addr() else {
1675                continue;
1676            };
1677            let Some(contact) =
1678                crate::discovery::local::contact_for_transport_addr(transport, local_addr)
1679            else {
1680                continue;
1681            };
1682            if contacts
1683                .iter()
1684                .any(|existing: &crate::discovery::local::LocalInstanceContact| {
1685                    existing.transport == contact.transport && existing.addr == contact.addr
1686                })
1687            {
1688                continue;
1689            }
1690            contacts.push(contact);
1691        }
1692        contacts
1693    }
1694
1695    fn publish_local_instance_record(&mut self, now_ms: u64) {
1696        let Some(registry) = self.local_instance_registry.clone() else {
1697            return;
1698        };
1699        let contacts = self.local_instance_contacts();
1700        match registry.publish(contacts, now_ms) {
1701            Ok(()) => {
1702                self.last_local_instance_publish_ms = Some(now_ms);
1703            }
1704            Err(err) => {
1705                debug!(error = %err, "failed to publish same-host FIPS instance record");
1706            }
1707        }
1708    }
1709
1710    fn maybe_publish_local_instance_record(&mut self, now_ms: u64) {
1711        if self.local_instance_registry.is_none() {
1712            return;
1713        }
1714        let interval_ms = self.config.node.discovery.local.publish_interval_ms();
1715        let due = self
1716            .last_local_instance_publish_ms
1717            .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1718            .unwrap_or(true);
1719        if due {
1720            self.publish_local_instance_record(now_ms);
1721        }
1722    }
1723
1724    fn local_instance_scan_due(&self, now_ms: u64) -> bool {
1725        if self.local_instance_registry.is_none() {
1726            return false;
1727        }
1728        let cfg = &self.config.node.discovery.local;
1729        let interval_ms = if self
1730            .local_instance_started_at_ms
1731            .map(|started| now_ms.saturating_sub(started) <= cfg.startup_scan_duration_ms())
1732            .unwrap_or(false)
1733        {
1734            cfg.startup_scan_interval_ms()
1735        } else {
1736            cfg.scan_interval_ms()
1737        };
1738        self.last_local_instance_scan_ms
1739            .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1740            .unwrap_or(true)
1741    }
1742
1743    fn local_instance_peer_allowed(&self, identity: &PeerIdentity) -> bool {
1744        if self.config.peers().iter().any(|peer| {
1745            PeerIdentity::from_npub(&peer.npub)
1746                .map(|configured| configured.node_addr() == identity.node_addr())
1747                .unwrap_or(false)
1748        }) {
1749            return true;
1750        }
1751        self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Open
1752    }
1753
1754    fn local_instance_peer_addresses(
1755        &self,
1756        record: &crate::discovery::local::LocalInstanceRecord,
1757    ) -> Vec<PeerAddress> {
1758        let mut addresses = Vec::new();
1759        for contact in &record.contacts {
1760            if contact.transport != "udp" && contact.transport != "tcp" {
1761                continue;
1762            }
1763            let Ok(socket_addr) = contact.addr.parse::<SocketAddr>() else {
1764                debug!(
1765                    npub = %record.npub,
1766                    transport = %contact.transport,
1767                    addr = %contact.addr,
1768                    "local instance discovery: skip non-socket contact"
1769                );
1770                continue;
1771            };
1772            if !socket_addr.ip().is_loopback() {
1773                debug!(
1774                    npub = %record.npub,
1775                    addr = %contact.addr,
1776                    "local instance discovery: skip non-loopback contact"
1777                );
1778                continue;
1779            }
1780            let address =
1781                PeerAddress::with_priority(contact.transport.clone(), contact.addr.clone(), 10)
1782                    .with_seen_at_ms(record.updated_at_ms);
1783            if addresses.iter().any(|existing: &PeerAddress| {
1784                existing.transport == address.transport && existing.addr == address.addr
1785            }) {
1786                continue;
1787            }
1788            addresses.push(address);
1789        }
1790        addresses
1791    }
1792
1793    /// Scan the same-host registry only on startup and then at a low cadence.
1794    /// This avoids a per-second filesystem poll while preserving the fast path
1795    /// for processes launched around the same time.
1796    pub(super) async fn poll_local_instance_discovery(&mut self) {
1797        let Some(registry) = self.local_instance_registry.clone() else {
1798            return;
1799        };
1800        let now_ms = Self::now_ms();
1801        self.maybe_publish_local_instance_record(now_ms);
1802        if !self.local_instance_scan_due(now_ms) {
1803            return;
1804        }
1805        self.last_local_instance_scan_ms = Some(now_ms);
1806
1807        let records = match registry.scan(now_ms, self.config.node.discovery.local.stale_after_ms())
1808        {
1809            Ok(records) => records,
1810            Err(err) => {
1811                debug!(error = %err, "same-host FIPS instance scan failed");
1812                return;
1813            }
1814        };
1815        if records.is_empty() {
1816            return;
1817        }
1818
1819        let mut connect_budget = self.discovery_connect_budget();
1820        let mut skipped_budget = 0usize;
1821        for record in records {
1822            let identity = match PeerIdentity::from_npub(&record.npub) {
1823                Ok(identity) => identity,
1824                Err(err) => {
1825                    debug!(npub = %record.npub, error = %err, "local instance discovery: skip bad npub");
1826                    continue;
1827                }
1828            };
1829            let peer_node_addr = *identity.node_addr();
1830            if peer_node_addr == *self.identity.node_addr() {
1831                continue;
1832            }
1833            if !self.local_instance_peer_allowed(&identity) {
1834                debug!(
1835                    npub = %identity.short_npub(),
1836                    "local instance discovery: skip unconfigured peer"
1837                );
1838                continue;
1839            }
1840
1841            let addresses = self.local_instance_peer_addresses(&record);
1842            if addresses.is_empty() {
1843                continue;
1844            }
1845
1846            if self.peers.contains_key(&peer_node_addr)
1847                && self.active_peer_candidate_is_fresh_enough_to_skip(&peer_node_addr, &addresses)
1848            {
1849                continue;
1850            }
1851
1852            for address in addresses {
1853                let Some((transport_id, remote_addr)) =
1854                    self.resolve_peer_address_for_match(&address)
1855                else {
1856                    continue;
1857                };
1858                if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1859                    continue;
1860                }
1861                if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1862                    skipped_budget = skipped_budget.saturating_add(1);
1863                    continue;
1864                }
1865                info!(
1866                    npub = %identity.short_npub(),
1867                    transport = %address.transport,
1868                    addr = %address.addr,
1869                    "same-host FIPS instance discovery: initiating handshake"
1870                );
1871                if let Err(err) = self
1872                    .initiate_connection(transport_id, remote_addr, identity)
1873                    .await
1874                {
1875                    debug!(
1876                        npub = %record.npub,
1877                        error = %err,
1878                        "same-host FIPS instance discovery: failed to initiate connection"
1879                    );
1880                }
1881                connect_budget = connect_budget.saturating_sub(1);
1882            }
1883        }
1884        if skipped_budget > 0 {
1885            debug!(
1886                skipped = skipped_budget,
1887                "same-host FIPS instance discovery connect budget exhausted"
1888            );
1889        }
1890    }
1891
1892    /// Drain mDNS-discovered peers and initiate Noise XX handshakes. For
1893    /// active peers this is a non-disruptive alternate-path refresh: the
1894    /// current link stays live until a new handshake authenticates and
1895    /// promotes. The handshake itself is the authentication — a spoofed
1896    /// mDNS advert with someone else's npub fails the XX exchange and
1897    /// is dropped.
1898    pub(super) async fn poll_lan_discovery(&mut self) {
1899        let Some(runtime) = self.lan_discovery.clone() else {
1900            return;
1901        };
1902        let events = runtime.drain_events().await;
1903        if events.is_empty() {
1904            return;
1905        }
1906        let mut connect_budget = self.discovery_connect_budget();
1907        let mut skipped_budget = 0usize;
1908        for event in events {
1909            let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1910            let Some((transport_id, local_addr)) =
1911                self.find_udp_transport_for_remote_addr(peer.addr)
1912            else {
1913                debug!(
1914                    addr = %peer.addr,
1915                    "lan: skip discovered peer with no compatible UDP transport"
1916                );
1917                continue;
1918            };
1919            let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1920                Ok(id) => id,
1921                Err(err) => {
1922                    debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1923                    continue;
1924                }
1925            };
1926            let peer_node_addr = *identity.node_addr();
1927            let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1928            if self.peers.contains_key(&peer_node_addr) {
1929                let candidate = PeerAddress::new("udp", peer.addr.to_string());
1930                if self.active_peer_candidate_is_fresh_enough_to_skip(
1931                    &peer_node_addr,
1932                    std::slice::from_ref(&candidate),
1933                ) {
1934                    continue;
1935                }
1936                if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1937                    continue;
1938                }
1939                if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1940                    skipped_budget = skipped_budget.saturating_add(1);
1941                    continue;
1942                }
1943                info!(
1944                    npub = %identity.short_npub(),
1945                    addr = %peer.addr,
1946                    local_addr = %local_addr,
1947                    "lan: initiating alternate-path handshake to active peer"
1948                );
1949                if let Err(err) = self
1950                    .initiate_connection(transport_id, remote_addr, identity)
1951                    .await
1952                {
1953                    debug!(
1954                        npub = %peer.npub,
1955                        error = %err,
1956                        "lan: failed to initiate active peer alternate-path handshake"
1957                    );
1958                }
1959                connect_budget = connect_budget.saturating_sub(1);
1960                continue;
1961            }
1962            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1963                continue;
1964            }
1965            if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1966                skipped_budget = skipped_budget.saturating_add(1);
1967                continue;
1968            }
1969            info!(
1970                npub = %identity.short_npub(),
1971                addr = %peer.addr,
1972                local_addr = %local_addr,
1973                "lan: initiating handshake to discovered peer"
1974            );
1975            if let Err(err) = self
1976                .initiate_connection(transport_id, remote_addr, identity)
1977                .await
1978            {
1979                debug!(
1980                    npub = %peer.npub,
1981                    error = %err,
1982                    "lan: failed to initiate connection to discovered peer"
1983                );
1984            }
1985            connect_budget = connect_budget.saturating_sub(1);
1986        }
1987        if skipped_budget > 0 {
1988            debug!(
1989                skipped = skipped_budget,
1990                "lan: discovery connect budget exhausted"
1991            );
1992        }
1993    }
1994
1995    /// Poll pending transport connects and initiate handshakes for ready ones.
1996    ///
1997    /// Called from the tick handler. For each pending connect, queries the
1998    /// transport's connection state. When a connection is established,
1999    /// marks the link as Connected and starts the Noise handshake.
2000    /// Failed connections are cleaned up and scheduled for retry.
2001    pub(super) async fn poll_pending_connects(&mut self) {
2002        if self.pending_connects.is_empty() {
2003            return;
2004        }
2005
2006        let mut completed = Vec::new();
2007
2008        for (i, pending) in self.pending_connects.iter().enumerate() {
2009            let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
2010                transport.connection_state(&pending.remote_addr)
2011            } else {
2012                crate::transport::ConnectionState::Failed("transport removed".into())
2013            };
2014
2015            match state {
2016                crate::transport::ConnectionState::Connected => {
2017                    completed.push((i, true, None));
2018                }
2019                crate::transport::ConnectionState::Failed(reason) => {
2020                    completed.push((i, false, Some(reason)));
2021                }
2022                crate::transport::ConnectionState::Connecting => {
2023                    // Still in progress, check on next tick
2024                }
2025                crate::transport::ConnectionState::None => {
2026                    // Shouldn't happen — treat as failure
2027                    completed.push((i, false, Some("no connection attempt found".into())));
2028                }
2029            }
2030        }
2031
2032        // Process completions in reverse order to preserve indices
2033        for (i, success, reason) in completed.into_iter().rev() {
2034            let pending = self.pending_connects.remove(i);
2035
2036            if success {
2037                // Mark link as Connected
2038                if let Some(link) = self.links.get_mut(&pending.link_id) {
2039                    link.set_connected();
2040                }
2041
2042                debug!(
2043                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2044                    transport_id = %pending.transport_id,
2045                    remote_addr = %pending.remote_addr,
2046                    link_id = %pending.link_id,
2047                    "Transport connected, starting handshake"
2048                );
2049
2050                // Start the handshake now that the transport is connected
2051                if let Err(e) = self
2052                    .start_handshake(
2053                        pending.link_id,
2054                        pending.transport_id,
2055                        pending.remote_addr.clone(),
2056                        pending.peer_identity,
2057                    )
2058                    .await
2059                {
2060                    warn!(
2061                        link_id = %pending.link_id,
2062                        error = %e,
2063                        "Failed to start handshake after transport connect"
2064                    );
2065                    // Clean up link on handshake failure
2066                    self.remove_link(&pending.link_id);
2067                }
2068            } else {
2069                let reason = reason.unwrap_or_default();
2070                warn!(
2071                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2072                    transport_id = %pending.transport_id,
2073                    remote_addr = %pending.remote_addr,
2074                    link_id = %pending.link_id,
2075                    reason = %reason,
2076                    "Transport connect failed"
2077                );
2078
2079                // Clean up link and schedule retry
2080                self.remove_link(&pending.link_id);
2081                self.links.remove(&pending.link_id);
2082                self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
2083            }
2084        }
2085    }
2086
2087    // === State Transitions ===
2088
2089    /// Start the node.
2090    ///
2091    /// Initializes the TUN interface (if configured), spawns I/O threads,
2092    /// and transitions to the Running state.
2093    pub async fn start(&mut self) -> Result<(), NodeError> {
2094        node_start_debug_log("Node::start begin");
2095        if !self.state.can_start() {
2096            return Err(NodeError::AlreadyStarted);
2097        }
2098        self.state = NodeState::Starting;
2099        node_start_debug_log("Node::start state set to starting");
2100
2101        // Create packet channel for transport -> Node communication
2102        let packet_buffer_size = self.config.node.buffers.packet_channel;
2103        let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
2104        self.packet_tx = Some(packet_tx.clone());
2105        self.packet_rx = Some(packet_rx);
2106        node_start_debug_log("Node::start packet channel created");
2107
2108        // Initialize transports first (before TUN, before Nostr discovery).
2109        node_start_debug_log("Node::start create transports begin");
2110        let transport_handles = self.create_transports(&packet_tx).await;
2111        node_start_debug_log(format!(
2112            "Node::start create transports complete count={}",
2113            transport_handles.len()
2114        ));
2115
2116        for mut handle in transport_handles {
2117            let transport_id = handle.transport_id();
2118            let transport_type = handle.transport_type().name;
2119            let name = handle.name().map(|s| s.to_string());
2120
2121            node_start_debug_log(format!(
2122                "Node::start transport start begin id={} type={} name={:?}",
2123                transport_id, transport_type, name
2124            ));
2125            match handle.start().await {
2126                Ok(()) => {
2127                    node_start_debug_log(format!(
2128                        "Node::start transport start ok id={} type={}",
2129                        transport_id, transport_type
2130                    ));
2131                    self.transports.insert(transport_id, handle);
2132                }
2133                Err(e) => {
2134                    node_start_debug_log(format!(
2135                        "Node::start transport start error id={} type={} error={}",
2136                        transport_id, transport_type, e
2137                    ));
2138                    if let Some(ref n) = name {
2139                        warn!(transport_type, name = %n, error = %e, "Transport failed to start");
2140                    } else {
2141                        warn!(transport_type, error = %e, "Transport failed to start");
2142                    }
2143                }
2144            }
2145        }
2146
2147        if !self.transports.is_empty() {
2148            info!(count = self.transports.len(), "Transports initialized");
2149        }
2150
2151        // Spawn the off-task FMP-encrypt + UDP-send + FMP-decrypt
2152        // worker pools. **Unix only** — both pools issue direct
2153        // sendmmsg(2) / sendmsg(2)+UDP_GSO / recvmmsg(2) calls on
2154        // raw file descriptors via `AsRawFd`, which is a unix-only
2155        // trait. On Windows the rx_loop's tokio-based send/recv
2156        // remain the canonical path; the perf overhaul lands its
2157        // gains on unix.
2158        //
2159        // Worker count defaults to the number of CPUs, overridable
2160        // via `FIPS_ENCRYPT_WORKERS=N` / `FIPS_DECRYPT_WORKERS=N`
2161        // for debug / benchmarking. Hash-by-destination means a
2162        // single TCP flow pins to one worker (preserves wire
2163        // ordering); additional workers light up under multi-flow
2164        // / multi-peer load. See `node::encrypt_worker` /
2165        // `node::decrypt_worker` for full rationale.
2166        #[cfg(unix)]
2167        {
2168            if self.config.node.worker_pools_enabled {
2169                node_start_debug_log("Node::start worker pools begin");
2170                let cpu_default = std::thread::available_parallelism()
2171                    .map(|n| n.get())
2172                    .unwrap_or(1)
2173                    .max(1);
2174                let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
2175                    .ok()
2176                    .and_then(|s| s.parse().ok())
2177                    .unwrap_or(cpu_default)
2178                    .max(1);
2179                self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
2180                    encrypt_worker_count,
2181                ));
2182                info!(
2183                    workers = encrypt_worker_count,
2184                    "Spawned FMP-encrypt worker pool"
2185                );
2186
2187                // `FIPS_DECRYPT_WORKERS=0` disables the pool entirely and
2188                // falls through to the in-line rx_loop decrypt path (the
2189                // "test-mode" branch in `handle_encrypted_frame`, which is
2190                // in fact a fully functional synchronous decrypt). Useful
2191                // as an A/B against the worker pipeline when chasing
2192                // scheduling/queueing regressions on the native macOS
2193                // path. Any non-zero value (env or default) spawns the
2194                // pool as before.
2195                let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
2196                    .ok()
2197                    .and_then(|s| s.parse().ok())
2198                    .unwrap_or(cpu_default);
2199                if decrypt_worker_count == 0 {
2200                    info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
2201                } else {
2202                    self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
2203                        decrypt_worker_count,
2204                    ));
2205                    info!(
2206                        workers = decrypt_worker_count,
2207                        "Spawned FMP+FSP-decrypt worker pool"
2208                    );
2209                }
2210                node_start_debug_log("Node::start worker pools complete");
2211            } else {
2212                node_start_debug_log("Node::start worker pools disabled");
2213                info!("FIPS worker pools disabled; using in-line crypto/send path");
2214            }
2215        }
2216
2217        if self.config.node.discovery.nostr.enabled {
2218            node_start_debug_log("Node::start nostr discovery start begin");
2219            match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
2220                .await
2221            {
2222                Ok(runtime) => {
2223                    node_start_debug_log("Node::start nostr discovery runtime created");
2224                    if let Err(err) = self.refresh_overlay_advert(&runtime).await {
2225                        warn!(error = %err, "Failed to publish initial Nostr overlay advert");
2226                    }
2227                    node_start_debug_log("Node::start nostr overlay advert refreshed");
2228                    self.nostr_discovery = Some(runtime);
2229                    self.nostr_discovery_started_at_ms = Some(Self::now_ms());
2230                    info!("Nostr overlay discovery enabled");
2231                }
2232                Err(err) => {
2233                    node_start_debug_log(format!(
2234                        "Node::start nostr discovery start error error={}",
2235                        err
2236                    ));
2237                    warn!(error = %err, "Failed to start Nostr overlay discovery");
2238                }
2239            }
2240        }
2241
2242        // mDNS / DNS-SD LAN discovery. Independent of Nostr — runs even
2243        // when Nostr is disabled, since it gives us sub-second pairing
2244        // on the same link without any relay or NAT-traversal roundtrip.
2245        if self.config.node.discovery.lan.enabled {
2246            node_start_debug_log("Node::start lan discovery start begin");
2247            let advertised_udp_port = self
2248                .transports
2249                .values()
2250                .filter(|h| h.is_operational())
2251                .filter(|h| h.transport_type().name == "udp")
2252                .find_map(|h| h.local_addr().map(|addr| addr.port()))
2253                .unwrap_or(0);
2254            let scope = self.lan_discovery_scope();
2255            match crate::discovery::lan::LanDiscovery::start(
2256                &self.identity,
2257                scope,
2258                advertised_udp_port,
2259                self.config.node.discovery.lan.clone(),
2260            )
2261            .await
2262            {
2263                Ok(runtime) => {
2264                    node_start_debug_log("Node::start lan discovery start ok");
2265                    self.lan_discovery = Some(runtime);
2266                    info!("LAN mDNS discovery enabled");
2267                }
2268                Err(err) => {
2269                    node_start_debug_log(format!(
2270                        "Node::start lan discovery start error error={}",
2271                        err
2272                    ));
2273                    debug!(error = %err, "LAN mDNS discovery not started");
2274                }
2275            }
2276        }
2277
2278        self.start_local_instance_discovery();
2279        self.poll_local_instance_discovery().await;
2280
2281        // Connect to static peers before TUN is active
2282        // This allows handshake messages to be sent before we start accepting packets
2283        node_start_debug_log("Node::start initiate peer connections begin");
2284        self.initiate_peer_connections().await;
2285        node_start_debug_log("Node::start initiate peer connections complete");
2286
2287        // Initialize TUN interface last, after transports and peers are ready
2288        if self.config.tun.enabled {
2289            node_start_debug_log("Node::start tun init begin");
2290            let address = *self.identity.address();
2291            match TunDevice::create(&self.config.tun, address).await {
2292                Ok(device) => {
2293                    let mtu = device.mtu();
2294                    let name = device.name().to_string();
2295                    let our_addr = *device.address();
2296
2297                    info!("TUN device active:");
2298                    info!("     name: {}", name);
2299                    info!("  address: {}", device.address());
2300                    info!("      mtu: {}", mtu);
2301
2302                    // Calculate max MSS for TCP clamping
2303                    let effective_mtu = self.effective_ipv6_mtu();
2304                    let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); // IPv6 + TCP headers
2305
2306                    info!("effective MTU: {} bytes", effective_mtu);
2307                    debug!("   max TCP MSS: {} bytes", max_mss);
2308
2309                    // On macOS, create a shutdown pipe. Writing to it unblocks the
2310                    // reader thread's select() loop without closing the TUN fd
2311                    // (which would cause a double-close when TunDevice drops).
2312                    #[cfg(target_os = "macos")]
2313                    let (shutdown_read_fd, shutdown_write_fd) = {
2314                        let mut fds = [0i32; 2];
2315                        if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
2316                            return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
2317                                "failed to create shutdown pipe".into(),
2318                            )));
2319                        }
2320                        (fds[0], fds[1])
2321                    };
2322
2323                    // Create writer (dups the fd for independent write access).
2324                    // Pass path_mtu_lookup so inbound SYN-ACK clamp can read
2325                    // per-destination path MTU learned via discovery.
2326                    let (writer, tun_tx) =
2327                        device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
2328
2329                    // Spawn writer thread
2330                    let writer_handle = thread::spawn(move || {
2331                        writer.run();
2332                    });
2333
2334                    // Clone tun_tx for the reader
2335                    let reader_tun_tx = tun_tx.clone();
2336
2337                    // Create outbound channel for TUN reader → Node
2338                    let tun_channel_size = self.config.node.buffers.tun_channel;
2339                    let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
2340
2341                    // Spawn reader thread
2342                    let transport_mtu = self.transport_mtu();
2343                    let path_mtu_lookup = self.path_mtu_lookup.clone();
2344                    #[cfg(target_os = "macos")]
2345                    let reader_handle = thread::spawn(move || {
2346                        run_tun_reader(
2347                            device,
2348                            mtu,
2349                            our_addr,
2350                            reader_tun_tx,
2351                            outbound_tx,
2352                            transport_mtu,
2353                            path_mtu_lookup,
2354                            shutdown_read_fd,
2355                        );
2356                    });
2357                    #[cfg(not(target_os = "macos"))]
2358                    let reader_handle = thread::spawn(move || {
2359                        run_tun_reader(
2360                            device,
2361                            mtu,
2362                            our_addr,
2363                            reader_tun_tx,
2364                            outbound_tx,
2365                            transport_mtu,
2366                            path_mtu_lookup,
2367                        );
2368                    });
2369
2370                    self.tun_state = TunState::Active;
2371                    self.tun_name = Some(name);
2372                    self.tun_tx = Some(tun_tx);
2373                    self.tun_outbound_rx = Some(outbound_rx);
2374                    self.tun_reader_handle = Some(reader_handle);
2375                    self.tun_writer_handle = Some(writer_handle);
2376                    #[cfg(target_os = "macos")]
2377                    {
2378                        self.tun_shutdown_fd = Some(shutdown_write_fd);
2379                    }
2380                }
2381                Err(e) => {
2382                    self.tun_state = TunState::Failed;
2383                    warn!(error = %e, "Failed to initialize TUN, continuing without it");
2384                }
2385            }
2386            node_start_debug_log("Node::start tun init complete");
2387        }
2388
2389        // Initialize DNS responder (independent of TUN).
2390        //
2391        // Default bind_addr is "::1" (IPv6 loopback). The shipped
2392        // fips-dns-setup configures systemd-resolved via a global
2393        // /etc/systemd/resolved.conf.d/fips.conf drop-in pointing at
2394        // [::1]:5354, which sidesteps a Linux IPV6_PKTINFO behaviour
2395        // where self-destined traffic to fips0's address is attributed
2396        // to fips0 in PKTINFO and gets silently dropped by the
2397        // mesh-interface filter in src/upper/dns.rs.
2398        //
2399        // For mesh-reachable resolution (rare), set bind_addr: "::"
2400        // in fips.yaml. The mesh-interface filter remains active to
2401        // prevent hosts-file alias enumeration in that mode.
2402        // `IPV6_V6ONLY=0` is set explicitly so IPv4 clients on
2403        // 127.0.0.1 still reach us regardless of kernel sysctl
2404        // defaults — but only when bind is on a wildcard / IPv6 path.
2405        if self.config.dns.enabled {
2406            node_start_debug_log("Node::start dns init begin");
2407            let addr_str = self.config.dns.bind_addr();
2408            match addr_str.parse::<std::net::IpAddr>() {
2409                Ok(ip) => {
2410                    let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
2411                    match Self::bind_dns_socket(bind) {
2412                        Ok(socket) => {
2413                            let dns_channel_size = self.config.node.buffers.dns_channel;
2414                            let (identity_tx, identity_rx) =
2415                                tokio::sync::mpsc::channel(dns_channel_size);
2416                            let dns_ttl = self.config.dns.ttl();
2417                            let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
2418                                self.config.peers(),
2419                            );
2420                            let reloader = if self.config.node.system_files_enabled {
2421                                let hosts_path = std::path::PathBuf::from(
2422                                    crate::upper::hosts::DEFAULT_HOSTS_PATH,
2423                                );
2424                                crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
2425                            } else {
2426                                crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
2427                            };
2428                            // Resolve the TUN ifindex so the responder can
2429                            // drop queries arriving on the mesh interface
2430                            // (fips0). Without this, the `::` bind exposes
2431                            // /etc/fips/hosts alias probing to any mesh peer.
2432                            // When TUN isn't enabled or the name can't be
2433                            // resolved, `None` disables the filter (there
2434                            // is no mesh surface to defend anyway).
2435                            let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
2436                            info!(
2437                                bind = %bind,
2438                                hosts = reloader.hosts().len(),
2439                                mesh_ifindex = ?mesh_ifindex,
2440                                "DNS responder started for .fips domain (auto-reload enabled)"
2441                            );
2442                            let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
2443                                socket,
2444                                identity_tx,
2445                                dns_ttl,
2446                                reloader,
2447                                mesh_ifindex,
2448                            ));
2449                            self.dns_identity_rx = Some(identity_rx);
2450                            self.dns_task = Some(handle);
2451                        }
2452                        Err(e) => {
2453                            warn!(bind = %bind, error = %e, "Failed to start DNS responder");
2454                        }
2455                    }
2456                }
2457                Err(e) => {
2458                    warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
2459                }
2460            }
2461            node_start_debug_log("Node::start dns init complete");
2462        }
2463
2464        self.state = NodeState::Running;
2465        node_start_debug_log("Node::start running");
2466        info!("Node started:");
2467        info!("       state: {}", self.state);
2468        info!("  transports: {}", self.transports.len());
2469        info!(" connections: {}", self.connections.len());
2470        Ok(())
2471    }
2472
2473    /// Bind a UDP socket for the DNS responder.
2474    ///
2475    /// For IPv6 binds (including `::`), sets `IPV6_V6ONLY=0` so the socket
2476    /// also accepts IPv4-mapped addresses. This guarantees dual-stack
2477    /// delivery regardless of `net.ipv6.bindv6only` sysctl on the host —
2478    /// v4 clients on 127.0.0.1 and v6 clients on the fips0 address both
2479    /// land on the same socket.
2480    ///
2481    /// Also enables `IPV6_RECVPKTINFO` on IPv6 sockets so the responder
2482    /// can learn the arrival interface per packet. The responder uses that
2483    /// to drop queries arriving on the mesh TUN, closing the hosts-file
2484    /// probing side-channel created by the `::` bind.
2485    fn bind_dns_socket(
2486        addr: std::net::SocketAddr,
2487    ) -> Result<tokio::net::UdpSocket, std::io::Error> {
2488        use socket2::{Domain, Protocol, Socket, Type};
2489        let domain = if addr.is_ipv4() {
2490            Domain::IPV4
2491        } else {
2492            Domain::IPV6
2493        };
2494        let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
2495        if addr.is_ipv6() {
2496            sock.set_only_v6(false)?;
2497            #[cfg(unix)]
2498            Self::set_recv_pktinfo_v6(&sock)?;
2499        }
2500        sock.set_nonblocking(true)?;
2501        sock.bind(&addr.into())?;
2502        tokio::net::UdpSocket::from_std(sock.into())
2503    }
2504
2505    /// Enable `IPV6_RECVPKTINFO` on an IPv6 UDP socket.
2506    ///
2507    /// After this setsockopt, each `recvmsg()` call on the socket receives
2508    /// an `IPV6_PKTINFO` control message containing the arrival interface
2509    /// index, which the DNS responder uses for its mesh-interface filter.
2510    #[cfg(unix)]
2511    fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
2512        use std::os::fd::AsRawFd;
2513        let enable: libc::c_int = 1;
2514        let ret = unsafe {
2515            libc::setsockopt(
2516                sock.as_raw_fd(),
2517                libc::IPPROTO_IPV6,
2518                libc::IPV6_RECVPKTINFO,
2519                &enable as *const _ as *const libc::c_void,
2520                std::mem::size_of::<libc::c_int>() as libc::socklen_t,
2521            )
2522        };
2523        if ret < 0 {
2524            return Err(std::io::Error::last_os_error());
2525        }
2526        Ok(())
2527    }
2528
2529    /// Resolve the mesh TUN interface index by name.
2530    ///
2531    /// Returns `None` if the interface does not exist (e.g. TUN disabled
2532    /// or not yet created). A `None` result disables the DNS responder's
2533    /// mesh-interface filter — safe, because if there is no fips0 there
2534    /// is no mesh exposure to defend against.
2535    fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
2536        #[cfg(unix)]
2537        {
2538            let c_name = std::ffi::CString::new(name).ok()?;
2539            let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
2540            if idx == 0 { None } else { Some(idx) }
2541        }
2542        #[cfg(not(unix))]
2543        {
2544            let _ = name;
2545            None
2546        }
2547    }
2548
2549    /// Stop the node.
2550    ///
2551    /// Shuts down TUN interface, stops I/O threads, and transitions to
2552    /// the Stopped state.
2553    pub async fn stop(&mut self) -> Result<(), NodeError> {
2554        if !self.state.can_stop() {
2555            return Err(NodeError::NotStarted);
2556        }
2557        self.state = NodeState::Stopping;
2558        info!(state = %self.state, "Node stopping");
2559
2560        // Stop DNS responder
2561        if let Some(handle) = self.dns_task.take() {
2562            handle.abort();
2563            debug!("DNS responder stopped");
2564        }
2565
2566        // Send disconnect notifications to all active peers before closing transports
2567        self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
2568            .await;
2569
2570        // Stop Nostr overlay discovery background work and withdraw any advert.
2571        if let Some(bootstrap) = self.nostr_discovery.take()
2572            && let Err(e) = bootstrap.shutdown().await
2573        {
2574            warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
2575        }
2576
2577        // Tear down LAN mDNS responder + browser. Best-effort: the
2578        // OS will eventually time the advert out via its TTL even if
2579        // we don't get a clean unregister out before the daemon exits.
2580        if let Some(lan) = self.lan_discovery.take() {
2581            lan.shutdown().await;
2582        }
2583
2584        if let Some(registry) = self.local_instance_registry.take()
2585            && let Err(err) = registry.remove()
2586        {
2587            debug!(error = %err, "failed to remove same-host FIPS instance record");
2588        }
2589
2590        // Shutdown transports (they're packet producers)
2591        let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
2592        for transport_id in transport_ids {
2593            if let Some(mut handle) = self.transports.remove(&transport_id) {
2594                let transport_type = handle.transport_type().name;
2595                match handle.stop().await {
2596                    Ok(()) => {
2597                        info!(transport_id = %transport_id, transport_type, "Transport stopped");
2598                    }
2599                    Err(e) => {
2600                        warn!(
2601                            transport_id = %transport_id,
2602                            transport_type,
2603                            error = %e,
2604                            "Transport stop failed"
2605                        );
2606                    }
2607                }
2608            }
2609        }
2610
2611        // Drop packet channels
2612        self.packet_tx.take();
2613        self.packet_rx.take();
2614
2615        // Shutdown TUN interface
2616        if let Some(name) = self.tun_name.take() {
2617            info!(name = %name, "Shutting down TUN interface");
2618
2619            // Drop the tun_tx to signal the writer to stop
2620            self.tun_tx.take();
2621
2622            // Delete the interface (on Linux, causes reader to get EFAULT)
2623            if let Err(e) = shutdown_tun_interface(&name).await {
2624                warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
2625            }
2626
2627            // On macOS, signal the reader thread to exit by writing to the
2628            // shutdown pipe. The reader's select() will wake up and break.
2629            #[cfg(target_os = "macos")]
2630            if let Some(fd) = self.tun_shutdown_fd.take() {
2631                unsafe {
2632                    libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
2633                    libc::close(fd);
2634                }
2635            }
2636
2637            // Wait for threads to finish
2638            if let Some(handle) = self.tun_reader_handle.take() {
2639                let _ = handle.join();
2640            }
2641            if let Some(handle) = self.tun_writer_handle.take() {
2642                let _ = handle.join();
2643            }
2644
2645            self.tun_state = TunState::Disabled;
2646        }
2647
2648        self.state = NodeState::Stopped;
2649        info!(state = %self.state, "Node stopped");
2650        Ok(())
2651    }
2652
2653    /// Send disconnect notifications to all active peers.
2654    ///
2655    /// Best-effort: send failures are logged and ignored since the transport
2656    /// may already be degraded. This runs before transports are shut down.
2657    async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
2658        let disconnect = Disconnect::new(reason);
2659        let plaintext = disconnect.encode();
2660
2661        // Collect node_addrs to avoid borrow conflict with send helper
2662        let peer_addrs: Vec<NodeAddr> = self
2663            .peers
2664            .iter()
2665            .filter(|(_, peer)| peer.can_send() && peer.has_session())
2666            .map(|(addr, _)| *addr)
2667            .collect();
2668
2669        if peer_addrs.is_empty() {
2670            debug!(
2671                total_peers = self.peers.len(),
2672                "No sendable peers for disconnect notification"
2673            );
2674            return;
2675        }
2676
2677        let mut sent = 0usize;
2678        for node_addr in &peer_addrs {
2679            match self
2680                .send_encrypted_link_message(node_addr, &plaintext)
2681                .await
2682            {
2683                Ok(()) => sent += 1,
2684                Err(e) => {
2685                    debug!(
2686                        peer = %self.peer_display_name(node_addr),
2687                        error = %e,
2688                        "Failed to send disconnect (transport may be down)"
2689                    );
2690                }
2691            }
2692        }
2693
2694        info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2695    }
2696
2697    pub(in crate::node) fn static_peer_addresses(
2698        &self,
2699        peer_config: &PeerConfig,
2700    ) -> Vec<PeerAddress> {
2701        peer_config
2702            .addresses_by_priority()
2703            .into_iter()
2704            .cloned()
2705            .collect()
2706    }
2707
2708    async fn nostr_peer_fallback_addresses(
2709        &self,
2710        peer_config: &PeerConfig,
2711        existing: &[PeerAddress],
2712    ) -> Vec<PeerAddress> {
2713        if !self.config.node.discovery.nostr.enabled
2714            || self.config.node.discovery.nostr.policy
2715                == crate::config::NostrDiscoveryPolicy::Disabled
2716        {
2717            return Vec::new();
2718        }
2719
2720        let Some(bootstrap) = self.nostr_discovery.clone() else {
2721            return Vec::new();
2722        };
2723        if self.nostr_cooldown_applies_to_peer_config(peer_config)
2724            && bootstrap
2725                .cooldown_until(&peer_config.npub, Self::now_ms())
2726                .is_some()
2727        {
2728            debug!(
2729                npub = %peer_config.npub,
2730                "Skipping cached Nostr fallback endpoints while peer is in traversal cooldown"
2731            );
2732            return Vec::new();
2733        }
2734        let endpoints = match bootstrap
2735            .cached_advert_endpoints_for_peer(&peer_config.npub)
2736            .await
2737        {
2738            Some(endpoints) => endpoints,
2739            None => {
2740                debug!(
2741                    npub = %peer_config.npub,
2742                    "No cached Nostr advert endpoints for configured peer"
2743                );
2744                return Vec::new();
2745            }
2746        };
2747
2748        let mut fallback = Vec::new();
2749        let mut next_priority = existing
2750            .iter()
2751            .map(|addr| addr.priority)
2752            .max()
2753            .unwrap_or(100)
2754            .saturating_add(1);
2755        // Stamp every overlay-derived candidate with the current wall clock.
2756        // The dialer still honors explicit priority first; this timestamp is
2757        // only a recency tiebreaker within the same priority tier. We use a
2758        // single timestamp per fetch because all candidates in one advert are
2759        // equally fresh.
2760        let seen_at_ms = Self::now_ms();
2761        for endpoint in endpoints {
2762            let Some(candidate) =
2763                Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2764            else {
2765                continue;
2766            };
2767            if existing
2768                .iter()
2769                .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2770                || fallback.iter().any(|addr: &PeerAddress| {
2771                    addr.transport == candidate.transport && addr.addr == candidate.addr
2772                })
2773            {
2774                continue;
2775            }
2776            fallback.push(candidate);
2777            next_priority = next_priority.saturating_add(1);
2778        }
2779        fallback
2780    }
2781
2782    pub(in crate::node) async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2783        if !self.config.node.discovery.nostr.enabled
2784            || self.config.node.discovery.nostr.policy
2785                == crate::config::NostrDiscoveryPolicy::Disabled
2786        {
2787            return false;
2788        }
2789        let Some(bootstrap) = self.nostr_discovery.clone() else {
2790            return false;
2791        };
2792        let now_ms = Self::now_ms();
2793        if self.nostr_cooldown_applies_to_peer_config(peer_config)
2794            && let Some(cooldown_until_ms) = bootstrap.cooldown_until(&peer_config.npub, now_ms)
2795        {
2796            debug!(
2797                npub = %peer_config.npub,
2798                cooldown_secs = cooldown_until_ms.saturating_sub(now_ms) / 1000,
2799                "Skipping Nostr traversal request while peer is in cooldown"
2800            );
2801            return false;
2802        }
2803        bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
2804        bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
2805        let mesh_signaling_allowed = self.mesh_signaling_allowed_for_peer(peer_config);
2806        let started = bootstrap
2807            .request_connect_with_mesh_signaling(peer_config.clone(), mesh_signaling_allowed)
2808            .await;
2809        if started {
2810            info!(
2811                npub = %peer_config.npub,
2812                mesh_signaling_allowed,
2813                "Started background UDP NAT traversal attempt"
2814            );
2815        } else {
2816            debug!(
2817                npub = %peer_config.npub,
2818                mesh_signaling_allowed,
2819                "Background UDP NAT traversal attempt already in progress"
2820            );
2821        }
2822        true
2823    }
2824
2825    fn nostr_cooldown_applies_to_peer_config(&self, peer_config: &PeerConfig) -> bool {
2826        !self.mesh_signaling_allowed_for_peer(peer_config)
2827    }
2828
2829    pub(in crate::node) fn mesh_signaling_allowed_for_peer(
2830        &self,
2831        peer_config: &PeerConfig,
2832    ) -> bool {
2833        let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
2834            return false;
2835        };
2836        let peer_addr = identity.node_addr();
2837        self.configured_peer(peer_addr).is_some()
2838    }
2839
2840    fn overlay_endpoint_to_peer_address(
2841        endpoint: &OverlayEndpointAdvert,
2842        priority: u8,
2843        seen_at_ms: u64,
2844    ) -> Option<PeerAddress> {
2845        let transport = match endpoint.transport {
2846            OverlayTransportKind::Udp => "udp",
2847            OverlayTransportKind::Tcp => "tcp",
2848            OverlayTransportKind::Tor => "tor",
2849            OverlayTransportKind::WebRtc => "webrtc",
2850        };
2851        Some(
2852            PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2853                .with_seen_at_ms(seen_at_ms),
2854        )
2855    }
2856
2857    async fn attempt_peer_address_list(
2858        &mut self,
2859        peer_config: &PeerConfig,
2860        peer_identity: PeerIdentity,
2861        allow_bootstrap_nat: bool,
2862        addresses: &[PeerAddress],
2863    ) -> Result<(), NodeError> {
2864        let mut attempted = false;
2865        let mut local_route_error = None;
2866        let peer_node_addr = *peer_identity.node_addr();
2867        let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2868
2869        for addr in addresses {
2870            if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2871                if !allow_bootstrap_nat {
2872                    continue;
2873                }
2874                if self.request_nostr_bootstrap(peer_config).await {
2875                    attempted = true;
2876                    continue;
2877                }
2878                debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2879                continue;
2880            }
2881
2882            let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2883                match self.resolve_ethernet_addr(&addr.addr) {
2884                    Ok(result) => result,
2885                    Err(e) => {
2886                        debug!(
2887                            transport = %addr.transport,
2888                            addr = %addr.addr,
2889                            error = %e,
2890                            "Failed to resolve Ethernet address"
2891                        );
2892                        continue;
2893                    }
2894                }
2895            } else if addr.transport == "ble" {
2896                #[cfg(bluer_available)]
2897                {
2898                    match self.resolve_ble_addr(&addr.addr) {
2899                        Ok(result) => result,
2900                        Err(e) => {
2901                            debug!(
2902                                transport = %addr.transport,
2903                                addr = %addr.addr,
2904                                error = %e,
2905                                "Failed to resolve BLE address"
2906                            );
2907                            continue;
2908                        }
2909                    }
2910                }
2911                #[cfg(not(bluer_available))]
2912                {
2913                    debug!(transport = %addr.transport, "BLE transport not available on this build");
2914                    continue;
2915                }
2916            } else {
2917                let tid = if addr.transport == "udp"
2918                    && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2919                {
2920                    match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2921                        Some((id, _)) => id,
2922                        None => {
2923                            debug!(
2924                                transport = %addr.transport,
2925                                addr = %addr.addr,
2926                                "No compatible operational UDP transport for address"
2927                            );
2928                            continue;
2929                        }
2930                    }
2931                } else {
2932                    match self.find_transport_for_type(&addr.transport) {
2933                        Some(id) => id,
2934                        None => {
2935                            debug!(
2936                                transport = %addr.transport,
2937                                addr = %addr.addr,
2938                                "No operational transport for address type"
2939                            );
2940                            continue;
2941                        }
2942                    }
2943                };
2944                (tid, TransportAddr::from_string(&addr.addr))
2945            };
2946
2947            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2948                attempted = true;
2949                debug!(
2950                    npub = %peer_config.npub,
2951                    transport_id = %transport_id,
2952                    remote_addr = %remote_addr,
2953                    "Skipping duplicate in-flight candidate path"
2954                );
2955                continue;
2956            }
2957
2958            if concrete_budget == 0 {
2959                debug!(
2960                    npub = %peer_config.npub,
2961                    max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2962                    "Path candidate race budget exhausted"
2963                );
2964                break;
2965            }
2966
2967            match self
2968                .initiate_connection(transport_id, remote_addr, peer_identity)
2969                .await
2970            {
2971                Ok(()) => {
2972                    attempted = true;
2973                    concrete_budget = concrete_budget.saturating_sub(1);
2974                }
2975                Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2976                Err(e) => {
2977                    if e.is_local_route_unavailable() && local_route_error.is_none() {
2978                        local_route_error = Some(e.to_string());
2979                    }
2980                    debug!(
2981                        npub = %peer_config.npub,
2982                        transport_id = %transport_id,
2983                        error = %e,
2984                        "Connection attempt failed, trying next address"
2985                    );
2986                }
2987            }
2988        }
2989
2990        if attempted {
2991            return Ok(());
2992        }
2993
2994        if let Some(error) = local_route_error {
2995            return Err(NodeError::LocalRouteUnavailable(error));
2996        }
2997
2998        Err(NodeError::NoTransportForType(format!(
2999            "no operational transport for any of {}'s addresses",
3000            peer_config.npub
3001        )))
3002    }
3003
3004    async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
3005        self.run_open_discovery_sweep(bootstrap, None, "per-tick")
3006            .await;
3007    }
3008
3009    pub(in crate::node) fn queue_active_fallback_direct_retries(
3010        &mut self,
3011        _bootstrap: &std::sync::Arc<NostrDiscovery>,
3012    ) {
3013        let now_ms = Self::now_ms();
3014        let peer_configs = self
3015            .config
3016            .auto_connect_peers()
3017            .cloned()
3018            .collect::<Vec<_>>();
3019
3020        for peer_config in peer_configs {
3021            let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
3022                continue;
3023            };
3024            let node_addr = *peer_identity.node_addr();
3025
3026            if self.retry_pending.contains_key(&node_addr)
3027                || !self.peers.contains_key(&node_addr)
3028                || self.is_connecting_to_peer(&node_addr)
3029                || !self.active_peer_should_keep_direct_retry(&node_addr, &peer_config)
3030            {
3031                continue;
3032            }
3033
3034            let mut state = super::retry::RetryState::new(peer_config.clone());
3035            state.reconnect = true;
3036            state.retry_after_ms = now_ms;
3037            self.retry_pending.insert(node_addr, state);
3038
3039            debug!(
3040                peer = %self.peer_display_name(&node_addr),
3041                "Queued direct-path retry for active fallback peer"
3042            );
3043        }
3044    }
3045
3046    /// Open-discovery cache sweep. Iterates the cached overlay adverts and
3047    /// queues retries for non-configured, not-yet-connected peers.
3048    ///
3049    /// `max_age_secs`, if set, filters out adverts whose `created_at` is
3050    /// older than `now - max_age_secs`. The per-tick sweep passes `None`
3051    /// (relies on the cache's own `valid_until_ms` filter); the one-shot
3052    /// startup sweep passes `Some(startup_sweep_max_age_secs)`.
3053    ///
3054    /// `caller` is a short label included in log lines so per-tick and
3055    /// startup sweeps are distinguishable in operator-facing logs.
3056    pub(in crate::node) async fn run_open_discovery_sweep(
3057        &mut self,
3058        bootstrap: &std::sync::Arc<NostrDiscovery>,
3059        max_age_secs: Option<u64>,
3060        caller: &'static str,
3061    ) {
3062        if !self.config.node.discovery.nostr.enabled
3063            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3064        {
3065            return;
3066        }
3067
3068        let configured_npubs = self
3069            .config
3070            .peers()
3071            .iter()
3072            .map(|peer| peer.npub.clone())
3073            .collect::<HashSet<_>>();
3074        let now_ms = Self::now_ms();
3075        let now_secs = now_ms / 1000;
3076        let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
3077        if enqueue_budget == 0 {
3078            debug!(
3079                caller = %caller,
3080                "open-discovery sweep: enqueue budget is 0, skipping"
3081            );
3082            return;
3083        }
3084
3085        let candidates = bootstrap.cached_open_discovery_candidates(64).await;
3086        let cached_count = candidates.len();
3087        let mut enqueued = 0usize;
3088        let mut skipped_age = 0usize;
3089        let mut skipped_configured = 0usize;
3090        let mut skipped_self = 0usize;
3091        let mut skipped_connected = 0usize;
3092        let mut skipped_retry_pending = 0usize;
3093        let mut skipped_connecting = 0usize;
3094        let mut skipped_no_endpoints = 0usize;
3095        let mut skipped_invalid_npub = 0usize;
3096        let mut skipped_cooldown = 0usize;
3097
3098        for (npub, endpoints, created_at_secs) in candidates {
3099            if enqueue_budget == 0 {
3100                break;
3101            }
3102
3103            if let Some(max_age) = max_age_secs
3104                && now_secs.saturating_sub(created_at_secs) > max_age
3105            {
3106                skipped_age = skipped_age.saturating_add(1);
3107                continue;
3108            }
3109
3110            if configured_npubs.contains(&npub) {
3111                // Configured peers don't go through the open-discovery
3112                // enqueue path — their `PeerConfig` is already in
3113                // `self.config.peers()`, so the regular retry queue is
3114                // what drives their reconnect. But on cold start with
3115                // NAT'd peers, every initial `initiate_peer_connection`
3116                // fails (no overlay data yet, static cache hints empty
3117                // or stale), each pushes the peer into `retry_pending`
3118                // with exponential backoff (5/10/20/40/80s), and by the
3119                // time the next backoff slot fires the Nostr advert is
3120                // already cached — we just don't act on it for ~80s.
3121                //
3122                // The arrival of an advert (which this sweep sees) means
3123                // we now have a path to dial. If the peer's retry is
3124                // scheduled in the future, pull it forward to "now" so
3125                // the next `process_pending_retries` tick fires it
3126                // immediately. The retry path (`initiate_peer_retry_
3127                // connection` → `try_peer_addresses`) then refetches
3128                // the advert and dials it — no behavioral change
3129                // beyond schedule timing.
3130                if let Ok(identity) = PeerIdentity::from_npub(&npub) {
3131                    let configured_addr = *identity.node_addr();
3132                    if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3133                        skipped_cooldown = skipped_cooldown.saturating_add(1);
3134                        skipped_configured = skipped_configured.saturating_add(1);
3135                        continue;
3136                    }
3137                    if let Some(state) = self.retry_pending.get_mut(&configured_addr)
3138                        && state.retry_after_ms > now_ms
3139                    {
3140                        state.retry_after_ms = now_ms;
3141                        debug!(
3142                            caller = %caller,
3143                            peer = %self.peer_display_name(&configured_addr),
3144                            advert_age_secs = now_secs.saturating_sub(created_at_secs),
3145                            "Expediting configured-peer retry after fresh overlay advert"
3146                        );
3147                    }
3148                }
3149                skipped_configured = skipped_configured.saturating_add(1);
3150                continue;
3151            }
3152
3153            let peer_identity = match PeerIdentity::from_npub(&npub) {
3154                Ok(identity) => identity,
3155                Err(_) => {
3156                    skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
3157                    continue;
3158                }
3159            };
3160            let node_addr = *peer_identity.node_addr();
3161            if node_addr == *self.identity.node_addr() {
3162                skipped_self = skipped_self.saturating_add(1);
3163                continue;
3164            }
3165            if self.peers.contains_key(&node_addr) {
3166                skipped_connected = skipped_connected.saturating_add(1);
3167                continue;
3168            }
3169            if self.retry_pending.contains_key(&node_addr) {
3170                skipped_retry_pending = skipped_retry_pending.saturating_add(1);
3171                continue;
3172            }
3173            if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3174                skipped_cooldown = skipped_cooldown.saturating_add(1);
3175                continue;
3176            }
3177            let connecting = self.connections.values().any(|conn| {
3178                conn.expected_identity()
3179                    .map(|id| id.node_addr() == &node_addr)
3180                    .unwrap_or(false)
3181            });
3182            if connecting {
3183                skipped_connecting = skipped_connecting.saturating_add(1);
3184                continue;
3185            }
3186
3187            let mut addresses = Vec::new();
3188            let mut priority = 120u8;
3189            let seen_at_ms = Self::now_ms();
3190            for endpoint in endpoints {
3191                let Some(candidate) =
3192                    Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
3193                else {
3194                    continue;
3195                };
3196                if addresses.iter().any(|existing: &PeerAddress| {
3197                    existing.transport == candidate.transport && existing.addr == candidate.addr
3198                }) {
3199                    continue;
3200                }
3201                addresses.push(candidate);
3202                priority = priority.saturating_add(1);
3203            }
3204            if addresses.is_empty() {
3205                skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
3206                continue;
3207            }
3208
3209            self.peer_aliases
3210                .entry(node_addr)
3211                .or_insert_with(|| peer_identity.short_npub());
3212            self.register_identity(node_addr, peer_identity.pubkey_full());
3213
3214            let mut state = super::retry::RetryState::new(PeerConfig {
3215                npub: npub.clone(),
3216                alias: None,
3217                addresses,
3218                connect_policy: ConnectPolicy::AutoConnect,
3219                auto_reconnect: true,
3220                discovery_fallback_transit: false,
3221            });
3222            state.reconnect = false;
3223            state.retry_after_ms = now_ms;
3224            state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
3225            self.retry_pending.insert(node_addr, state);
3226            info!(
3227                caller = %caller,
3228                peer = %peer_identity.short_npub(),
3229                advert_age_secs = now_secs.saturating_sub(created_at_secs),
3230                "open-discovery sweep: queued retry for cached advert"
3231            );
3232            enqueue_budget = enqueue_budget.saturating_sub(1);
3233            enqueued = enqueued.saturating_add(1);
3234        }
3235
3236        // Always log a one-line summary on the startup sweep so operators
3237        // can verify it ran. Per-tick sweeps are noisier; only summarize
3238        // when something happened.
3239        let total_skipped = skipped_age
3240            + skipped_configured
3241            + skipped_self
3242            + skipped_connected
3243            + skipped_retry_pending
3244            + skipped_connecting
3245            + skipped_no_endpoints
3246            + skipped_invalid_npub
3247            + skipped_cooldown;
3248        let should_summarize = caller == "startup" || enqueued > 0;
3249        if should_summarize {
3250            info!(
3251                caller = %caller,
3252                cached = cached_count,
3253                queued = enqueued,
3254                skipped_age = skipped_age,
3255                skipped_configured = skipped_configured,
3256                skipped_self = skipped_self,
3257                skipped_connected = skipped_connected,
3258                skipped_retry_pending = skipped_retry_pending,
3259                skipped_connecting = skipped_connecting,
3260                skipped_no_endpoints = skipped_no_endpoints,
3261                skipped_invalid_npub = skipped_invalid_npub,
3262                skipped_cooldown = skipped_cooldown,
3263                skipped_total = total_skipped,
3264                "open-discovery sweep complete"
3265            );
3266        }
3267    }
3268
3269    /// One-shot startup sweep: runs once after the configured settle
3270    /// delay, iterating the cached overlay adverts and queueing retries
3271    /// for any peer with a recent enough advert that we haven't already
3272    /// configured statically or established a link to.
3273    ///
3274    /// Gated identically to [`run_open_discovery_sweep`]: requires
3275    /// `node.discovery.nostr.enabled` and `policy == open`.
3276    async fn maybe_run_startup_open_discovery_sweep(
3277        &mut self,
3278        bootstrap: &std::sync::Arc<NostrDiscovery>,
3279    ) {
3280        if self.startup_open_discovery_sweep_done {
3281            return;
3282        }
3283        if !self.config.node.discovery.nostr.enabled
3284            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3285        {
3286            // Mark done so we don't keep re-checking on every tick.
3287            self.startup_open_discovery_sweep_done = true;
3288            return;
3289        }
3290        let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
3291            return;
3292        };
3293        let now_ms = Self::now_ms();
3294        let delay_ms = self
3295            .config
3296            .node
3297            .discovery
3298            .nostr
3299            .startup_sweep_delay_secs
3300            .saturating_mul(1000);
3301        if now_ms < started_at_ms.saturating_add(delay_ms) {
3302            return;
3303        }
3304
3305        let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
3306        self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
3307            .await;
3308        self.startup_open_discovery_sweep_done = true;
3309    }
3310
3311    fn available_outbound_slots(&self) -> usize {
3312        let connection_used = self
3313            .connections
3314            .len()
3315            .saturating_add(self.pending_connects.len());
3316        let connection_slots = if self.max_connections == 0 {
3317            usize::MAX
3318        } else {
3319            self.max_connections.saturating_sub(connection_used)
3320        };
3321
3322        let peer_slots = if self.max_peers == 0 {
3323            usize::MAX
3324        } else {
3325            self.max_peers.saturating_sub(self.peers.len())
3326        };
3327
3328        let link_slots = if self.max_links == 0 {
3329            usize::MAX
3330        } else {
3331            self.max_links.saturating_sub(self.links.len())
3332        };
3333
3334        connection_slots.min(peer_slots).min(link_slots)
3335    }
3336
3337    pub(in crate::node) fn open_discovery_enqueue_budget(
3338        &self,
3339        configured_npubs: &HashSet<String>,
3340    ) -> usize {
3341        let current_open_discovery_active = self
3342            .peers
3343            .values()
3344            .filter(|peer| !configured_npubs.contains(&peer.npub()))
3345            .count();
3346        let current_open_discovery_pending = self
3347            .retry_pending
3348            .values()
3349            .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
3350            .count();
3351
3352        let cap_remaining = self
3353            .config
3354            .node
3355            .discovery
3356            .nostr
3357            .open_discovery_max_pending
3358            .saturating_sub(current_open_discovery_active)
3359            .saturating_sub(current_open_discovery_pending);
3360
3361        cap_remaining.min(self.available_outbound_slots())
3362    }
3363
3364    fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
3365        now_ms.saturating_add(
3366            self.config
3367                .node
3368                .discovery
3369                .nostr
3370                .advert_ttl_secs
3371                .saturating_mul(1000)
3372                .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
3373        )
3374    }
3375
3376    async fn build_overlay_advert(
3377        &self,
3378        bootstrap: &std::sync::Arc<NostrDiscovery>,
3379    ) -> Option<OverlayAdvert> {
3380        if !self.config.node.discovery.nostr.enabled {
3381            return None;
3382        }
3383
3384        let mut endpoints = Vec::new();
3385        let mut has_udp_nat = false;
3386        let mut has_webrtc = false;
3387
3388        for handle in self.transports.values() {
3389            if !handle.is_operational() {
3390                continue;
3391            }
3392
3393            match handle.transport_type().name {
3394                "udp" => {
3395                    let Some(cfg) = self.lookup_udp_config(handle.name()) else {
3396                        continue;
3397                    };
3398                    if !cfg.advertise_on_nostr() {
3399                        continue;
3400                    }
3401                    if cfg.is_public() {
3402                        // Precedence:
3403                        // 1. operator-supplied `external_addr` (skips STUN)
3404                        // 2. non-wildcard *public* `local_addr` (operator
3405                        //    bound to a specific public IP directly)
3406                        // 3. STUN auto-discovery against ephemeral socket
3407                        //    (also taken when bind is wildcard *or* private —
3408                        //    a private bind is not peer-reachable, so we
3409                        //    must publish the public reflexive instead)
3410                        // 4. loud warn + omit endpoint
3411                        if let Some(explicit) = cfg.external_advert_addr() {
3412                            endpoints.push(OverlayEndpointAdvert {
3413                                transport: OverlayTransportKind::Udp,
3414                                addr: explicit.to_string(),
3415                            });
3416                        } else {
3417                            match handle.local_addr() {
3418                                Some(addr)
3419                                    if !addr.ip().is_unspecified()
3420                                        && !is_unroutable_advert_ip(addr.ip()) =>
3421                                {
3422                                    endpoints.push(OverlayEndpointAdvert {
3423                                        transport: OverlayTransportKind::Udp,
3424                                        addr: addr.to_string(),
3425                                    });
3426                                }
3427                                Some(addr) => {
3428                                    let key = handle.transport_id().as_u32();
3429                                    let port = addr.port();
3430                                    if let Some(public) =
3431                                        bootstrap.learn_public_udp_addr(key, port).await
3432                                    {
3433                                        endpoints.push(OverlayEndpointAdvert {
3434                                            transport: OverlayTransportKind::Udp,
3435                                            addr: public.to_string(),
3436                                        });
3437                                    } else {
3438                                        warn!(
3439                                            transport_id = key,
3440                                            bind_addr = %addr,
3441                                            "advert: udp public=true but bind is wildcard \
3442                                            or private and STUN observation failed; \
3443                                            advertising no UDP endpoint. Either set \
3444                                            transports.udp.external_addr, bind to a \
3445                                            specific *public* IP, or ensure \
3446                                            node.discovery.nostr.stun_servers is reachable"
3447                                        );
3448                                    }
3449                                }
3450                                None => {}
3451                            }
3452                        }
3453                    } else {
3454                        endpoints.push(OverlayEndpointAdvert {
3455                            transport: OverlayTransportKind::Udp,
3456                            addr: "nat".to_string(),
3457                        });
3458                        has_udp_nat = true;
3459                    }
3460                }
3461                "webrtc" => {
3462                    let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
3463                        continue;
3464                    };
3465                    if !cfg.advertise_on_nostr() {
3466                        continue;
3467                    }
3468                    endpoints.push(OverlayEndpointAdvert {
3469                        transport: OverlayTransportKind::WebRtc,
3470                        addr: hex::encode(self.identity.pubkey_full().serialize()),
3471                    });
3472                    has_webrtc = true;
3473                }
3474                "tcp" => {
3475                    let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
3476                        continue;
3477                    };
3478                    if !cfg.advertise_on_nostr() {
3479                        continue;
3480                    }
3481                    // Precedence:
3482                    // 1. operator-supplied `external_addr` (only path that
3483                    //    works on cloud-NAT setups where the public IP is
3484                    //    not on a host interface).
3485                    // 2. non-wildcard *public* `local_addr` (operator bound
3486                    //    to a specific public IP directly).
3487                    // 3. loud warn + omit endpoint (no TCP STUN equivalent).
3488                    //
3489                    // A wildcard *or* private bind is never advertised as-is
3490                    // — peers off-LAN can't reach a private bind, and there
3491                    // is no TCP STUN to discover a public reflexive.
3492                    if let Some(explicit) = cfg.external_advert_addr() {
3493                        endpoints.push(OverlayEndpointAdvert {
3494                            transport: OverlayTransportKind::Tcp,
3495                            addr: explicit.to_string(),
3496                        });
3497                    } else {
3498                        match handle.local_addr() {
3499                            Some(addr)
3500                                if !addr.ip().is_unspecified()
3501                                    && !is_unroutable_advert_ip(addr.ip()) =>
3502                            {
3503                                endpoints.push(OverlayEndpointAdvert {
3504                                    transport: OverlayTransportKind::Tcp,
3505                                    addr: addr.to_string(),
3506                                });
3507                            }
3508                            Some(addr) => {
3509                                warn!(
3510                                    bind_addr = %addr,
3511                                    "advert: tcp advertise_on_nostr=true bound to wildcard \
3512                                    or private IP and no transports.tcp.external_addr set; \
3513                                    advertising no TCP endpoint. Either set external_addr \
3514                                    to the public IP (recommended for cloud 1:1-NAT setups) \
3515                                    or bind explicitly to the public IP"
3516                                );
3517                            }
3518                            None => {}
3519                        }
3520                    }
3521                }
3522                "tor" => {
3523                    let Some(cfg) = self.lookup_tor_config(handle.name()) else {
3524                        continue;
3525                    };
3526                    if !cfg.advertise_on_nostr() {
3527                        continue;
3528                    }
3529                    if let Some(addr) = handle.onion_address() {
3530                        endpoints.push(OverlayEndpointAdvert {
3531                            transport: OverlayTransportKind::Tor,
3532                            addr: format!("{}:{}", addr, cfg.advertised_port()),
3533                        });
3534                    }
3535                }
3536                _ => {}
3537            }
3538        }
3539
3540        if endpoints.is_empty() {
3541            return None;
3542        }
3543
3544        Some(OverlayAdvert {
3545            identifier: ADVERT_IDENTIFIER.to_string(),
3546            version: ADVERT_VERSION,
3547            endpoints,
3548            signal_relays: (has_udp_nat || has_webrtc)
3549                .then(|| self.config.node.discovery.nostr.dm_relays.clone()),
3550            stun_servers: (has_udp_nat || has_webrtc)
3551                .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
3552        })
3553    }
3554
3555    async fn refresh_overlay_advert(
3556        &self,
3557        bootstrap: &std::sync::Arc<NostrDiscovery>,
3558    ) -> Result<(), crate::discovery::nostr::BootstrapError> {
3559        let advert = self.build_overlay_advert(bootstrap).await;
3560        bootstrap.update_local_advert(advert).await
3561    }
3562
3563    fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
3564        match (&self.config.transports.udp, transport_name) {
3565            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3566            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3567            _ => None,
3568        }
3569    }
3570
3571    fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
3572        match (&self.config.transports.tcp, transport_name) {
3573            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3574            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3575            _ => None,
3576        }
3577    }
3578
3579    fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
3580        match (&self.config.transports.tor, transport_name) {
3581            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3582            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3583            _ => None,
3584        }
3585    }
3586
3587    fn lookup_webrtc_config(
3588        &self,
3589        transport_name: Option<&str>,
3590    ) -> Option<&crate::config::WebRtcConfig> {
3591        match (&self.config.transports.webrtc, transport_name) {
3592            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3593            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3594            _ => None,
3595        }
3596    }
3597
3598    pub(in crate::node) async fn try_peer_addresses(
3599        &mut self,
3600        peer_config: &PeerConfig,
3601        peer_identity: PeerIdentity,
3602        allow_bootstrap_nat: bool,
3603    ) -> Result<(), NodeError> {
3604        let peer_node_addr = *peer_identity.node_addr();
3605        if self.peers.contains_key(&peer_node_addr) {
3606            debug!(
3607                npub = %peer_config.npub,
3608                "Peer already exists, skipping address attempts"
3609            );
3610            return Ok(());
3611        }
3612
3613        let candidates = self.peer_address_candidates(peer_config).await;
3614
3615        if candidates.is_empty() {
3616            if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3617                return Ok(());
3618            }
3619            return Err(NodeError::NoTransportForType(format!(
3620                "no addresses known for {}",
3621                peer_config.npub
3622            )));
3623        }
3624
3625        if self
3626            .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
3627            .await
3628            .is_ok()
3629        {
3630            if allow_bootstrap_nat {
3631                self.request_nostr_bootstrap(peer_config).await;
3632            }
3633            return Ok(());
3634        }
3635
3636        if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3637            return Ok(());
3638        }
3639
3640        Err(NodeError::NoTransportForType(format!(
3641            "no operational transport for any of {}'s addresses",
3642            peer_config.npub
3643        )))
3644    }
3645
3646    async fn try_active_peer_alternative_addresses(
3647        &mut self,
3648        peer_config: &PeerConfig,
3649        peer_identity: PeerIdentity,
3650        allow_same_path_refresh: bool,
3651    ) -> Result<bool, NodeError> {
3652        let peer_node_addr = *peer_identity.node_addr();
3653        let mut candidates = self.peer_address_candidates(peer_config).await;
3654        let same_path_refresh_needed = allow_same_path_refresh
3655            && (self.active_peer_needs_same_path_refresh(&peer_node_addr)
3656                || self
3657                    .peers
3658                    .get(&peer_node_addr)
3659                    .is_some_and(|peer| !peer.can_send()));
3660        if same_path_refresh_needed
3661            && let Some(candidate) = self.active_peer_current_udp_candidate(&peer_node_addr)
3662            && !candidates.iter().any(|existing| {
3663                existing.transport == candidate.transport && existing.addr == candidate.addr
3664            })
3665        {
3666            candidates.push(candidate);
3667            Self::sort_peer_address_candidates(&mut candidates);
3668        }
3669        let should_try_nostr =
3670            self.active_peer_should_keep_direct_retry(&peer_node_addr, peer_config);
3671
3672        if candidates.is_empty() {
3673            if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3674                return Ok(true);
3675            }
3676            return Err(NodeError::NoTransportForType(format!(
3677                "no addresses known for {}",
3678                peer_config.npub
3679            )));
3680        }
3681
3682        let alternatives: Vec<_> = candidates
3683            .into_iter()
3684            .filter(|addr| {
3685                same_path_refresh_needed
3686                    || !self.active_peer_matches_candidate(&peer_node_addr, addr)
3687            })
3688            .collect();
3689
3690        if alternatives.is_empty() {
3691            if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3692                return Ok(true);
3693            }
3694            return Ok(false);
3695        }
3696
3697        let needs_separate_nostr_attempt = should_try_nostr
3698            && !alternatives
3699                .iter()
3700                .any(|addr| addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat"));
3701        let address_result = self
3702            .attempt_peer_address_list(peer_config, peer_identity, true, &alternatives)
3703            .await;
3704        let nostr_attempted =
3705            needs_separate_nostr_attempt && self.request_nostr_bootstrap(peer_config).await;
3706
3707        match address_result {
3708            Ok(()) => Ok(true),
3709            Err(err) if nostr_attempted => {
3710                debug!(
3711                    npub = %peer_config.npub,
3712                    error = %err,
3713                    "Static active-peer direct-path alternatives failed; Nostr traversal still queued"
3714                );
3715                Ok(true)
3716            }
3717            Err(err) => Err(err),
3718        }
3719    }
3720
3721    async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
3722        // Merge every candidate from every source we have for this peer.
3723        // Explicitly configured addresses keep first shot, then freshly
3724        // fetched overlay adverts are appended as fallback candidates. This
3725        // lets native peers try known LAN/nvpn/static UDP routes before
3726        // slower WebRTC/Nostr-discovered paths, while still racing every
3727        // concrete candidate that fits in the per-peer budget.
3728        let static_addresses = self.static_peer_addresses(peer_config);
3729        let overlay_addresses = self
3730            .nostr_peer_fallback_addresses(peer_config, &static_addresses)
3731            .await;
3732
3733        let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
3734        for addr in overlay_addresses.into_iter().chain(static_addresses) {
3735            if !candidates.iter().any(|existing: &PeerAddress| {
3736                existing.transport == addr.transport && existing.addr == addr.addr
3737            }) {
3738                candidates.push(addr);
3739            }
3740        }
3741
3742        Self::sort_peer_address_candidates(&mut candidates);
3743
3744        candidates
3745    }
3746
3747    fn sort_peer_address_candidates(candidates: &mut [PeerAddress]) {
3748        // Stable sort: recently observed candidates win over unstamped static
3749        // hints, then explicit priority and recency break ties. Static hints
3750        // remain candidates, but they are not privileged forever after a
3751        // network move; a fresh advert/STUN-observed endpoint gets the first
3752        // shot when the race budget is tight.
3753        candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
3754            (Some(_), None) => std::cmp::Ordering::Less,
3755            (None, Some(_)) => std::cmp::Ordering::Greater,
3756            _ if a.priority != b.priority => a.priority.cmp(&b.priority),
3757            (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
3758            (None, None) => std::cmp::Ordering::Equal,
3759        });
3760    }
3761
3762    fn active_peer_matches_any_candidate(
3763        &self,
3764        peer_node_addr: &NodeAddr,
3765        candidates: &[PeerAddress],
3766    ) -> bool {
3767        candidates
3768            .iter()
3769            .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
3770    }
3771
3772    pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
3773        &self,
3774        peer_node_addr: &NodeAddr,
3775        candidates: &[PeerAddress],
3776    ) -> bool {
3777        if !self
3778            .peers
3779            .get(peer_node_addr)
3780            .is_some_and(|peer| peer.can_send())
3781        {
3782            return false;
3783        }
3784        if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
3785            return false;
3786        }
3787        !self.active_peer_needs_same_path_refresh(peer_node_addr)
3788    }
3789
3790    pub(in crate::node) fn active_peer_should_keep_direct_retry(
3791        &self,
3792        peer_node_addr: &NodeAddr,
3793        peer_config: &PeerConfig,
3794    ) -> bool {
3795        let Some(peer) = self.peers.get(peer_node_addr) else {
3796            return false;
3797        };
3798
3799        let static_addresses = self.static_peer_addresses(peer_config);
3800        if !static_addresses.is_empty() {
3801            return !self
3802                .active_peer_candidate_is_fresh_enough_to_skip(peer_node_addr, &static_addresses);
3803        }
3804
3805        if peer_config.npub.is_empty() {
3806            return false;
3807        }
3808
3809        if !self.config.node.discovery.nostr.enabled
3810            || self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Disabled
3811        {
3812            return false;
3813        }
3814
3815        let Some(transport_id) = peer.transport_id() else {
3816            return true;
3817        };
3818
3819        if self.bootstrap_transports.contains(&transport_id) {
3820            return self.active_peer_needs_same_path_refresh(peer_node_addr);
3821        }
3822
3823        let Some(transport) = self.transports.get(&transport_id) else {
3824            return true;
3825        };
3826
3827        if transport.transport_type().name != "udp" {
3828            return true;
3829        }
3830
3831        self.active_peer_needs_same_path_refresh(peer_node_addr)
3832    }
3833
3834    pub(in crate::node) fn clear_retry_unless_direct_refresh_needed(
3835        &mut self,
3836        peer_node_addr: &NodeAddr,
3837    ) {
3838        let keep_retry = self
3839            .retry_pending
3840            .get(peer_node_addr)
3841            .map(|state| state.peer_config.clone())
3842            .is_some_and(|peer_config| {
3843                self.active_peer_should_keep_direct_retry(peer_node_addr, &peer_config)
3844            });
3845
3846        if !keep_retry {
3847            self.retry_pending.remove(peer_node_addr);
3848        }
3849    }
3850
3851    fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
3852        let Some(peer) = self.peers.get(peer_node_addr) else {
3853            return false;
3854        };
3855        let stale_after_ms = self
3856            .config
3857            .node
3858            .heartbeat_interval_secs
3859            .saturating_mul(1000)
3860            .max(1000);
3861        peer.idle_time(Self::now_ms()) > stale_after_ms
3862    }
3863
3864    fn active_peer_current_udp_candidate(&self, peer_node_addr: &NodeAddr) -> Option<PeerAddress> {
3865        let peer = self.peers.get(peer_node_addr)?;
3866        let transport_id = peer.transport_id()?;
3867        let current_addr = peer.current_addr()?;
3868        let transport = self.transports.get(&transport_id)?;
3869        if transport.transport_type().name != "udp" {
3870            return None;
3871        }
3872        let socket_addr = current_addr.as_str()?.parse::<SocketAddr>().ok()?;
3873
3874        Some(
3875            PeerAddress::with_priority("udp", socket_addr.to_string(), 240)
3876                .with_seen_at_ms(Self::now_ms()),
3877        )
3878    }
3879
3880    pub(in crate::node) fn active_peer_matches_candidate(
3881        &self,
3882        peer_node_addr: &NodeAddr,
3883        candidate: &PeerAddress,
3884    ) -> bool {
3885        let Some(peer) = self.peers.get(peer_node_addr) else {
3886            return false;
3887        };
3888        let Some(current_addr) = peer.current_addr() else {
3889            return false;
3890        };
3891        if let Some(peer_transport_id) = peer.transport_id()
3892            && let Some((candidate_transport_id, candidate_addr)) =
3893                self.resolve_peer_address_for_match(candidate)
3894        {
3895            return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
3896        }
3897        if peer
3898            .transport_id()
3899            .map(|id| self.bootstrap_transports.contains(&id))
3900            .unwrap_or(false)
3901        {
3902            return false;
3903        }
3904        let current_addr = current_addr.to_string();
3905        let current_transport = peer
3906            .transport_id()
3907            .and_then(|id| self.transports.get(&id))
3908            .map(|transport| transport.transport_type().name);
3909
3910        candidate.addr == current_addr
3911            && current_transport
3912                .map(|transport| transport == candidate.transport)
3913                .unwrap_or(true)
3914    }
3915
3916    pub(in crate::node) fn active_peer_uses_recent_endpoint_path(
3917        &self,
3918        peer_node_addr: &NodeAddr,
3919        peer_config: &PeerConfig,
3920    ) -> bool {
3921        peer_config.addresses.iter().any(|addr| {
3922            addr.seen_at_ms.is_some() && self.active_peer_matches_candidate(peer_node_addr, addr)
3923        })
3924    }
3925
3926    pub(in crate::node) fn active_peer_uses_traversal_path(
3927        &self,
3928        peer_node_addr: &NodeAddr,
3929        peer_config: &PeerConfig,
3930    ) -> bool {
3931        let via_bootstrap_transport = self
3932            .peers
3933            .get(peer_node_addr)
3934            .and_then(|peer| peer.transport_id())
3935            .map(|id| self.bootstrap_transports.contains(&id))
3936            .unwrap_or(false);
3937
3938        via_bootstrap_transport
3939            || self.active_peer_uses_recent_endpoint_path(peer_node_addr, peer_config)
3940    }
3941
3942    // === Control API methods ===
3943
3944    /// Connect to a peer via the control API.
3945    ///
3946    /// Creates an ephemeral peer connection (not persisted to config, no
3947    /// auto-reconnect). Reuses the same connection path as auto-connect
3948    /// peers. Returns JSON data on success or an error message.
3949    pub(crate) async fn api_connect(
3950        &mut self,
3951        npub: &str,
3952        address: &str,
3953        transport: &str,
3954    ) -> Result<serde_json::Value, String> {
3955        let peer_config = PeerConfig {
3956            npub: npub.to_string(),
3957            alias: None,
3958            addresses: vec![PeerAddress::new(transport, address)],
3959            connect_policy: ConnectPolicy::Manual,
3960            auto_reconnect: false,
3961            discovery_fallback_transit: true,
3962        };
3963
3964        // Pre-seed identity cache (same as initiate_peer_connections does)
3965        if let Ok(identity) = PeerIdentity::from_npub(npub) {
3966            self.peer_aliases
3967                .insert(*identity.node_addr(), identity.short_npub());
3968            self.register_identity(*identity.node_addr(), identity.pubkey_full());
3969        }
3970
3971        self.initiate_peer_connection(&peer_config)
3972            .await
3973            .map(|()| {
3974                info!(
3975                    npub = %npub,
3976                    address = %address,
3977                    transport = %transport,
3978                    "API connect initiated"
3979                );
3980                serde_json::json!({
3981                    "npub": npub,
3982                    "address": address,
3983                    "transport": transport,
3984                })
3985            })
3986            .map_err(|e| e.to_string())
3987    }
3988
3989    /// Disconnect a peer via the control API.
3990    ///
3991    /// Removes the peer and suppresses auto-reconnect.
3992    pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
3993        let peer_identity =
3994            PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
3995        let node_addr = *peer_identity.node_addr();
3996
3997        if !self.peers.contains_key(&node_addr) {
3998            return Err(format!("peer not found: {npub}"));
3999        }
4000
4001        // Remove the peer (full cleanup: sessions, indices, links, tree, bloom)
4002        self.remove_active_peer(&node_addr);
4003
4004        // Suppress any pending auto-reconnect
4005        self.retry_pending.remove(&node_addr);
4006
4007        info!(npub = %npub, "API disconnect completed");
4008
4009        Ok(serde_json::json!({
4010            "npub": npub,
4011            "disconnected": true,
4012        }))
4013    }
4014
4015    /// Adopt an already-established UDP traversal and start the normal FIPS
4016    /// Noise handshake over it.
4017    ///
4018    /// This is intended for integration with an external rendezvous runtime
4019    /// that has already completed relay signaling, STUN observation, and UDP
4020    /// hole punching. After handoff, the adopted socket is owned by FIPS.
4021    pub async fn adopt_established_traversal(
4022        &mut self,
4023        traversal: EstablishedTraversal,
4024    ) -> Result<BootstrapHandoffResult, NodeError> {
4025        debug!(
4026            peer_npub = %traversal.peer_npub,
4027            session_id = %traversal.session_id,
4028            remote_addr = %traversal.remote_addr,
4029            "adopting established traversal socket"
4030        );
4031
4032        if !self.state.is_operational() {
4033            return Err(NodeError::NotStarted);
4034        }
4035
4036        let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
4037        let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
4038            NodeError::InvalidPeerNpub {
4039                npub: traversal.peer_npub.clone(),
4040                reason: e.to_string(),
4041            }
4042        })?;
4043        let peer_node_addr = *peer_identity.node_addr();
4044        if self.peers.contains_key(&peer_node_addr) {
4045            debug!(
4046                peer_npub = %traversal.peer_npub,
4047                "Adopting NAT traversal handoff as alternate path for already-connected peer"
4048            );
4049        }
4050
4051        self.peer_aliases
4052            .insert(peer_node_addr, peer_identity.short_npub());
4053        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
4054
4055        let transport_id = self.allocate_transport_id();
4056        // Adopted ephemeral UDP transports inherit MTU + socket-buffer sizing
4057        // (and accept_connections / advertise flags) from the operator's
4058        // configured [transports.udp] when the bootstrap runtime doesn't
4059        // pass an explicit override. Lookup tries `transport_name` first
4060        // (covers the `Named` multi-listener variant) and falls back to the
4061        // unnamed `Single` listener, so single- and named-listener configs
4062        // both inherit cleanly.
4063        //
4064        // Tradeoff: `UdpConfig::default()` sets MTU 1280 (IPv6 minimum), the
4065        // only value guaranteed to survive arbitrary middlebox paths.
4066        // Inheriting a higher operator-chosen MTU means NAT-traversed flows
4067        // initially attempt that MTU and may black-hole on tighter paths
4068        // until reactive `MtuExceeded` recovery kicks in. Operators who
4069        // raise the primary MTU based on known-clean topology accept that
4070        // tradeoff; the silent drop on a too-low default was strictly
4071        // worse for the common case where the primary MTU is reachable.
4072        //
4073        // Bind / external address fields are cleared since the socket is
4074        // already bound.
4075        let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
4076            let mut cfg = self
4077                .lookup_udp_config(traversal.transport_name.as_deref())
4078                .or_else(|| self.lookup_udp_config(None))
4079                .cloned()
4080                .unwrap_or_default();
4081            cfg.bind_addr = None;
4082            cfg.external_addr = None;
4083            cfg
4084        });
4085        let mut transport = crate::transport::udp::UdpTransport::new(
4086            transport_id,
4087            traversal.transport_name.clone(),
4088            inherited_config,
4089            packet_tx,
4090        );
4091
4092        transport
4093            .adopt_socket_async(traversal.socket)
4094            .await
4095            .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
4096
4097        let local_addr = transport.local_addr().ok_or_else(|| {
4098            NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
4099        })?;
4100
4101        self.transports.insert(
4102            transport_id,
4103            crate::transport::TransportHandle::Udp(transport),
4104        );
4105        self.bootstrap_transports.insert(transport_id);
4106        self.bootstrap_transport_npubs
4107            .insert(transport_id, traversal.peer_npub.clone());
4108
4109        let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
4110        if let Err(err) = self
4111            .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
4112            .await
4113        {
4114            self.bootstrap_transports.remove(&transport_id);
4115            self.bootstrap_transport_npubs.remove(&transport_id);
4116            if let Some(mut handle) = self.transports.remove(&transport_id) {
4117                let _ = handle.stop().await;
4118            }
4119            return Err(err);
4120        }
4121
4122        info!(
4123            peer = %self.peer_display_name(&peer_node_addr),
4124            transport_id = %transport_id,
4125            local_addr = %local_addr,
4126            remote_addr = %traversal.remote_addr,
4127            session_id = %traversal.session_id,
4128            "adopted NAT traversal socket; handshake initiated"
4129        );
4130
4131        Ok(BootstrapHandoffResult {
4132            transport_id,
4133            local_addr,
4134            remote_addr: traversal.remote_addr,
4135            peer_node_addr,
4136            session_id: traversal.session_id,
4137        })
4138    }
4139}