Skip to main content

fips_core/node/
lifecycle.rs

1//! Node lifecycle management: start, stop, and peer connection initiation.
2
3use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, NostrDiscoveryPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6    ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, MeshTraversalSignal, NostrDiscovery,
7    OverlayAdvert, OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason, SessionMessageType};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use secp256k1::PublicKey;
18use std::collections::{HashMap, HashSet};
19use std::net::{IpAddr, SocketAddr};
20use std::thread;
21use std::time::Duration;
22use tracing::{debug, info, warn};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25enum MeshSignalSessionAction {
26    Send,
27    Defer,
28    Drop,
29}
30
31#[cfg(debug_assertions)]
32fn node_start_debug_log(message: impl AsRef<str>) {
33    use std::io::Write as _;
34
35    if let Ok(mut file) = std::fs::OpenOptions::new()
36        .create(true)
37        .append(true)
38        .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
39    {
40        let _ = writeln!(
41            file,
42            "{:?} {}",
43            std::time::SystemTime::now(),
44            message.as_ref()
45        );
46    }
47}
48
49#[cfg(not(debug_assertions))]
50fn node_start_debug_log(_message: impl AsRef<str>) {}
51
52/// True if `ip` is not a viable canonical advert endpoint for peers off
53/// the publisher's own LAN. Covers RFC1918, loopback, link-local, IPv4
54/// CGNAT (100.64/10), unspecified, multicast/benchmark, and IPv6
55/// unique-local/loopback/unspecified. We never publish these as the
56/// peer's primary `runtime_endpoint`; an off-LAN consumer can't route
57/// to them, and latching one in onto a slow overlay-relay fallback is
58/// the original bug this guard exists to prevent.
59fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
60    match ip {
61        IpAddr::V4(v4) => {
62            v4.is_private()
63                || v4.is_loopback()
64                || v4.is_link_local()
65                || v4.is_unspecified()
66                || v4.is_multicast()
67                || v4.is_broadcast()
68                || v4.is_documentation()
69                // 100.64.0.0/10 — CGNAT, RFC 6598. Not routable on the
70                // public internet; behaves like an extra NAT layer.
71                || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
72        }
73        IpAddr::V6(v6) => {
74            v6.is_loopback()
75                || v6.is_unspecified()
76                || v6.is_unique_local()
77                || v6.is_multicast()
78                // IPv6 link-local: fe80::/10
79                || (v6.segments()[0] & 0xffc0) == 0xfe80
80        }
81    }
82}
83
84fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
85    matches!(
86        (local, remote),
87        (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
88    )
89}
90
91const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
92const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
93const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
94const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
95
96impl Node {
97    /// Initiate connections to configured static peers.
98    ///
99    /// For each peer configured with AutoConnect policy, creates a link and
100    /// peer entry, then starts the Noise handshake by sending the first message.
101    /// Replace the runtime peer list. Newly added auto-connect peers get
102    /// `initiate_peer_connection` immediately; removed peers are dropped from
103    /// the retry queue (the regular liveness timeout reaps any active session
104    /// — we don't proactively disconnect, since the same npub might be on its
105    /// way back via a fresh advert). Existing auto-connect entries with new
106    /// direct addresses are dialed immediately, and retry state is refreshed
107    /// so later attempts also use the latest hints.
108    pub(super) async fn update_peers(
109        &mut self,
110        new_peers: Vec<crate::config::PeerConfig>,
111    ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
112        use std::collections::{HashMap, HashSet};
113
114        let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
115            HashMap::with_capacity(new_peers.len());
116        let mut new_order = Vec::with_capacity(new_peers.len());
117        for peer in new_peers {
118            let identity = match PeerIdentity::from_npub(&peer.npub) {
119                Ok(id) => id,
120                Err(e) => {
121                    return Err(crate::node::NodeError::InvalidPeerNpub {
122                        npub: peer.npub.clone(),
123                        reason: e.to_string(),
124                    });
125                }
126            };
127            // Last-write-wins on duplicates so callers passing a multi-source
128            // candidate list (e.g. operator hints + recent-peers cache for
129            // the same npub) get the merge they meant.
130            let node_addr = *identity.node_addr();
131            if !new_by_addr.contains_key(&node_addr) {
132                new_order.push(node_addr);
133            }
134            new_by_addr.insert(node_addr, peer);
135        }
136
137        let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
138            .config
139            .peers()
140            .iter()
141            .filter_map(|pc| {
142                PeerIdentity::from_npub(&pc.npub)
143                    .ok()
144                    .map(|id| (*id.node_addr(), pc.clone()))
145            })
146            .collect();
147
148        let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
149        let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
150
151        let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
152        let added: Vec<_> = new_addrs.difference(&current_addrs).copied().collect();
153        let kept: Vec<_> = new_addrs.intersection(&current_addrs).copied().collect();
154
155        let mut outcome = crate::node::UpdatePeersOutcome::default();
156
157        for node_addr in &removed {
158            if self.retry_pending.remove(node_addr).is_some() {
159                debug!(
160                    peer = %self.peer_display_name(node_addr),
161                    "Dropping retry entry for peer removed from runtime peer list"
162                );
163            }
164            self.peer_aliases.remove(node_addr);
165            self.set_discovery_fallback_transit_allowed(*node_addr, false);
166            outcome.removed += 1;
167        }
168
169        let mut auto_connect_refresh_configs = Vec::new();
170        for node_addr in &kept {
171            let new_pc = &new_by_addr[node_addr];
172            let current_pc = &current_by_addr[node_addr];
173            if new_pc.addresses != current_pc.addresses
174                || new_pc.alias != current_pc.alias
175                || new_pc.connect_policy != current_pc.connect_policy
176                || new_pc.auto_reconnect != current_pc.auto_reconnect
177                || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
178            {
179                outcome.updated += 1;
180                self.set_discovery_fallback_transit_allowed(
181                    *node_addr,
182                    new_pc.discovery_fallback_transit,
183                );
184                if let Some(state) = self.retry_pending.get_mut(node_addr) {
185                    state.peer_config = new_pc.clone();
186                    state.retry_after_ms = Self::now_ms();
187                }
188                if let Some(alias) = new_pc.alias.clone() {
189                    self.peer_aliases.insert(*node_addr, alias);
190                }
191                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
192                    auto_connect_refresh_configs.push(new_pc.clone());
193                }
194            } else {
195                outcome.unchanged += 1;
196                self.set_discovery_fallback_transit_allowed(
197                    *node_addr,
198                    new_pc.discovery_fallback_transit,
199                );
200                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
201                    auto_connect_refresh_configs.push(new_pc.clone());
202                }
203            }
204        }
205
206        let added_configs: Vec<crate::config::PeerConfig> = new_order
207            .iter()
208            .filter(|addr| added.contains(addr))
209            .map(|addr| new_by_addr[addr].clone())
210            .collect();
211
212        // Replace the live config peer list before initiating connections so
213        // any helper that consults `self.config.peers()` during the dial
214        // (alias lookup, retry-state seeding) sees the new entries.
215        self.config.peers = new_order
216            .iter()
217            .filter_map(|addr| new_by_addr.get(addr).cloned())
218            .collect();
219        self.configured_peer_send_weights = Self::configured_peer_send_weights(&self.config);
220
221        for peer_config in added_configs {
222            outcome.added += 1;
223            let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
224                continue;
225            };
226            let name = peer_config
227                .alias
228                .clone()
229                .unwrap_or_else(|| identity.short_npub());
230            self.peer_aliases.insert(*identity.node_addr(), name);
231            self.set_discovery_fallback_transit_allowed(
232                *identity.node_addr(),
233                peer_config.discovery_fallback_transit,
234            );
235            self.register_identity(*identity.node_addr(), identity.pubkey_full());
236
237            if !peer_config.is_auto_connect() {
238                continue;
239            }
240
241            match self
242                .try_auto_connect_graph_session(&peer_config, identity)
243                .await
244            {
245                Ok(true) => continue,
246                Ok(false) => {}
247                Err(err) => {
248                    debug!(
249                        npub = %peer_config.npub,
250                        error = %err,
251                        "Existing FIPS graph did not warm newly added peer"
252                    );
253                }
254            }
255
256            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
257                warn!(
258                    npub = %peer_config.npub,
259                    error = %e,
260                    "Failed to initiate connection for newly added peer"
261                );
262                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
263                    self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
264                }
265                if matches!(e, crate::node::NodeError::NoTransportForType(_))
266                    && let Some(bootstrap) = self.nostr_discovery.clone()
267                {
268                    bootstrap
269                        .request_advert_stale_check(peer_config.npub.clone())
270                        .await;
271                }
272            }
273        }
274
275        for peer_config in auto_connect_refresh_configs {
276            let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
277                continue;
278            };
279            let node_addr = *peer_identity.node_addr();
280
281            if self.peers.contains_key(&node_addr) {
282                match self
283                    .initiate_active_peer_alternative_connection(&peer_config)
284                    .await
285                {
286                    Ok(attempted) => {
287                        if attempted {
288                            debug!(
289                                peer = %self.peer_display_name(&node_addr),
290                                "Started non-disruptive alternate-path handshake for active peer"
291                            );
292                        }
293                    }
294                    Err(e) => {
295                        debug!(
296                            npub = %peer_config.npub,
297                            error = %e,
298                            "Active peer alternate-path refresh did not start"
299                        );
300                    }
301                }
302                continue;
303            }
304
305            match self
306                .try_auto_connect_graph_session(&peer_config, peer_identity)
307                .await
308            {
309                Ok(true) => continue,
310                Ok(false) => {}
311                Err(err) => {
312                    debug!(
313                        npub = %peer_config.npub,
314                        error = %err,
315                        "Existing FIPS graph did not warm refreshed peer"
316                    );
317                }
318            }
319
320            match self.initiate_peer_connection(&peer_config).await {
321                Ok(()) => {
322                    let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
323                    if let Some(state) = self.retry_pending.get_mut(&node_addr) {
324                        state.peer_config = peer_config;
325                        state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
326                    }
327                }
328                Err(e) => {
329                    debug!(
330                        npub = %peer_config.npub,
331                        error = %e,
332                        "Refreshed peer addresses did not initiate a direct connection"
333                    );
334                    self.schedule_retry_after_error(node_addr, Self::now_ms(), &e);
335                }
336            }
337        }
338
339        self.warm_auto_connect_graph_sessions().await;
340
341        Ok(outcome)
342    }
343
344    pub(super) async fn initiate_peer_connections(&mut self) {
345        // Build display name map from all configured peers (alias or short npub),
346        // and pre-seed the identity cache from each peer's npub so that TUN packets
347        // addressed to a configured peer can be dispatched (and trigger session
348        // initiation) immediately on startup — without waiting for the link-layer
349        // handshake to complete first.
350        let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
351            .config
352            .peers()
353            .iter()
354            .filter_map(|pc| {
355                PeerIdentity::from_npub(&pc.npub)
356                    .ok()
357                    .map(|id| (id, pc.alias.clone()))
358            })
359            .collect();
360
361        for (identity, alias) in peer_identities {
362            let name = alias.unwrap_or_else(|| identity.short_npub());
363            self.peer_aliases.insert(*identity.node_addr(), name);
364            // Pre-seed identity cache. The parity may be wrong (npub is x-only)
365            // but will be corrected to the real value when the peer is promoted
366            // after a successful Noise handshake.
367            self.register_identity(*identity.node_addr(), identity.pubkey_full());
368        }
369
370        // Collect peer configs to avoid borrow conflicts
371        let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
372
373        if peer_configs.is_empty() {
374            debug!("No static peers configured");
375            return;
376        }
377
378        debug!(
379            count = peer_configs.len(),
380            "Initiating static peer connections"
381        );
382
383        for peer_config in peer_configs {
384            let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
385                Ok(identity) => identity,
386                Err(_) => continue,
387            };
388            match self
389                .try_auto_connect_graph_session(&peer_config, peer_identity)
390                .await
391            {
392                Ok(true) => continue,
393                Ok(false) => {}
394                Err(err) => {
395                    debug!(
396                        npub = %peer_config.npub,
397                        error = %err,
398                        "Existing FIPS graph did not warm auto-connect peer"
399                    );
400                }
401            }
402            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
403                warn!(
404                    npub = %peer_config.npub,
405                    alias = ?peer_config.alias,
406                    error = %e,
407                    "Failed to initiate peer connection"
408                );
409                // Schedule a retry so transient address-resolution failures
410                // (e.g. cached endpoints stale, NAT rebinds, all addresses
411                // currently unreachable) recover without a daemon restart.
412                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
413                    self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
414                }
415                // No-transport failures most often mean the cached overlay
416                // advert is pointing at a dead post-NAT-rebind address. The
417                // advert cache is read-only inside fetch_advert, so retries
418                // would loop on the same dead address until expiry. Force a
419                // re-fetch so the next retry tick picks up fresh endpoints.
420                if matches!(e, crate::node::NodeError::NoTransportForType(_))
421                    && let Some(bootstrap) = self.nostr_discovery.clone()
422                {
423                    bootstrap
424                        .request_advert_stale_check(peer_config.npub.clone())
425                        .await;
426                }
427            }
428        }
429
430        self.warm_auto_connect_graph_sessions().await;
431    }
432
433    pub(in crate::node) async fn try_auto_connect_graph_session(
434        &mut self,
435        peer_config: &PeerConfig,
436        peer_identity: PeerIdentity,
437    ) -> Result<bool, NodeError> {
438        if !peer_config.is_auto_connect() {
439            return Ok(false);
440        }
441
442        let peer_node_addr = *peer_identity.node_addr();
443        if self
444            .peers
445            .get(&peer_node_addr)
446            .is_some_and(|peer| peer.can_send())
447        {
448            return Ok(false);
449        }
450        if self.auto_connect_should_race_direct_path(peer_config) {
451            return Ok(false);
452        }
453        if self
454            .sessions
455            .get(&peer_node_addr)
456            .is_some_and(|entry| entry.is_established() || entry.is_initiating())
457        {
458            return Ok(true);
459        }
460        if self.find_next_hop(&peer_node_addr).is_none() {
461            return Ok(false);
462        }
463
464        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
465        match self
466            .initiate_session(peer_node_addr, peer_identity.pubkey_full())
467            .await
468        {
469            Ok(()) => {
470                debug!(
471                    peer = %self.peer_display_name(&peer_node_addr),
472                    "Warmed auto-connect peer session over existing FIPS graph"
473                );
474                Ok(true)
475            }
476            Err(NodeError::SendFailed { node_addr, reason })
477                if node_addr == peer_node_addr && reason == "no route to destination" =>
478            {
479                self.maybe_initiate_lookup(&peer_node_addr).await;
480                Ok(false)
481            }
482            Err(err) => Err(err),
483        }
484    }
485
486    fn auto_connect_should_race_direct_path(&self, peer_config: &PeerConfig) -> bool {
487        !peer_config.addresses.is_empty() || self.config.node.discovery.nostr.enabled
488    }
489
490    /// Initiate a connection to a single peer.
491    ///
492    /// Creates a link, starts the Noise handshake, and sends the first message.
493    pub(super) async fn initiate_peer_connection(
494        &mut self,
495        peer_config: &crate::config::PeerConfig,
496    ) -> Result<(), NodeError> {
497        self.initiate_peer_connection_inner(peer_config).await
498    }
499
500    /// Initiate a connection from the retry path. Identical to
501    /// [`initiate_peer_connection`] today — both paths fan out across every
502    /// known address (explicit priority first, then freshness) in a single
503    /// pass. The two entry points stay separate so callers can be distinguished
504    /// in tracing.
505    pub(super) async fn initiate_peer_retry_connection(
506        &mut self,
507        peer_config: &crate::config::PeerConfig,
508    ) -> Result<(), NodeError> {
509        self.initiate_peer_connection_inner(peer_config).await
510    }
511
512    pub(in crate::node) async fn initiate_active_peer_alternative_connection(
513        &mut self,
514        peer_config: &crate::config::PeerConfig,
515    ) -> Result<bool, NodeError> {
516        self.initiate_active_peer_alternative_connection_inner(peer_config, false)
517            .await
518    }
519
520    pub(in crate::node) async fn initiate_active_peer_direct_refresh_connection(
521        &mut self,
522        peer_config: &crate::config::PeerConfig,
523    ) -> Result<bool, NodeError> {
524        self.initiate_active_peer_alternative_connection_inner(peer_config, true)
525            .await
526    }
527
528    async fn initiate_active_peer_alternative_connection_inner(
529        &mut self,
530        peer_config: &crate::config::PeerConfig,
531        allow_same_path_refresh: bool,
532    ) -> Result<bool, NodeError> {
533        let peer_identity =
534            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
535                npub: peer_config.npub.clone(),
536                reason: e.to_string(),
537            })?;
538        let peer_node_addr = *peer_identity.node_addr();
539
540        if !self.peers.contains_key(&peer_node_addr) {
541            self.initiate_peer_connection(peer_config).await?;
542            return Ok(true);
543        }
544
545        // Keep the current link live and race fresh concrete candidates.
546        // Cross-connection resolution still decides which replacement link
547        // wins if both peers try the same upgrade; the important part here is
548        // that a stale path does not depend on the remote peer receiving our
549        // hint first before either side attempts the better address.
550        self.try_active_peer_alternative_addresses(
551            peer_config,
552            peer_identity,
553            allow_same_path_refresh,
554        )
555        .await
556    }
557
558    async fn initiate_peer_connection_inner(
559        &mut self,
560        peer_config: &crate::config::PeerConfig,
561    ) -> Result<(), NodeError> {
562        // Parse the peer's npub to get their identity
563        let peer_identity =
564            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
565                npub: peer_config.npub.clone(),
566                reason: e.to_string(),
567            })?;
568
569        let peer_node_addr = *peer_identity.node_addr();
570
571        // Check if peer already exists (fully authenticated)
572        if self.peers.contains_key(&peer_node_addr) {
573            debug!(
574                npub = %peer_config.npub,
575                "Peer already exists, skipping"
576            );
577            return Ok(());
578        }
579
580        self.try_peer_addresses(peer_config, peer_identity, true)
581            .await
582    }
583
584    fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
585        self.connections.values().any(|conn| {
586            conn.expected_identity()
587                .map(|id| id.node_addr() == peer_node_addr)
588                .unwrap_or(false)
589        })
590    }
591
592    fn is_connecting_to_peer_on_path(
593        &self,
594        peer_node_addr: &NodeAddr,
595        transport_id: TransportId,
596        remote_addr: &TransportAddr,
597    ) -> bool {
598        self.connections.values().any(|conn| {
599            conn.expected_identity()
600                .map(|id| id.node_addr() == peer_node_addr)
601                .unwrap_or(false)
602                && conn.transport_id() == Some(transport_id)
603                && conn.source_addr() == Some(remote_addr)
604        }) || self.pending_connects.iter().any(|pending| {
605            pending.peer_identity.node_addr() == peer_node_addr
606                && pending.transport_id == transport_id
607                && &pending.remote_addr == remote_addr
608        })
609    }
610
611    pub(in crate::node) fn should_warm_auto_connect_session(
612        &self,
613        peer_node_addr: &NodeAddr,
614    ) -> bool {
615        if self
616            .peers
617            .get(peer_node_addr)
618            .is_some_and(|peer| peer.can_send())
619            || self
620                .sessions
621                .get(peer_node_addr)
622                .is_some_and(|entry| entry.is_established())
623        {
624            return false;
625        }
626
627        self.config.peers().iter().any(|peer| {
628            peer.is_auto_connect()
629                && PeerIdentity::from_npub(&peer.npub)
630                    .map(|identity| identity.node_addr() == peer_node_addr)
631                    .unwrap_or(false)
632        })
633    }
634
635    pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
636        if !self.peers.values().any(|peer| peer.can_send()) {
637            return 0;
638        }
639
640        let mut budget = self.graph_session_warmup_budget();
641        if budget == 0 {
642            return 0;
643        }
644
645        let peer_identities: Vec<_> = self
646            .config
647            .auto_connect_peers()
648            .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
649            .collect();
650
651        let mut warmed = 0;
652        for identity in peer_identities {
653            if budget == 0 {
654                break;
655            }
656
657            let peer_node_addr = *identity.node_addr();
658            if peer_node_addr == *self.identity.node_addr()
659                || !self.should_warm_auto_connect_session(&peer_node_addr)
660                || self
661                    .sessions
662                    .get(&peer_node_addr)
663                    .is_some_and(|entry| entry.is_initiating())
664            {
665                continue;
666            }
667
668            self.register_identity(peer_node_addr, identity.pubkey_full());
669
670            if self.find_next_hop(&peer_node_addr).is_some() {
671                match self
672                    .initiate_session(peer_node_addr, identity.pubkey_full())
673                    .await
674                {
675                    Ok(()) => {
676                        warmed += 1;
677                        budget = budget.saturating_sub(1);
678                        debug!(
679                            peer = %self.peer_display_name(&peer_node_addr),
680                            "Warmed auto-connect peer session over existing FIPS graph"
681                        );
682                    }
683                    Err(NodeError::SendFailed { node_addr, reason })
684                        if node_addr == peer_node_addr && reason == "no route to destination" =>
685                    {
686                        self.maybe_initiate_lookup(&peer_node_addr).await;
687                        warmed += 1;
688                        budget = budget.saturating_sub(1);
689                    }
690                    Err(err) => {
691                        debug!(
692                            peer = %self.peer_display_name(&peer_node_addr),
693                            error = %err,
694                            "Failed to warm auto-connect peer session"
695                        );
696                    }
697                }
698            } else {
699                self.maybe_initiate_lookup(&peer_node_addr).await;
700                warmed += 1;
701                budget = budget.saturating_sub(1);
702            }
703        }
704
705        warmed
706    }
707
708    pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
709        let max_destinations = self.config.node.session.pending_max_destinations;
710        if max_destinations == 0 {
711            return 0;
712        }
713
714        let pending_sessions = self
715            .sessions
716            .values()
717            .filter(|entry| !entry.is_established())
718            .count();
719        let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
720        max_destinations
721            .saturating_sub(pending_total)
722            .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
723    }
724
725    fn outbound_handshake_slots(&self) -> usize {
726        let used = self
727            .connections
728            .len()
729            .saturating_add(self.pending_connects.len());
730        if self.max_connections == 0 {
731            usize::MAX
732        } else {
733            self.max_connections.saturating_sub(used)
734        }
735    }
736
737    fn outbound_link_slots(&self) -> usize {
738        if self.max_links == 0 {
739            usize::MAX
740        } else {
741            self.max_links.saturating_sub(self.links.len())
742        }
743    }
744
745    fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
746        if !self.peers.contains_key(peer_node_addr)
747            && self.max_peers > 0
748            && self.peers.len() >= self.max_peers
749        {
750            return 0;
751        }
752
753        let in_flight_for_peer = self
754            .connections
755            .values()
756            .filter(|conn| {
757                conn.expected_identity()
758                    .map(|id| id.node_addr() == peer_node_addr)
759                    .unwrap_or(false)
760            })
761            .count()
762            .saturating_add(
763                self.pending_connects
764                    .iter()
765                    .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
766                    .count(),
767            );
768
769        self.outbound_handshake_slots()
770            .min(self.outbound_link_slots())
771            .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
772    }
773
774    fn discovery_connect_budget(&self) -> usize {
775        self.outbound_handshake_slots()
776            .min(self.outbound_link_slots())
777            .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
778    }
779
780    /// Find a UDP transport whose bound socket can send to `remote_addr`.
781    ///
782    /// LAN discovery can surface both IPv4 and IPv6 addresses for the same
783    /// service. A wildcard IPv4 socket cannot send to an IPv6 link-local
784    /// target, and vice versa, so callers must choose by socket family rather
785    /// than by transport type alone.
786    fn find_udp_transport_for_remote_addr(
787        &self,
788        remote_addr: SocketAddr,
789    ) -> Option<(TransportId, SocketAddr)> {
790        self.transports
791            .iter()
792            .filter(|(id, handle)| {
793                handle.transport_type().name == "udp"
794                    && handle.is_operational()
795                    && !self.bootstrap_transports.contains(id)
796            })
797            .filter_map(|(id, handle)| {
798                let local_addr = handle.local_addr()?;
799                socket_addr_families_compatible(local_addr, remote_addr)
800                    .then_some((*id, local_addr))
801            })
802            .min_by_key(|(id, _)| id.as_u32())
803    }
804
805    pub(super) fn transport_discovery_candidate(
806        &self,
807        discovered_transport_id: TransportId,
808        discovered_addr: TransportAddr,
809    ) -> Option<(TransportId, TransportAddr, &'static str)> {
810        let transport = self.transports.get(&discovered_transport_id)?;
811        let transport_name = transport.transport_type().name;
812
813        if transport_name != "udp" {
814            return Some((discovered_transport_id, discovered_addr, transport_name));
815        }
816
817        let Some(remote_socket_addr) = discovered_addr
818            .as_str()
819            .and_then(|addr| addr.parse::<SocketAddr>().ok())
820        else {
821            if self.bootstrap_transports.contains(&discovered_transport_id) {
822                debug!(
823                    transport_id = %discovered_transport_id,
824                    remote_addr = %discovered_addr,
825                    "transport discovery: skip non-numeric UDP address from bootstrap transport"
826                );
827                return None;
828            }
829            return Some((discovered_transport_id, discovered_addr, transport_name));
830        };
831
832        let Some((transport_id, local_addr)) =
833            self.find_udp_transport_for_remote_addr(remote_socket_addr)
834        else {
835            debug!(
836                transport_id = %discovered_transport_id,
837                remote_addr = %discovered_addr,
838                "transport discovery: skip UDP peer with no compatible local socket"
839            );
840            return None;
841        };
842
843        if transport_id != discovered_transport_id {
844            debug!(
845                discovered_transport_id = %discovered_transport_id,
846                selected_transport_id = %transport_id,
847                local_addr = %local_addr,
848                remote_addr = %remote_socket_addr,
849                "transport discovery: selected compatible UDP transport"
850            );
851        }
852
853        Some((
854            transport_id,
855            TransportAddr::from_socket_addr(remote_socket_addr),
856            transport_name,
857        ))
858    }
859
860    fn peer_address_string_for_transport_candidate(
861        &self,
862        transport_id: TransportId,
863        transport_name: &str,
864        remote_addr: &TransportAddr,
865    ) -> String {
866        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
867        let _ = (transport_id, transport_name);
868
869        #[cfg(any(target_os = "linux", target_os = "macos"))]
870        if transport_name == "ethernet"
871            && remote_addr.as_bytes().len() == 6
872            && let Some(interface) = self
873                .transports
874                .get(&transport_id)
875                .and_then(|transport| transport.interface_name())
876        {
877            let mut mac = [0u8; 6];
878            mac.copy_from_slice(remote_addr.as_bytes());
879            return format!(
880                "{interface}/{}",
881                crate::transport::ethernet::format_mac(&mac)
882            );
883        }
884
885        remote_addr.to_string()
886    }
887
888    fn resolve_peer_address_for_match(
889        &self,
890        candidate: &PeerAddress,
891    ) -> Option<(TransportId, TransportAddr)> {
892        if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
893            return None;
894        }
895
896        if candidate.transport == "ethernet" {
897            return self.resolve_ethernet_addr(&candidate.addr).ok();
898        }
899
900        if candidate.transport == "ble" {
901            #[cfg(bluer_available)]
902            {
903                return self.resolve_ble_addr(&candidate.addr).ok();
904            }
905            #[cfg(not(bluer_available))]
906            {
907                return None;
908            }
909        }
910
911        let transport_id = if candidate.transport == "udp"
912            && let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
913        {
914            self.find_udp_transport_for_remote_addr(remote_socket_addr)
915                .map(|(id, _)| id)?
916        } else {
917            self.find_transport_for_type(&candidate.transport)?
918        };
919
920        Some((transport_id, TransportAddr::from_string(&candidate.addr)))
921    }
922
923    /// Initiate a connection to a peer on a specific transport and address.
924    ///
925    /// For connectionless transports (UDP, Ethernet): allocates a link, starts
926    /// the Noise IK handshake, sends msg1, and registers the connection for
927    /// msg2 dispatch.
928    ///
929    /// For connection-oriented transports (TCP, Tor): allocates a link and
930    /// starts a non-blocking transport connect. The handshake is deferred
931    /// until the transport connection is established — the tick handler
932    /// polls `connection_state()` and initiates the handshake when ready.
933    pub(super) async fn initiate_connection(
934        &mut self,
935        transport_id: TransportId,
936        remote_addr: TransportAddr,
937        peer_identity: PeerIdentity,
938    ) -> Result<(), NodeError> {
939        let peer_node_addr = *peer_identity.node_addr();
940
941        if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
942            debug!(
943                peer = %self.peer_display_name(&peer_node_addr),
944                transport_id = %transport_id,
945                remote_addr = %remote_addr,
946                "Connection already in progress for candidate path"
947            );
948            return Ok(());
949        }
950
951        if self.outbound_handshake_slots() == 0 {
952            return Err(NodeError::MaxConnectionsExceeded {
953                max: self.max_connections,
954            });
955        }
956
957        if self.outbound_link_slots() == 0 {
958            return Err(NodeError::MaxLinksExceeded {
959                max: self.max_links,
960            });
961        }
962
963        if !self.peers.contains_key(&peer_node_addr)
964            && self.max_peers > 0
965            && self.peers.len() >= self.max_peers
966        {
967            return Err(NodeError::MaxPeersExceeded {
968                max: self.max_peers,
969            });
970        }
971
972        self.authorize_peer(
973            &peer_identity,
974            PeerAclContext::OutboundConnect,
975            transport_id,
976            &remote_addr,
977        )?;
978
979        let is_connection_oriented = self
980            .transports
981            .get(&transport_id)
982            .map(|t| t.transport_type().connection_oriented)
983            .unwrap_or(false);
984
985        // Allocate link ID and create link
986        let link_id = self.allocate_link_id();
987
988        let link = if is_connection_oriented {
989            Link::new(
990                link_id,
991                transport_id,
992                remote_addr.clone(),
993                LinkDirection::Outbound,
994                Duration::from_millis(self.config.node.base_rtt_ms),
995            )
996        } else {
997            Link::connectionless(
998                link_id,
999                transport_id,
1000                remote_addr.clone(),
1001                LinkDirection::Outbound,
1002                Duration::from_millis(self.config.node.base_rtt_ms),
1003            )
1004        };
1005
1006        self.links.insert(link_id, link);
1007
1008        // Add reverse lookup for packet dispatch
1009        self.addr_to_link
1010            .insert((transport_id, remote_addr.clone()), link_id);
1011
1012        if is_connection_oriented {
1013            // Connection-oriented: start non-blocking connect, defer handshake
1014            if let Some(transport) = self.transports.get(&transport_id) {
1015                match transport.connect(&remote_addr).await {
1016                    Ok(()) => {
1017                        debug!(
1018                            peer = %self.peer_display_name(&peer_node_addr),
1019                            transport_id = %transport_id,
1020                            remote_addr = %remote_addr,
1021                            link_id = %link_id,
1022                            "Transport connect initiated (non-blocking)"
1023                        );
1024                        self.pending_connects.push(super::PendingConnect {
1025                            link_id,
1026                            transport_id,
1027                            remote_addr,
1028                            peer_identity,
1029                        });
1030                    }
1031                    Err(e) => {
1032                        // Clean up link
1033                        self.links.remove(&link_id);
1034                        self.addr_to_link.remove(&(transport_id, remote_addr));
1035                        return Err(NodeError::from_transport_error(e));
1036                    }
1037                }
1038            }
1039            Ok(())
1040        } else {
1041            // Connectionless: proceed with immediate handshake
1042            self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
1043                .await
1044        }
1045    }
1046
1047    /// Start the Noise handshake on a link and send msg1.
1048    ///
1049    /// Called immediately for connectionless transports, or after the
1050    /// transport connection is established for connection-oriented transports.
1051    pub(super) async fn start_handshake(
1052        &mut self,
1053        link_id: LinkId,
1054        transport_id: TransportId,
1055        remote_addr: TransportAddr,
1056        peer_identity: PeerIdentity,
1057    ) -> Result<(), NodeError> {
1058        let peer_node_addr = *peer_identity.node_addr();
1059
1060        // Create connection in handshake phase (outbound knows expected identity)
1061        let current_time_ms = Self::now_ms();
1062        let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
1063
1064        // Allocate a session index for this handshake
1065        let our_index = match self.index_allocator.allocate() {
1066            Ok(idx) => idx,
1067            Err(e) => {
1068                // Clean up the link we just created
1069                self.links.remove(&link_id);
1070                self.addr_to_link.remove(&(transport_id, remote_addr));
1071                return Err(NodeError::IndexAllocationFailed(e.to_string()));
1072            }
1073        };
1074
1075        // Start the Noise handshake and get message 1
1076        let our_keypair = self.identity.keypair();
1077        let noise_msg1 =
1078            match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
1079                Ok(msg) => msg,
1080                Err(e) => {
1081                    // Clean up the index and link
1082                    let _ = self.index_allocator.free(our_index);
1083                    self.links.remove(&link_id);
1084                    self.addr_to_link.remove(&(transport_id, remote_addr));
1085                    return Err(NodeError::HandshakeFailed(e.to_string()));
1086                }
1087            };
1088
1089        // Set index and transport info on the connection
1090        connection.set_our_index(our_index);
1091        connection.set_transport_id(transport_id);
1092        connection.set_source_addr(remote_addr.clone());
1093
1094        // Build wire format msg1: [0x01][sender_idx:4 LE][noise_msg1:82]
1095        let wire_msg1 = build_msg1(our_index, &noise_msg1);
1096
1097        debug!(
1098            peer = %self.peer_display_name(&peer_node_addr),
1099            transport_id = %transport_id,
1100            remote_addr = %remote_addr,
1101            link_id = %link_id,
1102            our_index = %our_index,
1103            "Connection initiated"
1104        );
1105
1106        // Store msg1 for resend and schedule first resend
1107        let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
1108        connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
1109
1110        // Track in pending_outbound for msg2 dispatch
1111        self.pending_outbound
1112            .insert((transport_id, our_index.as_u32()), link_id);
1113        self.connections.insert(link_id, connection);
1114
1115        // Send the wire format handshake message. If the very first send fails
1116        // synchronously (for example an IPv6 candidate on an IPv4-only UDP
1117        // socket), undo this candidate so the caller can try the next address
1118        // in the same dial pass.
1119        let send_result = match self.transports.get(&transport_id) {
1120            Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
1121            None => None,
1122        };
1123        match send_result {
1124            Some(send_result) => {
1125                self.note_local_send_outcome(&send_result);
1126                match send_result {
1127                    Ok(bytes) => {
1128                        debug!(
1129                            link_id = %link_id,
1130                            our_index = %our_index,
1131                            bytes,
1132                            "Sent Noise handshake message 1 (wire format)"
1133                        );
1134                    }
1135                    Err(e) => {
1136                        warn!(
1137                            link_id = %link_id,
1138                            transport_id = %transport_id,
1139                            remote_addr = %remote_addr,
1140                            our_index = %our_index,
1141                            error = %e,
1142                            "Failed to send handshake message"
1143                        );
1144                        self.pending_outbound
1145                            .remove(&(transport_id, our_index.as_u32()));
1146                        self.connections.remove(&link_id);
1147                        self.links.remove(&link_id);
1148                        self.addr_to_link
1149                            .remove(&(transport_id, remote_addr.clone()));
1150                        let _ = self.index_allocator.free(our_index);
1151                        return Err(NodeError::from_transport_error(e));
1152                    }
1153                }
1154            }
1155            None => {
1156                self.pending_outbound
1157                    .remove(&(transport_id, our_index.as_u32()));
1158                self.connections.remove(&link_id);
1159                self.links.remove(&link_id);
1160                self.addr_to_link
1161                    .remove(&(transport_id, remote_addr.clone()));
1162                let _ = self.index_allocator.free(our_index);
1163                return Err(NodeError::TransportError(format!(
1164                    "transport {transport_id} disappeared before first handshake send"
1165                )));
1166            }
1167        }
1168
1169        Ok(())
1170    }
1171
1172    /// Poll all transports for discovered peers and auto-connect.
1173    ///
1174    /// Called from the tick handler. Iterates operational transports,
1175    /// drains their discovery buffers, and initiates connections to
1176    /// newly discovered peers (if auto_connect is enabled).
1177    pub(super) async fn poll_transport_discovery(&mut self) {
1178        // Collect discoveries first to avoid borrow conflict with self
1179        let mut to_connect = Vec::new();
1180        let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
1181        let mut connect_budget = self.discovery_connect_budget();
1182        let mut skipped_budget = 0usize;
1183
1184        for transport in self.transports.values() {
1185            if !transport.is_operational() {
1186                continue;
1187            }
1188            if !transport.auto_connect() {
1189                // Still drain the buffer so it doesn't grow unbounded
1190                let _ = transport.discover();
1191                continue;
1192            }
1193            let discovered = match transport.discover() {
1194                Ok(peers) => peers,
1195                Err(_) => continue,
1196            };
1197            for peer in discovered {
1198                let discovered_transport_id = peer.transport_id;
1199                let pubkey = match peer.pubkey_hint {
1200                    Some(pk) => pk,
1201                    None => continue,
1202                };
1203                let identity = PeerIdentity::from_pubkey(pubkey);
1204                let node_addr = *identity.node_addr();
1205
1206                // Skip self
1207                if node_addr == *self.identity.node_addr() {
1208                    continue;
1209                }
1210
1211                let Some((candidate_transport_id, remote_addr, transport_name)) =
1212                    self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1213                else {
1214                    continue;
1215                };
1216
1217                if self.peers.contains_key(&node_addr) {
1218                    let candidate = PeerAddress::new(
1219                        transport_name,
1220                        self.peer_address_string_for_transport_candidate(
1221                            candidate_transport_id,
1222                            transport_name,
1223                            &remote_addr,
1224                        ),
1225                    );
1226                    if self.active_peer_candidate_is_fresh_enough_to_skip(
1227                        &node_addr,
1228                        std::slice::from_ref(&candidate),
1229                    ) {
1230                        continue;
1231                    }
1232                    if self.is_connecting_to_peer_on_path(
1233                        &node_addr,
1234                        candidate_transport_id,
1235                        &remote_addr,
1236                    ) {
1237                        continue;
1238                    }
1239                    let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1240                    if connect_budget == 0
1241                        || self
1242                            .path_candidate_attempt_budget(&node_addr)
1243                            .saturating_sub(queued_for_peer)
1244                            == 0
1245                    {
1246                        skipped_budget = skipped_budget.saturating_add(1);
1247                        continue;
1248                    }
1249                    to_connect.push((candidate_transport_id, remote_addr, identity, true));
1250                    *queued_per_peer.entry(node_addr).or_default() += 1;
1251                    connect_budget = connect_budget.saturating_sub(1);
1252                    continue;
1253                }
1254
1255                if self.is_connecting_to_peer_on_path(
1256                    &node_addr,
1257                    candidate_transport_id,
1258                    &remote_addr,
1259                ) {
1260                    continue;
1261                }
1262
1263                let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1264                if connect_budget == 0
1265                    || self
1266                        .path_candidate_attempt_budget(&node_addr)
1267                        .saturating_sub(queued_for_peer)
1268                        == 0
1269                {
1270                    skipped_budget = skipped_budget.saturating_add(1);
1271                    continue;
1272                }
1273                to_connect.push((candidate_transport_id, remote_addr, identity, false));
1274                *queued_per_peer.entry(node_addr).or_default() += 1;
1275                connect_budget = connect_budget.saturating_sub(1);
1276            }
1277        }
1278
1279        if skipped_budget > 0 {
1280            debug!(
1281                skipped = skipped_budget,
1282                queued = to_connect.len(),
1283                "Transport discovery connect budget exhausted"
1284            );
1285        }
1286
1287        for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1288            info!(
1289                peer = %self.peer_display_name(identity.node_addr()),
1290                transport_id = %transport_id,
1291                remote_addr = %remote_addr,
1292                active_refresh,
1293                "Auto-connecting to discovered peer"
1294            );
1295            if let Err(e) = self
1296                .initiate_connection(transport_id, remote_addr, identity)
1297                .await
1298            {
1299                warn!(error = %e, "Failed to auto-connect to discovered peer");
1300            }
1301        }
1302    }
1303
1304    pub(super) async fn poll_nostr_discovery(&mut self) {
1305        let Some(bootstrap) = self.nostr_discovery.clone() else {
1306            return;
1307        };
1308
1309        bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
1310        bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
1311
1312        self.drain_nostr_mesh_signals(&bootstrap).await;
1313
1314        for event in bootstrap.drain_events().await {
1315            match event {
1316                BootstrapEvent::Established { traversal } => {
1317                    let active_refresh = PeerIdentity::from_npub(&traversal.peer_npub)
1318                        .ok()
1319                        .is_some_and(|identity| self.peers.contains_key(identity.node_addr()));
1320                    let admission_allowed = if active_refresh {
1321                        self.outbound_direct_refresh_admission_check()
1322                    } else {
1323                        self.outbound_admission_check()
1324                    };
1325                    if !admission_allowed {
1326                        debug!(
1327                            peer_npub = %traversal.peer_npub,
1328                            peers = self.peers.len(),
1329                            max_peers = self.max_peers,
1330                            active_refresh,
1331                            "Dropping established NAT traversal: at capacity"
1332                        );
1333                        continue;
1334                    }
1335                    let peer_npub = traversal.peer_npub.clone();
1336                    match self.adopt_established_traversal(traversal).await {
1337                        Ok(_) => {
1338                            info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1339                        }
1340                        Err(err) => {
1341                            warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1342                            if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1343                                self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1344                            }
1345                        }
1346                    }
1347                }
1348                BootstrapEvent::Failed {
1349                    peer_config,
1350                    reason,
1351                } => {
1352                    let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1353                        Ok(identity) => identity,
1354                        Err(_) => continue,
1355                    };
1356                    let node_addr = *peer_identity.node_addr();
1357                    let now_ms = Self::now_ms();
1358                    if self.peers.contains_key(&node_addr) {
1359                        if self.active_peer_should_keep_direct_retry(&node_addr, &peer_config) {
1360                            let decision =
1361                                bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1362                            if decision.should_warn {
1363                                warn!(
1364                                    npub = %peer_config.npub,
1365                                    error = %reason,
1366                                    consecutive_failures = decision.consecutive_failures,
1367                                    cooldown_secs = decision
1368                                        .cooldown_until_ms
1369                                        .map(|t| t.saturating_sub(now_ms) / 1000),
1370                                    "Direct-path NAT traversal upgrade failed"
1371                                );
1372                            } else {
1373                                debug!(
1374                                    npub = %peer_config.npub,
1375                                    error = %reason,
1376                                    consecutive_failures = decision.consecutive_failures,
1377                                    "Direct-path NAT traversal upgrade failed (suppressed by warn-rate-limit)"
1378                                );
1379                            }
1380                            if decision.crossed_threshold {
1381                                bootstrap
1382                                    .request_advert_stale_check(peer_config.npub.clone())
1383                                    .await;
1384                            }
1385                            self.schedule_link_dead_reprobe(node_addr, now_ms);
1386                        } else {
1387                            debug!(
1388                                npub = %peer_config.npub,
1389                                error = %reason,
1390                                "Ignoring failed NAT traversal for already-connected peer on fresh direct path"
1391                            );
1392                        }
1393                        continue;
1394                    }
1395                    if self.is_connecting_to_peer(&node_addr) {
1396                        debug!(
1397                            npub = %peer_config.npub,
1398                            error = %reason,
1399                            "Ignoring failed NAT traversal while peer handshake is already in progress"
1400                        );
1401                        continue;
1402                    }
1403
1404                    let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1405                    if decision.should_warn {
1406                        warn!(
1407                            npub = %peer_config.npub,
1408                            error = %reason,
1409                            consecutive_failures = decision.consecutive_failures,
1410                            cooldown_secs = decision
1411                                .cooldown_until_ms
1412                                .map(|t| t.saturating_sub(now_ms) / 1000),
1413                            "NAT traversal failed"
1414                        );
1415                    } else {
1416                        debug!(
1417                            npub = %peer_config.npub,
1418                            error = %reason,
1419                            consecutive_failures = decision.consecutive_failures,
1420                            "NAT traversal failed (suppressed by warn-rate-limit)"
1421                        );
1422                    }
1423
1424                    // B6: stale-advert eviction on the streak-threshold
1425                    // crossing. Fire-and-forget; the outcome is logged so
1426                    // operators can see when peers get cleaned up.
1427                    if decision.crossed_threshold {
1428                        bootstrap
1429                            .request_advert_stale_check(peer_config.npub.clone())
1430                            .await;
1431                    }
1432
1433                    if self
1434                        .try_peer_addresses(&peer_config, peer_identity, false)
1435                        .await
1436                        .is_ok()
1437                    {
1438                        continue;
1439                    }
1440
1441                    self.schedule_retry(node_addr, now_ms);
1442                    if self.nostr_cooldown_applies_to_peer_config(&peer_config)
1443                        && let Some(cooldown_until_ms) = decision.cooldown_until_ms
1444                        && let Some(state) = self.retry_pending.get_mut(&node_addr)
1445                    {
1446                        // Push the next retry past the cooldown so the
1447                        // open-discovery sweep doesn't re-enqueue and the
1448                        // per-attempt backoff doesn't fire sooner.
1449                        state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1450                    }
1451                }
1452            }
1453        }
1454
1455        self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1456            .await;
1457        self.queue_open_discovery_retries(&bootstrap).await;
1458        self.queue_active_fallback_direct_retries(&bootstrap);
1459
1460        // Advert refresh can touch STUN/public-endpoint discovery on some
1461        // configs. Drain traversal events and queue direct retries first so a
1462        // slow refresh cannot delay path recovery work already waiting on us.
1463        if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1464            debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1465        }
1466    }
1467
1468    async fn drain_nostr_mesh_signals(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
1469        let mut deferred = Vec::new();
1470
1471        for signal in bootstrap.drain_mesh_signals().await {
1472            let (peer_npub, msg_type, payload) = match &signal {
1473                MeshTraversalSignal::Offer { peer_npub, offer } => {
1474                    let payload = match serde_json::to_vec(&offer) {
1475                        Ok(payload) => payload,
1476                        Err(error) => {
1477                            debug!(
1478                                peer = %peer_npub,
1479                                error = %error,
1480                                "Failed to encode mesh traversal offer"
1481                            );
1482                            continue;
1483                        }
1484                    };
1485                    (
1486                        peer_npub.clone(),
1487                        SessionMessageType::TraversalOffer.to_byte(),
1488                        payload,
1489                    )
1490                }
1491                MeshTraversalSignal::Answer { peer_npub, answer } => {
1492                    let payload = match serde_json::to_vec(&answer) {
1493                        Ok(payload) => payload,
1494                        Err(error) => {
1495                            debug!(
1496                                peer = %peer_npub,
1497                                error = %error,
1498                                "Failed to encode mesh traversal answer"
1499                            );
1500                            continue;
1501                        }
1502                    };
1503                    (
1504                        peer_npub.clone(),
1505                        SessionMessageType::TraversalAnswer.to_byte(),
1506                        payload,
1507                    )
1508                }
1509            };
1510
1511            let peer_identity = match PeerIdentity::from_npub(&peer_npub) {
1512                Ok(identity) => identity,
1513                Err(error) => {
1514                    debug!(
1515                        peer = %peer_npub,
1516                        error = %error,
1517                        "Cannot send mesh traversal signal to invalid peer npub"
1518                    );
1519                    continue;
1520                }
1521            };
1522            let peer_addr = *peer_identity.node_addr();
1523            match self
1524                .mesh_signal_session_action(peer_addr, peer_identity.pubkey_full())
1525                .await
1526            {
1527                MeshSignalSessionAction::Send => {}
1528                MeshSignalSessionAction::Defer => {
1529                    deferred.push(signal);
1530                    continue;
1531                }
1532                MeshSignalSessionAction::Drop => continue,
1533            }
1534
1535            if let Err(error) = self.send_session_msg(&peer_addr, msg_type, &payload).await {
1536                debug!(
1537                    peer = %self.peer_display_name(&peer_addr),
1538                    error = %error,
1539                    "Failed to send mesh traversal signal"
1540                );
1541            }
1542        }
1543
1544        for signal in deferred {
1545            bootstrap.requeue_mesh_signal(signal);
1546        }
1547    }
1548
1549    async fn mesh_signal_session_action(
1550        &mut self,
1551        peer_addr: NodeAddr,
1552        peer_pubkey: PublicKey,
1553    ) -> MeshSignalSessionAction {
1554        if let Some(entry) = self.sessions.get(&peer_addr) {
1555            if entry.is_established() {
1556                return MeshSignalSessionAction::Send;
1557            }
1558            if entry.is_initiating() || entry.is_awaiting_msg3() {
1559                debug!(
1560                    peer = %self.peer_display_name(&peer_addr),
1561                    "Deferring mesh traversal signal until end-to-end session is established"
1562                );
1563                return MeshSignalSessionAction::Defer;
1564            }
1565        }
1566
1567        if self.find_next_hop(&peer_addr).is_none() {
1568            debug!(
1569                peer = %self.peer_display_name(&peer_addr),
1570                "Cannot warm mesh traversal signal session without a FIPS route"
1571            );
1572            self.maybe_initiate_lookup(&peer_addr).await;
1573            return MeshSignalSessionAction::Drop;
1574        }
1575
1576        self.register_identity(peer_addr, peer_pubkey);
1577        match self.initiate_session(peer_addr, peer_pubkey).await {
1578            Ok(()) => {
1579                debug!(
1580                    peer = %self.peer_display_name(&peer_addr),
1581                    "Warming end-to-end session for mesh traversal signal"
1582                );
1583                MeshSignalSessionAction::Defer
1584            }
1585            Err(NodeError::SendFailed { node_addr, reason })
1586                if node_addr == peer_addr && reason == "no route to destination" =>
1587            {
1588                debug!(
1589                    peer = %self.peer_display_name(&peer_addr),
1590                    "Cannot warm mesh traversal signal session without a FIPS route"
1591                );
1592                self.maybe_initiate_lookup(&peer_addr).await;
1593                MeshSignalSessionAction::Drop
1594            }
1595            Err(error) => {
1596                debug!(
1597                    peer = %self.peer_display_name(&peer_addr),
1598                    error = %error,
1599                    "Failed to warm end-to-end session for mesh traversal signal"
1600                );
1601                MeshSignalSessionAction::Drop
1602            }
1603        }
1604    }
1605
1606    /// Resolve the LAN-only discovery scope. Applications with explicit
1607    /// connectivity config can set `node.discovery.lan.scope` without
1608    /// changing the public Nostr discovery `app` tag. The older fallback
1609    /// extracts a scope from the Nostr app tag used by default scoped
1610    /// discovery.
1611    pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1612        if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1613            let scope = scope.trim();
1614            if !scope.is_empty() {
1615                return Some(scope.to_string());
1616            }
1617        }
1618
1619        let app = self.config.node.discovery.nostr.app.trim();
1620        if app.is_empty() {
1621            return None;
1622        }
1623        if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1624            let scope = rest.trim();
1625            if scope.is_empty() {
1626                None
1627            } else {
1628                Some(scope.to_string())
1629            }
1630        } else {
1631            Some(app.to_string())
1632        }
1633    }
1634
1635    pub(super) fn start_local_instance_discovery(&mut self) {
1636        if !self.config.node.discovery.local.enabled {
1637            return;
1638        }
1639        let Some(scope) = self.lan_discovery_scope() else {
1640            debug!("local instance discovery not started: no discovery scope");
1641            return;
1642        };
1643        let now_ms = Self::now_ms();
1644        match crate::discovery::local::LocalInstanceRegistry::new(
1645            self.identity.npub(),
1646            scope,
1647            &self.config.node.discovery.local,
1648            now_ms,
1649        ) {
1650            Ok(registry) => {
1651                self.local_instance_registry = Some(registry);
1652                self.local_instance_started_at_ms = Some(now_ms);
1653                self.last_local_instance_publish_ms = None;
1654                self.last_local_instance_scan_ms = None;
1655                self.publish_local_instance_record(now_ms);
1656                info!("Same-host FIPS instance discovery enabled");
1657            }
1658            Err(crate::discovery::local::LocalInstanceRegistryError::Disabled) => {
1659                debug!("same-host FIPS instance discovery disabled");
1660            }
1661            Err(err) => {
1662                debug!(error = %err, "same-host FIPS instance discovery not started");
1663            }
1664        }
1665    }
1666
1667    fn local_instance_contacts(&self) -> Vec<crate::discovery::local::LocalInstanceContact> {
1668        let mut contacts = Vec::new();
1669        for handle in self.transports.values() {
1670            if !handle.is_operational() || !handle.accept_connections() {
1671                continue;
1672            }
1673            let transport = handle.transport_type().name;
1674            if transport != "udp" && transport != "tcp" {
1675                continue;
1676            }
1677            let Some(local_addr) = handle.local_addr() else {
1678                continue;
1679            };
1680            let Some(contact) =
1681                crate::discovery::local::contact_for_transport_addr(transport, local_addr)
1682            else {
1683                continue;
1684            };
1685            if contacts
1686                .iter()
1687                .any(|existing: &crate::discovery::local::LocalInstanceContact| {
1688                    existing.transport == contact.transport && existing.addr == contact.addr
1689                })
1690            {
1691                continue;
1692            }
1693            contacts.push(contact);
1694        }
1695        contacts
1696    }
1697
1698    fn publish_local_instance_record(&mut self, now_ms: u64) {
1699        let Some(registry) = self.local_instance_registry.clone() else {
1700            return;
1701        };
1702        let contacts = self.local_instance_contacts();
1703        match registry.publish(contacts, now_ms) {
1704            Ok(()) => {
1705                self.last_local_instance_publish_ms = Some(now_ms);
1706            }
1707            Err(err) => {
1708                debug!(error = %err, "failed to publish same-host FIPS instance record");
1709            }
1710        }
1711    }
1712
1713    fn maybe_publish_local_instance_record(&mut self, now_ms: u64) {
1714        if self.local_instance_registry.is_none() {
1715            return;
1716        }
1717        let interval_ms = self.config.node.discovery.local.publish_interval_ms();
1718        let due = self
1719            .last_local_instance_publish_ms
1720            .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1721            .unwrap_or(true);
1722        if due {
1723            self.publish_local_instance_record(now_ms);
1724        }
1725    }
1726
1727    fn local_instance_scan_due(&self, now_ms: u64) -> bool {
1728        if self.local_instance_registry.is_none() {
1729            return false;
1730        }
1731        let cfg = &self.config.node.discovery.local;
1732        let interval_ms = if self
1733            .local_instance_started_at_ms
1734            .map(|started| now_ms.saturating_sub(started) <= cfg.startup_scan_duration_ms())
1735            .unwrap_or(false)
1736        {
1737            cfg.startup_scan_interval_ms()
1738        } else {
1739            cfg.scan_interval_ms()
1740        };
1741        self.last_local_instance_scan_ms
1742            .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1743            .unwrap_or(true)
1744    }
1745
1746    fn local_instance_peer_allowed(&self, identity: &PeerIdentity) -> bool {
1747        if self.config.peers().iter().any(|peer| {
1748            PeerIdentity::from_npub(&peer.npub)
1749                .map(|configured| configured.node_addr() == identity.node_addr())
1750                .unwrap_or(false)
1751        }) {
1752            return true;
1753        }
1754        self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Open
1755    }
1756
1757    fn local_instance_peer_addresses(
1758        &self,
1759        record: &crate::discovery::local::LocalInstanceRecord,
1760    ) -> Vec<PeerAddress> {
1761        let mut addresses = Vec::new();
1762        for contact in &record.contacts {
1763            if contact.transport != "udp" && contact.transport != "tcp" {
1764                continue;
1765            }
1766            let Ok(socket_addr) = contact.addr.parse::<SocketAddr>() else {
1767                debug!(
1768                    npub = %record.npub,
1769                    transport = %contact.transport,
1770                    addr = %contact.addr,
1771                    "local instance discovery: skip non-socket contact"
1772                );
1773                continue;
1774            };
1775            if !socket_addr.ip().is_loopback() {
1776                debug!(
1777                    npub = %record.npub,
1778                    addr = %contact.addr,
1779                    "local instance discovery: skip non-loopback contact"
1780                );
1781                continue;
1782            }
1783            let address =
1784                PeerAddress::with_priority(contact.transport.clone(), contact.addr.clone(), 10)
1785                    .with_seen_at_ms(record.updated_at_ms);
1786            if addresses.iter().any(|existing: &PeerAddress| {
1787                existing.transport == address.transport && existing.addr == address.addr
1788            }) {
1789                continue;
1790            }
1791            addresses.push(address);
1792        }
1793        addresses
1794    }
1795
1796    /// Scan the same-host registry only on startup and then at a low cadence.
1797    /// This avoids a per-second filesystem poll while preserving the fast path
1798    /// for processes launched around the same time.
1799    pub(super) async fn poll_local_instance_discovery(&mut self) {
1800        let Some(registry) = self.local_instance_registry.clone() else {
1801            return;
1802        };
1803        let now_ms = Self::now_ms();
1804        self.maybe_publish_local_instance_record(now_ms);
1805        if !self.local_instance_scan_due(now_ms) {
1806            return;
1807        }
1808        self.last_local_instance_scan_ms = Some(now_ms);
1809
1810        let records = match registry.scan(now_ms, self.config.node.discovery.local.stale_after_ms())
1811        {
1812            Ok(records) => records,
1813            Err(err) => {
1814                debug!(error = %err, "same-host FIPS instance scan failed");
1815                return;
1816            }
1817        };
1818        if records.is_empty() {
1819            return;
1820        }
1821
1822        let mut connect_budget = self.discovery_connect_budget();
1823        let mut skipped_budget = 0usize;
1824        for record in records {
1825            let identity = match PeerIdentity::from_npub(&record.npub) {
1826                Ok(identity) => identity,
1827                Err(err) => {
1828                    debug!(npub = %record.npub, error = %err, "local instance discovery: skip bad npub");
1829                    continue;
1830                }
1831            };
1832            let peer_node_addr = *identity.node_addr();
1833            if peer_node_addr == *self.identity.node_addr() {
1834                continue;
1835            }
1836            if !self.local_instance_peer_allowed(&identity) {
1837                debug!(
1838                    npub = %identity.short_npub(),
1839                    "local instance discovery: skip unconfigured peer"
1840                );
1841                continue;
1842            }
1843
1844            let addresses = self.local_instance_peer_addresses(&record);
1845            if addresses.is_empty() {
1846                continue;
1847            }
1848
1849            if self.peers.contains_key(&peer_node_addr)
1850                && self.active_peer_candidate_is_fresh_enough_to_skip(&peer_node_addr, &addresses)
1851            {
1852                continue;
1853            }
1854
1855            for address in addresses {
1856                let Some((transport_id, remote_addr)) =
1857                    self.resolve_peer_address_for_match(&address)
1858                else {
1859                    continue;
1860                };
1861                if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1862                    continue;
1863                }
1864                if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1865                    skipped_budget = skipped_budget.saturating_add(1);
1866                    continue;
1867                }
1868                info!(
1869                    npub = %identity.short_npub(),
1870                    transport = %address.transport,
1871                    addr = %address.addr,
1872                    "same-host FIPS instance discovery: initiating handshake"
1873                );
1874                if let Err(err) = self
1875                    .initiate_connection(transport_id, remote_addr, identity)
1876                    .await
1877                {
1878                    debug!(
1879                        npub = %record.npub,
1880                        error = %err,
1881                        "same-host FIPS instance discovery: failed to initiate connection"
1882                    );
1883                }
1884                connect_budget = connect_budget.saturating_sub(1);
1885            }
1886        }
1887        if skipped_budget > 0 {
1888            debug!(
1889                skipped = skipped_budget,
1890                "same-host FIPS instance discovery connect budget exhausted"
1891            );
1892        }
1893    }
1894
1895    /// Drain mDNS-discovered peers and initiate Noise XX handshakes. For
1896    /// active peers this is a non-disruptive alternate-path refresh: the
1897    /// current link stays live until a new handshake authenticates and
1898    /// promotes. The handshake itself is the authentication — a spoofed
1899    /// mDNS advert with someone else's npub fails the XX exchange and
1900    /// is dropped.
1901    pub(super) async fn poll_lan_discovery(&mut self) {
1902        let Some(runtime) = self.lan_discovery.clone() else {
1903            return;
1904        };
1905        let events = runtime.drain_events().await;
1906        if events.is_empty() {
1907            return;
1908        }
1909        let mut connect_budget = self.discovery_connect_budget();
1910        let mut skipped_budget = 0usize;
1911        for event in events {
1912            let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1913            let Some((transport_id, local_addr)) =
1914                self.find_udp_transport_for_remote_addr(peer.addr)
1915            else {
1916                debug!(
1917                    addr = %peer.addr,
1918                    "lan: skip discovered peer with no compatible UDP transport"
1919                );
1920                continue;
1921            };
1922            let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1923                Ok(id) => id,
1924                Err(err) => {
1925                    debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1926                    continue;
1927                }
1928            };
1929            let peer_node_addr = *identity.node_addr();
1930            let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1931            if self.peers.contains_key(&peer_node_addr) {
1932                let candidate = PeerAddress::new("udp", peer.addr.to_string());
1933                if self.active_peer_candidate_is_fresh_enough_to_skip(
1934                    &peer_node_addr,
1935                    std::slice::from_ref(&candidate),
1936                ) {
1937                    continue;
1938                }
1939                if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1940                    continue;
1941                }
1942                if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1943                    skipped_budget = skipped_budget.saturating_add(1);
1944                    continue;
1945                }
1946                info!(
1947                    npub = %identity.short_npub(),
1948                    addr = %peer.addr,
1949                    local_addr = %local_addr,
1950                    "lan: initiating alternate-path handshake to active peer"
1951                );
1952                if let Err(err) = self
1953                    .initiate_connection(transport_id, remote_addr, identity)
1954                    .await
1955                {
1956                    debug!(
1957                        npub = %peer.npub,
1958                        error = %err,
1959                        "lan: failed to initiate active peer alternate-path handshake"
1960                    );
1961                }
1962                connect_budget = connect_budget.saturating_sub(1);
1963                continue;
1964            }
1965            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1966                continue;
1967            }
1968            if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1969                skipped_budget = skipped_budget.saturating_add(1);
1970                continue;
1971            }
1972            info!(
1973                npub = %identity.short_npub(),
1974                addr = %peer.addr,
1975                local_addr = %local_addr,
1976                "lan: initiating handshake to discovered peer"
1977            );
1978            if let Err(err) = self
1979                .initiate_connection(transport_id, remote_addr, identity)
1980                .await
1981            {
1982                debug!(
1983                    npub = %peer.npub,
1984                    error = %err,
1985                    "lan: failed to initiate connection to discovered peer"
1986                );
1987            }
1988            connect_budget = connect_budget.saturating_sub(1);
1989        }
1990        if skipped_budget > 0 {
1991            debug!(
1992                skipped = skipped_budget,
1993                "lan: discovery connect budget exhausted"
1994            );
1995        }
1996    }
1997
1998    /// Poll pending transport connects and initiate handshakes for ready ones.
1999    ///
2000    /// Called from the tick handler. For each pending connect, queries the
2001    /// transport's connection state. When a connection is established,
2002    /// marks the link as Connected and starts the Noise handshake.
2003    /// Failed connections are cleaned up and scheduled for retry.
2004    pub(super) async fn poll_pending_connects(&mut self) {
2005        if self.pending_connects.is_empty() {
2006            return;
2007        }
2008
2009        let mut completed = Vec::new();
2010
2011        for (i, pending) in self.pending_connects.iter().enumerate() {
2012            let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
2013                transport.connection_state(&pending.remote_addr)
2014            } else {
2015                crate::transport::ConnectionState::Failed("transport removed".into())
2016            };
2017
2018            match state {
2019                crate::transport::ConnectionState::Connected => {
2020                    completed.push((i, true, None));
2021                }
2022                crate::transport::ConnectionState::Failed(reason) => {
2023                    completed.push((i, false, Some(reason)));
2024                }
2025                crate::transport::ConnectionState::Connecting => {
2026                    // Still in progress, check on next tick
2027                }
2028                crate::transport::ConnectionState::None => {
2029                    // Shouldn't happen — treat as failure
2030                    completed.push((i, false, Some("no connection attempt found".into())));
2031                }
2032            }
2033        }
2034
2035        // Process completions in reverse order to preserve indices
2036        for (i, success, reason) in completed.into_iter().rev() {
2037            let pending = self.pending_connects.remove(i);
2038
2039            if success {
2040                // Mark link as Connected
2041                if let Some(link) = self.links.get_mut(&pending.link_id) {
2042                    link.set_connected();
2043                }
2044
2045                debug!(
2046                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2047                    transport_id = %pending.transport_id,
2048                    remote_addr = %pending.remote_addr,
2049                    link_id = %pending.link_id,
2050                    "Transport connected, starting handshake"
2051                );
2052
2053                // Start the handshake now that the transport is connected
2054                if let Err(e) = self
2055                    .start_handshake(
2056                        pending.link_id,
2057                        pending.transport_id,
2058                        pending.remote_addr.clone(),
2059                        pending.peer_identity,
2060                    )
2061                    .await
2062                {
2063                    warn!(
2064                        link_id = %pending.link_id,
2065                        error = %e,
2066                        "Failed to start handshake after transport connect"
2067                    );
2068                    // Clean up link on handshake failure
2069                    self.remove_link(&pending.link_id);
2070                }
2071            } else {
2072                let reason = reason.unwrap_or_default();
2073                warn!(
2074                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2075                    transport_id = %pending.transport_id,
2076                    remote_addr = %pending.remote_addr,
2077                    link_id = %pending.link_id,
2078                    reason = %reason,
2079                    "Transport connect failed"
2080                );
2081
2082                // Clean up link and schedule retry
2083                self.remove_link(&pending.link_id);
2084                self.links.remove(&pending.link_id);
2085                self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
2086            }
2087        }
2088    }
2089
2090    // === State Transitions ===
2091
2092    /// Start the node.
2093    ///
2094    /// Initializes the TUN interface (if configured), spawns I/O threads,
2095    /// and transitions to the Running state.
2096    pub async fn start(&mut self) -> Result<(), NodeError> {
2097        node_start_debug_log("Node::start begin");
2098        if !self.state.can_start() {
2099            return Err(NodeError::AlreadyStarted);
2100        }
2101        self.state = NodeState::Starting;
2102        node_start_debug_log("Node::start state set to starting");
2103
2104        // Create packet channel for transport -> Node communication
2105        let packet_buffer_size = self.config.node.buffers.packet_channel;
2106        let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
2107        self.packet_tx = Some(packet_tx.clone());
2108        self.packet_rx = Some(packet_rx);
2109        node_start_debug_log("Node::start packet channel created");
2110
2111        // Initialize transports first (before TUN, before Nostr discovery).
2112        node_start_debug_log("Node::start create transports begin");
2113        let transport_handles = self.create_transports(&packet_tx).await;
2114        node_start_debug_log(format!(
2115            "Node::start create transports complete count={}",
2116            transport_handles.len()
2117        ));
2118
2119        for mut handle in transport_handles {
2120            let transport_id = handle.transport_id();
2121            let transport_type = handle.transport_type().name;
2122            let name = handle.name().map(|s| s.to_string());
2123
2124            node_start_debug_log(format!(
2125                "Node::start transport start begin id={} type={} name={:?}",
2126                transport_id, transport_type, name
2127            ));
2128            match handle.start().await {
2129                Ok(()) => {
2130                    node_start_debug_log(format!(
2131                        "Node::start transport start ok id={} type={}",
2132                        transport_id, transport_type
2133                    ));
2134                    self.transports.insert(transport_id, handle);
2135                }
2136                Err(e) => {
2137                    node_start_debug_log(format!(
2138                        "Node::start transport start error id={} type={} error={}",
2139                        transport_id, transport_type, e
2140                    ));
2141                    if let Some(ref n) = name {
2142                        warn!(transport_type, name = %n, error = %e, "Transport failed to start");
2143                    } else {
2144                        warn!(transport_type, error = %e, "Transport failed to start");
2145                    }
2146                }
2147            }
2148        }
2149
2150        if !self.transports.is_empty() {
2151            info!(count = self.transports.len(), "Transports initialized");
2152        }
2153
2154        // Spawn the off-task FMP-encrypt + UDP-send + FMP-decrypt
2155        // worker pools. **Unix only** — both pools issue direct
2156        // sendmmsg(2) / sendmsg(2)+UDP_GSO / recvmmsg(2) calls on
2157        // raw file descriptors via `AsRawFd`, which is a unix-only
2158        // trait. On Windows the rx_loop's tokio-based send/recv
2159        // remain the canonical path; the perf overhaul lands its
2160        // gains on unix.
2161        //
2162        // Worker count defaults to the number of CPUs, overridable
2163        // via `FIPS_ENCRYPT_WORKERS=N` / `FIPS_DECRYPT_WORKERS=N`
2164        // for debug / benchmarking. Hash-by-destination means a
2165        // single TCP flow pins to one worker (preserves wire
2166        // ordering); additional workers light up under multi-flow
2167        // / multi-peer load. See `node::encrypt_worker` /
2168        // `node::decrypt_worker` for full rationale.
2169        #[cfg(unix)]
2170        {
2171            if self.config.node.worker_pools_enabled {
2172                node_start_debug_log("Node::start worker pools begin");
2173                let cpu_default = std::thread::available_parallelism()
2174                    .map(|n| n.get())
2175                    .unwrap_or(1)
2176                    .max(1);
2177                let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
2178                    .ok()
2179                    .and_then(|s| s.parse().ok())
2180                    .unwrap_or(cpu_default)
2181                    .max(1);
2182                self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
2183                    encrypt_worker_count,
2184                ));
2185                info!(
2186                    workers = encrypt_worker_count,
2187                    "Spawned FMP-encrypt worker pool"
2188                );
2189
2190                // `FIPS_DECRYPT_WORKERS=0` disables the pool entirely and
2191                // falls through to the in-line rx_loop decrypt path (the
2192                // "test-mode" branch in `handle_encrypted_frame`, which is
2193                // in fact a fully functional synchronous decrypt). Useful
2194                // as an A/B against the worker pipeline when chasing
2195                // scheduling/queueing regressions on the native macOS
2196                // path. Any non-zero value (env or default) spawns the
2197                // pool as before.
2198                let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
2199                    .ok()
2200                    .and_then(|s| s.parse().ok())
2201                    .unwrap_or(cpu_default);
2202                if decrypt_worker_count == 0 {
2203                    info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
2204                } else {
2205                    self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
2206                        decrypt_worker_count,
2207                    ));
2208                    info!(
2209                        workers = decrypt_worker_count,
2210                        "Spawned FMP+FSP-decrypt worker pool"
2211                    );
2212                }
2213                node_start_debug_log("Node::start worker pools complete");
2214            } else {
2215                node_start_debug_log("Node::start worker pools disabled");
2216                info!("FIPS worker pools disabled; using in-line crypto/send path");
2217            }
2218        }
2219
2220        if self.config.node.discovery.nostr.enabled {
2221            node_start_debug_log("Node::start nostr discovery start begin");
2222            match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
2223                .await
2224            {
2225                Ok(runtime) => {
2226                    node_start_debug_log("Node::start nostr discovery runtime created");
2227                    if let Err(err) = self.refresh_overlay_advert(&runtime).await {
2228                        warn!(error = %err, "Failed to publish initial Nostr overlay advert");
2229                    }
2230                    node_start_debug_log("Node::start nostr overlay advert refreshed");
2231                    self.nostr_discovery = Some(runtime);
2232                    self.nostr_discovery_started_at_ms = Some(Self::now_ms());
2233                    info!("Nostr overlay discovery enabled");
2234                }
2235                Err(err) => {
2236                    node_start_debug_log(format!(
2237                        "Node::start nostr discovery start error error={}",
2238                        err
2239                    ));
2240                    warn!(error = %err, "Failed to start Nostr overlay discovery");
2241                }
2242            }
2243        }
2244
2245        // mDNS / DNS-SD LAN discovery. Independent of Nostr — runs even
2246        // when Nostr is disabled, since it gives us sub-second pairing
2247        // on the same link without any relay or NAT-traversal roundtrip.
2248        if self.config.node.discovery.lan.enabled {
2249            node_start_debug_log("Node::start lan discovery start begin");
2250            let advertised_udp_port = self
2251                .transports
2252                .values()
2253                .filter(|h| h.is_operational())
2254                .filter(|h| h.transport_type().name == "udp")
2255                .find_map(|h| h.local_addr().map(|addr| addr.port()))
2256                .unwrap_or(0);
2257            let scope = self.lan_discovery_scope();
2258            match crate::discovery::lan::LanDiscovery::start(
2259                &self.identity,
2260                scope,
2261                advertised_udp_port,
2262                self.config.node.discovery.lan.clone(),
2263            )
2264            .await
2265            {
2266                Ok(runtime) => {
2267                    node_start_debug_log("Node::start lan discovery start ok");
2268                    self.lan_discovery = Some(runtime);
2269                    info!("LAN mDNS discovery enabled");
2270                }
2271                Err(err) => {
2272                    node_start_debug_log(format!(
2273                        "Node::start lan discovery start error error={}",
2274                        err
2275                    ));
2276                    debug!(error = %err, "LAN mDNS discovery not started");
2277                }
2278            }
2279        }
2280
2281        self.start_local_instance_discovery();
2282        self.poll_local_instance_discovery().await;
2283
2284        // Connect to static peers before TUN is active
2285        // This allows handshake messages to be sent before we start accepting packets
2286        node_start_debug_log("Node::start initiate peer connections begin");
2287        self.initiate_peer_connections().await;
2288        node_start_debug_log("Node::start initiate peer connections complete");
2289
2290        // Initialize TUN interface last, after transports and peers are ready
2291        if self.config.tun.enabled {
2292            node_start_debug_log("Node::start tun init begin");
2293            let address = *self.identity.address();
2294            match TunDevice::create(&self.config.tun, address).await {
2295                Ok(device) => {
2296                    let mtu = device.mtu();
2297                    let name = device.name().to_string();
2298                    let our_addr = *device.address();
2299
2300                    info!("TUN device active:");
2301                    info!("     name: {}", name);
2302                    info!("  address: {}", device.address());
2303                    info!("      mtu: {}", mtu);
2304
2305                    // Calculate max MSS for TCP clamping
2306                    let effective_mtu = self.effective_ipv6_mtu();
2307                    let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); // IPv6 + TCP headers
2308
2309                    info!("effective MTU: {} bytes", effective_mtu);
2310                    debug!("   max TCP MSS: {} bytes", max_mss);
2311
2312                    // On macOS, create a shutdown pipe. Writing to it unblocks the
2313                    // reader thread's select() loop without closing the TUN fd
2314                    // (which would cause a double-close when TunDevice drops).
2315                    #[cfg(target_os = "macos")]
2316                    let (shutdown_read_fd, shutdown_write_fd) = {
2317                        let mut fds = [0i32; 2];
2318                        if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
2319                            return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
2320                                "failed to create shutdown pipe".into(),
2321                            )));
2322                        }
2323                        (fds[0], fds[1])
2324                    };
2325
2326                    // Create writer (dups the fd for independent write access).
2327                    // Pass path_mtu_lookup so inbound SYN-ACK clamp can read
2328                    // per-destination path MTU learned via discovery.
2329                    let (writer, tun_tx) =
2330                        device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
2331
2332                    // Spawn writer thread
2333                    let writer_handle = thread::spawn(move || {
2334                        writer.run();
2335                    });
2336
2337                    // Clone tun_tx for the reader
2338                    let reader_tun_tx = tun_tx.clone();
2339
2340                    // Create outbound channel for TUN reader → Node
2341                    let tun_channel_size = self.config.node.buffers.tun_channel;
2342                    let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
2343
2344                    // Spawn reader thread
2345                    let transport_mtu = self.transport_mtu();
2346                    let path_mtu_lookup = self.path_mtu_lookup.clone();
2347                    #[cfg(target_os = "macos")]
2348                    let reader_handle = thread::spawn(move || {
2349                        run_tun_reader(
2350                            device,
2351                            mtu,
2352                            our_addr,
2353                            reader_tun_tx,
2354                            outbound_tx,
2355                            transport_mtu,
2356                            path_mtu_lookup,
2357                            shutdown_read_fd,
2358                        );
2359                    });
2360                    #[cfg(not(target_os = "macos"))]
2361                    let reader_handle = thread::spawn(move || {
2362                        run_tun_reader(
2363                            device,
2364                            mtu,
2365                            our_addr,
2366                            reader_tun_tx,
2367                            outbound_tx,
2368                            transport_mtu,
2369                            path_mtu_lookup,
2370                        );
2371                    });
2372
2373                    self.tun_state = TunState::Active;
2374                    self.tun_name = Some(name);
2375                    self.tun_tx = Some(tun_tx);
2376                    self.tun_outbound_rx = Some(outbound_rx);
2377                    self.tun_reader_handle = Some(reader_handle);
2378                    self.tun_writer_handle = Some(writer_handle);
2379                    #[cfg(target_os = "macos")]
2380                    {
2381                        self.tun_shutdown_fd = Some(shutdown_write_fd);
2382                    }
2383                }
2384                Err(e) => {
2385                    self.tun_state = TunState::Failed;
2386                    warn!(error = %e, "Failed to initialize TUN, continuing without it");
2387                }
2388            }
2389            node_start_debug_log("Node::start tun init complete");
2390        }
2391
2392        // Initialize DNS responder (independent of TUN).
2393        //
2394        // Default bind_addr is "::1" (IPv6 loopback). The shipped
2395        // fips-dns-setup configures systemd-resolved via a global
2396        // /etc/systemd/resolved.conf.d/fips.conf drop-in pointing at
2397        // [::1]:5354, which sidesteps a Linux IPV6_PKTINFO behaviour
2398        // where self-destined traffic to fips0's address is attributed
2399        // to fips0 in PKTINFO and gets silently dropped by the
2400        // mesh-interface filter in src/upper/dns.rs.
2401        //
2402        // For mesh-reachable resolution (rare), set bind_addr: "::"
2403        // in fips.yaml. The mesh-interface filter remains active to
2404        // prevent hosts-file alias enumeration in that mode.
2405        // `IPV6_V6ONLY=0` is set explicitly so IPv4 clients on
2406        // 127.0.0.1 still reach us regardless of kernel sysctl
2407        // defaults — but only when bind is on a wildcard / IPv6 path.
2408        if self.config.dns.enabled {
2409            node_start_debug_log("Node::start dns init begin");
2410            let addr_str = self.config.dns.bind_addr();
2411            match addr_str.parse::<std::net::IpAddr>() {
2412                Ok(ip) => {
2413                    let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
2414                    match Self::bind_dns_socket(bind) {
2415                        Ok(socket) => {
2416                            let dns_channel_size = self.config.node.buffers.dns_channel;
2417                            let (identity_tx, identity_rx) =
2418                                tokio::sync::mpsc::channel(dns_channel_size);
2419                            let dns_ttl = self.config.dns.ttl();
2420                            let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
2421                                self.config.peers(),
2422                            );
2423                            let reloader = if self.config.node.system_files_enabled {
2424                                let hosts_path = std::path::PathBuf::from(
2425                                    crate::upper::hosts::DEFAULT_HOSTS_PATH,
2426                                );
2427                                crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
2428                            } else {
2429                                crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
2430                            };
2431                            // Resolve the TUN ifindex so the responder can
2432                            // drop queries arriving on the mesh interface
2433                            // (fips0). Without this, the `::` bind exposes
2434                            // /etc/fips/hosts alias probing to any mesh peer.
2435                            // When TUN isn't enabled or the name can't be
2436                            // resolved, `None` disables the filter (there
2437                            // is no mesh surface to defend anyway).
2438                            let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
2439                            info!(
2440                                bind = %bind,
2441                                hosts = reloader.hosts().len(),
2442                                mesh_ifindex = ?mesh_ifindex,
2443                                "DNS responder started for .fips domain (auto-reload enabled)"
2444                            );
2445                            let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
2446                                socket,
2447                                identity_tx,
2448                                dns_ttl,
2449                                reloader,
2450                                mesh_ifindex,
2451                            ));
2452                            self.dns_identity_rx = Some(identity_rx);
2453                            self.dns_task = Some(handle);
2454                        }
2455                        Err(e) => {
2456                            warn!(bind = %bind, error = %e, "Failed to start DNS responder");
2457                        }
2458                    }
2459                }
2460                Err(e) => {
2461                    warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
2462                }
2463            }
2464            node_start_debug_log("Node::start dns init complete");
2465        }
2466
2467        self.state = NodeState::Running;
2468        node_start_debug_log("Node::start running");
2469        info!("Node started:");
2470        info!("       state: {}", self.state);
2471        info!("  transports: {}", self.transports.len());
2472        info!(" connections: {}", self.connections.len());
2473        Ok(())
2474    }
2475
2476    /// Bind a UDP socket for the DNS responder.
2477    ///
2478    /// For IPv6 binds (including `::`), sets `IPV6_V6ONLY=0` so the socket
2479    /// also accepts IPv4-mapped addresses. This guarantees dual-stack
2480    /// delivery regardless of `net.ipv6.bindv6only` sysctl on the host —
2481    /// v4 clients on 127.0.0.1 and v6 clients on the fips0 address both
2482    /// land on the same socket.
2483    ///
2484    /// Also enables `IPV6_RECVPKTINFO` on IPv6 sockets so the responder
2485    /// can learn the arrival interface per packet. The responder uses that
2486    /// to drop queries arriving on the mesh TUN, closing the hosts-file
2487    /// probing side-channel created by the `::` bind.
2488    fn bind_dns_socket(
2489        addr: std::net::SocketAddr,
2490    ) -> Result<tokio::net::UdpSocket, std::io::Error> {
2491        use socket2::{Domain, Protocol, Socket, Type};
2492        let domain = if addr.is_ipv4() {
2493            Domain::IPV4
2494        } else {
2495            Domain::IPV6
2496        };
2497        let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
2498        if addr.is_ipv6() {
2499            sock.set_only_v6(false)?;
2500            #[cfg(unix)]
2501            Self::set_recv_pktinfo_v6(&sock)?;
2502        }
2503        sock.set_nonblocking(true)?;
2504        sock.bind(&addr.into())?;
2505        tokio::net::UdpSocket::from_std(sock.into())
2506    }
2507
2508    /// Enable `IPV6_RECVPKTINFO` on an IPv6 UDP socket.
2509    ///
2510    /// After this setsockopt, each `recvmsg()` call on the socket receives
2511    /// an `IPV6_PKTINFO` control message containing the arrival interface
2512    /// index, which the DNS responder uses for its mesh-interface filter.
2513    #[cfg(unix)]
2514    fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
2515        use std::os::fd::AsRawFd;
2516        let enable: libc::c_int = 1;
2517        let ret = unsafe {
2518            libc::setsockopt(
2519                sock.as_raw_fd(),
2520                libc::IPPROTO_IPV6,
2521                libc::IPV6_RECVPKTINFO,
2522                &enable as *const _ as *const libc::c_void,
2523                std::mem::size_of::<libc::c_int>() as libc::socklen_t,
2524            )
2525        };
2526        if ret < 0 {
2527            return Err(std::io::Error::last_os_error());
2528        }
2529        Ok(())
2530    }
2531
2532    /// Resolve the mesh TUN interface index by name.
2533    ///
2534    /// Returns `None` if the interface does not exist (e.g. TUN disabled
2535    /// or not yet created). A `None` result disables the DNS responder's
2536    /// mesh-interface filter — safe, because if there is no fips0 there
2537    /// is no mesh exposure to defend against.
2538    fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
2539        #[cfg(unix)]
2540        {
2541            let c_name = std::ffi::CString::new(name).ok()?;
2542            let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
2543            if idx == 0 { None } else { Some(idx) }
2544        }
2545        #[cfg(not(unix))]
2546        {
2547            let _ = name;
2548            None
2549        }
2550    }
2551
2552    /// Stop the node.
2553    ///
2554    /// Shuts down TUN interface, stops I/O threads, and transitions to
2555    /// the Stopped state.
2556    pub async fn stop(&mut self) -> Result<(), NodeError> {
2557        if !self.state.can_stop() {
2558            return Err(NodeError::NotStarted);
2559        }
2560        self.state = NodeState::Stopping;
2561        info!(state = %self.state, "Node stopping");
2562
2563        // Stop DNS responder
2564        if let Some(handle) = self.dns_task.take() {
2565            handle.abort();
2566            debug!("DNS responder stopped");
2567        }
2568
2569        // Send disconnect notifications to all active peers before closing transports
2570        self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
2571            .await;
2572
2573        // Stop Nostr overlay discovery background work and withdraw any advert.
2574        if let Some(bootstrap) = self.nostr_discovery.take()
2575            && let Err(e) = bootstrap.shutdown().await
2576        {
2577            warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
2578        }
2579
2580        // Tear down LAN mDNS responder + browser. Best-effort: the
2581        // OS will eventually time the advert out via its TTL even if
2582        // we don't get a clean unregister out before the daemon exits.
2583        if let Some(lan) = self.lan_discovery.take() {
2584            lan.shutdown().await;
2585        }
2586
2587        if let Some(registry) = self.local_instance_registry.take()
2588            && let Err(err) = registry.remove()
2589        {
2590            debug!(error = %err, "failed to remove same-host FIPS instance record");
2591        }
2592
2593        // Shutdown transports (they're packet producers)
2594        let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
2595        for transport_id in transport_ids {
2596            if let Some(mut handle) = self.transports.remove(&transport_id) {
2597                let transport_type = handle.transport_type().name;
2598                match handle.stop().await {
2599                    Ok(()) => {
2600                        info!(transport_id = %transport_id, transport_type, "Transport stopped");
2601                    }
2602                    Err(e) => {
2603                        warn!(
2604                            transport_id = %transport_id,
2605                            transport_type,
2606                            error = %e,
2607                            "Transport stop failed"
2608                        );
2609                    }
2610                }
2611            }
2612        }
2613
2614        // Drop packet channels
2615        self.packet_tx.take();
2616        self.packet_rx.take();
2617
2618        // Shutdown TUN interface
2619        if let Some(name) = self.tun_name.take() {
2620            info!(name = %name, "Shutting down TUN interface");
2621
2622            // Drop the tun_tx to signal the writer to stop
2623            self.tun_tx.take();
2624
2625            // Delete the interface (on Linux, causes reader to get EFAULT)
2626            if let Err(e) = shutdown_tun_interface(&name).await {
2627                warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
2628            }
2629
2630            // On macOS, signal the reader thread to exit by writing to the
2631            // shutdown pipe. The reader's select() will wake up and break.
2632            #[cfg(target_os = "macos")]
2633            if let Some(fd) = self.tun_shutdown_fd.take() {
2634                unsafe {
2635                    libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
2636                    libc::close(fd);
2637                }
2638            }
2639
2640            // Wait for threads to finish
2641            if let Some(handle) = self.tun_reader_handle.take() {
2642                let _ = handle.join();
2643            }
2644            if let Some(handle) = self.tun_writer_handle.take() {
2645                let _ = handle.join();
2646            }
2647
2648            self.tun_state = TunState::Disabled;
2649        }
2650
2651        self.state = NodeState::Stopped;
2652        info!(state = %self.state, "Node stopped");
2653        Ok(())
2654    }
2655
2656    /// Send disconnect notifications to all active peers.
2657    ///
2658    /// Best-effort: send failures are logged and ignored since the transport
2659    /// may already be degraded. This runs before transports are shut down.
2660    async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
2661        let disconnect = Disconnect::new(reason);
2662        let plaintext = disconnect.encode();
2663
2664        // Collect node_addrs to avoid borrow conflict with send helper
2665        let peer_addrs: Vec<NodeAddr> = self
2666            .peers
2667            .iter()
2668            .filter(|(_, peer)| peer.can_send() && peer.has_session())
2669            .map(|(addr, _)| *addr)
2670            .collect();
2671
2672        if peer_addrs.is_empty() {
2673            debug!(
2674                total_peers = self.peers.len(),
2675                "No sendable peers for disconnect notification"
2676            );
2677            return;
2678        }
2679
2680        let mut sent = 0usize;
2681        for node_addr in &peer_addrs {
2682            match self
2683                .send_encrypted_link_message(node_addr, &plaintext)
2684                .await
2685            {
2686                Ok(()) => sent += 1,
2687                Err(e) => {
2688                    debug!(
2689                        peer = %self.peer_display_name(node_addr),
2690                        error = %e,
2691                        "Failed to send disconnect (transport may be down)"
2692                    );
2693                }
2694            }
2695        }
2696
2697        info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2698    }
2699
2700    pub(in crate::node) fn static_peer_addresses(
2701        &self,
2702        peer_config: &PeerConfig,
2703    ) -> Vec<PeerAddress> {
2704        peer_config
2705            .addresses_by_priority()
2706            .into_iter()
2707            .cloned()
2708            .collect()
2709    }
2710
2711    async fn nostr_peer_fallback_addresses(
2712        &self,
2713        peer_config: &PeerConfig,
2714        existing: &[PeerAddress],
2715    ) -> Vec<PeerAddress> {
2716        if !self.config.node.discovery.nostr.enabled
2717            || self.config.node.discovery.nostr.policy
2718                == crate::config::NostrDiscoveryPolicy::Disabled
2719        {
2720            return Vec::new();
2721        }
2722
2723        let Some(bootstrap) = self.nostr_discovery.clone() else {
2724            return Vec::new();
2725        };
2726        if self.nostr_cooldown_applies_to_peer_config(peer_config)
2727            && bootstrap
2728                .cooldown_until(&peer_config.npub, Self::now_ms())
2729                .is_some()
2730        {
2731            debug!(
2732                npub = %peer_config.npub,
2733                "Skipping cached Nostr fallback endpoints while peer is in traversal cooldown"
2734            );
2735            return Vec::new();
2736        }
2737        let endpoints = match bootstrap
2738            .cached_advert_endpoints_for_peer(&peer_config.npub)
2739            .await
2740        {
2741            Some(endpoints) => endpoints,
2742            None => {
2743                debug!(
2744                    npub = %peer_config.npub,
2745                    "No cached Nostr advert endpoints for configured peer"
2746                );
2747                return Vec::new();
2748            }
2749        };
2750
2751        let mut fallback = Vec::new();
2752        let mut next_priority = existing
2753            .iter()
2754            .map(|addr| addr.priority)
2755            .max()
2756            .unwrap_or(100)
2757            .saturating_add(1);
2758        // Stamp every overlay-derived candidate with the current wall clock.
2759        // The dialer still honors explicit priority first; this timestamp is
2760        // only a recency tiebreaker within the same priority tier. We use a
2761        // single timestamp per fetch because all candidates in one advert are
2762        // equally fresh.
2763        let seen_at_ms = Self::now_ms();
2764        for endpoint in endpoints {
2765            let Some(candidate) =
2766                Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2767            else {
2768                continue;
2769            };
2770            if existing
2771                .iter()
2772                .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2773                || fallback.iter().any(|addr: &PeerAddress| {
2774                    addr.transport == candidate.transport && addr.addr == candidate.addr
2775                })
2776            {
2777                continue;
2778            }
2779            fallback.push(candidate);
2780            next_priority = next_priority.saturating_add(1);
2781        }
2782        fallback
2783    }
2784
2785    pub(in crate::node) async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2786        if !self.config.node.discovery.nostr.enabled
2787            || self.config.node.discovery.nostr.policy
2788                == crate::config::NostrDiscoveryPolicy::Disabled
2789        {
2790            return false;
2791        }
2792        let Some(bootstrap) = self.nostr_discovery.clone() else {
2793            return false;
2794        };
2795        let now_ms = Self::now_ms();
2796        if self.nostr_cooldown_applies_to_peer_config(peer_config)
2797            && let Some(cooldown_until_ms) = bootstrap.cooldown_until(&peer_config.npub, now_ms)
2798        {
2799            debug!(
2800                npub = %peer_config.npub,
2801                cooldown_secs = cooldown_until_ms.saturating_sub(now_ms) / 1000,
2802                "Skipping Nostr traversal request while peer is in cooldown"
2803            );
2804            return false;
2805        }
2806        bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
2807        bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
2808        let mesh_signaling_allowed = self.mesh_signaling_allowed_for_peer(peer_config);
2809        let started = bootstrap
2810            .request_connect_with_mesh_signaling(peer_config.clone(), mesh_signaling_allowed)
2811            .await;
2812        if started {
2813            info!(
2814                npub = %peer_config.npub,
2815                mesh_signaling_allowed,
2816                "Started background UDP NAT traversal attempt"
2817            );
2818        } else {
2819            debug!(
2820                npub = %peer_config.npub,
2821                mesh_signaling_allowed,
2822                "Background UDP NAT traversal attempt already in progress"
2823            );
2824        }
2825        true
2826    }
2827
2828    fn nostr_cooldown_applies_to_peer_config(&self, peer_config: &PeerConfig) -> bool {
2829        !self.mesh_signaling_allowed_for_peer(peer_config)
2830    }
2831
2832    pub(in crate::node) fn mesh_signaling_allowed_for_peer(
2833        &self,
2834        peer_config: &PeerConfig,
2835    ) -> bool {
2836        let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
2837            return false;
2838        };
2839        let peer_addr = identity.node_addr();
2840        self.configured_peer(peer_addr).is_some()
2841    }
2842
2843    fn overlay_endpoint_to_peer_address(
2844        endpoint: &OverlayEndpointAdvert,
2845        priority: u8,
2846        seen_at_ms: u64,
2847    ) -> Option<PeerAddress> {
2848        let transport = match endpoint.transport {
2849            OverlayTransportKind::Udp => "udp",
2850            OverlayTransportKind::Tcp => "tcp",
2851            OverlayTransportKind::Tor => "tor",
2852            OverlayTransportKind::WebRtc => "webrtc",
2853        };
2854        Some(
2855            PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2856                .with_seen_at_ms(seen_at_ms),
2857        )
2858    }
2859
2860    async fn attempt_peer_address_list(
2861        &mut self,
2862        peer_config: &PeerConfig,
2863        peer_identity: PeerIdentity,
2864        allow_bootstrap_nat: bool,
2865        addresses: &[PeerAddress],
2866    ) -> Result<(), NodeError> {
2867        let mut attempted = false;
2868        let mut local_route_error = None;
2869        let peer_node_addr = *peer_identity.node_addr();
2870        let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2871
2872        for addr in addresses {
2873            if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2874                if !allow_bootstrap_nat {
2875                    continue;
2876                }
2877                if self.request_nostr_bootstrap(peer_config).await {
2878                    attempted = true;
2879                    continue;
2880                }
2881                debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2882                continue;
2883            }
2884
2885            let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2886                match self.resolve_ethernet_addr(&addr.addr) {
2887                    Ok(result) => result,
2888                    Err(e) => {
2889                        debug!(
2890                            transport = %addr.transport,
2891                            addr = %addr.addr,
2892                            error = %e,
2893                            "Failed to resolve Ethernet address"
2894                        );
2895                        continue;
2896                    }
2897                }
2898            } else if addr.transport == "ble" {
2899                #[cfg(bluer_available)]
2900                {
2901                    match self.resolve_ble_addr(&addr.addr) {
2902                        Ok(result) => result,
2903                        Err(e) => {
2904                            debug!(
2905                                transport = %addr.transport,
2906                                addr = %addr.addr,
2907                                error = %e,
2908                                "Failed to resolve BLE address"
2909                            );
2910                            continue;
2911                        }
2912                    }
2913                }
2914                #[cfg(not(bluer_available))]
2915                {
2916                    debug!(transport = %addr.transport, "BLE transport not available on this build");
2917                    continue;
2918                }
2919            } else {
2920                let tid = if addr.transport == "udp"
2921                    && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2922                {
2923                    match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2924                        Some((id, _)) => id,
2925                        None => {
2926                            debug!(
2927                                transport = %addr.transport,
2928                                addr = %addr.addr,
2929                                "No compatible operational UDP transport for address"
2930                            );
2931                            continue;
2932                        }
2933                    }
2934                } else {
2935                    match self.find_transport_for_type(&addr.transport) {
2936                        Some(id) => id,
2937                        None => {
2938                            debug!(
2939                                transport = %addr.transport,
2940                                addr = %addr.addr,
2941                                "No operational transport for address type"
2942                            );
2943                            continue;
2944                        }
2945                    }
2946                };
2947                (tid, TransportAddr::from_string(&addr.addr))
2948            };
2949
2950            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2951                attempted = true;
2952                debug!(
2953                    npub = %peer_config.npub,
2954                    transport_id = %transport_id,
2955                    remote_addr = %remote_addr,
2956                    "Skipping duplicate in-flight candidate path"
2957                );
2958                continue;
2959            }
2960
2961            if concrete_budget == 0 {
2962                debug!(
2963                    npub = %peer_config.npub,
2964                    max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2965                    "Path candidate race budget exhausted"
2966                );
2967                break;
2968            }
2969
2970            match self
2971                .initiate_connection(transport_id, remote_addr, peer_identity)
2972                .await
2973            {
2974                Ok(()) => {
2975                    attempted = true;
2976                    concrete_budget = concrete_budget.saturating_sub(1);
2977                }
2978                Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2979                Err(e) => {
2980                    if e.is_local_route_unavailable() && local_route_error.is_none() {
2981                        local_route_error = Some(e.to_string());
2982                    }
2983                    debug!(
2984                        npub = %peer_config.npub,
2985                        transport_id = %transport_id,
2986                        error = %e,
2987                        "Connection attempt failed, trying next address"
2988                    );
2989                }
2990            }
2991        }
2992
2993        if attempted {
2994            return Ok(());
2995        }
2996
2997        if let Some(error) = local_route_error {
2998            return Err(NodeError::LocalRouteUnavailable(error));
2999        }
3000
3001        Err(NodeError::NoTransportForType(format!(
3002            "no operational transport for any of {}'s addresses",
3003            peer_config.npub
3004        )))
3005    }
3006
3007    async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
3008        self.run_open_discovery_sweep(bootstrap, None, "per-tick")
3009            .await;
3010    }
3011
3012    pub(in crate::node) fn queue_active_fallback_direct_retries(
3013        &mut self,
3014        _bootstrap: &std::sync::Arc<NostrDiscovery>,
3015    ) {
3016        let now_ms = Self::now_ms();
3017        let peer_configs = self
3018            .config
3019            .auto_connect_peers()
3020            .cloned()
3021            .collect::<Vec<_>>();
3022
3023        for peer_config in peer_configs {
3024            let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
3025                continue;
3026            };
3027            let node_addr = *peer_identity.node_addr();
3028
3029            if self.retry_pending.contains_key(&node_addr)
3030                || !self.peers.contains_key(&node_addr)
3031                || self.is_connecting_to_peer(&node_addr)
3032                || !self.active_peer_should_keep_direct_retry(&node_addr, &peer_config)
3033            {
3034                continue;
3035            }
3036
3037            let mut state = super::retry::RetryState::new(peer_config.clone());
3038            state.reconnect = true;
3039            state.retry_after_ms = now_ms;
3040            self.retry_pending.insert(node_addr, state);
3041
3042            debug!(
3043                peer = %self.peer_display_name(&node_addr),
3044                "Queued direct-path retry for active fallback peer"
3045            );
3046        }
3047    }
3048
3049    /// Open-discovery cache sweep. Iterates the cached overlay adverts and
3050    /// queues retries for non-configured, not-yet-connected peers.
3051    ///
3052    /// `max_age_secs`, if set, filters out adverts whose `created_at` is
3053    /// older than `now - max_age_secs`. The per-tick sweep passes `None`
3054    /// (relies on the cache's own `valid_until_ms` filter); the one-shot
3055    /// startup sweep passes `Some(startup_sweep_max_age_secs)`.
3056    ///
3057    /// `caller` is a short label included in log lines so per-tick and
3058    /// startup sweeps are distinguishable in operator-facing logs.
3059    pub(in crate::node) async fn run_open_discovery_sweep(
3060        &mut self,
3061        bootstrap: &std::sync::Arc<NostrDiscovery>,
3062        max_age_secs: Option<u64>,
3063        caller: &'static str,
3064    ) {
3065        if !self.config.node.discovery.nostr.enabled
3066            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3067        {
3068            return;
3069        }
3070
3071        let configured_npubs = self
3072            .config
3073            .peers()
3074            .iter()
3075            .map(|peer| peer.npub.clone())
3076            .collect::<HashSet<_>>();
3077        let now_ms = Self::now_ms();
3078        let now_secs = now_ms / 1000;
3079        let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
3080        if enqueue_budget == 0 {
3081            debug!(
3082                caller = %caller,
3083                "open-discovery sweep: enqueue budget is 0, skipping"
3084            );
3085            return;
3086        }
3087
3088        let candidates = bootstrap.cached_open_discovery_candidates(64).await;
3089        let cached_count = candidates.len();
3090        let mut enqueued = 0usize;
3091        let mut skipped_age = 0usize;
3092        let mut skipped_configured = 0usize;
3093        let mut skipped_self = 0usize;
3094        let mut skipped_connected = 0usize;
3095        let mut skipped_retry_pending = 0usize;
3096        let mut skipped_connecting = 0usize;
3097        let mut skipped_no_endpoints = 0usize;
3098        let mut skipped_invalid_npub = 0usize;
3099        let mut skipped_cooldown = 0usize;
3100
3101        for (npub, endpoints, created_at_secs) in candidates {
3102            if enqueue_budget == 0 {
3103                break;
3104            }
3105
3106            if let Some(max_age) = max_age_secs
3107                && now_secs.saturating_sub(created_at_secs) > max_age
3108            {
3109                skipped_age = skipped_age.saturating_add(1);
3110                continue;
3111            }
3112
3113            if configured_npubs.contains(&npub) {
3114                // Configured peers don't go through the open-discovery
3115                // enqueue path — their `PeerConfig` is already in
3116                // `self.config.peers()`, so the regular retry queue is
3117                // what drives their reconnect. But on cold start with
3118                // NAT'd peers, every initial `initiate_peer_connection`
3119                // fails (no overlay data yet, static cache hints empty
3120                // or stale), each pushes the peer into `retry_pending`
3121                // with exponential backoff (5/10/20/40/80s), and by the
3122                // time the next backoff slot fires the Nostr advert is
3123                // already cached — we just don't act on it for ~80s.
3124                //
3125                // The arrival of an advert (which this sweep sees) means
3126                // we now have a path to dial. If the peer's retry is
3127                // scheduled in the future, pull it forward to "now" so
3128                // the next `process_pending_retries` tick fires it
3129                // immediately. The retry path (`initiate_peer_retry_
3130                // connection` → `try_peer_addresses`) then refetches
3131                // the advert and dials it — no behavioral change
3132                // beyond schedule timing.
3133                if let Ok(identity) = PeerIdentity::from_npub(&npub) {
3134                    let configured_addr = *identity.node_addr();
3135                    if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3136                        skipped_cooldown = skipped_cooldown.saturating_add(1);
3137                        skipped_configured = skipped_configured.saturating_add(1);
3138                        continue;
3139                    }
3140                    if let Some(state) = self.retry_pending.get_mut(&configured_addr)
3141                        && state.retry_after_ms > now_ms
3142                    {
3143                        state.retry_after_ms = now_ms;
3144                        debug!(
3145                            caller = %caller,
3146                            peer = %self.peer_display_name(&configured_addr),
3147                            advert_age_secs = now_secs.saturating_sub(created_at_secs),
3148                            "Expediting configured-peer retry after fresh overlay advert"
3149                        );
3150                    }
3151                }
3152                skipped_configured = skipped_configured.saturating_add(1);
3153                continue;
3154            }
3155
3156            let peer_identity = match PeerIdentity::from_npub(&npub) {
3157                Ok(identity) => identity,
3158                Err(_) => {
3159                    skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
3160                    continue;
3161                }
3162            };
3163            let node_addr = *peer_identity.node_addr();
3164            if node_addr == *self.identity.node_addr() {
3165                skipped_self = skipped_self.saturating_add(1);
3166                continue;
3167            }
3168            if self.peers.contains_key(&node_addr) {
3169                skipped_connected = skipped_connected.saturating_add(1);
3170                continue;
3171            }
3172            if self.retry_pending.contains_key(&node_addr) {
3173                skipped_retry_pending = skipped_retry_pending.saturating_add(1);
3174                continue;
3175            }
3176            if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3177                skipped_cooldown = skipped_cooldown.saturating_add(1);
3178                continue;
3179            }
3180            let connecting = self.connections.values().any(|conn| {
3181                conn.expected_identity()
3182                    .map(|id| id.node_addr() == &node_addr)
3183                    .unwrap_or(false)
3184            });
3185            if connecting {
3186                skipped_connecting = skipped_connecting.saturating_add(1);
3187                continue;
3188            }
3189
3190            let mut addresses = Vec::new();
3191            let mut priority = 120u8;
3192            let seen_at_ms = Self::now_ms();
3193            for endpoint in endpoints {
3194                let Some(candidate) =
3195                    Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
3196                else {
3197                    continue;
3198                };
3199                if addresses.iter().any(|existing: &PeerAddress| {
3200                    existing.transport == candidate.transport && existing.addr == candidate.addr
3201                }) {
3202                    continue;
3203                }
3204                addresses.push(candidate);
3205                priority = priority.saturating_add(1);
3206            }
3207            if addresses.is_empty() {
3208                skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
3209                continue;
3210            }
3211
3212            self.peer_aliases
3213                .entry(node_addr)
3214                .or_insert_with(|| peer_identity.short_npub());
3215            self.register_identity(node_addr, peer_identity.pubkey_full());
3216
3217            let mut state = super::retry::RetryState::new(PeerConfig {
3218                npub: npub.clone(),
3219                alias: None,
3220                addresses,
3221                connect_policy: ConnectPolicy::AutoConnect,
3222                auto_reconnect: true,
3223                discovery_fallback_transit: false,
3224            });
3225            state.reconnect = false;
3226            state.retry_after_ms = now_ms;
3227            state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
3228            self.retry_pending.insert(node_addr, state);
3229            info!(
3230                caller = %caller,
3231                peer = %peer_identity.short_npub(),
3232                advert_age_secs = now_secs.saturating_sub(created_at_secs),
3233                "open-discovery sweep: queued retry for cached advert"
3234            );
3235            enqueue_budget = enqueue_budget.saturating_sub(1);
3236            enqueued = enqueued.saturating_add(1);
3237        }
3238
3239        // Always log a one-line summary on the startup sweep so operators
3240        // can verify it ran. Per-tick sweeps are noisier; only summarize
3241        // when something happened.
3242        let total_skipped = skipped_age
3243            + skipped_configured
3244            + skipped_self
3245            + skipped_connected
3246            + skipped_retry_pending
3247            + skipped_connecting
3248            + skipped_no_endpoints
3249            + skipped_invalid_npub
3250            + skipped_cooldown;
3251        let should_summarize = caller == "startup" || enqueued > 0;
3252        if should_summarize {
3253            info!(
3254                caller = %caller,
3255                cached = cached_count,
3256                queued = enqueued,
3257                skipped_age = skipped_age,
3258                skipped_configured = skipped_configured,
3259                skipped_self = skipped_self,
3260                skipped_connected = skipped_connected,
3261                skipped_retry_pending = skipped_retry_pending,
3262                skipped_connecting = skipped_connecting,
3263                skipped_no_endpoints = skipped_no_endpoints,
3264                skipped_invalid_npub = skipped_invalid_npub,
3265                skipped_cooldown = skipped_cooldown,
3266                skipped_total = total_skipped,
3267                "open-discovery sweep complete"
3268            );
3269        }
3270    }
3271
3272    /// One-shot startup sweep: runs once after the configured settle
3273    /// delay, iterating the cached overlay adverts and queueing retries
3274    /// for any peer with a recent enough advert that we haven't already
3275    /// configured statically or established a link to.
3276    ///
3277    /// Gated identically to [`run_open_discovery_sweep`]: requires
3278    /// `node.discovery.nostr.enabled` and `policy == open`.
3279    async fn maybe_run_startup_open_discovery_sweep(
3280        &mut self,
3281        bootstrap: &std::sync::Arc<NostrDiscovery>,
3282    ) {
3283        if self.startup_open_discovery_sweep_done {
3284            return;
3285        }
3286        if !self.config.node.discovery.nostr.enabled
3287            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3288        {
3289            // Mark done so we don't keep re-checking on every tick.
3290            self.startup_open_discovery_sweep_done = true;
3291            return;
3292        }
3293        let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
3294            return;
3295        };
3296        let now_ms = Self::now_ms();
3297        let delay_ms = self
3298            .config
3299            .node
3300            .discovery
3301            .nostr
3302            .startup_sweep_delay_secs
3303            .saturating_mul(1000);
3304        if now_ms < started_at_ms.saturating_add(delay_ms) {
3305            return;
3306        }
3307
3308        let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
3309        self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
3310            .await;
3311        self.startup_open_discovery_sweep_done = true;
3312    }
3313
3314    fn available_outbound_slots(&self) -> usize {
3315        let connection_used = self
3316            .connections
3317            .len()
3318            .saturating_add(self.pending_connects.len());
3319        let connection_slots = if self.max_connections == 0 {
3320            usize::MAX
3321        } else {
3322            self.max_connections.saturating_sub(connection_used)
3323        };
3324
3325        let peer_slots = if self.max_peers == 0 {
3326            usize::MAX
3327        } else {
3328            self.max_peers.saturating_sub(self.peers.len())
3329        };
3330
3331        let link_slots = if self.max_links == 0 {
3332            usize::MAX
3333        } else {
3334            self.max_links.saturating_sub(self.links.len())
3335        };
3336
3337        connection_slots.min(peer_slots).min(link_slots)
3338    }
3339
3340    pub(in crate::node) fn open_discovery_enqueue_budget(
3341        &self,
3342        configured_npubs: &HashSet<String>,
3343    ) -> usize {
3344        let current_open_discovery_active = self
3345            .peers
3346            .values()
3347            .filter(|peer| !configured_npubs.contains(&peer.npub()))
3348            .count();
3349        let current_open_discovery_pending = self
3350            .retry_pending
3351            .values()
3352            .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
3353            .count();
3354
3355        let cap_remaining = self
3356            .config
3357            .node
3358            .discovery
3359            .nostr
3360            .open_discovery_max_pending
3361            .saturating_sub(current_open_discovery_active)
3362            .saturating_sub(current_open_discovery_pending);
3363
3364        cap_remaining.min(self.available_outbound_slots())
3365    }
3366
3367    fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
3368        now_ms.saturating_add(
3369            self.config
3370                .node
3371                .discovery
3372                .nostr
3373                .advert_ttl_secs
3374                .saturating_mul(1000)
3375                .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
3376        )
3377    }
3378
3379    async fn build_overlay_advert(
3380        &self,
3381        bootstrap: &std::sync::Arc<NostrDiscovery>,
3382    ) -> Option<OverlayAdvert> {
3383        if !self.config.node.discovery.nostr.enabled {
3384            return None;
3385        }
3386
3387        let mut endpoints = Vec::new();
3388        let mut has_udp_nat = false;
3389        let mut has_webrtc = false;
3390
3391        for handle in self.transports.values() {
3392            if !handle.is_operational() {
3393                continue;
3394            }
3395
3396            match handle.transport_type().name {
3397                "udp" => {
3398                    let Some(cfg) = self.lookup_udp_config(handle.name()) else {
3399                        continue;
3400                    };
3401                    if !cfg.advertise_on_nostr() {
3402                        continue;
3403                    }
3404                    if cfg.is_public() {
3405                        // Precedence:
3406                        // 1. operator-supplied `external_addr` (skips STUN)
3407                        // 2. non-wildcard *public* `local_addr` (operator
3408                        //    bound to a specific public IP directly)
3409                        // 3. STUN auto-discovery against ephemeral socket
3410                        //    (also taken when bind is wildcard *or* private —
3411                        //    a private bind is not peer-reachable, so we
3412                        //    must publish the public reflexive instead)
3413                        // 4. loud warn + omit endpoint
3414                        if let Some(explicit) = cfg.external_advert_addr() {
3415                            endpoints.push(OverlayEndpointAdvert {
3416                                transport: OverlayTransportKind::Udp,
3417                                addr: explicit.to_string(),
3418                            });
3419                        } else {
3420                            match handle.local_addr() {
3421                                Some(addr)
3422                                    if !addr.ip().is_unspecified()
3423                                        && !is_unroutable_advert_ip(addr.ip()) =>
3424                                {
3425                                    endpoints.push(OverlayEndpointAdvert {
3426                                        transport: OverlayTransportKind::Udp,
3427                                        addr: addr.to_string(),
3428                                    });
3429                                }
3430                                Some(addr) => {
3431                                    let key = handle.transport_id().as_u32();
3432                                    let port = addr.port();
3433                                    if let Some(public) =
3434                                        bootstrap.learn_public_udp_addr(key, port).await
3435                                    {
3436                                        endpoints.push(OverlayEndpointAdvert {
3437                                            transport: OverlayTransportKind::Udp,
3438                                            addr: public.to_string(),
3439                                        });
3440                                    } else {
3441                                        warn!(
3442                                            transport_id = key,
3443                                            bind_addr = %addr,
3444                                            "advert: udp public=true but bind is wildcard \
3445                                            or private and STUN observation failed; \
3446                                            advertising no UDP endpoint. Either set \
3447                                            transports.udp.external_addr, bind to a \
3448                                            specific *public* IP, or ensure \
3449                                            node.discovery.nostr.stun_servers is reachable"
3450                                        );
3451                                    }
3452                                }
3453                                None => {}
3454                            }
3455                        }
3456                    } else {
3457                        endpoints.push(OverlayEndpointAdvert {
3458                            transport: OverlayTransportKind::Udp,
3459                            addr: "nat".to_string(),
3460                        });
3461                        has_udp_nat = true;
3462                    }
3463                }
3464                "webrtc" => {
3465                    let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
3466                        continue;
3467                    };
3468                    if !cfg.advertise_on_nostr() {
3469                        continue;
3470                    }
3471                    endpoints.push(OverlayEndpointAdvert {
3472                        transport: OverlayTransportKind::WebRtc,
3473                        addr: hex::encode(self.identity.pubkey_full().serialize()),
3474                    });
3475                    has_webrtc = true;
3476                }
3477                "tcp" => {
3478                    let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
3479                        continue;
3480                    };
3481                    if !cfg.advertise_on_nostr() {
3482                        continue;
3483                    }
3484                    // Precedence:
3485                    // 1. operator-supplied `external_addr` (only path that
3486                    //    works on cloud-NAT setups where the public IP is
3487                    //    not on a host interface).
3488                    // 2. non-wildcard *public* `local_addr` (operator bound
3489                    //    to a specific public IP directly).
3490                    // 3. loud warn + omit endpoint (no TCP STUN equivalent).
3491                    //
3492                    // A wildcard *or* private bind is never advertised as-is
3493                    // — peers off-LAN can't reach a private bind, and there
3494                    // is no TCP STUN to discover a public reflexive.
3495                    if let Some(explicit) = cfg.external_advert_addr() {
3496                        endpoints.push(OverlayEndpointAdvert {
3497                            transport: OverlayTransportKind::Tcp,
3498                            addr: explicit.to_string(),
3499                        });
3500                    } else {
3501                        match handle.local_addr() {
3502                            Some(addr)
3503                                if !addr.ip().is_unspecified()
3504                                    && !is_unroutable_advert_ip(addr.ip()) =>
3505                            {
3506                                endpoints.push(OverlayEndpointAdvert {
3507                                    transport: OverlayTransportKind::Tcp,
3508                                    addr: addr.to_string(),
3509                                });
3510                            }
3511                            Some(addr) => {
3512                                warn!(
3513                                    bind_addr = %addr,
3514                                    "advert: tcp advertise_on_nostr=true bound to wildcard \
3515                                    or private IP and no transports.tcp.external_addr set; \
3516                                    advertising no TCP endpoint. Either set external_addr \
3517                                    to the public IP (recommended for cloud 1:1-NAT setups) \
3518                                    or bind explicitly to the public IP"
3519                                );
3520                            }
3521                            None => {}
3522                        }
3523                    }
3524                }
3525                "tor" => {
3526                    let Some(cfg) = self.lookup_tor_config(handle.name()) else {
3527                        continue;
3528                    };
3529                    if !cfg.advertise_on_nostr() {
3530                        continue;
3531                    }
3532                    if let Some(addr) = handle.onion_address() {
3533                        endpoints.push(OverlayEndpointAdvert {
3534                            transport: OverlayTransportKind::Tor,
3535                            addr: format!("{}:{}", addr, cfg.advertised_port()),
3536                        });
3537                    }
3538                }
3539                _ => {}
3540            }
3541        }
3542
3543        if endpoints.is_empty() {
3544            return None;
3545        }
3546
3547        Some(OverlayAdvert {
3548            identifier: ADVERT_IDENTIFIER.to_string(),
3549            version: ADVERT_VERSION,
3550            endpoints,
3551            signal_relays: (has_udp_nat || has_webrtc)
3552                .then(|| self.config.node.discovery.nostr.dm_relays.clone()),
3553            stun_servers: (has_udp_nat || has_webrtc)
3554                .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
3555        })
3556    }
3557
3558    async fn refresh_overlay_advert(
3559        &self,
3560        bootstrap: &std::sync::Arc<NostrDiscovery>,
3561    ) -> Result<(), crate::discovery::nostr::BootstrapError> {
3562        let advert = self.build_overlay_advert(bootstrap).await;
3563        bootstrap.update_local_advert(advert).await
3564    }
3565
3566    fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
3567        match (&self.config.transports.udp, transport_name) {
3568            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3569            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3570            _ => None,
3571        }
3572    }
3573
3574    fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
3575        match (&self.config.transports.tcp, transport_name) {
3576            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3577            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3578            _ => None,
3579        }
3580    }
3581
3582    fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
3583        match (&self.config.transports.tor, transport_name) {
3584            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3585            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3586            _ => None,
3587        }
3588    }
3589
3590    fn lookup_webrtc_config(
3591        &self,
3592        transport_name: Option<&str>,
3593    ) -> Option<&crate::config::WebRtcConfig> {
3594        match (&self.config.transports.webrtc, transport_name) {
3595            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3596            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3597            _ => None,
3598        }
3599    }
3600
3601    pub(in crate::node) async fn try_peer_addresses(
3602        &mut self,
3603        peer_config: &PeerConfig,
3604        peer_identity: PeerIdentity,
3605        allow_bootstrap_nat: bool,
3606    ) -> Result<(), NodeError> {
3607        let peer_node_addr = *peer_identity.node_addr();
3608        if self.peers.contains_key(&peer_node_addr) {
3609            debug!(
3610                npub = %peer_config.npub,
3611                "Peer already exists, skipping address attempts"
3612            );
3613            return Ok(());
3614        }
3615
3616        let candidates = self.peer_address_candidates(peer_config).await;
3617
3618        if candidates.is_empty() {
3619            if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3620                return Ok(());
3621            }
3622            return Err(NodeError::NoTransportForType(format!(
3623                "no addresses known for {}",
3624                peer_config.npub
3625            )));
3626        }
3627
3628        if self
3629            .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
3630            .await
3631            .is_ok()
3632        {
3633            if allow_bootstrap_nat {
3634                self.request_nostr_bootstrap(peer_config).await;
3635            }
3636            return Ok(());
3637        }
3638
3639        if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3640            return Ok(());
3641        }
3642
3643        Err(NodeError::NoTransportForType(format!(
3644            "no operational transport for any of {}'s addresses",
3645            peer_config.npub
3646        )))
3647    }
3648
3649    async fn try_active_peer_alternative_addresses(
3650        &mut self,
3651        peer_config: &PeerConfig,
3652        peer_identity: PeerIdentity,
3653        allow_same_path_refresh: bool,
3654    ) -> Result<bool, NodeError> {
3655        let peer_node_addr = *peer_identity.node_addr();
3656        let mut candidates = self.peer_address_candidates(peer_config).await;
3657        let same_path_refresh_needed = allow_same_path_refresh
3658            && (self.active_peer_needs_same_path_refresh(&peer_node_addr)
3659                || self
3660                    .peers
3661                    .get(&peer_node_addr)
3662                    .is_some_and(|peer| !peer.can_send()));
3663        if same_path_refresh_needed
3664            && let Some(candidate) = self.active_peer_current_udp_candidate(&peer_node_addr)
3665            && !candidates.iter().any(|existing| {
3666                existing.transport == candidate.transport && existing.addr == candidate.addr
3667            })
3668        {
3669            candidates.push(candidate);
3670            Self::sort_peer_address_candidates(&mut candidates);
3671        }
3672        let should_try_nostr =
3673            self.active_peer_should_keep_direct_retry(&peer_node_addr, peer_config);
3674
3675        if candidates.is_empty() {
3676            if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3677                return Ok(true);
3678            }
3679            return Err(NodeError::NoTransportForType(format!(
3680                "no addresses known for {}",
3681                peer_config.npub
3682            )));
3683        }
3684
3685        let alternatives: Vec<_> = candidates
3686            .into_iter()
3687            .filter(|addr| {
3688                same_path_refresh_needed
3689                    || !self.active_peer_matches_candidate(&peer_node_addr, addr)
3690            })
3691            .collect();
3692
3693        if alternatives.is_empty() {
3694            if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3695                return Ok(true);
3696            }
3697            return Ok(false);
3698        }
3699
3700        let needs_separate_nostr_attempt = should_try_nostr
3701            && !alternatives
3702                .iter()
3703                .any(|addr| addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat"));
3704        let address_result = self
3705            .attempt_peer_address_list(peer_config, peer_identity, true, &alternatives)
3706            .await;
3707        let nostr_attempted =
3708            needs_separate_nostr_attempt && self.request_nostr_bootstrap(peer_config).await;
3709
3710        match address_result {
3711            Ok(()) => Ok(true),
3712            Err(err) if nostr_attempted => {
3713                debug!(
3714                    npub = %peer_config.npub,
3715                    error = %err,
3716                    "Static active-peer direct-path alternatives failed; Nostr traversal still queued"
3717                );
3718                Ok(true)
3719            }
3720            Err(err) => Err(err),
3721        }
3722    }
3723
3724    async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
3725        // Merge every candidate from every source we have for this peer.
3726        // Explicitly configured addresses keep first shot, then freshly
3727        // fetched overlay adverts are appended as fallback candidates. This
3728        // lets native peers try known LAN/nvpn/static UDP routes before
3729        // slower WebRTC/Nostr-discovered paths, while still racing every
3730        // concrete candidate that fits in the per-peer budget.
3731        let static_addresses = self.static_peer_addresses(peer_config);
3732        let overlay_addresses = self
3733            .nostr_peer_fallback_addresses(peer_config, &static_addresses)
3734            .await;
3735
3736        let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
3737        for addr in overlay_addresses.into_iter().chain(static_addresses) {
3738            if !candidates.iter().any(|existing: &PeerAddress| {
3739                existing.transport == addr.transport && existing.addr == addr.addr
3740            }) {
3741                candidates.push(addr);
3742            }
3743        }
3744
3745        Self::sort_peer_address_candidates(&mut candidates);
3746
3747        candidates
3748    }
3749
3750    fn sort_peer_address_candidates(candidates: &mut [PeerAddress]) {
3751        // Stable sort: explicit priority is the contract, and freshness only
3752        // breaks ties inside one priority tier. Overlay-discovered endpoints
3753        // are assigned lower priority than configured/static hints when both
3754        // exist, so operator-provided LAN routes keep first shot without
3755        // dropping fresh overlay candidates from the race.
3756        candidates.sort_by(|a, b| {
3757            if a.priority != b.priority {
3758                return a.priority.cmp(&b.priority);
3759            }
3760            match (a.seen_at_ms, b.seen_at_ms) {
3761                (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
3762                (Some(_), None) => std::cmp::Ordering::Less,
3763                (None, Some(_)) => std::cmp::Ordering::Greater,
3764                (None, None) => std::cmp::Ordering::Equal,
3765            }
3766        });
3767    }
3768
3769    fn active_peer_matches_any_candidate(
3770        &self,
3771        peer_node_addr: &NodeAddr,
3772        candidates: &[PeerAddress],
3773    ) -> bool {
3774        candidates
3775            .iter()
3776            .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
3777    }
3778
3779    pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
3780        &self,
3781        peer_node_addr: &NodeAddr,
3782        candidates: &[PeerAddress],
3783    ) -> bool {
3784        if !self
3785            .peers
3786            .get(peer_node_addr)
3787            .is_some_and(|peer| peer.can_send())
3788        {
3789            return false;
3790        }
3791        if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
3792            return false;
3793        }
3794        !self.active_peer_needs_same_path_refresh(peer_node_addr)
3795    }
3796
3797    pub(in crate::node) fn active_peer_should_keep_direct_retry(
3798        &self,
3799        peer_node_addr: &NodeAddr,
3800        peer_config: &PeerConfig,
3801    ) -> bool {
3802        let Some(peer) = self.peers.get(peer_node_addr) else {
3803            return false;
3804        };
3805
3806        let static_addresses = self.static_peer_addresses(peer_config);
3807        if !static_addresses.is_empty() {
3808            return !self
3809                .active_peer_candidate_is_fresh_enough_to_skip(peer_node_addr, &static_addresses);
3810        }
3811
3812        if peer_config.npub.is_empty() {
3813            return false;
3814        }
3815
3816        if !self.config.node.discovery.nostr.enabled
3817            || self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Disabled
3818        {
3819            return false;
3820        }
3821
3822        let Some(transport_id) = peer.transport_id() else {
3823            return true;
3824        };
3825
3826        if self.bootstrap_transports.contains(&transport_id) {
3827            return self.active_peer_needs_same_path_refresh(peer_node_addr);
3828        }
3829
3830        let Some(transport) = self.transports.get(&transport_id) else {
3831            return true;
3832        };
3833
3834        if transport.transport_type().name != "udp" {
3835            return true;
3836        }
3837
3838        self.active_peer_needs_same_path_refresh(peer_node_addr)
3839    }
3840
3841    pub(in crate::node) fn clear_retry_unless_direct_refresh_needed(
3842        &mut self,
3843        peer_node_addr: &NodeAddr,
3844    ) {
3845        let keep_retry = self
3846            .retry_pending
3847            .get(peer_node_addr)
3848            .map(|state| state.peer_config.clone())
3849            .is_some_and(|peer_config| {
3850                self.active_peer_should_keep_direct_retry(peer_node_addr, &peer_config)
3851            });
3852
3853        if !keep_retry {
3854            self.retry_pending.remove(peer_node_addr);
3855        }
3856    }
3857
3858    fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
3859        let Some(peer) = self.peers.get(peer_node_addr) else {
3860            return false;
3861        };
3862        let stale_after_ms = self
3863            .config
3864            .node
3865            .heartbeat_interval_secs
3866            .saturating_mul(1000)
3867            .max(1000);
3868        peer.idle_time(Self::now_ms()) > stale_after_ms
3869    }
3870
3871    pub(in crate::node) fn active_peer_current_udp_candidate(
3872        &self,
3873        peer_node_addr: &NodeAddr,
3874    ) -> Option<PeerAddress> {
3875        let peer = self.peers.get(peer_node_addr)?;
3876        let current_addr = peer.current_addr()?;
3877        if let Some(transport_id) = peer.transport_id() {
3878            if let Some(transport) = self.transports.get(&transport_id) {
3879                if transport.transport_type().name != "udp" {
3880                    return None;
3881                }
3882            } else if !self
3883                .transports
3884                .values()
3885                .any(|transport| transport.transport_type().name == "udp")
3886            {
3887                return None;
3888            }
3889        } else if !self
3890            .transports
3891            .values()
3892            .any(|transport| transport.transport_type().name == "udp")
3893        {
3894            return None;
3895        }
3896        let socket_addr = current_addr.as_str()?.parse::<SocketAddr>().ok()?;
3897
3898        Some(
3899            PeerAddress::with_priority("udp", socket_addr.to_string(), 240)
3900                .with_seen_at_ms(Self::now_ms()),
3901        )
3902    }
3903
3904    pub(in crate::node) fn active_peer_matches_candidate(
3905        &self,
3906        peer_node_addr: &NodeAddr,
3907        candidate: &PeerAddress,
3908    ) -> bool {
3909        let Some(peer) = self.peers.get(peer_node_addr) else {
3910            return false;
3911        };
3912        let Some(current_addr) = peer.current_addr() else {
3913            return false;
3914        };
3915        if let Some(peer_transport_id) = peer.transport_id()
3916            && let Some((candidate_transport_id, candidate_addr)) =
3917                self.resolve_peer_address_for_match(candidate)
3918        {
3919            return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
3920        }
3921        if peer
3922            .transport_id()
3923            .map(|id| self.bootstrap_transports.contains(&id))
3924            .unwrap_or(false)
3925        {
3926            return false;
3927        }
3928        let current_addr = current_addr.to_string();
3929        let current_transport = peer
3930            .transport_id()
3931            .and_then(|id| self.transports.get(&id))
3932            .map(|transport| transport.transport_type().name);
3933
3934        candidate.addr == current_addr
3935            && current_transport
3936                .map(|transport| transport == candidate.transport)
3937                .unwrap_or(true)
3938    }
3939
3940    pub(in crate::node) fn active_peer_uses_recent_endpoint_path(
3941        &self,
3942        peer_node_addr: &NodeAddr,
3943        peer_config: &PeerConfig,
3944    ) -> bool {
3945        peer_config.addresses.iter().any(|addr| {
3946            addr.seen_at_ms.is_some() && self.active_peer_matches_candidate(peer_node_addr, addr)
3947        })
3948    }
3949
3950    pub(in crate::node) fn active_peer_uses_traversal_path(
3951        &self,
3952        peer_node_addr: &NodeAddr,
3953        peer_config: &PeerConfig,
3954    ) -> bool {
3955        let via_bootstrap_transport = self
3956            .peers
3957            .get(peer_node_addr)
3958            .and_then(|peer| peer.transport_id())
3959            .map(|id| self.bootstrap_transports.contains(&id))
3960            .unwrap_or(false);
3961
3962        via_bootstrap_transport
3963            || self.active_peer_uses_recent_endpoint_path(peer_node_addr, peer_config)
3964    }
3965
3966    // === Control API methods ===
3967
3968    /// Connect to a peer via the control API.
3969    ///
3970    /// Creates an ephemeral peer connection (not persisted to config, no
3971    /// auto-reconnect). Reuses the same connection path as auto-connect
3972    /// peers. Returns JSON data on success or an error message.
3973    pub(crate) async fn api_connect(
3974        &mut self,
3975        npub: &str,
3976        address: &str,
3977        transport: &str,
3978    ) -> Result<serde_json::Value, String> {
3979        let peer_config = PeerConfig {
3980            npub: npub.to_string(),
3981            alias: None,
3982            addresses: vec![PeerAddress::new(transport, address)],
3983            connect_policy: ConnectPolicy::Manual,
3984            auto_reconnect: false,
3985            discovery_fallback_transit: true,
3986        };
3987
3988        // Pre-seed identity cache (same as initiate_peer_connections does)
3989        if let Ok(identity) = PeerIdentity::from_npub(npub) {
3990            self.peer_aliases
3991                .insert(*identity.node_addr(), identity.short_npub());
3992            self.register_identity(*identity.node_addr(), identity.pubkey_full());
3993        }
3994
3995        self.initiate_peer_connection(&peer_config)
3996            .await
3997            .map(|()| {
3998                info!(
3999                    npub = %npub,
4000                    address = %address,
4001                    transport = %transport,
4002                    "API connect initiated"
4003                );
4004                serde_json::json!({
4005                    "npub": npub,
4006                    "address": address,
4007                    "transport": transport,
4008                })
4009            })
4010            .map_err(|e| e.to_string())
4011    }
4012
4013    /// Disconnect a peer via the control API.
4014    ///
4015    /// Removes the peer and suppresses auto-reconnect.
4016    pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
4017        let peer_identity =
4018            PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
4019        let node_addr = *peer_identity.node_addr();
4020
4021        if !self.peers.contains_key(&node_addr) {
4022            return Err(format!("peer not found: {npub}"));
4023        }
4024
4025        // Remove the peer (full cleanup: sessions, indices, links, tree, bloom)
4026        self.remove_active_peer(&node_addr);
4027
4028        // Suppress any pending auto-reconnect
4029        self.retry_pending.remove(&node_addr);
4030
4031        info!(npub = %npub, "API disconnect completed");
4032
4033        Ok(serde_json::json!({
4034            "npub": npub,
4035            "disconnected": true,
4036        }))
4037    }
4038
4039    /// Adopt an already-established UDP traversal and start the normal FIPS
4040    /// Noise handshake over it.
4041    ///
4042    /// This is intended for integration with an external rendezvous runtime
4043    /// that has already completed relay signaling, STUN observation, and UDP
4044    /// hole punching. After handoff, the adopted socket is owned by FIPS.
4045    pub async fn adopt_established_traversal(
4046        &mut self,
4047        traversal: EstablishedTraversal,
4048    ) -> Result<BootstrapHandoffResult, NodeError> {
4049        debug!(
4050            peer_npub = %traversal.peer_npub,
4051            session_id = %traversal.session_id,
4052            remote_addr = %traversal.remote_addr,
4053            "adopting established traversal socket"
4054        );
4055
4056        if !self.state.is_operational() {
4057            return Err(NodeError::NotStarted);
4058        }
4059
4060        let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
4061        let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
4062            NodeError::InvalidPeerNpub {
4063                npub: traversal.peer_npub.clone(),
4064                reason: e.to_string(),
4065            }
4066        })?;
4067        let peer_node_addr = *peer_identity.node_addr();
4068        if self.peers.contains_key(&peer_node_addr) {
4069            debug!(
4070                peer_npub = %traversal.peer_npub,
4071                "Adopting NAT traversal handoff as alternate path for already-connected peer"
4072            );
4073        }
4074
4075        self.peer_aliases
4076            .insert(peer_node_addr, peer_identity.short_npub());
4077        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
4078
4079        let transport_id = self.allocate_transport_id();
4080        // Adopted ephemeral UDP transports inherit MTU + socket-buffer sizing
4081        // (and accept_connections / advertise flags) from the operator's
4082        // configured [transports.udp] when the bootstrap runtime doesn't
4083        // pass an explicit override. Lookup tries `transport_name` first
4084        // (covers the `Named` multi-listener variant) and falls back to the
4085        // unnamed `Single` listener, so single- and named-listener configs
4086        // both inherit cleanly.
4087        //
4088        // Tradeoff: `UdpConfig::default()` sets MTU 1280 (IPv6 minimum), the
4089        // only value guaranteed to survive arbitrary middlebox paths.
4090        // Inheriting a higher operator-chosen MTU means NAT-traversed flows
4091        // initially attempt that MTU and may black-hole on tighter paths
4092        // until reactive `MtuExceeded` recovery kicks in. Operators who
4093        // raise the primary MTU based on known-clean topology accept that
4094        // tradeoff; the silent drop on a too-low default was strictly
4095        // worse for the common case where the primary MTU is reachable.
4096        //
4097        // Bind / external address fields are cleared since the socket is
4098        // already bound.
4099        let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
4100            let mut cfg = self
4101                .lookup_udp_config(traversal.transport_name.as_deref())
4102                .or_else(|| self.lookup_udp_config(None))
4103                .cloned()
4104                .unwrap_or_default();
4105            cfg.bind_addr = None;
4106            cfg.external_addr = None;
4107            cfg
4108        });
4109        let mut transport = crate::transport::udp::UdpTransport::new(
4110            transport_id,
4111            traversal.transport_name.clone(),
4112            inherited_config,
4113            packet_tx,
4114        );
4115
4116        transport
4117            .adopt_socket_async(traversal.socket)
4118            .await
4119            .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
4120
4121        let local_addr = transport.local_addr().ok_or_else(|| {
4122            NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
4123        })?;
4124
4125        self.transports.insert(
4126            transport_id,
4127            crate::transport::TransportHandle::Udp(transport),
4128        );
4129        self.bootstrap_transports.insert(transport_id);
4130        self.bootstrap_transport_npubs
4131            .insert(transport_id, traversal.peer_npub.clone());
4132
4133        let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
4134        if let Err(err) = self
4135            .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
4136            .await
4137        {
4138            self.bootstrap_transports.remove(&transport_id);
4139            self.bootstrap_transport_npubs.remove(&transport_id);
4140            if let Some(mut handle) = self.transports.remove(&transport_id) {
4141                let _ = handle.stop().await;
4142            }
4143            return Err(err);
4144        }
4145
4146        info!(
4147            peer = %self.peer_display_name(&peer_node_addr),
4148            transport_id = %transport_id,
4149            local_addr = %local_addr,
4150            remote_addr = %traversal.remote_addr,
4151            session_id = %traversal.session_id,
4152            "adopted NAT traversal socket; handshake initiated"
4153        );
4154
4155        Ok(BootstrapHandoffResult {
4156            transport_id,
4157            local_addr,
4158            remote_addr: traversal.remote_addr,
4159            peer_node_addr,
4160            session_id: traversal.session_id,
4161        })
4162    }
4163}