Skip to main content

fips_core/node/
lifecycle.rs

1//! Node lifecycle management: start, stop, and peer connection initiation.
2
3use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, NostrDiscoveryPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6    ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, MeshTraversalSignal, NostrDiscovery,
7    OverlayAdvert, OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason, SessionMessageType};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use secp256k1::PublicKey;
18use std::collections::{HashMap, HashSet};
19use std::net::{IpAddr, SocketAddr};
20use std::thread;
21use std::time::Duration;
22use tracing::{debug, info, warn};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25enum MeshSignalSessionAction {
26    Send,
27    Defer,
28    Drop,
29}
30
31#[cfg(debug_assertions)]
32fn node_start_debug_log(message: impl AsRef<str>) {
33    use std::io::Write as _;
34
35    if let Ok(mut file) = std::fs::OpenOptions::new()
36        .create(true)
37        .append(true)
38        .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
39    {
40        let _ = writeln!(
41            file,
42            "{:?} {}",
43            std::time::SystemTime::now(),
44            message.as_ref()
45        );
46    }
47}
48
49#[cfg(not(debug_assertions))]
50fn node_start_debug_log(_message: impl AsRef<str>) {}
51
52/// True if `ip` is not a viable canonical advert endpoint for peers off
53/// the publisher's own LAN. Covers RFC1918, loopback, link-local, IPv4
54/// CGNAT (100.64/10), unspecified, multicast/benchmark, and IPv6
55/// unique-local/loopback/unspecified. We never publish these as the
56/// peer's primary `runtime_endpoint`; an off-LAN consumer can't route
57/// to them, and latching one in onto a slow overlay-relay fallback is
58/// the original bug this guard exists to prevent.
59fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
60    match ip {
61        IpAddr::V4(v4) => {
62            v4.is_private()
63                || v4.is_loopback()
64                || v4.is_link_local()
65                || v4.is_unspecified()
66                || v4.is_multicast()
67                || v4.is_broadcast()
68                || v4.is_documentation()
69                // 100.64.0.0/10 — CGNAT, RFC 6598. Not routable on the
70                // public internet; behaves like an extra NAT layer.
71                || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
72        }
73        IpAddr::V6(v6) => {
74            v6.is_loopback()
75                || v6.is_unspecified()
76                || v6.is_unique_local()
77                || v6.is_multicast()
78                // IPv6 link-local: fe80::/10
79                || (v6.segments()[0] & 0xffc0) == 0xfe80
80        }
81    }
82}
83
84fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
85    matches!(
86        (local, remote),
87        (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
88    )
89}
90
91const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
92const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
93const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
94const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
95
96impl Node {
97    /// Initiate connections to configured static peers.
98    ///
99    /// For each peer configured with AutoConnect policy, creates a link and
100    /// peer entry, then starts the Noise handshake by sending the first message.
101    /// Replace the runtime peer list. Newly added auto-connect peers get
102    /// `initiate_peer_connection` immediately; removed peers are dropped from
103    /// the retry queue (the regular liveness timeout reaps any active session
104    /// — we don't proactively disconnect, since the same npub might be on its
105    /// way back via a fresh advert). Existing auto-connect entries with new
106    /// direct addresses are dialed immediately, and retry state is refreshed
107    /// so later attempts also use the latest hints.
108    pub(super) async fn update_peers(
109        &mut self,
110        new_peers: Vec<crate::config::PeerConfig>,
111    ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
112        use std::collections::{HashMap, HashSet};
113
114        let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
115            HashMap::with_capacity(new_peers.len());
116        let mut new_order = Vec::with_capacity(new_peers.len());
117        for peer in new_peers {
118            let identity = match PeerIdentity::from_npub(&peer.npub) {
119                Ok(id) => id,
120                Err(e) => {
121                    return Err(crate::node::NodeError::InvalidPeerNpub {
122                        npub: peer.npub.clone(),
123                        reason: e.to_string(),
124                    });
125                }
126            };
127            // Last-write-wins on duplicates so callers passing a multi-source
128            // candidate list (e.g. operator hints + recent-peers cache for
129            // the same npub) get the merge they meant.
130            let node_addr = *identity.node_addr();
131            if !new_by_addr.contains_key(&node_addr) {
132                new_order.push(node_addr);
133            }
134            new_by_addr.insert(node_addr, peer);
135        }
136
137        let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
138            .config
139            .peers()
140            .iter()
141            .filter_map(|pc| {
142                PeerIdentity::from_npub(&pc.npub)
143                    .ok()
144                    .map(|id| (*id.node_addr(), pc.clone()))
145            })
146            .collect();
147
148        let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
149        let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
150
151        let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
152        let added: Vec<_> = new_addrs.difference(&current_addrs).copied().collect();
153        let kept: Vec<_> = new_addrs.intersection(&current_addrs).copied().collect();
154
155        let mut outcome = crate::node::UpdatePeersOutcome::default();
156
157        for node_addr in &removed {
158            if self.retry_pending.remove(node_addr).is_some() {
159                debug!(
160                    peer = %self.peer_display_name(node_addr),
161                    "Dropping retry entry for peer removed from runtime peer list"
162                );
163            }
164            self.peer_aliases.remove(node_addr);
165            self.set_discovery_fallback_transit_allowed(*node_addr, false);
166            outcome.removed += 1;
167        }
168
169        let mut auto_connect_refresh_configs = Vec::new();
170        for node_addr in &kept {
171            let new_pc = &new_by_addr[node_addr];
172            let current_pc = &current_by_addr[node_addr];
173            if new_pc.addresses != current_pc.addresses
174                || new_pc.alias != current_pc.alias
175                || new_pc.connect_policy != current_pc.connect_policy
176                || new_pc.auto_reconnect != current_pc.auto_reconnect
177                || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
178            {
179                outcome.updated += 1;
180                self.set_discovery_fallback_transit_allowed(
181                    *node_addr,
182                    new_pc.discovery_fallback_transit,
183                );
184                if let Some(state) = self.retry_pending.get_mut(node_addr) {
185                    state.peer_config = new_pc.clone();
186                    state.retry_after_ms = Self::now_ms();
187                }
188                if let Some(alias) = new_pc.alias.clone() {
189                    self.peer_aliases.insert(*node_addr, alias);
190                }
191                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
192                    auto_connect_refresh_configs.push(new_pc.clone());
193                }
194            } else {
195                outcome.unchanged += 1;
196                self.set_discovery_fallback_transit_allowed(
197                    *node_addr,
198                    new_pc.discovery_fallback_transit,
199                );
200                if let Some(state) = self.retry_pending.get_mut(node_addr) {
201                    state.peer_config = new_pc.clone();
202                }
203                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
204                    auto_connect_refresh_configs.push(new_pc.clone());
205                }
206            }
207        }
208
209        let added_configs: Vec<crate::config::PeerConfig> = new_order
210            .iter()
211            .filter(|addr| added.contains(addr))
212            .map(|addr| new_by_addr[addr].clone())
213            .collect();
214
215        // Replace the live config peer list before initiating connections so
216        // any helper that consults `self.config.peers()` during the dial
217        // (alias lookup, retry-state seeding) sees the new entries.
218        self.config.peers = new_order
219            .iter()
220            .filter_map(|addr| new_by_addr.get(addr).cloned())
221            .collect();
222        self.configured_peer_send_weights = Self::configured_peer_send_weights(&self.config);
223
224        for peer_config in added_configs {
225            outcome.added += 1;
226            let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
227                continue;
228            };
229            let name = peer_config
230                .alias
231                .clone()
232                .unwrap_or_else(|| identity.short_npub());
233            self.peer_aliases.insert(*identity.node_addr(), name);
234            self.set_discovery_fallback_transit_allowed(
235                *identity.node_addr(),
236                peer_config.discovery_fallback_transit,
237            );
238            self.register_identity(*identity.node_addr(), identity.pubkey_full());
239
240            if !peer_config.is_auto_connect() {
241                continue;
242            }
243
244            match self
245                .try_auto_connect_graph_session(&peer_config, identity)
246                .await
247            {
248                Ok(true) => continue,
249                Ok(false) => {}
250                Err(err) => {
251                    debug!(
252                        npub = %peer_config.npub,
253                        error = %err,
254                        "Existing FIPS graph did not warm newly added peer"
255                    );
256                }
257            }
258
259            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
260                warn!(
261                    npub = %peer_config.npub,
262                    error = %e,
263                    "Failed to initiate connection for newly added peer"
264                );
265                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
266                    self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
267                }
268                if matches!(e, crate::node::NodeError::NoTransportForType(_))
269                    && let Some(bootstrap) = self.nostr_discovery.clone()
270                {
271                    bootstrap
272                        .request_advert_stale_check(peer_config.npub.clone())
273                        .await;
274                }
275            }
276        }
277
278        for peer_config in auto_connect_refresh_configs {
279            let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
280                continue;
281            };
282            let node_addr = *peer_identity.node_addr();
283
284            if self.peers.contains_key(&node_addr) {
285                match self
286                    .initiate_active_peer_alternative_connection(&peer_config)
287                    .await
288                {
289                    Ok(attempted) => {
290                        if attempted {
291                            debug!(
292                                peer = %self.peer_display_name(&node_addr),
293                                "Started non-disruptive alternate-path handshake for active peer"
294                            );
295                        }
296                    }
297                    Err(e) => {
298                        debug!(
299                            npub = %peer_config.npub,
300                            error = %e,
301                            "Active peer alternate-path refresh did not start"
302                        );
303                    }
304                }
305                continue;
306            }
307
308            match self
309                .try_auto_connect_graph_session(&peer_config, peer_identity)
310                .await
311            {
312                Ok(true) => continue,
313                Ok(false) => {}
314                Err(err) => {
315                    debug!(
316                        npub = %peer_config.npub,
317                        error = %err,
318                        "Existing FIPS graph did not warm refreshed peer"
319                    );
320                }
321            }
322
323            match self.initiate_peer_connection(&peer_config).await {
324                Ok(()) => {
325                    let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
326                    if let Some(state) = self.retry_pending.get_mut(&node_addr) {
327                        state.peer_config = peer_config;
328                        state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
329                    }
330                }
331                Err(e) => {
332                    debug!(
333                        npub = %peer_config.npub,
334                        error = %e,
335                        "Refreshed peer addresses did not initiate a direct connection"
336                    );
337                    self.schedule_retry_after_error(node_addr, Self::now_ms(), &e);
338                }
339            }
340        }
341
342        self.warm_auto_connect_graph_sessions().await;
343
344        Ok(outcome)
345    }
346
347    pub(super) async fn initiate_peer_connections(&mut self) {
348        // Build display name map from all configured peers (alias or short npub),
349        // and pre-seed the identity cache from each peer's npub so that TUN packets
350        // addressed to a configured peer can be dispatched (and trigger session
351        // initiation) immediately on startup — without waiting for the link-layer
352        // handshake to complete first.
353        let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
354            .config
355            .peers()
356            .iter()
357            .filter_map(|pc| {
358                PeerIdentity::from_npub(&pc.npub)
359                    .ok()
360                    .map(|id| (id, pc.alias.clone()))
361            })
362            .collect();
363
364        for (identity, alias) in peer_identities {
365            let name = alias.unwrap_or_else(|| identity.short_npub());
366            self.peer_aliases.insert(*identity.node_addr(), name);
367            // Pre-seed identity cache. The parity may be wrong (npub is x-only)
368            // but will be corrected to the real value when the peer is promoted
369            // after a successful Noise handshake.
370            self.register_identity(*identity.node_addr(), identity.pubkey_full());
371        }
372
373        // Collect peer configs to avoid borrow conflicts
374        let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
375
376        if peer_configs.is_empty() {
377            debug!("No static peers configured");
378            return;
379        }
380
381        debug!(
382            count = peer_configs.len(),
383            "Initiating static peer connections"
384        );
385
386        for peer_config in peer_configs {
387            let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
388                Ok(identity) => identity,
389                Err(_) => continue,
390            };
391            match self
392                .try_auto_connect_graph_session(&peer_config, peer_identity)
393                .await
394            {
395                Ok(true) => continue,
396                Ok(false) => {}
397                Err(err) => {
398                    debug!(
399                        npub = %peer_config.npub,
400                        error = %err,
401                        "Existing FIPS graph did not warm auto-connect peer"
402                    );
403                }
404            }
405            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
406                warn!(
407                    npub = %peer_config.npub,
408                    alias = ?peer_config.alias,
409                    error = %e,
410                    "Failed to initiate peer connection"
411                );
412                // Schedule a retry so transient address-resolution failures
413                // (e.g. cached endpoints stale, NAT rebinds, all addresses
414                // currently unreachable) recover without a daemon restart.
415                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
416                    self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
417                }
418                // No-transport failures most often mean the cached overlay
419                // advert is pointing at a dead post-NAT-rebind address. The
420                // advert cache is read-only inside fetch_advert, so retries
421                // would loop on the same dead address until expiry. Force a
422                // re-fetch so the next retry tick picks up fresh endpoints.
423                if matches!(e, crate::node::NodeError::NoTransportForType(_))
424                    && let Some(bootstrap) = self.nostr_discovery.clone()
425                {
426                    bootstrap
427                        .request_advert_stale_check(peer_config.npub.clone())
428                        .await;
429                }
430            }
431        }
432
433        self.warm_auto_connect_graph_sessions().await;
434    }
435
436    pub(in crate::node) async fn try_auto_connect_graph_session(
437        &mut self,
438        peer_config: &PeerConfig,
439        peer_identity: PeerIdentity,
440    ) -> Result<bool, NodeError> {
441        if !peer_config.is_auto_connect() {
442            return Ok(false);
443        }
444
445        let peer_node_addr = *peer_identity.node_addr();
446        if self
447            .peers
448            .get(&peer_node_addr)
449            .is_some_and(|peer| peer.can_send())
450        {
451            return Ok(false);
452        }
453        if self.auto_connect_should_race_direct_path(peer_config) {
454            return Ok(false);
455        }
456        if self
457            .sessions
458            .get(&peer_node_addr)
459            .is_some_and(|entry| entry.is_established() || entry.is_initiating())
460        {
461            return Ok(true);
462        }
463        if self.find_next_hop(&peer_node_addr).is_none() {
464            return Ok(false);
465        }
466
467        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
468        match self
469            .initiate_session(peer_node_addr, peer_identity.pubkey_full())
470            .await
471        {
472            Ok(()) => {
473                debug!(
474                    peer = %self.peer_display_name(&peer_node_addr),
475                    "Warmed auto-connect peer session over existing FIPS graph"
476                );
477                Ok(true)
478            }
479            Err(NodeError::SendFailed { node_addr, reason })
480                if node_addr == peer_node_addr && reason == "no route to destination" =>
481            {
482                self.maybe_initiate_lookup(&peer_node_addr).await;
483                Ok(false)
484            }
485            Err(err) => Err(err),
486        }
487    }
488
489    fn auto_connect_should_race_direct_path(&self, peer_config: &PeerConfig) -> bool {
490        !peer_config.addresses.is_empty() || self.config.node.discovery.nostr.enabled
491    }
492
493    /// Initiate a connection to a single peer.
494    ///
495    /// Creates a link, starts the Noise handshake, and sends the first message.
496    pub(super) async fn initiate_peer_connection(
497        &mut self,
498        peer_config: &crate::config::PeerConfig,
499    ) -> Result<(), NodeError> {
500        self.initiate_peer_connection_inner(peer_config).await
501    }
502
503    /// Initiate a connection from the retry path. Identical to
504    /// [`initiate_peer_connection`] today — both paths fan out across every
505    /// known address (explicit priority first, then freshness) in a single
506    /// pass. The two entry points stay separate so callers can be distinguished
507    /// in tracing.
508    pub(super) async fn initiate_peer_retry_connection(
509        &mut self,
510        peer_config: &crate::config::PeerConfig,
511    ) -> Result<(), NodeError> {
512        self.initiate_peer_connection_inner(peer_config).await
513    }
514
515    pub(in crate::node) async fn initiate_active_peer_alternative_connection(
516        &mut self,
517        peer_config: &crate::config::PeerConfig,
518    ) -> Result<bool, NodeError> {
519        self.initiate_active_peer_alternative_connection_inner(peer_config, false)
520            .await
521    }
522
523    pub(in crate::node) async fn initiate_active_peer_direct_refresh_connection(
524        &mut self,
525        peer_config: &crate::config::PeerConfig,
526    ) -> Result<bool, NodeError> {
527        self.initiate_active_peer_alternative_connection_inner(peer_config, true)
528            .await
529    }
530
531    async fn initiate_active_peer_alternative_connection_inner(
532        &mut self,
533        peer_config: &crate::config::PeerConfig,
534        allow_same_path_refresh: bool,
535    ) -> Result<bool, NodeError> {
536        let peer_identity =
537            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
538                npub: peer_config.npub.clone(),
539                reason: e.to_string(),
540            })?;
541        let peer_node_addr = *peer_identity.node_addr();
542
543        if !self.peers.contains_key(&peer_node_addr) {
544            self.initiate_peer_connection(peer_config).await?;
545            return Ok(true);
546        }
547
548        // Keep the current link live and race fresh concrete candidates.
549        // Cross-connection resolution still decides which replacement link
550        // wins if both peers try the same upgrade; the important part here is
551        // that a stale path does not depend on the remote peer receiving our
552        // hint first before either side attempts the better address.
553        self.try_active_peer_alternative_addresses(
554            peer_config,
555            peer_identity,
556            allow_same_path_refresh,
557        )
558        .await
559    }
560
561    async fn initiate_peer_connection_inner(
562        &mut self,
563        peer_config: &crate::config::PeerConfig,
564    ) -> Result<(), NodeError> {
565        // Parse the peer's npub to get their identity
566        let peer_identity =
567            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
568                npub: peer_config.npub.clone(),
569                reason: e.to_string(),
570            })?;
571
572        let peer_node_addr = *peer_identity.node_addr();
573
574        // Check if peer already exists (fully authenticated)
575        if self.peers.contains_key(&peer_node_addr) {
576            debug!(
577                npub = %peer_config.npub,
578                "Peer already exists, skipping"
579            );
580            return Ok(());
581        }
582
583        self.try_peer_addresses(peer_config, peer_identity, true)
584            .await
585    }
586
587    fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
588        self.connections.values().any(|conn| {
589            conn.expected_identity()
590                .map(|id| id.node_addr() == peer_node_addr)
591                .unwrap_or(false)
592        })
593    }
594
595    fn is_connecting_to_peer_on_path(
596        &self,
597        peer_node_addr: &NodeAddr,
598        transport_id: TransportId,
599        remote_addr: &TransportAddr,
600    ) -> bool {
601        self.connections.values().any(|conn| {
602            conn.expected_identity()
603                .map(|id| id.node_addr() == peer_node_addr)
604                .unwrap_or(false)
605                && conn.transport_id() == Some(transport_id)
606                && conn.source_addr() == Some(remote_addr)
607        }) || self.pending_connects.iter().any(|pending| {
608            pending.peer_identity.node_addr() == peer_node_addr
609                && pending.transport_id == transport_id
610                && &pending.remote_addr == remote_addr
611        })
612    }
613
614    pub(in crate::node) fn should_warm_auto_connect_session(
615        &self,
616        peer_node_addr: &NodeAddr,
617    ) -> bool {
618        if self
619            .peers
620            .get(peer_node_addr)
621            .is_some_and(|peer| peer.can_send())
622            || self
623                .sessions
624                .get(peer_node_addr)
625                .is_some_and(|entry| entry.is_established())
626        {
627            return false;
628        }
629
630        self.config.peers().iter().any(|peer| {
631            peer.is_auto_connect()
632                && PeerIdentity::from_npub(&peer.npub)
633                    .map(|identity| identity.node_addr() == peer_node_addr)
634                    .unwrap_or(false)
635        })
636    }
637
638    pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
639        if !self.peers.values().any(|peer| peer.can_send()) {
640            return 0;
641        }
642
643        let mut budget = self.graph_session_warmup_budget();
644        if budget == 0 {
645            return 0;
646        }
647
648        let peer_identities: Vec<_> = self
649            .config
650            .auto_connect_peers()
651            .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
652            .collect();
653
654        let mut warmed = 0;
655        for identity in peer_identities {
656            if budget == 0 {
657                break;
658            }
659
660            let peer_node_addr = *identity.node_addr();
661            if peer_node_addr == *self.identity.node_addr()
662                || !self.should_warm_auto_connect_session(&peer_node_addr)
663                || self
664                    .sessions
665                    .get(&peer_node_addr)
666                    .is_some_and(|entry| entry.is_initiating())
667            {
668                continue;
669            }
670
671            self.register_identity(peer_node_addr, identity.pubkey_full());
672
673            if self.find_next_hop(&peer_node_addr).is_some() {
674                match self
675                    .initiate_session(peer_node_addr, identity.pubkey_full())
676                    .await
677                {
678                    Ok(()) => {
679                        warmed += 1;
680                        budget = budget.saturating_sub(1);
681                        debug!(
682                            peer = %self.peer_display_name(&peer_node_addr),
683                            "Warmed auto-connect peer session over existing FIPS graph"
684                        );
685                    }
686                    Err(NodeError::SendFailed { node_addr, reason })
687                        if node_addr == peer_node_addr && reason == "no route to destination" =>
688                    {
689                        self.maybe_initiate_lookup(&peer_node_addr).await;
690                        warmed += 1;
691                        budget = budget.saturating_sub(1);
692                    }
693                    Err(err) => {
694                        debug!(
695                            peer = %self.peer_display_name(&peer_node_addr),
696                            error = %err,
697                            "Failed to warm auto-connect peer session"
698                        );
699                    }
700                }
701            } else {
702                self.maybe_initiate_lookup(&peer_node_addr).await;
703                warmed += 1;
704                budget = budget.saturating_sub(1);
705            }
706        }
707
708        warmed
709    }
710
711    pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
712        let max_destinations = self.config.node.session.pending_max_destinations;
713        if max_destinations == 0 {
714            return 0;
715        }
716
717        let pending_sessions = self
718            .sessions
719            .values()
720            .filter(|entry| !entry.is_established())
721            .count();
722        let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
723        max_destinations
724            .saturating_sub(pending_total)
725            .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
726    }
727
728    fn outbound_handshake_slots(&self) -> usize {
729        let used = self
730            .connections
731            .len()
732            .saturating_add(self.pending_connects.len());
733        if self.max_connections == 0 {
734            usize::MAX
735        } else {
736            self.max_connections.saturating_sub(used)
737        }
738    }
739
740    fn outbound_link_slots(&self) -> usize {
741        if self.max_links == 0 {
742            usize::MAX
743        } else {
744            self.max_links.saturating_sub(self.links.len())
745        }
746    }
747
748    fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
749        if !self.peers.contains_key(peer_node_addr)
750            && self.max_peers > 0
751            && self.peers.len() >= self.max_peers
752        {
753            return 0;
754        }
755
756        let in_flight_for_peer = self
757            .connections
758            .values()
759            .filter(|conn| {
760                conn.expected_identity()
761                    .map(|id| id.node_addr() == peer_node_addr)
762                    .unwrap_or(false)
763            })
764            .count()
765            .saturating_add(
766                self.pending_connects
767                    .iter()
768                    .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
769                    .count(),
770            );
771
772        self.outbound_handshake_slots()
773            .min(self.outbound_link_slots())
774            .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
775    }
776
777    fn reclaim_lower_priority_inflight_candidate_for_peer(
778        &mut self,
779        peer_node_addr: &NodeAddr,
780        candidate: &PeerAddress,
781    ) -> bool {
782        const UNKNOWN_PATH_PRIORITY: u16 = u8::MAX as u16 + 1;
783
784        let Some((candidate_transport_id, candidate_addr)) =
785            self.resolve_peer_address_for_match(candidate)
786        else {
787            return false;
788        };
789        let Some(candidate_priority) =
790            self.configured_path_priority(peer_node_addr, candidate_transport_id, &candidate_addr)
791        else {
792            return false;
793        };
794        let candidate_priority = u16::from(candidate_priority);
795
796        let victim = self
797            .connections
798            .iter()
799            .filter_map(|(link_id, conn)| {
800                let identity = conn.expected_identity()?;
801                if identity.node_addr() != peer_node_addr {
802                    return None;
803                }
804                let transport_id = conn.transport_id()?;
805                let remote_addr = conn.source_addr()?;
806                if transport_id == candidate_transport_id && remote_addr == &candidate_addr {
807                    return None;
808                }
809                let priority = self
810                    .configured_path_priority(peer_node_addr, transport_id, remote_addr)
811                    .map(u16::from)
812                    .unwrap_or(UNKNOWN_PATH_PRIORITY);
813                (priority > candidate_priority).then_some((
814                    *link_id,
815                    priority,
816                    conn.started_at(),
817                    transport_id,
818                    remote_addr.clone(),
819                ))
820            })
821            .max_by_key(|(_, priority, started_at, _, _)| {
822                (*priority, std::cmp::Reverse(*started_at))
823            });
824
825        let Some((link_id, victim_priority, _, victim_transport_id, victim_addr)) = victim else {
826            return false;
827        };
828
829        let Some(conn) = self.connections.remove(&link_id) else {
830            return false;
831        };
832        if let Some(idx) = conn.our_index()
833            && let Some(transport_id) = conn.transport_id()
834        {
835            self.pending_outbound.remove(&(transport_id, idx.as_u32()));
836            let _ = self.index_allocator.free(idx);
837        }
838        self.remove_link(&link_id);
839        self.cleanup_bootstrap_transport_if_unused(victim_transport_id);
840
841        debug!(
842            peer = %self.peer_display_name(peer_node_addr),
843            candidate_transport_id = %candidate_transport_id,
844            candidate_addr = %candidate_addr,
845            candidate_priority,
846            victim_link_id = %link_id,
847            victim_transport_id = %victim_transport_id,
848            victim_addr = %victim_addr,
849            victim_priority,
850            "Reclaimed lower-priority in-flight candidate slot for configured direct path"
851        );
852
853        true
854    }
855
856    fn discovery_connect_budget(&self) -> usize {
857        self.outbound_handshake_slots()
858            .min(self.outbound_link_slots())
859            .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
860    }
861
862    /// Find a UDP transport whose bound socket can send to `remote_addr`.
863    ///
864    /// LAN discovery can surface both IPv4 and IPv6 addresses for the same
865    /// service. A wildcard IPv4 socket cannot send to an IPv6 link-local
866    /// target, and vice versa, so callers must choose by socket family rather
867    /// than by transport type alone.
868    fn find_udp_transport_for_remote_addr(
869        &self,
870        remote_addr: SocketAddr,
871    ) -> Option<(TransportId, SocketAddr)> {
872        self.transports
873            .iter()
874            .filter(|(id, handle)| {
875                handle.transport_type().name == "udp"
876                    && handle.is_operational()
877                    && !self.bootstrap_transports.contains(id)
878            })
879            .filter_map(|(id, handle)| {
880                let local_addr = handle.local_addr()?;
881                socket_addr_families_compatible(local_addr, remote_addr)
882                    .then_some((*id, local_addr))
883            })
884            .min_by_key(|(id, _)| id.as_u32())
885    }
886
887    pub(super) fn transport_discovery_candidate(
888        &self,
889        discovered_transport_id: TransportId,
890        discovered_addr: TransportAddr,
891    ) -> Option<(TransportId, TransportAddr, &'static str)> {
892        let transport = self.transports.get(&discovered_transport_id)?;
893        let transport_name = transport.transport_type().name;
894
895        if transport_name != "udp" {
896            return Some((discovered_transport_id, discovered_addr, transport_name));
897        }
898
899        let Some(remote_socket_addr) = discovered_addr
900            .as_str()
901            .and_then(|addr| addr.parse::<SocketAddr>().ok())
902        else {
903            if self.bootstrap_transports.contains(&discovered_transport_id) {
904                debug!(
905                    transport_id = %discovered_transport_id,
906                    remote_addr = %discovered_addr,
907                    "transport discovery: skip non-numeric UDP address from bootstrap transport"
908                );
909                return None;
910            }
911            return Some((discovered_transport_id, discovered_addr, transport_name));
912        };
913
914        let Some((transport_id, local_addr)) =
915            self.find_udp_transport_for_remote_addr(remote_socket_addr)
916        else {
917            debug!(
918                transport_id = %discovered_transport_id,
919                remote_addr = %discovered_addr,
920                "transport discovery: skip UDP peer with no compatible local socket"
921            );
922            return None;
923        };
924
925        if transport_id != discovered_transport_id {
926            debug!(
927                discovered_transport_id = %discovered_transport_id,
928                selected_transport_id = %transport_id,
929                local_addr = %local_addr,
930                remote_addr = %remote_socket_addr,
931                "transport discovery: selected compatible UDP transport"
932            );
933        }
934
935        Some((
936            transport_id,
937            TransportAddr::from_socket_addr(remote_socket_addr),
938            transport_name,
939        ))
940    }
941
942    fn peer_address_string_for_transport_candidate(
943        &self,
944        transport_id: TransportId,
945        transport_name: &str,
946        remote_addr: &TransportAddr,
947    ) -> String {
948        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
949        let _ = (transport_id, transport_name);
950
951        #[cfg(any(target_os = "linux", target_os = "macos"))]
952        if transport_name == "ethernet"
953            && remote_addr.as_bytes().len() == 6
954            && let Some(interface) = self
955                .transports
956                .get(&transport_id)
957                .and_then(|transport| transport.interface_name())
958        {
959            let mut mac = [0u8; 6];
960            mac.copy_from_slice(remote_addr.as_bytes());
961            return format!(
962                "{interface}/{}",
963                crate::transport::ethernet::format_mac(&mac)
964            );
965        }
966
967        remote_addr.to_string()
968    }
969
970    fn resolve_peer_address_for_match(
971        &self,
972        candidate: &PeerAddress,
973    ) -> Option<(TransportId, TransportAddr)> {
974        if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
975            return None;
976        }
977
978        if candidate.transport == "ethernet" {
979            return self.resolve_ethernet_addr(&candidate.addr).ok();
980        }
981
982        if candidate.transport == "ble" {
983            #[cfg(bluer_available)]
984            {
985                return self.resolve_ble_addr(&candidate.addr).ok();
986            }
987            #[cfg(not(bluer_available))]
988            {
989                return None;
990            }
991        }
992
993        let transport_id = if candidate.transport == "udp"
994            && let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
995        {
996            self.find_udp_transport_for_remote_addr(remote_socket_addr)
997                .map(|(id, _)| id)?
998        } else {
999            self.find_transport_for_type(&candidate.transport)?
1000        };
1001
1002        Some((transport_id, TransportAddr::from_string(&candidate.addr)))
1003    }
1004
1005    /// Initiate a connection to a peer on a specific transport and address.
1006    ///
1007    /// For connectionless transports (UDP, Ethernet): allocates a link, starts
1008    /// the Noise IK handshake, sends msg1, and registers the connection for
1009    /// msg2 dispatch.
1010    ///
1011    /// For connection-oriented transports (TCP, Tor): allocates a link and
1012    /// starts a non-blocking transport connect. The handshake is deferred
1013    /// until the transport connection is established — the tick handler
1014    /// polls `connection_state()` and initiates the handshake when ready.
1015    pub(super) async fn initiate_connection(
1016        &mut self,
1017        transport_id: TransportId,
1018        remote_addr: TransportAddr,
1019        peer_identity: PeerIdentity,
1020    ) -> Result<(), NodeError> {
1021        let peer_node_addr = *peer_identity.node_addr();
1022
1023        if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1024            debug!(
1025                peer = %self.peer_display_name(&peer_node_addr),
1026                transport_id = %transport_id,
1027                remote_addr = %remote_addr,
1028                "Connection already in progress for candidate path"
1029            );
1030            return Ok(());
1031        }
1032
1033        if self.outbound_handshake_slots() == 0 {
1034            return Err(NodeError::MaxConnectionsExceeded {
1035                max: self.max_connections,
1036            });
1037        }
1038
1039        if self.outbound_link_slots() == 0 {
1040            return Err(NodeError::MaxLinksExceeded {
1041                max: self.max_links,
1042            });
1043        }
1044
1045        if !self.peers.contains_key(&peer_node_addr)
1046            && self.max_peers > 0
1047            && self.peers.len() >= self.max_peers
1048        {
1049            return Err(NodeError::MaxPeersExceeded {
1050                max: self.max_peers,
1051            });
1052        }
1053
1054        self.authorize_peer(
1055            &peer_identity,
1056            PeerAclContext::OutboundConnect,
1057            transport_id,
1058            &remote_addr,
1059        )?;
1060
1061        let is_connection_oriented = self
1062            .transports
1063            .get(&transport_id)
1064            .map(|t| t.transport_type().connection_oriented)
1065            .unwrap_or(false);
1066
1067        // Allocate link ID and create link
1068        let link_id = self.allocate_link_id();
1069
1070        let link = if is_connection_oriented {
1071            Link::new(
1072                link_id,
1073                transport_id,
1074                remote_addr.clone(),
1075                LinkDirection::Outbound,
1076                Duration::from_millis(self.config.node.base_rtt_ms),
1077            )
1078        } else {
1079            Link::connectionless(
1080                link_id,
1081                transport_id,
1082                remote_addr.clone(),
1083                LinkDirection::Outbound,
1084                Duration::from_millis(self.config.node.base_rtt_ms),
1085            )
1086        };
1087
1088        self.links.insert(link_id, link);
1089
1090        // Add reverse lookup for packet dispatch
1091        self.addr_to_link
1092            .insert((transport_id, remote_addr.clone()), link_id);
1093
1094        if is_connection_oriented {
1095            // Connection-oriented: start non-blocking connect, defer handshake
1096            if let Some(transport) = self.transports.get(&transport_id) {
1097                match transport.connect(&remote_addr).await {
1098                    Ok(()) => {
1099                        debug!(
1100                            peer = %self.peer_display_name(&peer_node_addr),
1101                            transport_id = %transport_id,
1102                            remote_addr = %remote_addr,
1103                            link_id = %link_id,
1104                            "Transport connect initiated (non-blocking)"
1105                        );
1106                        self.pending_connects.push(super::PendingConnect {
1107                            link_id,
1108                            transport_id,
1109                            remote_addr,
1110                            peer_identity,
1111                        });
1112                    }
1113                    Err(e) => {
1114                        // Clean up link
1115                        self.links.remove(&link_id);
1116                        self.addr_to_link.remove(&(transport_id, remote_addr));
1117                        return Err(NodeError::from_transport_error(e));
1118                    }
1119                }
1120            }
1121            Ok(())
1122        } else {
1123            // Connectionless: proceed with immediate handshake
1124            self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
1125                .await
1126        }
1127    }
1128
1129    /// Start the Noise handshake on a link and send msg1.
1130    ///
1131    /// Called immediately for connectionless transports, or after the
1132    /// transport connection is established for connection-oriented transports.
1133    pub(super) async fn start_handshake(
1134        &mut self,
1135        link_id: LinkId,
1136        transport_id: TransportId,
1137        remote_addr: TransportAddr,
1138        peer_identity: PeerIdentity,
1139    ) -> Result<(), NodeError> {
1140        let peer_node_addr = *peer_identity.node_addr();
1141
1142        // Create connection in handshake phase (outbound knows expected identity)
1143        let current_time_ms = Self::now_ms();
1144        let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
1145
1146        // Allocate a session index for this handshake
1147        let our_index = match self.index_allocator.allocate() {
1148            Ok(idx) => idx,
1149            Err(e) => {
1150                // Clean up the link we just created
1151                self.links.remove(&link_id);
1152                self.addr_to_link.remove(&(transport_id, remote_addr));
1153                return Err(NodeError::IndexAllocationFailed(e.to_string()));
1154            }
1155        };
1156
1157        // Start the Noise handshake and get message 1
1158        let our_keypair = self.identity.keypair();
1159        let noise_msg1 =
1160            match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
1161                Ok(msg) => msg,
1162                Err(e) => {
1163                    // Clean up the index and link
1164                    let _ = self.index_allocator.free(our_index);
1165                    self.links.remove(&link_id);
1166                    self.addr_to_link.remove(&(transport_id, remote_addr));
1167                    return Err(NodeError::HandshakeFailed(e.to_string()));
1168                }
1169            };
1170
1171        // Set index and transport info on the connection
1172        connection.set_our_index(our_index);
1173        connection.set_transport_id(transport_id);
1174        connection.set_source_addr(remote_addr.clone());
1175
1176        // Build wire format msg1: [0x01][sender_idx:4 LE][noise_msg1:82]
1177        let wire_msg1 = build_msg1(our_index, &noise_msg1);
1178
1179        debug!(
1180            peer = %self.peer_display_name(&peer_node_addr),
1181            transport_id = %transport_id,
1182            remote_addr = %remote_addr,
1183            link_id = %link_id,
1184            our_index = %our_index,
1185            "Connection initiated"
1186        );
1187
1188        // Store msg1 for resend and schedule first resend
1189        let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
1190        connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
1191
1192        // Track in pending_outbound for msg2 dispatch
1193        self.pending_outbound
1194            .insert((transport_id, our_index.as_u32()), link_id);
1195        self.connections.insert(link_id, connection);
1196
1197        // Send the wire format handshake message. If the very first send fails
1198        // synchronously (for example an IPv6 candidate on an IPv4-only UDP
1199        // socket), undo this candidate so the caller can try the next address
1200        // in the same dial pass.
1201        let send_result = match self.transports.get(&transport_id) {
1202            Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
1203            None => None,
1204        };
1205        match send_result {
1206            Some(send_result) => {
1207                self.note_local_send_outcome(&peer_node_addr, &send_result);
1208                match send_result {
1209                    Ok(bytes) => {
1210                        debug!(
1211                            link_id = %link_id,
1212                            our_index = %our_index,
1213                            bytes,
1214                            "Sent Noise handshake message 1 (wire format)"
1215                        );
1216                    }
1217                    Err(e) => {
1218                        warn!(
1219                            link_id = %link_id,
1220                            transport_id = %transport_id,
1221                            remote_addr = %remote_addr,
1222                            our_index = %our_index,
1223                            error = %e,
1224                            "Failed to send handshake message"
1225                        );
1226                        self.pending_outbound
1227                            .remove(&(transport_id, our_index.as_u32()));
1228                        self.connections.remove(&link_id);
1229                        self.links.remove(&link_id);
1230                        self.addr_to_link
1231                            .remove(&(transport_id, remote_addr.clone()));
1232                        let _ = self.index_allocator.free(our_index);
1233                        return Err(NodeError::from_transport_error(e));
1234                    }
1235                }
1236            }
1237            None => {
1238                self.pending_outbound
1239                    .remove(&(transport_id, our_index.as_u32()));
1240                self.connections.remove(&link_id);
1241                self.links.remove(&link_id);
1242                self.addr_to_link
1243                    .remove(&(transport_id, remote_addr.clone()));
1244                let _ = self.index_allocator.free(our_index);
1245                return Err(NodeError::TransportError(format!(
1246                    "transport {transport_id} disappeared before first handshake send"
1247                )));
1248            }
1249        }
1250
1251        Ok(())
1252    }
1253
1254    /// Poll all transports for discovered peers and auto-connect.
1255    ///
1256    /// Called from the tick handler. Iterates operational transports,
1257    /// drains their discovery buffers, and initiates connections to
1258    /// newly discovered peers (if auto_connect is enabled).
1259    pub(super) async fn poll_transport_discovery(&mut self) {
1260        // Collect discoveries first to avoid borrow conflict with self
1261        let mut to_connect = Vec::new();
1262        let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
1263        let mut connect_budget = self.discovery_connect_budget();
1264        let mut skipped_budget = 0usize;
1265
1266        for transport in self.transports.values() {
1267            if !transport.is_operational() {
1268                continue;
1269            }
1270            if !transport.auto_connect() {
1271                // Still drain the buffer so it doesn't grow unbounded
1272                let _ = transport.discover();
1273                continue;
1274            }
1275            let discovered = match transport.discover() {
1276                Ok(peers) => peers,
1277                Err(_) => continue,
1278            };
1279            for peer in discovered {
1280                let discovered_transport_id = peer.transport_id;
1281                let pubkey = match peer.pubkey_hint {
1282                    Some(pk) => pk,
1283                    None => continue,
1284                };
1285                let identity = PeerIdentity::from_pubkey(pubkey);
1286                let node_addr = *identity.node_addr();
1287
1288                // Skip self
1289                if node_addr == *self.identity.node_addr() {
1290                    continue;
1291                }
1292
1293                let Some((candidate_transport_id, remote_addr, transport_name)) =
1294                    self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1295                else {
1296                    continue;
1297                };
1298
1299                if self.peers.contains_key(&node_addr) {
1300                    let candidate = PeerAddress::new(
1301                        transport_name,
1302                        self.peer_address_string_for_transport_candidate(
1303                            candidate_transport_id,
1304                            transport_name,
1305                            &remote_addr,
1306                        ),
1307                    );
1308                    if self.active_peer_candidate_is_fresh_enough_to_skip(
1309                        &node_addr,
1310                        std::slice::from_ref(&candidate),
1311                    ) {
1312                        continue;
1313                    }
1314                    if self.is_connecting_to_peer_on_path(
1315                        &node_addr,
1316                        candidate_transport_id,
1317                        &remote_addr,
1318                    ) {
1319                        continue;
1320                    }
1321                    let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1322                    if connect_budget == 0
1323                        || self
1324                            .path_candidate_attempt_budget(&node_addr)
1325                            .saturating_sub(queued_for_peer)
1326                            == 0
1327                    {
1328                        skipped_budget = skipped_budget.saturating_add(1);
1329                        continue;
1330                    }
1331                    to_connect.push((candidate_transport_id, remote_addr, identity, true));
1332                    *queued_per_peer.entry(node_addr).or_default() += 1;
1333                    connect_budget = connect_budget.saturating_sub(1);
1334                    continue;
1335                }
1336
1337                if self.is_connecting_to_peer_on_path(
1338                    &node_addr,
1339                    candidate_transport_id,
1340                    &remote_addr,
1341                ) {
1342                    continue;
1343                }
1344
1345                let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1346                if connect_budget == 0
1347                    || self
1348                        .path_candidate_attempt_budget(&node_addr)
1349                        .saturating_sub(queued_for_peer)
1350                        == 0
1351                {
1352                    skipped_budget = skipped_budget.saturating_add(1);
1353                    continue;
1354                }
1355                to_connect.push((candidate_transport_id, remote_addr, identity, false));
1356                *queued_per_peer.entry(node_addr).or_default() += 1;
1357                connect_budget = connect_budget.saturating_sub(1);
1358            }
1359        }
1360
1361        if skipped_budget > 0 {
1362            debug!(
1363                skipped = skipped_budget,
1364                queued = to_connect.len(),
1365                "Transport discovery connect budget exhausted"
1366            );
1367        }
1368
1369        for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1370            info!(
1371                peer = %self.peer_display_name(identity.node_addr()),
1372                transport_id = %transport_id,
1373                remote_addr = %remote_addr,
1374                active_refresh,
1375                "Auto-connecting to discovered peer"
1376            );
1377            if let Err(e) = self
1378                .initiate_connection(transport_id, remote_addr, identity)
1379                .await
1380            {
1381                warn!(error = %e, "Failed to auto-connect to discovered peer");
1382            }
1383        }
1384    }
1385
1386    pub(super) async fn poll_nostr_discovery(&mut self) {
1387        let Some(bootstrap) = self.nostr_discovery.clone() else {
1388            return;
1389        };
1390
1391        bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
1392        bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
1393
1394        self.drain_nostr_mesh_signals(&bootstrap).await;
1395
1396        for event in bootstrap.drain_events().await {
1397            match event {
1398                BootstrapEvent::Established { traversal } => {
1399                    let active_refresh = PeerIdentity::from_npub(&traversal.peer_npub)
1400                        .ok()
1401                        .is_some_and(|identity| self.peers.contains_key(identity.node_addr()));
1402                    let admission_allowed = if active_refresh {
1403                        self.outbound_direct_refresh_admission_check()
1404                    } else {
1405                        self.outbound_admission_check()
1406                    };
1407                    if !admission_allowed {
1408                        debug!(
1409                            peer_npub = %traversal.peer_npub,
1410                            peers = self.peers.len(),
1411                            max_peers = self.max_peers,
1412                            active_refresh,
1413                            "Dropping established NAT traversal: at capacity"
1414                        );
1415                        continue;
1416                    }
1417                    let peer_npub = traversal.peer_npub.clone();
1418                    match self.adopt_established_traversal(traversal).await {
1419                        Ok(_) => {
1420                            info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1421                        }
1422                        Err(err) => {
1423                            warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1424                            if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1425                                self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1426                            }
1427                        }
1428                    }
1429                }
1430                BootstrapEvent::Failed {
1431                    peer_config,
1432                    reason,
1433                } => {
1434                    let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1435                        Ok(identity) => identity,
1436                        Err(_) => continue,
1437                    };
1438                    let node_addr = *peer_identity.node_addr();
1439                    let now_ms = Self::now_ms();
1440                    if self.peers.contains_key(&node_addr) {
1441                        if self.active_peer_should_keep_direct_retry(&node_addr, &peer_config) {
1442                            let decision =
1443                                bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1444                            if decision.should_warn {
1445                                warn!(
1446                                    npub = %peer_config.npub,
1447                                    error = %reason,
1448                                    consecutive_failures = decision.consecutive_failures,
1449                                    cooldown_secs = decision
1450                                        .cooldown_until_ms
1451                                        .map(|t| t.saturating_sub(now_ms) / 1000),
1452                                    "Direct-path NAT traversal upgrade failed"
1453                                );
1454                            } else {
1455                                debug!(
1456                                    npub = %peer_config.npub,
1457                                    error = %reason,
1458                                    consecutive_failures = decision.consecutive_failures,
1459                                    "Direct-path NAT traversal upgrade failed (suppressed by warn-rate-limit)"
1460                                );
1461                            }
1462                            if decision.crossed_threshold {
1463                                bootstrap
1464                                    .request_advert_stale_check(peer_config.npub.clone())
1465                                    .await;
1466                            }
1467                            self.schedule_link_dead_reprobe(node_addr, now_ms);
1468                        } else {
1469                            debug!(
1470                                npub = %peer_config.npub,
1471                                error = %reason,
1472                                "Ignoring failed NAT traversal for already-connected peer on fresh direct path"
1473                            );
1474                        }
1475                        continue;
1476                    }
1477                    if self.is_connecting_to_peer(&node_addr) {
1478                        debug!(
1479                            npub = %peer_config.npub,
1480                            error = %reason,
1481                            "Ignoring failed NAT traversal while peer handshake is already in progress"
1482                        );
1483                        continue;
1484                    }
1485
1486                    let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1487                    if decision.should_warn {
1488                        warn!(
1489                            npub = %peer_config.npub,
1490                            error = %reason,
1491                            consecutive_failures = decision.consecutive_failures,
1492                            cooldown_secs = decision
1493                                .cooldown_until_ms
1494                                .map(|t| t.saturating_sub(now_ms) / 1000),
1495                            "NAT traversal failed"
1496                        );
1497                    } else {
1498                        debug!(
1499                            npub = %peer_config.npub,
1500                            error = %reason,
1501                            consecutive_failures = decision.consecutive_failures,
1502                            "NAT traversal failed (suppressed by warn-rate-limit)"
1503                        );
1504                    }
1505
1506                    // B6: stale-advert eviction on the streak-threshold
1507                    // crossing. Fire-and-forget; the outcome is logged so
1508                    // operators can see when peers get cleaned up.
1509                    if decision.crossed_threshold {
1510                        bootstrap
1511                            .request_advert_stale_check(peer_config.npub.clone())
1512                            .await;
1513                    }
1514
1515                    if self
1516                        .try_peer_addresses(&peer_config, peer_identity, false)
1517                        .await
1518                        .is_ok()
1519                    {
1520                        continue;
1521                    }
1522
1523                    self.schedule_retry(node_addr, now_ms);
1524                    if self.nostr_cooldown_applies_to_peer_config(&peer_config)
1525                        && let Some(cooldown_until_ms) = decision.cooldown_until_ms
1526                        && let Some(state) = self.retry_pending.get_mut(&node_addr)
1527                    {
1528                        // Push the next retry past the cooldown so the
1529                        // open-discovery sweep doesn't re-enqueue and the
1530                        // per-attempt backoff doesn't fire sooner.
1531                        state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1532                    }
1533                }
1534            }
1535        }
1536
1537        self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1538            .await;
1539        self.queue_open_discovery_retries(&bootstrap).await;
1540        self.queue_active_fallback_direct_retries(&bootstrap);
1541
1542        // Advert refresh can touch STUN/public-endpoint discovery on some
1543        // configs. Drain traversal events and queue direct retries first so a
1544        // slow refresh cannot delay path recovery work already waiting on us.
1545        if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1546            debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1547        }
1548    }
1549
1550    async fn drain_nostr_mesh_signals(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
1551        let mut deferred = Vec::new();
1552
1553        for signal in bootstrap.drain_mesh_signals().await {
1554            let (peer_npub, msg_type, payload) = match &signal {
1555                MeshTraversalSignal::Offer { peer_npub, offer } => {
1556                    let payload = match serde_json::to_vec(&offer) {
1557                        Ok(payload) => payload,
1558                        Err(error) => {
1559                            debug!(
1560                                peer = %peer_npub,
1561                                error = %error,
1562                                "Failed to encode mesh traversal offer"
1563                            );
1564                            continue;
1565                        }
1566                    };
1567                    (
1568                        peer_npub.clone(),
1569                        SessionMessageType::TraversalOffer.to_byte(),
1570                        payload,
1571                    )
1572                }
1573                MeshTraversalSignal::Answer { peer_npub, answer } => {
1574                    let payload = match serde_json::to_vec(&answer) {
1575                        Ok(payload) => payload,
1576                        Err(error) => {
1577                            debug!(
1578                                peer = %peer_npub,
1579                                error = %error,
1580                                "Failed to encode mesh traversal answer"
1581                            );
1582                            continue;
1583                        }
1584                    };
1585                    (
1586                        peer_npub.clone(),
1587                        SessionMessageType::TraversalAnswer.to_byte(),
1588                        payload,
1589                    )
1590                }
1591            };
1592
1593            let peer_identity = match PeerIdentity::from_npub(&peer_npub) {
1594                Ok(identity) => identity,
1595                Err(error) => {
1596                    debug!(
1597                        peer = %peer_npub,
1598                        error = %error,
1599                        "Cannot send mesh traversal signal to invalid peer npub"
1600                    );
1601                    continue;
1602                }
1603            };
1604            let peer_addr = *peer_identity.node_addr();
1605            match self
1606                .mesh_signal_session_action(peer_addr, peer_identity.pubkey_full())
1607                .await
1608            {
1609                MeshSignalSessionAction::Send => {}
1610                MeshSignalSessionAction::Defer => {
1611                    deferred.push(signal);
1612                    continue;
1613                }
1614                MeshSignalSessionAction::Drop => continue,
1615            }
1616
1617            if let Err(error) = self.send_session_msg(&peer_addr, msg_type, &payload).await {
1618                debug!(
1619                    peer = %self.peer_display_name(&peer_addr),
1620                    error = %error,
1621                    "Failed to send mesh traversal signal"
1622                );
1623            }
1624        }
1625
1626        for signal in deferred {
1627            bootstrap.requeue_mesh_signal(signal);
1628        }
1629    }
1630
1631    async fn mesh_signal_session_action(
1632        &mut self,
1633        peer_addr: NodeAddr,
1634        peer_pubkey: PublicKey,
1635    ) -> MeshSignalSessionAction {
1636        if let Some(entry) = self.sessions.get(&peer_addr) {
1637            if entry.is_established() {
1638                return MeshSignalSessionAction::Send;
1639            }
1640            if entry.is_initiating() || entry.is_awaiting_msg3() {
1641                debug!(
1642                    peer = %self.peer_display_name(&peer_addr),
1643                    "Deferring mesh traversal signal until end-to-end session is established"
1644                );
1645                return MeshSignalSessionAction::Defer;
1646            }
1647        }
1648
1649        if self.find_next_hop(&peer_addr).is_none() {
1650            debug!(
1651                peer = %self.peer_display_name(&peer_addr),
1652                "Cannot warm mesh traversal signal session without a FIPS route"
1653            );
1654            self.maybe_initiate_lookup(&peer_addr).await;
1655            return MeshSignalSessionAction::Drop;
1656        }
1657
1658        self.register_identity(peer_addr, peer_pubkey);
1659        match self.initiate_session(peer_addr, peer_pubkey).await {
1660            Ok(()) => {
1661                debug!(
1662                    peer = %self.peer_display_name(&peer_addr),
1663                    "Warming end-to-end session for mesh traversal signal"
1664                );
1665                MeshSignalSessionAction::Defer
1666            }
1667            Err(NodeError::SendFailed { node_addr, reason })
1668                if node_addr == peer_addr && reason == "no route to destination" =>
1669            {
1670                debug!(
1671                    peer = %self.peer_display_name(&peer_addr),
1672                    "Cannot warm mesh traversal signal session without a FIPS route"
1673                );
1674                self.maybe_initiate_lookup(&peer_addr).await;
1675                MeshSignalSessionAction::Drop
1676            }
1677            Err(error) => {
1678                debug!(
1679                    peer = %self.peer_display_name(&peer_addr),
1680                    error = %error,
1681                    "Failed to warm end-to-end session for mesh traversal signal"
1682                );
1683                MeshSignalSessionAction::Drop
1684            }
1685        }
1686    }
1687
1688    /// Resolve the LAN-only discovery scope. Applications with explicit
1689    /// connectivity config can set `node.discovery.lan.scope` without
1690    /// changing the public Nostr discovery `app` tag. The older fallback
1691    /// extracts a scope from the Nostr app tag used by default scoped
1692    /// discovery.
1693    pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1694        if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1695            let scope = scope.trim();
1696            if !scope.is_empty() {
1697                return Some(scope.to_string());
1698            }
1699        }
1700
1701        let app = self.config.node.discovery.nostr.app.trim();
1702        if app.is_empty() {
1703            return None;
1704        }
1705        if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1706            let scope = rest.trim();
1707            if scope.is_empty() {
1708                None
1709            } else {
1710                Some(scope.to_string())
1711            }
1712        } else {
1713            Some(app.to_string())
1714        }
1715    }
1716
1717    pub(super) fn start_local_instance_discovery(&mut self) {
1718        if !self.config.node.discovery.local.enabled {
1719            return;
1720        }
1721        let Some(scope) = self.lan_discovery_scope() else {
1722            debug!("local instance discovery not started: no discovery scope");
1723            return;
1724        };
1725        let now_ms = Self::now_ms();
1726        match crate::discovery::local::LocalInstanceRegistry::new(
1727            self.identity.npub(),
1728            scope,
1729            &self.config.node.discovery.local,
1730            now_ms,
1731        ) {
1732            Ok(registry) => {
1733                self.local_instance_registry = Some(registry);
1734                self.local_instance_started_at_ms = Some(now_ms);
1735                self.last_local_instance_publish_ms = None;
1736                self.last_local_instance_scan_ms = None;
1737                self.publish_local_instance_record(now_ms);
1738                info!("Same-host FIPS instance discovery enabled");
1739            }
1740            Err(crate::discovery::local::LocalInstanceRegistryError::Disabled) => {
1741                debug!("same-host FIPS instance discovery disabled");
1742            }
1743            Err(err) => {
1744                debug!(error = %err, "same-host FIPS instance discovery not started");
1745            }
1746        }
1747    }
1748
1749    fn local_instance_contacts(&self) -> Vec<crate::discovery::local::LocalInstanceContact> {
1750        let mut contacts = Vec::new();
1751        for handle in self.transports.values() {
1752            if !handle.is_operational() || !handle.accept_connections() {
1753                continue;
1754            }
1755            let transport = handle.transport_type().name;
1756            if transport != "udp" && transport != "tcp" {
1757                continue;
1758            }
1759            let Some(local_addr) = handle.local_addr() else {
1760                continue;
1761            };
1762            let Some(contact) =
1763                crate::discovery::local::contact_for_transport_addr(transport, local_addr)
1764            else {
1765                continue;
1766            };
1767            if contacts
1768                .iter()
1769                .any(|existing: &crate::discovery::local::LocalInstanceContact| {
1770                    existing.transport == contact.transport && existing.addr == contact.addr
1771                })
1772            {
1773                continue;
1774            }
1775            contacts.push(contact);
1776        }
1777        contacts
1778    }
1779
1780    fn publish_local_instance_record(&mut self, now_ms: u64) {
1781        let Some(registry) = self.local_instance_registry.clone() else {
1782            return;
1783        };
1784        let contacts = self.local_instance_contacts();
1785        match registry.publish(contacts, now_ms) {
1786            Ok(()) => {
1787                self.last_local_instance_publish_ms = Some(now_ms);
1788            }
1789            Err(err) => {
1790                debug!(error = %err, "failed to publish same-host FIPS instance record");
1791            }
1792        }
1793    }
1794
1795    fn maybe_publish_local_instance_record(&mut self, now_ms: u64) {
1796        if self.local_instance_registry.is_none() {
1797            return;
1798        }
1799        let interval_ms = self.config.node.discovery.local.publish_interval_ms();
1800        let due = self
1801            .last_local_instance_publish_ms
1802            .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1803            .unwrap_or(true);
1804        if due {
1805            self.publish_local_instance_record(now_ms);
1806        }
1807    }
1808
1809    fn local_instance_scan_due(&self, now_ms: u64) -> bool {
1810        if self.local_instance_registry.is_none() {
1811            return false;
1812        }
1813        let cfg = &self.config.node.discovery.local;
1814        let interval_ms = if self
1815            .local_instance_started_at_ms
1816            .map(|started| now_ms.saturating_sub(started) <= cfg.startup_scan_duration_ms())
1817            .unwrap_or(false)
1818        {
1819            cfg.startup_scan_interval_ms()
1820        } else {
1821            cfg.scan_interval_ms()
1822        };
1823        self.last_local_instance_scan_ms
1824            .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1825            .unwrap_or(true)
1826    }
1827
1828    fn local_instance_peer_allowed(&self, identity: &PeerIdentity) -> bool {
1829        if self.config.peers().iter().any(|peer| {
1830            PeerIdentity::from_npub(&peer.npub)
1831                .map(|configured| configured.node_addr() == identity.node_addr())
1832                .unwrap_or(false)
1833        }) {
1834            return true;
1835        }
1836        self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Open
1837    }
1838
1839    fn local_instance_peer_addresses(
1840        &self,
1841        record: &crate::discovery::local::LocalInstanceRecord,
1842    ) -> Vec<PeerAddress> {
1843        let mut addresses = Vec::new();
1844        for contact in &record.contacts {
1845            if contact.transport != "udp" && contact.transport != "tcp" {
1846                continue;
1847            }
1848            let Ok(socket_addr) = contact.addr.parse::<SocketAddr>() else {
1849                debug!(
1850                    npub = %record.npub,
1851                    transport = %contact.transport,
1852                    addr = %contact.addr,
1853                    "local instance discovery: skip non-socket contact"
1854                );
1855                continue;
1856            };
1857            if !socket_addr.ip().is_loopback() {
1858                debug!(
1859                    npub = %record.npub,
1860                    addr = %contact.addr,
1861                    "local instance discovery: skip non-loopback contact"
1862                );
1863                continue;
1864            }
1865            let address =
1866                PeerAddress::with_priority(contact.transport.clone(), contact.addr.clone(), 10)
1867                    .with_seen_at_ms(record.updated_at_ms);
1868            if addresses.iter().any(|existing: &PeerAddress| {
1869                existing.transport == address.transport && existing.addr == address.addr
1870            }) {
1871                continue;
1872            }
1873            addresses.push(address);
1874        }
1875        addresses
1876    }
1877
1878    /// Scan the same-host registry only on startup and then at a low cadence.
1879    /// This avoids a per-second filesystem poll while preserving the fast path
1880    /// for processes launched around the same time.
1881    pub(super) async fn poll_local_instance_discovery(&mut self) {
1882        let Some(registry) = self.local_instance_registry.clone() else {
1883            return;
1884        };
1885        let now_ms = Self::now_ms();
1886        self.maybe_publish_local_instance_record(now_ms);
1887        if !self.local_instance_scan_due(now_ms) {
1888            return;
1889        }
1890        self.last_local_instance_scan_ms = Some(now_ms);
1891
1892        let records = match registry.scan(now_ms, self.config.node.discovery.local.stale_after_ms())
1893        {
1894            Ok(records) => records,
1895            Err(err) => {
1896                debug!(error = %err, "same-host FIPS instance scan failed");
1897                return;
1898            }
1899        };
1900        if records.is_empty() {
1901            return;
1902        }
1903
1904        let mut connect_budget = self.discovery_connect_budget();
1905        let mut skipped_budget = 0usize;
1906        for record in records {
1907            let identity = match PeerIdentity::from_npub(&record.npub) {
1908                Ok(identity) => identity,
1909                Err(err) => {
1910                    debug!(npub = %record.npub, error = %err, "local instance discovery: skip bad npub");
1911                    continue;
1912                }
1913            };
1914            let peer_node_addr = *identity.node_addr();
1915            if peer_node_addr == *self.identity.node_addr() {
1916                continue;
1917            }
1918            if !self.local_instance_peer_allowed(&identity) {
1919                debug!(
1920                    npub = %identity.short_npub(),
1921                    "local instance discovery: skip unconfigured peer"
1922                );
1923                continue;
1924            }
1925
1926            let addresses = self.local_instance_peer_addresses(&record);
1927            if addresses.is_empty() {
1928                continue;
1929            }
1930
1931            if self.peers.contains_key(&peer_node_addr)
1932                && self.active_peer_candidate_is_fresh_enough_to_skip(&peer_node_addr, &addresses)
1933            {
1934                continue;
1935            }
1936
1937            for address in addresses {
1938                let Some((transport_id, remote_addr)) =
1939                    self.resolve_peer_address_for_match(&address)
1940                else {
1941                    continue;
1942                };
1943                if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1944                    continue;
1945                }
1946                if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1947                    skipped_budget = skipped_budget.saturating_add(1);
1948                    continue;
1949                }
1950                info!(
1951                    npub = %identity.short_npub(),
1952                    transport = %address.transport,
1953                    addr = %address.addr,
1954                    "same-host FIPS instance discovery: initiating handshake"
1955                );
1956                if let Err(err) = self
1957                    .initiate_connection(transport_id, remote_addr, identity)
1958                    .await
1959                {
1960                    debug!(
1961                        npub = %record.npub,
1962                        error = %err,
1963                        "same-host FIPS instance discovery: failed to initiate connection"
1964                    );
1965                }
1966                connect_budget = connect_budget.saturating_sub(1);
1967            }
1968        }
1969        if skipped_budget > 0 {
1970            debug!(
1971                skipped = skipped_budget,
1972                "same-host FIPS instance discovery connect budget exhausted"
1973            );
1974        }
1975    }
1976
1977    /// Drain mDNS-discovered peers and initiate Noise XX handshakes. For
1978    /// active peers this is a non-disruptive alternate-path refresh: the
1979    /// current link stays live until a new handshake authenticates and
1980    /// promotes. The handshake itself is the authentication — a spoofed
1981    /// mDNS advert with someone else's npub fails the XX exchange and
1982    /// is dropped.
1983    pub(super) async fn poll_lan_discovery(&mut self) {
1984        let Some(runtime) = self.lan_discovery.clone() else {
1985            return;
1986        };
1987        let events = runtime.drain_events().await;
1988        if events.is_empty() {
1989            return;
1990        }
1991        let mut connect_budget = self.discovery_connect_budget();
1992        let mut skipped_budget = 0usize;
1993        for event in events {
1994            let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1995            let Some((transport_id, local_addr)) =
1996                self.find_udp_transport_for_remote_addr(peer.addr)
1997            else {
1998                debug!(
1999                    addr = %peer.addr,
2000                    "lan: skip discovered peer with no compatible UDP transport"
2001                );
2002                continue;
2003            };
2004            let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
2005                Ok(id) => id,
2006                Err(err) => {
2007                    debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
2008                    continue;
2009                }
2010            };
2011            let peer_node_addr = *identity.node_addr();
2012            let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
2013            if self.peers.contains_key(&peer_node_addr) {
2014                let candidate = PeerAddress::new("udp", peer.addr.to_string());
2015                if self.active_peer_candidate_is_fresh_enough_to_skip(
2016                    &peer_node_addr,
2017                    std::slice::from_ref(&candidate),
2018                ) {
2019                    continue;
2020                }
2021                if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2022                    continue;
2023                }
2024                if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
2025                    skipped_budget = skipped_budget.saturating_add(1);
2026                    continue;
2027                }
2028                info!(
2029                    npub = %identity.short_npub(),
2030                    addr = %peer.addr,
2031                    local_addr = %local_addr,
2032                    "lan: initiating alternate-path handshake to active peer"
2033                );
2034                if let Err(err) = self
2035                    .initiate_connection(transport_id, remote_addr, identity)
2036                    .await
2037                {
2038                    debug!(
2039                        npub = %peer.npub,
2040                        error = %err,
2041                        "lan: failed to initiate active peer alternate-path handshake"
2042                    );
2043                }
2044                connect_budget = connect_budget.saturating_sub(1);
2045                continue;
2046            }
2047            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2048                continue;
2049            }
2050            if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
2051                skipped_budget = skipped_budget.saturating_add(1);
2052                continue;
2053            }
2054            info!(
2055                npub = %identity.short_npub(),
2056                addr = %peer.addr,
2057                local_addr = %local_addr,
2058                "lan: initiating handshake to discovered peer"
2059            );
2060            if let Err(err) = self
2061                .initiate_connection(transport_id, remote_addr, identity)
2062                .await
2063            {
2064                debug!(
2065                    npub = %peer.npub,
2066                    error = %err,
2067                    "lan: failed to initiate connection to discovered peer"
2068                );
2069            }
2070            connect_budget = connect_budget.saturating_sub(1);
2071        }
2072        if skipped_budget > 0 {
2073            debug!(
2074                skipped = skipped_budget,
2075                "lan: discovery connect budget exhausted"
2076            );
2077        }
2078    }
2079
2080    /// Poll pending transport connects and initiate handshakes for ready ones.
2081    ///
2082    /// Called from the tick handler. For each pending connect, queries the
2083    /// transport's connection state. When a connection is established,
2084    /// marks the link as Connected and starts the Noise handshake.
2085    /// Failed connections are cleaned up and scheduled for retry.
2086    pub(super) async fn poll_pending_connects(&mut self) {
2087        if self.pending_connects.is_empty() {
2088            return;
2089        }
2090
2091        let mut completed = Vec::new();
2092
2093        for (i, pending) in self.pending_connects.iter().enumerate() {
2094            let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
2095                transport.connection_state(&pending.remote_addr)
2096            } else {
2097                crate::transport::ConnectionState::Failed("transport removed".into())
2098            };
2099
2100            match state {
2101                crate::transport::ConnectionState::Connected => {
2102                    completed.push((i, true, None));
2103                }
2104                crate::transport::ConnectionState::Failed(reason) => {
2105                    completed.push((i, false, Some(reason)));
2106                }
2107                crate::transport::ConnectionState::Connecting => {
2108                    // Still in progress, check on next tick
2109                }
2110                crate::transport::ConnectionState::None => {
2111                    // Shouldn't happen — treat as failure
2112                    completed.push((i, false, Some("no connection attempt found".into())));
2113                }
2114            }
2115        }
2116
2117        // Process completions in reverse order to preserve indices
2118        for (i, success, reason) in completed.into_iter().rev() {
2119            let pending = self.pending_connects.remove(i);
2120
2121            if success {
2122                // Mark link as Connected
2123                if let Some(link) = self.links.get_mut(&pending.link_id) {
2124                    link.set_connected();
2125                }
2126
2127                debug!(
2128                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2129                    transport_id = %pending.transport_id,
2130                    remote_addr = %pending.remote_addr,
2131                    link_id = %pending.link_id,
2132                    "Transport connected, starting handshake"
2133                );
2134
2135                // Start the handshake now that the transport is connected
2136                if let Err(e) = self
2137                    .start_handshake(
2138                        pending.link_id,
2139                        pending.transport_id,
2140                        pending.remote_addr.clone(),
2141                        pending.peer_identity,
2142                    )
2143                    .await
2144                {
2145                    warn!(
2146                        link_id = %pending.link_id,
2147                        error = %e,
2148                        "Failed to start handshake after transport connect"
2149                    );
2150                    // Clean up link on handshake failure
2151                    self.remove_link(&pending.link_id);
2152                }
2153            } else {
2154                let reason = reason.unwrap_or_default();
2155                warn!(
2156                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2157                    transport_id = %pending.transport_id,
2158                    remote_addr = %pending.remote_addr,
2159                    link_id = %pending.link_id,
2160                    reason = %reason,
2161                    "Transport connect failed"
2162                );
2163
2164                // Clean up link and schedule retry
2165                self.remove_link(&pending.link_id);
2166                self.links.remove(&pending.link_id);
2167                self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
2168            }
2169        }
2170    }
2171
2172    // === State Transitions ===
2173
2174    /// Start the node.
2175    ///
2176    /// Initializes the TUN interface (if configured), spawns I/O threads,
2177    /// and transitions to the Running state.
2178    pub async fn start(&mut self) -> Result<(), NodeError> {
2179        node_start_debug_log("Node::start begin");
2180        if !self.state.can_start() {
2181            return Err(NodeError::AlreadyStarted);
2182        }
2183        self.state = NodeState::Starting;
2184        node_start_debug_log("Node::start state set to starting");
2185
2186        // Create packet channel for transport -> Node communication
2187        let packet_buffer_size = self.config.node.buffers.packet_channel;
2188        let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
2189        self.packet_tx = Some(packet_tx.clone());
2190        self.packet_rx = Some(packet_rx);
2191        node_start_debug_log("Node::start packet channel created");
2192
2193        // Initialize transports first (before TUN, before Nostr discovery).
2194        node_start_debug_log("Node::start create transports begin");
2195        let transport_handles = self.create_transports(&packet_tx).await;
2196        node_start_debug_log(format!(
2197            "Node::start create transports complete count={}",
2198            transport_handles.len()
2199        ));
2200
2201        for mut handle in transport_handles {
2202            let transport_id = handle.transport_id();
2203            let transport_type = handle.transport_type().name;
2204            let name = handle.name().map(|s| s.to_string());
2205
2206            node_start_debug_log(format!(
2207                "Node::start transport start begin id={} type={} name={:?}",
2208                transport_id, transport_type, name
2209            ));
2210            match handle.start().await {
2211                Ok(()) => {
2212                    node_start_debug_log(format!(
2213                        "Node::start transport start ok id={} type={}",
2214                        transport_id, transport_type
2215                    ));
2216                    self.transports.insert(transport_id, handle);
2217                }
2218                Err(e) => {
2219                    node_start_debug_log(format!(
2220                        "Node::start transport start error id={} type={} error={}",
2221                        transport_id, transport_type, e
2222                    ));
2223                    if let Some(ref n) = name {
2224                        warn!(transport_type, name = %n, error = %e, "Transport failed to start");
2225                    } else {
2226                        warn!(transport_type, error = %e, "Transport failed to start");
2227                    }
2228                }
2229            }
2230        }
2231
2232        if !self.transports.is_empty() {
2233            info!(count = self.transports.len(), "Transports initialized");
2234        }
2235
2236        // Spawn the off-task FMP-encrypt + UDP-send + FMP-decrypt
2237        // worker pools. **Unix only** — both pools issue direct
2238        // sendmmsg(2) / sendmsg(2)+UDP_GSO / recvmmsg(2) calls on
2239        // raw file descriptors via `AsRawFd`, which is a unix-only
2240        // trait. On Windows the rx_loop's tokio-based send/recv
2241        // remain the canonical path; the perf overhaul lands its
2242        // gains on unix.
2243        //
2244        // Worker count defaults to the number of CPUs, overridable
2245        // via `FIPS_ENCRYPT_WORKERS=N` / `FIPS_DECRYPT_WORKERS=N`
2246        // for debug / benchmarking. Hash-by-destination means a
2247        // single TCP flow pins to one worker (preserves wire
2248        // ordering); additional workers light up under multi-flow
2249        // / multi-peer load. See `node::encrypt_worker` /
2250        // `node::decrypt_worker` for full rationale.
2251        #[cfg(unix)]
2252        {
2253            if self.config.node.worker_pools_enabled {
2254                node_start_debug_log("Node::start worker pools begin");
2255                let cpu_default = std::thread::available_parallelism()
2256                    .map(|n| n.get())
2257                    .unwrap_or(1)
2258                    .max(1);
2259                let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
2260                    .ok()
2261                    .and_then(|s| s.parse().ok())
2262                    .unwrap_or(cpu_default)
2263                    .max(1);
2264                self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
2265                    encrypt_worker_count,
2266                ));
2267                info!(
2268                    workers = encrypt_worker_count,
2269                    "Spawned FMP-encrypt worker pool"
2270                );
2271
2272                // `FIPS_DECRYPT_WORKERS=0` disables the pool entirely and
2273                // falls through to the in-line rx_loop decrypt path (the
2274                // "test-mode" branch in `handle_encrypted_frame`, which is
2275                // in fact a fully functional synchronous decrypt). Useful
2276                // as an A/B against the worker pipeline when chasing
2277                // scheduling/queueing regressions on the native macOS
2278                // path. Any non-zero value (env or default) spawns the
2279                // pool as before.
2280                let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
2281                    .ok()
2282                    .and_then(|s| s.parse().ok())
2283                    .unwrap_or(cpu_default);
2284                if decrypt_worker_count == 0 {
2285                    info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
2286                } else {
2287                    self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
2288                        decrypt_worker_count,
2289                    ));
2290                    info!(
2291                        workers = decrypt_worker_count,
2292                        "Spawned FMP+FSP-decrypt worker pool"
2293                    );
2294                }
2295                node_start_debug_log("Node::start worker pools complete");
2296            } else {
2297                node_start_debug_log("Node::start worker pools disabled");
2298                info!("FIPS worker pools disabled; using in-line crypto/send path");
2299            }
2300        }
2301
2302        if self.config.node.discovery.nostr.enabled {
2303            node_start_debug_log("Node::start nostr discovery start begin");
2304            match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
2305                .await
2306            {
2307                Ok(runtime) => {
2308                    node_start_debug_log("Node::start nostr discovery runtime created");
2309                    if let Err(err) = self.refresh_overlay_advert(&runtime).await {
2310                        warn!(error = %err, "Failed to publish initial Nostr overlay advert");
2311                    }
2312                    node_start_debug_log("Node::start nostr overlay advert refreshed");
2313                    self.nostr_discovery = Some(runtime);
2314                    self.nostr_discovery_started_at_ms = Some(Self::now_ms());
2315                    info!("Nostr overlay discovery enabled");
2316                }
2317                Err(err) => {
2318                    node_start_debug_log(format!(
2319                        "Node::start nostr discovery start error error={}",
2320                        err
2321                    ));
2322                    warn!(error = %err, "Failed to start Nostr overlay discovery");
2323                }
2324            }
2325        }
2326
2327        // mDNS / DNS-SD LAN discovery. Independent of Nostr — runs even
2328        // when Nostr is disabled, since it gives us sub-second pairing
2329        // on the same link without any relay or NAT-traversal roundtrip.
2330        if self.config.node.discovery.lan.enabled {
2331            node_start_debug_log("Node::start lan discovery start begin");
2332            let advertised_udp_port = self
2333                .transports
2334                .values()
2335                .filter(|h| h.is_operational())
2336                .filter(|h| h.transport_type().name == "udp")
2337                .find_map(|h| h.local_addr().map(|addr| addr.port()))
2338                .unwrap_or(0);
2339            let scope = self.lan_discovery_scope();
2340            match crate::discovery::lan::LanDiscovery::start(
2341                &self.identity,
2342                scope,
2343                advertised_udp_port,
2344                self.config.node.discovery.lan.clone(),
2345            )
2346            .await
2347            {
2348                Ok(runtime) => {
2349                    node_start_debug_log("Node::start lan discovery start ok");
2350                    self.lan_discovery = Some(runtime);
2351                    info!("LAN mDNS discovery enabled");
2352                }
2353                Err(err) => {
2354                    node_start_debug_log(format!(
2355                        "Node::start lan discovery start error error={}",
2356                        err
2357                    ));
2358                    debug!(error = %err, "LAN mDNS discovery not started");
2359                }
2360            }
2361        }
2362
2363        self.start_local_instance_discovery();
2364        self.poll_local_instance_discovery().await;
2365
2366        // Connect to static peers before TUN is active
2367        // This allows handshake messages to be sent before we start accepting packets
2368        node_start_debug_log("Node::start initiate peer connections begin");
2369        self.initiate_peer_connections().await;
2370        node_start_debug_log("Node::start initiate peer connections complete");
2371
2372        // Initialize TUN interface last, after transports and peers are ready
2373        if self.config.tun.enabled {
2374            node_start_debug_log("Node::start tun init begin");
2375            let address = *self.identity.address();
2376            match TunDevice::create(&self.config.tun, address).await {
2377                Ok(device) => {
2378                    let mtu = device.mtu();
2379                    let name = device.name().to_string();
2380                    let our_addr = *device.address();
2381
2382                    info!("TUN device active:");
2383                    info!("     name: {}", name);
2384                    info!("  address: {}", device.address());
2385                    info!("      mtu: {}", mtu);
2386
2387                    // Calculate max MSS for TCP clamping
2388                    let effective_mtu = self.effective_ipv6_mtu();
2389                    let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); // IPv6 + TCP headers
2390
2391                    info!("effective MTU: {} bytes", effective_mtu);
2392                    debug!("   max TCP MSS: {} bytes", max_mss);
2393
2394                    // On macOS, create a shutdown pipe. Writing to it unblocks the
2395                    // reader thread's select() loop without closing the TUN fd
2396                    // (which would cause a double-close when TunDevice drops).
2397                    #[cfg(target_os = "macos")]
2398                    let (shutdown_read_fd, shutdown_write_fd) = {
2399                        let mut fds = [0i32; 2];
2400                        if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
2401                            return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
2402                                "failed to create shutdown pipe".into(),
2403                            )));
2404                        }
2405                        (fds[0], fds[1])
2406                    };
2407
2408                    // Create writer (dups the fd for independent write access).
2409                    // Pass path_mtu_lookup so inbound SYN-ACK clamp can read
2410                    // per-destination path MTU learned via discovery.
2411                    let (writer, tun_tx) =
2412                        device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
2413
2414                    // Spawn writer thread
2415                    let writer_handle = thread::spawn(move || {
2416                        writer.run();
2417                    });
2418
2419                    // Clone tun_tx for the reader
2420                    let reader_tun_tx = tun_tx.clone();
2421
2422                    // Create outbound channel for TUN reader → Node
2423                    let tun_channel_size = self.config.node.buffers.tun_channel;
2424                    let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
2425
2426                    // Spawn reader thread
2427                    let transport_mtu = self.transport_mtu();
2428                    let path_mtu_lookup = self.path_mtu_lookup.clone();
2429                    #[cfg(target_os = "macos")]
2430                    let reader_handle = thread::spawn(move || {
2431                        run_tun_reader(
2432                            device,
2433                            mtu,
2434                            our_addr,
2435                            reader_tun_tx,
2436                            outbound_tx,
2437                            transport_mtu,
2438                            path_mtu_lookup,
2439                            shutdown_read_fd,
2440                        );
2441                    });
2442                    #[cfg(not(target_os = "macos"))]
2443                    let reader_handle = thread::spawn(move || {
2444                        run_tun_reader(
2445                            device,
2446                            mtu,
2447                            our_addr,
2448                            reader_tun_tx,
2449                            outbound_tx,
2450                            transport_mtu,
2451                            path_mtu_lookup,
2452                        );
2453                    });
2454
2455                    self.tun_state = TunState::Active;
2456                    self.tun_name = Some(name);
2457                    self.tun_tx = Some(tun_tx);
2458                    self.tun_outbound_rx = Some(outbound_rx);
2459                    self.tun_reader_handle = Some(reader_handle);
2460                    self.tun_writer_handle = Some(writer_handle);
2461                    #[cfg(target_os = "macos")]
2462                    {
2463                        self.tun_shutdown_fd = Some(shutdown_write_fd);
2464                    }
2465                }
2466                Err(e) => {
2467                    self.tun_state = TunState::Failed;
2468                    warn!(error = %e, "Failed to initialize TUN, continuing without it");
2469                }
2470            }
2471            node_start_debug_log("Node::start tun init complete");
2472        }
2473
2474        // Initialize DNS responder (independent of TUN).
2475        //
2476        // Default bind_addr is "::1" (IPv6 loopback). The shipped
2477        // fips-dns-setup configures systemd-resolved via a global
2478        // /etc/systemd/resolved.conf.d/fips.conf drop-in pointing at
2479        // [::1]:5354, which sidesteps a Linux IPV6_PKTINFO behaviour
2480        // where self-destined traffic to fips0's address is attributed
2481        // to fips0 in PKTINFO and gets silently dropped by the
2482        // mesh-interface filter in src/upper/dns.rs.
2483        //
2484        // For mesh-reachable resolution (rare), set bind_addr: "::"
2485        // in fips.yaml. The mesh-interface filter remains active to
2486        // prevent hosts-file alias enumeration in that mode.
2487        // `IPV6_V6ONLY=0` is set explicitly so IPv4 clients on
2488        // 127.0.0.1 still reach us regardless of kernel sysctl
2489        // defaults — but only when bind is on a wildcard / IPv6 path.
2490        if self.config.dns.enabled {
2491            node_start_debug_log("Node::start dns init begin");
2492            let addr_str = self.config.dns.bind_addr();
2493            match addr_str.parse::<std::net::IpAddr>() {
2494                Ok(ip) => {
2495                    let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
2496                    match Self::bind_dns_socket(bind) {
2497                        Ok(socket) => {
2498                            let dns_channel_size = self.config.node.buffers.dns_channel;
2499                            let (identity_tx, identity_rx) =
2500                                tokio::sync::mpsc::channel(dns_channel_size);
2501                            let dns_ttl = self.config.dns.ttl();
2502                            let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
2503                                self.config.peers(),
2504                            );
2505                            let reloader = if self.config.node.system_files_enabled {
2506                                let hosts_path = std::path::PathBuf::from(
2507                                    crate::upper::hosts::DEFAULT_HOSTS_PATH,
2508                                );
2509                                crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
2510                            } else {
2511                                crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
2512                            };
2513                            // Resolve the TUN ifindex so the responder can
2514                            // drop queries arriving on the mesh interface
2515                            // (fips0). Without this, the `::` bind exposes
2516                            // /etc/fips/hosts alias probing to any mesh peer.
2517                            // When TUN isn't enabled or the name can't be
2518                            // resolved, `None` disables the filter (there
2519                            // is no mesh surface to defend anyway).
2520                            let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
2521                            info!(
2522                                bind = %bind,
2523                                hosts = reloader.hosts().len(),
2524                                mesh_ifindex = ?mesh_ifindex,
2525                                "DNS responder started for .fips domain (auto-reload enabled)"
2526                            );
2527                            let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
2528                                socket,
2529                                identity_tx,
2530                                dns_ttl,
2531                                reloader,
2532                                mesh_ifindex,
2533                            ));
2534                            self.dns_identity_rx = Some(identity_rx);
2535                            self.dns_task = Some(handle);
2536                        }
2537                        Err(e) => {
2538                            warn!(bind = %bind, error = %e, "Failed to start DNS responder");
2539                        }
2540                    }
2541                }
2542                Err(e) => {
2543                    warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
2544                }
2545            }
2546            node_start_debug_log("Node::start dns init complete");
2547        }
2548
2549        self.state = NodeState::Running;
2550        node_start_debug_log("Node::start running");
2551        info!("Node started:");
2552        info!("       state: {}", self.state);
2553        info!("  transports: {}", self.transports.len());
2554        info!(" connections: {}", self.connections.len());
2555        Ok(())
2556    }
2557
2558    /// Bind a UDP socket for the DNS responder.
2559    ///
2560    /// For IPv6 binds (including `::`), sets `IPV6_V6ONLY=0` so the socket
2561    /// also accepts IPv4-mapped addresses. This guarantees dual-stack
2562    /// delivery regardless of `net.ipv6.bindv6only` sysctl on the host —
2563    /// v4 clients on 127.0.0.1 and v6 clients on the fips0 address both
2564    /// land on the same socket.
2565    ///
2566    /// Also enables `IPV6_RECVPKTINFO` on IPv6 sockets so the responder
2567    /// can learn the arrival interface per packet. The responder uses that
2568    /// to drop queries arriving on the mesh TUN, closing the hosts-file
2569    /// probing side-channel created by the `::` bind.
2570    fn bind_dns_socket(
2571        addr: std::net::SocketAddr,
2572    ) -> Result<tokio::net::UdpSocket, std::io::Error> {
2573        use socket2::{Domain, Protocol, Socket, Type};
2574        let domain = if addr.is_ipv4() {
2575            Domain::IPV4
2576        } else {
2577            Domain::IPV6
2578        };
2579        let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
2580        if addr.is_ipv6() {
2581            sock.set_only_v6(false)?;
2582            #[cfg(unix)]
2583            Self::set_recv_pktinfo_v6(&sock)?;
2584        }
2585        sock.set_nonblocking(true)?;
2586        sock.bind(&addr.into())?;
2587        tokio::net::UdpSocket::from_std(sock.into())
2588    }
2589
2590    /// Enable `IPV6_RECVPKTINFO` on an IPv6 UDP socket.
2591    ///
2592    /// After this setsockopt, each `recvmsg()` call on the socket receives
2593    /// an `IPV6_PKTINFO` control message containing the arrival interface
2594    /// index, which the DNS responder uses for its mesh-interface filter.
2595    #[cfg(unix)]
2596    fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
2597        use std::os::fd::AsRawFd;
2598        let enable: libc::c_int = 1;
2599        let ret = unsafe {
2600            libc::setsockopt(
2601                sock.as_raw_fd(),
2602                libc::IPPROTO_IPV6,
2603                libc::IPV6_RECVPKTINFO,
2604                &enable as *const _ as *const libc::c_void,
2605                std::mem::size_of::<libc::c_int>() as libc::socklen_t,
2606            )
2607        };
2608        if ret < 0 {
2609            return Err(std::io::Error::last_os_error());
2610        }
2611        Ok(())
2612    }
2613
2614    /// Resolve the mesh TUN interface index by name.
2615    ///
2616    /// Returns `None` if the interface does not exist (e.g. TUN disabled
2617    /// or not yet created). A `None` result disables the DNS responder's
2618    /// mesh-interface filter — safe, because if there is no fips0 there
2619    /// is no mesh exposure to defend against.
2620    fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
2621        #[cfg(unix)]
2622        {
2623            let c_name = std::ffi::CString::new(name).ok()?;
2624            let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
2625            if idx == 0 { None } else { Some(idx) }
2626        }
2627        #[cfg(not(unix))]
2628        {
2629            let _ = name;
2630            None
2631        }
2632    }
2633
2634    /// Stop the node.
2635    ///
2636    /// Shuts down TUN interface, stops I/O threads, and transitions to
2637    /// the Stopped state.
2638    pub async fn stop(&mut self) -> Result<(), NodeError> {
2639        if !self.state.can_stop() {
2640            return Err(NodeError::NotStarted);
2641        }
2642        self.state = NodeState::Stopping;
2643        info!(state = %self.state, "Node stopping");
2644
2645        // Stop DNS responder
2646        if let Some(handle) = self.dns_task.take() {
2647            handle.abort();
2648            debug!("DNS responder stopped");
2649        }
2650
2651        // Send disconnect notifications to all active peers before closing transports
2652        self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
2653            .await;
2654
2655        // Stop Nostr overlay discovery background work and withdraw any advert.
2656        if let Some(bootstrap) = self.nostr_discovery.take()
2657            && let Err(e) = bootstrap.shutdown().await
2658        {
2659            warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
2660        }
2661
2662        // Tear down LAN mDNS responder + browser. Best-effort: the
2663        // OS will eventually time the advert out via its TTL even if
2664        // we don't get a clean unregister out before the daemon exits.
2665        if let Some(lan) = self.lan_discovery.take() {
2666            lan.shutdown().await;
2667        }
2668
2669        if let Some(registry) = self.local_instance_registry.take()
2670            && let Err(err) = registry.remove()
2671        {
2672            debug!(error = %err, "failed to remove same-host FIPS instance record");
2673        }
2674
2675        // Shutdown transports (they're packet producers)
2676        let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
2677        for transport_id in transport_ids {
2678            if let Some(mut handle) = self.transports.remove(&transport_id) {
2679                let transport_type = handle.transport_type().name;
2680                match handle.stop().await {
2681                    Ok(()) => {
2682                        info!(transport_id = %transport_id, transport_type, "Transport stopped");
2683                    }
2684                    Err(e) => {
2685                        warn!(
2686                            transport_id = %transport_id,
2687                            transport_type,
2688                            error = %e,
2689                            "Transport stop failed"
2690                        );
2691                    }
2692                }
2693            }
2694        }
2695
2696        // Drop packet channels
2697        self.packet_tx.take();
2698        self.packet_rx.take();
2699
2700        // Shutdown TUN interface
2701        if let Some(name) = self.tun_name.take() {
2702            info!(name = %name, "Shutting down TUN interface");
2703
2704            // Drop the tun_tx to signal the writer to stop
2705            self.tun_tx.take();
2706
2707            // Delete the interface (on Linux, causes reader to get EFAULT)
2708            if let Err(e) = shutdown_tun_interface(&name).await {
2709                warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
2710            }
2711
2712            // On macOS, signal the reader thread to exit by writing to the
2713            // shutdown pipe. The reader's select() will wake up and break.
2714            #[cfg(target_os = "macos")]
2715            if let Some(fd) = self.tun_shutdown_fd.take() {
2716                unsafe {
2717                    libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
2718                    libc::close(fd);
2719                }
2720            }
2721
2722            // Wait for threads to finish
2723            if let Some(handle) = self.tun_reader_handle.take() {
2724                let _ = handle.join();
2725            }
2726            if let Some(handle) = self.tun_writer_handle.take() {
2727                let _ = handle.join();
2728            }
2729
2730            self.tun_state = TunState::Disabled;
2731        }
2732
2733        self.state = NodeState::Stopped;
2734        info!(state = %self.state, "Node stopped");
2735        Ok(())
2736    }
2737
2738    /// Send disconnect notifications to all active peers.
2739    ///
2740    /// Best-effort: send failures are logged and ignored since the transport
2741    /// may already be degraded. This runs before transports are shut down.
2742    async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
2743        let disconnect = Disconnect::new(reason);
2744        let plaintext = disconnect.encode();
2745
2746        // Collect node_addrs to avoid borrow conflict with send helper
2747        let peer_addrs: Vec<NodeAddr> = self
2748            .peers
2749            .iter()
2750            .filter(|(_, peer)| peer.can_send() && peer.has_session())
2751            .map(|(addr, _)| *addr)
2752            .collect();
2753
2754        if peer_addrs.is_empty() {
2755            debug!(
2756                total_peers = self.peers.len(),
2757                "No sendable peers for disconnect notification"
2758            );
2759            return;
2760        }
2761
2762        let mut sent = 0usize;
2763        for node_addr in &peer_addrs {
2764            match self
2765                .send_encrypted_link_message(node_addr, &plaintext)
2766                .await
2767            {
2768                Ok(()) => sent += 1,
2769                Err(e) => {
2770                    debug!(
2771                        peer = %self.peer_display_name(node_addr),
2772                        error = %e,
2773                        "Failed to send disconnect (transport may be down)"
2774                    );
2775                }
2776            }
2777        }
2778
2779        info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2780    }
2781
2782    pub(in crate::node) fn static_peer_addresses(
2783        &self,
2784        peer_config: &PeerConfig,
2785    ) -> Vec<PeerAddress> {
2786        peer_config
2787            .addresses_by_priority()
2788            .into_iter()
2789            .cloned()
2790            .collect()
2791    }
2792
2793    async fn nostr_peer_fallback_addresses(
2794        &self,
2795        peer_config: &PeerConfig,
2796        existing: &[PeerAddress],
2797    ) -> Vec<PeerAddress> {
2798        if !self.config.node.discovery.nostr.enabled
2799            || self.config.node.discovery.nostr.policy
2800                == crate::config::NostrDiscoveryPolicy::Disabled
2801        {
2802            return Vec::new();
2803        }
2804
2805        let Some(bootstrap) = self.nostr_discovery.clone() else {
2806            return Vec::new();
2807        };
2808        if self.nostr_cooldown_applies_to_peer_config(peer_config)
2809            && bootstrap
2810                .cooldown_until(&peer_config.npub, Self::now_ms())
2811                .is_some()
2812        {
2813            debug!(
2814                npub = %peer_config.npub,
2815                "Skipping cached Nostr fallback endpoints while peer is in traversal cooldown"
2816            );
2817            return Vec::new();
2818        }
2819        let endpoints = match bootstrap
2820            .cached_advert_endpoints_for_peer(&peer_config.npub)
2821            .await
2822        {
2823            Some(endpoints) => endpoints,
2824            None => {
2825                debug!(
2826                    npub = %peer_config.npub,
2827                    "No cached Nostr advert endpoints for configured peer"
2828                );
2829                return Vec::new();
2830            }
2831        };
2832
2833        let mut fallback = Vec::new();
2834        let mut next_priority = existing
2835            .iter()
2836            .map(|addr| addr.priority)
2837            .max()
2838            .unwrap_or(100)
2839            .saturating_add(1);
2840        // Stamp every overlay-derived candidate with the current wall clock.
2841        // The dialer still honors explicit priority first; this timestamp is
2842        // only a recency tiebreaker within the same priority tier. We use a
2843        // single timestamp per fetch because all candidates in one advert are
2844        // equally fresh.
2845        let seen_at_ms = Self::now_ms();
2846        for endpoint in endpoints {
2847            let Some(candidate) =
2848                Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2849            else {
2850                continue;
2851            };
2852            if existing
2853                .iter()
2854                .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2855                || fallback.iter().any(|addr: &PeerAddress| {
2856                    addr.transport == candidate.transport && addr.addr == candidate.addr
2857                })
2858            {
2859                continue;
2860            }
2861            fallback.push(candidate);
2862            next_priority = next_priority.saturating_add(1);
2863        }
2864        fallback
2865    }
2866
2867    pub(in crate::node) async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2868        if !self.config.node.discovery.nostr.enabled
2869            || self.config.node.discovery.nostr.policy
2870                == crate::config::NostrDiscoveryPolicy::Disabled
2871        {
2872            return false;
2873        }
2874        let Some(bootstrap) = self.nostr_discovery.clone() else {
2875            return false;
2876        };
2877        let now_ms = Self::now_ms();
2878        if self.nostr_cooldown_applies_to_peer_config(peer_config)
2879            && let Some(cooldown_until_ms) = bootstrap.cooldown_until(&peer_config.npub, now_ms)
2880        {
2881            debug!(
2882                npub = %peer_config.npub,
2883                cooldown_secs = cooldown_until_ms.saturating_sub(now_ms) / 1000,
2884                "Skipping Nostr traversal request while peer is in cooldown"
2885            );
2886            return false;
2887        }
2888        bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
2889        bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
2890        let mesh_signaling_allowed = self.mesh_signaling_allowed_for_peer(peer_config);
2891        let started = bootstrap
2892            .request_connect_with_mesh_signaling(peer_config.clone(), mesh_signaling_allowed)
2893            .await;
2894        if started {
2895            info!(
2896                npub = %peer_config.npub,
2897                mesh_signaling_allowed,
2898                "Started background UDP NAT traversal attempt"
2899            );
2900        } else {
2901            debug!(
2902                npub = %peer_config.npub,
2903                mesh_signaling_allowed,
2904                "Background UDP NAT traversal attempt already in progress"
2905            );
2906        }
2907        true
2908    }
2909
2910    fn nostr_cooldown_applies_to_peer_config(&self, peer_config: &PeerConfig) -> bool {
2911        !self.mesh_signaling_allowed_for_peer(peer_config)
2912    }
2913
2914    pub(in crate::node) fn mesh_signaling_allowed_for_peer(
2915        &self,
2916        peer_config: &PeerConfig,
2917    ) -> bool {
2918        let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
2919            return false;
2920        };
2921        let peer_addr = identity.node_addr();
2922        self.configured_peer(peer_addr).is_some()
2923    }
2924
2925    fn overlay_endpoint_to_peer_address(
2926        endpoint: &OverlayEndpointAdvert,
2927        priority: u8,
2928        seen_at_ms: u64,
2929    ) -> Option<PeerAddress> {
2930        let transport = match endpoint.transport {
2931            OverlayTransportKind::Udp => "udp",
2932            OverlayTransportKind::Tcp => "tcp",
2933            OverlayTransportKind::Tor => "tor",
2934            OverlayTransportKind::WebRtc => "webrtc",
2935        };
2936        Some(
2937            PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2938                .with_seen_at_ms(seen_at_ms),
2939        )
2940    }
2941
2942    async fn attempt_peer_address_list(
2943        &mut self,
2944        peer_config: &PeerConfig,
2945        peer_identity: PeerIdentity,
2946        allow_bootstrap_nat: bool,
2947        addresses: &[PeerAddress],
2948    ) -> Result<(), NodeError> {
2949        let mut attempted = false;
2950        let mut local_route_error = None;
2951        let peer_node_addr = *peer_identity.node_addr();
2952        let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2953
2954        for addr in addresses {
2955            if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2956                if !allow_bootstrap_nat {
2957                    continue;
2958                }
2959                if self.request_nostr_bootstrap(peer_config).await {
2960                    attempted = true;
2961                    continue;
2962                }
2963                debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2964                continue;
2965            }
2966
2967            let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2968                match self.resolve_ethernet_addr(&addr.addr) {
2969                    Ok(result) => result,
2970                    Err(e) => {
2971                        debug!(
2972                            transport = %addr.transport,
2973                            addr = %addr.addr,
2974                            error = %e,
2975                            "Failed to resolve Ethernet address"
2976                        );
2977                        continue;
2978                    }
2979                }
2980            } else if addr.transport == "ble" {
2981                #[cfg(bluer_available)]
2982                {
2983                    match self.resolve_ble_addr(&addr.addr) {
2984                        Ok(result) => result,
2985                        Err(e) => {
2986                            debug!(
2987                                transport = %addr.transport,
2988                                addr = %addr.addr,
2989                                error = %e,
2990                                "Failed to resolve BLE address"
2991                            );
2992                            continue;
2993                        }
2994                    }
2995                }
2996                #[cfg(not(bluer_available))]
2997                {
2998                    debug!(transport = %addr.transport, "BLE transport not available on this build");
2999                    continue;
3000                }
3001            } else {
3002                let tid = if addr.transport == "udp"
3003                    && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
3004                {
3005                    match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
3006                        Some((id, _)) => id,
3007                        None => {
3008                            debug!(
3009                                transport = %addr.transport,
3010                                addr = %addr.addr,
3011                                "No compatible operational UDP transport for address"
3012                            );
3013                            continue;
3014                        }
3015                    }
3016                } else {
3017                    match self.find_transport_for_type(&addr.transport) {
3018                        Some(id) => id,
3019                        None => {
3020                            debug!(
3021                                transport = %addr.transport,
3022                                addr = %addr.addr,
3023                                "No operational transport for address type"
3024                            );
3025                            continue;
3026                        }
3027                    }
3028                };
3029                (tid, TransportAddr::from_string(&addr.addr))
3030            };
3031
3032            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
3033                attempted = true;
3034                debug!(
3035                    npub = %peer_config.npub,
3036                    transport_id = %transport_id,
3037                    remote_addr = %remote_addr,
3038                    "Skipping duplicate in-flight candidate path"
3039                );
3040                continue;
3041            }
3042
3043            if concrete_budget == 0
3044                && self.reclaim_lower_priority_inflight_candidate_for_peer(&peer_node_addr, addr)
3045            {
3046                concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
3047            }
3048
3049            if concrete_budget == 0 {
3050                debug!(
3051                    npub = %peer_config.npub,
3052                    max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
3053                    "Path candidate race budget exhausted"
3054                );
3055                break;
3056            }
3057
3058            match self
3059                .initiate_connection(transport_id, remote_addr, peer_identity)
3060                .await
3061            {
3062                Ok(()) => {
3063                    attempted = true;
3064                    concrete_budget = concrete_budget.saturating_sub(1);
3065                }
3066                Err(e @ NodeError::AccessDenied(_)) => return Err(e),
3067                Err(e) => {
3068                    if e.is_local_route_unavailable() && local_route_error.is_none() {
3069                        local_route_error = Some(e.to_string());
3070                    }
3071                    debug!(
3072                        npub = %peer_config.npub,
3073                        transport_id = %transport_id,
3074                        error = %e,
3075                        "Connection attempt failed, trying next address"
3076                    );
3077                }
3078            }
3079        }
3080
3081        if attempted {
3082            return Ok(());
3083        }
3084
3085        if let Some(error) = local_route_error {
3086            return Err(NodeError::LocalRouteUnavailable(error));
3087        }
3088
3089        Err(NodeError::NoTransportForType(format!(
3090            "no operational transport for any of {}'s addresses",
3091            peer_config.npub
3092        )))
3093    }
3094
3095    async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
3096        self.run_open_discovery_sweep(bootstrap, None, "per-tick")
3097            .await;
3098    }
3099
3100    pub(in crate::node) fn queue_active_fallback_direct_retries(
3101        &mut self,
3102        _bootstrap: &std::sync::Arc<NostrDiscovery>,
3103    ) {
3104        let now_ms = Self::now_ms();
3105        let peer_configs = self
3106            .config
3107            .auto_connect_peers()
3108            .cloned()
3109            .collect::<Vec<_>>();
3110
3111        for peer_config in peer_configs {
3112            let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
3113                continue;
3114            };
3115            let node_addr = *peer_identity.node_addr();
3116
3117            if self.retry_pending.contains_key(&node_addr)
3118                || !self.peers.contains_key(&node_addr)
3119                || self.is_connecting_to_peer(&node_addr)
3120                || !self.active_peer_should_keep_direct_retry(&node_addr, &peer_config)
3121            {
3122                continue;
3123            }
3124
3125            let mut state = super::retry::RetryState::new(peer_config.clone());
3126            state.reconnect = true;
3127            state.retry_after_ms = now_ms;
3128            self.retry_pending.insert(node_addr, state);
3129
3130            debug!(
3131                peer = %self.peer_display_name(&node_addr),
3132                "Queued direct-path retry for active fallback peer"
3133            );
3134        }
3135    }
3136
3137    /// Open-discovery cache sweep. Iterates the cached overlay adverts and
3138    /// queues retries for non-configured, not-yet-connected peers.
3139    ///
3140    /// `max_age_secs`, if set, filters out adverts whose `created_at` is
3141    /// older than `now - max_age_secs`. The per-tick sweep passes `None`
3142    /// (relies on the cache's own `valid_until_ms` filter); the one-shot
3143    /// startup sweep passes `Some(startup_sweep_max_age_secs)`.
3144    ///
3145    /// `caller` is a short label included in log lines so per-tick and
3146    /// startup sweeps are distinguishable in operator-facing logs.
3147    pub(in crate::node) async fn run_open_discovery_sweep(
3148        &mut self,
3149        bootstrap: &std::sync::Arc<NostrDiscovery>,
3150        max_age_secs: Option<u64>,
3151        caller: &'static str,
3152    ) {
3153        if !self.config.node.discovery.nostr.enabled
3154            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3155        {
3156            return;
3157        }
3158
3159        let configured_npubs = self
3160            .config
3161            .peers()
3162            .iter()
3163            .map(|peer| peer.npub.clone())
3164            .collect::<HashSet<_>>();
3165        let now_ms = Self::now_ms();
3166        let now_secs = now_ms / 1000;
3167        let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
3168        if enqueue_budget == 0 {
3169            debug!(
3170                caller = %caller,
3171                "open-discovery sweep: enqueue budget is 0, skipping"
3172            );
3173            return;
3174        }
3175
3176        let candidates = bootstrap.cached_open_discovery_candidates(64).await;
3177        let cached_count = candidates.len();
3178        let mut enqueued = 0usize;
3179        let mut skipped_age = 0usize;
3180        let mut skipped_configured = 0usize;
3181        let mut skipped_self = 0usize;
3182        let mut skipped_connected = 0usize;
3183        let mut skipped_retry_pending = 0usize;
3184        let mut skipped_connecting = 0usize;
3185        let mut skipped_no_endpoints = 0usize;
3186        let mut skipped_invalid_npub = 0usize;
3187        let mut skipped_cooldown = 0usize;
3188
3189        for (npub, endpoints, created_at_secs) in candidates {
3190            if enqueue_budget == 0 {
3191                break;
3192            }
3193
3194            if let Some(max_age) = max_age_secs
3195                && now_secs.saturating_sub(created_at_secs) > max_age
3196            {
3197                skipped_age = skipped_age.saturating_add(1);
3198                continue;
3199            }
3200
3201            if configured_npubs.contains(&npub) {
3202                // Configured peers don't go through the open-discovery
3203                // enqueue path — their `PeerConfig` is already in
3204                // `self.config.peers()`, so the regular retry queue is
3205                // what drives their reconnect. But on cold start with
3206                // NAT'd peers, every initial `initiate_peer_connection`
3207                // fails (no overlay data yet, static cache hints empty
3208                // or stale), each pushes the peer into `retry_pending`
3209                // with exponential backoff (5/10/20/40/80s), and by the
3210                // time the next backoff slot fires the Nostr advert is
3211                // already cached — we just don't act on it for ~80s.
3212                //
3213                // The arrival of an advert (which this sweep sees) means
3214                // we now have a path to dial. If the peer's retry is
3215                // scheduled in the future, pull it forward to "now" so
3216                // the next `process_pending_retries` tick fires it
3217                // immediately. The retry path (`initiate_peer_retry_
3218                // connection` → `try_peer_addresses`) then refetches
3219                // the advert and dials it — no behavioral change
3220                // beyond schedule timing.
3221                if let Ok(identity) = PeerIdentity::from_npub(&npub) {
3222                    let configured_addr = *identity.node_addr();
3223                    if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3224                        skipped_cooldown = skipped_cooldown.saturating_add(1);
3225                        skipped_configured = skipped_configured.saturating_add(1);
3226                        continue;
3227                    }
3228                    if let Some(state) = self.retry_pending.get_mut(&configured_addr)
3229                        && state.retry_after_ms > now_ms
3230                    {
3231                        state.retry_after_ms = now_ms;
3232                        debug!(
3233                            caller = %caller,
3234                            peer = %self.peer_display_name(&configured_addr),
3235                            advert_age_secs = now_secs.saturating_sub(created_at_secs),
3236                            "Expediting configured-peer retry after fresh overlay advert"
3237                        );
3238                    }
3239                }
3240                skipped_configured = skipped_configured.saturating_add(1);
3241                continue;
3242            }
3243
3244            let peer_identity = match PeerIdentity::from_npub(&npub) {
3245                Ok(identity) => identity,
3246                Err(_) => {
3247                    skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
3248                    continue;
3249                }
3250            };
3251            let node_addr = *peer_identity.node_addr();
3252            if node_addr == *self.identity.node_addr() {
3253                skipped_self = skipped_self.saturating_add(1);
3254                continue;
3255            }
3256            if self.peers.contains_key(&node_addr) {
3257                skipped_connected = skipped_connected.saturating_add(1);
3258                continue;
3259            }
3260            if self.retry_pending.contains_key(&node_addr) {
3261                skipped_retry_pending = skipped_retry_pending.saturating_add(1);
3262                continue;
3263            }
3264            if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3265                skipped_cooldown = skipped_cooldown.saturating_add(1);
3266                continue;
3267            }
3268            let connecting = self.connections.values().any(|conn| {
3269                conn.expected_identity()
3270                    .map(|id| id.node_addr() == &node_addr)
3271                    .unwrap_or(false)
3272            });
3273            if connecting {
3274                skipped_connecting = skipped_connecting.saturating_add(1);
3275                continue;
3276            }
3277
3278            let mut addresses = Vec::new();
3279            let mut priority = 120u8;
3280            let seen_at_ms = Self::now_ms();
3281            for endpoint in endpoints {
3282                let Some(candidate) =
3283                    Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
3284                else {
3285                    continue;
3286                };
3287                if addresses.iter().any(|existing: &PeerAddress| {
3288                    existing.transport == candidate.transport && existing.addr == candidate.addr
3289                }) {
3290                    continue;
3291                }
3292                addresses.push(candidate);
3293                priority = priority.saturating_add(1);
3294            }
3295            if addresses.is_empty() {
3296                skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
3297                continue;
3298            }
3299
3300            self.peer_aliases
3301                .entry(node_addr)
3302                .or_insert_with(|| peer_identity.short_npub());
3303            self.register_identity(node_addr, peer_identity.pubkey_full());
3304
3305            let mut state = super::retry::RetryState::new(PeerConfig {
3306                npub: npub.clone(),
3307                alias: None,
3308                addresses,
3309                connect_policy: ConnectPolicy::AutoConnect,
3310                auto_reconnect: true,
3311                discovery_fallback_transit: false,
3312            });
3313            state.reconnect = false;
3314            state.retry_after_ms = now_ms;
3315            state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
3316            self.retry_pending.insert(node_addr, state);
3317            info!(
3318                caller = %caller,
3319                peer = %peer_identity.short_npub(),
3320                advert_age_secs = now_secs.saturating_sub(created_at_secs),
3321                "open-discovery sweep: queued retry for cached advert"
3322            );
3323            enqueue_budget = enqueue_budget.saturating_sub(1);
3324            enqueued = enqueued.saturating_add(1);
3325        }
3326
3327        // Always log a one-line summary on the startup sweep so operators
3328        // can verify it ran. Per-tick sweeps are noisier; only summarize
3329        // when something happened.
3330        let total_skipped = skipped_age
3331            + skipped_configured
3332            + skipped_self
3333            + skipped_connected
3334            + skipped_retry_pending
3335            + skipped_connecting
3336            + skipped_no_endpoints
3337            + skipped_invalid_npub
3338            + skipped_cooldown;
3339        let should_summarize = caller == "startup" || enqueued > 0;
3340        if should_summarize {
3341            info!(
3342                caller = %caller,
3343                cached = cached_count,
3344                queued = enqueued,
3345                skipped_age = skipped_age,
3346                skipped_configured = skipped_configured,
3347                skipped_self = skipped_self,
3348                skipped_connected = skipped_connected,
3349                skipped_retry_pending = skipped_retry_pending,
3350                skipped_connecting = skipped_connecting,
3351                skipped_no_endpoints = skipped_no_endpoints,
3352                skipped_invalid_npub = skipped_invalid_npub,
3353                skipped_cooldown = skipped_cooldown,
3354                skipped_total = total_skipped,
3355                "open-discovery sweep complete"
3356            );
3357        }
3358    }
3359
3360    /// One-shot startup sweep: runs once after the configured settle
3361    /// delay, iterating the cached overlay adverts and queueing retries
3362    /// for any peer with a recent enough advert that we haven't already
3363    /// configured statically or established a link to.
3364    ///
3365    /// Gated identically to [`run_open_discovery_sweep`]: requires
3366    /// `node.discovery.nostr.enabled` and `policy == open`.
3367    async fn maybe_run_startup_open_discovery_sweep(
3368        &mut self,
3369        bootstrap: &std::sync::Arc<NostrDiscovery>,
3370    ) {
3371        if self.startup_open_discovery_sweep_done {
3372            return;
3373        }
3374        if !self.config.node.discovery.nostr.enabled
3375            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3376        {
3377            // Mark done so we don't keep re-checking on every tick.
3378            self.startup_open_discovery_sweep_done = true;
3379            return;
3380        }
3381        let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
3382            return;
3383        };
3384        let now_ms = Self::now_ms();
3385        let delay_ms = self
3386            .config
3387            .node
3388            .discovery
3389            .nostr
3390            .startup_sweep_delay_secs
3391            .saturating_mul(1000);
3392        if now_ms < started_at_ms.saturating_add(delay_ms) {
3393            return;
3394        }
3395
3396        let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
3397        self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
3398            .await;
3399        self.startup_open_discovery_sweep_done = true;
3400    }
3401
3402    fn available_outbound_slots(&self) -> usize {
3403        let connection_used = self
3404            .connections
3405            .len()
3406            .saturating_add(self.pending_connects.len());
3407        let connection_slots = if self.max_connections == 0 {
3408            usize::MAX
3409        } else {
3410            self.max_connections.saturating_sub(connection_used)
3411        };
3412
3413        let peer_slots = if self.max_peers == 0 {
3414            usize::MAX
3415        } else {
3416            self.max_peers.saturating_sub(self.peers.len())
3417        };
3418
3419        let link_slots = if self.max_links == 0 {
3420            usize::MAX
3421        } else {
3422            self.max_links.saturating_sub(self.links.len())
3423        };
3424
3425        connection_slots.min(peer_slots).min(link_slots)
3426    }
3427
3428    pub(in crate::node) fn open_discovery_enqueue_budget(
3429        &self,
3430        configured_npubs: &HashSet<String>,
3431    ) -> usize {
3432        let current_open_discovery_active = self
3433            .peers
3434            .values()
3435            .filter(|peer| !configured_npubs.contains(&peer.npub()))
3436            .count();
3437        let current_open_discovery_pending = self
3438            .retry_pending
3439            .values()
3440            .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
3441            .count();
3442
3443        let cap_remaining = self
3444            .config
3445            .node
3446            .discovery
3447            .nostr
3448            .open_discovery_max_pending
3449            .saturating_sub(current_open_discovery_active)
3450            .saturating_sub(current_open_discovery_pending);
3451
3452        cap_remaining.min(self.available_outbound_slots())
3453    }
3454
3455    fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
3456        now_ms.saturating_add(
3457            self.config
3458                .node
3459                .discovery
3460                .nostr
3461                .advert_ttl_secs
3462                .saturating_mul(1000)
3463                .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
3464        )
3465    }
3466
3467    async fn build_overlay_advert(
3468        &self,
3469        bootstrap: &std::sync::Arc<NostrDiscovery>,
3470    ) -> Option<OverlayAdvert> {
3471        if !self.config.node.discovery.nostr.enabled {
3472            return None;
3473        }
3474
3475        let mut endpoints = Vec::new();
3476        let mut has_udp_nat = false;
3477        let mut has_webrtc = false;
3478
3479        for handle in self.transports.values() {
3480            if !handle.is_operational() {
3481                continue;
3482            }
3483
3484            match handle.transport_type().name {
3485                "udp" => {
3486                    let Some(cfg) = self.lookup_udp_config(handle.name()) else {
3487                        continue;
3488                    };
3489                    if !cfg.advertise_on_nostr() {
3490                        continue;
3491                    }
3492                    if cfg.is_public() {
3493                        // Precedence:
3494                        // 1. operator-supplied `external_addr` (skips STUN)
3495                        // 2. non-wildcard *public* `local_addr` (operator
3496                        //    bound to a specific public IP directly)
3497                        // 3. STUN auto-discovery against ephemeral socket
3498                        //    (also taken when bind is wildcard *or* private —
3499                        //    a private bind is not peer-reachable, so we
3500                        //    must publish the public reflexive instead)
3501                        // 4. loud warn + omit endpoint
3502                        if let Some(explicit) = cfg.external_advert_addr() {
3503                            endpoints.push(OverlayEndpointAdvert {
3504                                transport: OverlayTransportKind::Udp,
3505                                addr: explicit.to_string(),
3506                            });
3507                        } else {
3508                            match handle.local_addr() {
3509                                Some(addr)
3510                                    if !addr.ip().is_unspecified()
3511                                        && !is_unroutable_advert_ip(addr.ip()) =>
3512                                {
3513                                    endpoints.push(OverlayEndpointAdvert {
3514                                        transport: OverlayTransportKind::Udp,
3515                                        addr: addr.to_string(),
3516                                    });
3517                                }
3518                                Some(addr) => {
3519                                    let key = handle.transport_id().as_u32();
3520                                    let port = addr.port();
3521                                    if let Some(public) =
3522                                        bootstrap.learn_public_udp_addr(key, port).await
3523                                    {
3524                                        endpoints.push(OverlayEndpointAdvert {
3525                                            transport: OverlayTransportKind::Udp,
3526                                            addr: public.to_string(),
3527                                        });
3528                                    } else {
3529                                        warn!(
3530                                            transport_id = key,
3531                                            bind_addr = %addr,
3532                                            "advert: udp public=true but bind is wildcard \
3533                                            or private and STUN observation failed; \
3534                                            advertising no UDP endpoint. Either set \
3535                                            transports.udp.external_addr, bind to a \
3536                                            specific *public* IP, or ensure \
3537                                            node.discovery.nostr.stun_servers is reachable"
3538                                        );
3539                                    }
3540                                }
3541                                None => {}
3542                            }
3543                        }
3544                    } else {
3545                        endpoints.push(OverlayEndpointAdvert {
3546                            transport: OverlayTransportKind::Udp,
3547                            addr: "nat".to_string(),
3548                        });
3549                        has_udp_nat = true;
3550                    }
3551                }
3552                "webrtc" => {
3553                    let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
3554                        continue;
3555                    };
3556                    if !cfg.advertise_on_nostr() {
3557                        continue;
3558                    }
3559                    endpoints.push(OverlayEndpointAdvert {
3560                        transport: OverlayTransportKind::WebRtc,
3561                        addr: hex::encode(self.identity.pubkey_full().serialize()),
3562                    });
3563                    has_webrtc = true;
3564                }
3565                "tcp" => {
3566                    let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
3567                        continue;
3568                    };
3569                    if !cfg.advertise_on_nostr() {
3570                        continue;
3571                    }
3572                    // Precedence:
3573                    // 1. operator-supplied `external_addr` (only path that
3574                    //    works on cloud-NAT setups where the public IP is
3575                    //    not on a host interface).
3576                    // 2. non-wildcard *public* `local_addr` (operator bound
3577                    //    to a specific public IP directly).
3578                    // 3. loud warn + omit endpoint (no TCP STUN equivalent).
3579                    //
3580                    // A wildcard *or* private bind is never advertised as-is
3581                    // — peers off-LAN can't reach a private bind, and there
3582                    // is no TCP STUN to discover a public reflexive.
3583                    if let Some(explicit) = cfg.external_advert_addr() {
3584                        endpoints.push(OverlayEndpointAdvert {
3585                            transport: OverlayTransportKind::Tcp,
3586                            addr: explicit.to_string(),
3587                        });
3588                    } else {
3589                        match handle.local_addr() {
3590                            Some(addr)
3591                                if !addr.ip().is_unspecified()
3592                                    && !is_unroutable_advert_ip(addr.ip()) =>
3593                            {
3594                                endpoints.push(OverlayEndpointAdvert {
3595                                    transport: OverlayTransportKind::Tcp,
3596                                    addr: addr.to_string(),
3597                                });
3598                            }
3599                            Some(addr) => {
3600                                warn!(
3601                                    bind_addr = %addr,
3602                                    "advert: tcp advertise_on_nostr=true bound to wildcard \
3603                                    or private IP and no transports.tcp.external_addr set; \
3604                                    advertising no TCP endpoint. Either set external_addr \
3605                                    to the public IP (recommended for cloud 1:1-NAT setups) \
3606                                    or bind explicitly to the public IP"
3607                                );
3608                            }
3609                            None => {}
3610                        }
3611                    }
3612                }
3613                "tor" => {
3614                    let Some(cfg) = self.lookup_tor_config(handle.name()) else {
3615                        continue;
3616                    };
3617                    if !cfg.advertise_on_nostr() {
3618                        continue;
3619                    }
3620                    if let Some(addr) = handle.onion_address() {
3621                        endpoints.push(OverlayEndpointAdvert {
3622                            transport: OverlayTransportKind::Tor,
3623                            addr: format!("{}:{}", addr, cfg.advertised_port()),
3624                        });
3625                    }
3626                }
3627                _ => {}
3628            }
3629        }
3630
3631        if endpoints.is_empty() {
3632            return None;
3633        }
3634
3635        Some(OverlayAdvert {
3636            identifier: ADVERT_IDENTIFIER.to_string(),
3637            version: ADVERT_VERSION,
3638            endpoints,
3639            signal_relays: (has_udp_nat || has_webrtc)
3640                .then(|| self.config.node.discovery.nostr.dm_relays.clone()),
3641            stun_servers: (has_udp_nat || has_webrtc)
3642                .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
3643        })
3644    }
3645
3646    async fn refresh_overlay_advert(
3647        &self,
3648        bootstrap: &std::sync::Arc<NostrDiscovery>,
3649    ) -> Result<(), crate::discovery::nostr::BootstrapError> {
3650        let advert = self.build_overlay_advert(bootstrap).await;
3651        bootstrap.update_local_advert(advert).await
3652    }
3653
3654    fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
3655        match (&self.config.transports.udp, transport_name) {
3656            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3657            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3658            _ => None,
3659        }
3660    }
3661
3662    fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
3663        match (&self.config.transports.tcp, transport_name) {
3664            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3665            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3666            _ => None,
3667        }
3668    }
3669
3670    fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
3671        match (&self.config.transports.tor, transport_name) {
3672            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3673            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3674            _ => None,
3675        }
3676    }
3677
3678    fn lookup_webrtc_config(
3679        &self,
3680        transport_name: Option<&str>,
3681    ) -> Option<&crate::config::WebRtcConfig> {
3682        match (&self.config.transports.webrtc, transport_name) {
3683            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3684            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3685            _ => None,
3686        }
3687    }
3688
3689    pub(in crate::node) async fn try_peer_addresses(
3690        &mut self,
3691        peer_config: &PeerConfig,
3692        peer_identity: PeerIdentity,
3693        allow_bootstrap_nat: bool,
3694    ) -> Result<(), NodeError> {
3695        let peer_node_addr = *peer_identity.node_addr();
3696        if self.peers.contains_key(&peer_node_addr) {
3697            debug!(
3698                npub = %peer_config.npub,
3699                "Peer already exists, skipping address attempts"
3700            );
3701            return Ok(());
3702        }
3703
3704        let candidates = self.peer_address_candidates(peer_config).await;
3705
3706        if candidates.is_empty() {
3707            if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3708                return Ok(());
3709            }
3710            return Err(NodeError::NoTransportForType(format!(
3711                "no addresses known for {}",
3712                peer_config.npub
3713            )));
3714        }
3715
3716        if self
3717            .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
3718            .await
3719            .is_ok()
3720        {
3721            if allow_bootstrap_nat {
3722                self.request_nostr_bootstrap(peer_config).await;
3723            }
3724            return Ok(());
3725        }
3726
3727        if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3728            return Ok(());
3729        }
3730
3731        Err(NodeError::NoTransportForType(format!(
3732            "no operational transport for any of {}'s addresses",
3733            peer_config.npub
3734        )))
3735    }
3736
3737    async fn try_active_peer_alternative_addresses(
3738        &mut self,
3739        peer_config: &PeerConfig,
3740        peer_identity: PeerIdentity,
3741        allow_same_path_refresh: bool,
3742    ) -> Result<bool, NodeError> {
3743        let peer_node_addr = *peer_identity.node_addr();
3744        let mut candidates = self.peer_address_candidates(peer_config).await;
3745        let same_path_refresh_needed = allow_same_path_refresh
3746            && (self.active_peer_needs_same_path_refresh(&peer_node_addr)
3747                || self
3748                    .peers
3749                    .get(&peer_node_addr)
3750                    .is_some_and(|peer| !peer.can_send()));
3751        if same_path_refresh_needed
3752            && let Some(candidate) = self.active_peer_current_udp_candidate(&peer_node_addr)
3753            && !candidates.iter().any(|existing| {
3754                existing.transport == candidate.transport && existing.addr == candidate.addr
3755            })
3756        {
3757            candidates.push(candidate);
3758            Self::sort_peer_address_candidates(&mut candidates);
3759        }
3760        let should_try_nostr =
3761            self.active_peer_should_keep_direct_retry(&peer_node_addr, peer_config);
3762
3763        if candidates.is_empty() {
3764            if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3765                return Ok(true);
3766            }
3767            return Err(NodeError::NoTransportForType(format!(
3768                "no addresses known for {}",
3769                peer_config.npub
3770            )));
3771        }
3772
3773        let alternatives: Vec<_> = candidates
3774            .into_iter()
3775            .filter(|addr| {
3776                same_path_refresh_needed
3777                    || !self.active_peer_matches_candidate(&peer_node_addr, addr)
3778            })
3779            .collect();
3780
3781        if alternatives.is_empty() {
3782            if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3783                return Ok(true);
3784            }
3785            return Ok(false);
3786        }
3787
3788        let needs_separate_nostr_attempt = should_try_nostr
3789            && !alternatives
3790                .iter()
3791                .any(|addr| addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat"));
3792        let address_result = self
3793            .attempt_peer_address_list(peer_config, peer_identity, true, &alternatives)
3794            .await;
3795        let nostr_attempted =
3796            needs_separate_nostr_attempt && self.request_nostr_bootstrap(peer_config).await;
3797
3798        match address_result {
3799            Ok(()) => Ok(true),
3800            Err(err) if nostr_attempted => {
3801                debug!(
3802                    npub = %peer_config.npub,
3803                    error = %err,
3804                    "Static active-peer direct-path alternatives failed; Nostr traversal still queued"
3805                );
3806                Ok(true)
3807            }
3808            Err(err) => Err(err),
3809        }
3810    }
3811
3812    async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
3813        // Merge every candidate from every source we have for this peer.
3814        // Explicitly configured addresses keep first shot, then freshly
3815        // fetched overlay adverts are appended as fallback candidates. This
3816        // lets native peers try known LAN/nvpn/static UDP routes before
3817        // slower WebRTC/Nostr-discovered paths, while still racing every
3818        // concrete candidate that fits in the per-peer budget.
3819        let static_addresses = self.static_peer_addresses(peer_config);
3820        let overlay_addresses = self
3821            .nostr_peer_fallback_addresses(peer_config, &static_addresses)
3822            .await;
3823
3824        let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
3825        for addr in overlay_addresses.into_iter().chain(static_addresses) {
3826            if !candidates.iter().any(|existing: &PeerAddress| {
3827                existing.transport == addr.transport && existing.addr == addr.addr
3828            }) {
3829                candidates.push(addr);
3830            }
3831        }
3832
3833        Self::sort_peer_address_candidates(&mut candidates);
3834
3835        candidates
3836    }
3837
3838    fn sort_peer_address_candidates(candidates: &mut [PeerAddress]) {
3839        // Stable sort: explicit priority is the contract, and freshness only
3840        // breaks ties inside one priority tier. Overlay-discovered endpoints
3841        // are assigned lower priority than configured/static hints when both
3842        // exist, so operator-provided LAN routes keep first shot without
3843        // dropping fresh overlay candidates from the race.
3844        candidates.sort_by(|a, b| {
3845            if a.priority != b.priority {
3846                return a.priority.cmp(&b.priority);
3847            }
3848            match (a.seen_at_ms, b.seen_at_ms) {
3849                (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
3850                (Some(_), None) => std::cmp::Ordering::Less,
3851                (None, Some(_)) => std::cmp::Ordering::Greater,
3852                (None, None) => std::cmp::Ordering::Equal,
3853            }
3854        });
3855    }
3856
3857    fn active_peer_matches_any_candidate(
3858        &self,
3859        peer_node_addr: &NodeAddr,
3860        candidates: &[PeerAddress],
3861    ) -> bool {
3862        candidates
3863            .iter()
3864            .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
3865    }
3866
3867    pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
3868        &self,
3869        peer_node_addr: &NodeAddr,
3870        candidates: &[PeerAddress],
3871    ) -> bool {
3872        if !self
3873            .peers
3874            .get(peer_node_addr)
3875            .is_some_and(|peer| peer.can_send())
3876        {
3877            return false;
3878        }
3879        if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
3880            return false;
3881        }
3882        !self.active_peer_needs_same_path_refresh(peer_node_addr)
3883    }
3884
3885    pub(in crate::node) fn active_peer_should_keep_direct_retry(
3886        &self,
3887        peer_node_addr: &NodeAddr,
3888        peer_config: &PeerConfig,
3889    ) -> bool {
3890        let Some(peer) = self.peers.get(peer_node_addr) else {
3891            return false;
3892        };
3893
3894        let static_addresses = self.static_peer_addresses(peer_config);
3895        if !static_addresses.is_empty() {
3896            return !self
3897                .active_peer_candidate_is_fresh_enough_to_skip(peer_node_addr, &static_addresses);
3898        }
3899
3900        if peer_config.npub.is_empty() {
3901            return false;
3902        }
3903
3904        if !self.config.node.discovery.nostr.enabled
3905            || self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Disabled
3906        {
3907            return false;
3908        }
3909
3910        let Some(transport_id) = peer.transport_id() else {
3911            return true;
3912        };
3913
3914        if self.bootstrap_transports.contains(&transport_id) {
3915            return self.active_peer_needs_same_path_refresh(peer_node_addr);
3916        }
3917
3918        let Some(transport) = self.transports.get(&transport_id) else {
3919            return true;
3920        };
3921
3922        if transport.transport_type().name != "udp" {
3923            return true;
3924        }
3925
3926        self.active_peer_needs_same_path_refresh(peer_node_addr)
3927    }
3928
3929    pub(in crate::node) fn clear_retry_unless_direct_refresh_needed(
3930        &mut self,
3931        peer_node_addr: &NodeAddr,
3932    ) {
3933        let keep_retry = self
3934            .retry_pending
3935            .get(peer_node_addr)
3936            .map(|state| state.peer_config.clone())
3937            .is_some_and(|peer_config| {
3938                self.active_peer_should_keep_direct_retry(peer_node_addr, &peer_config)
3939            });
3940
3941        if !keep_retry {
3942            self.retry_pending.remove(peer_node_addr);
3943        }
3944    }
3945
3946    fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
3947        let Some(peer) = self.peers.get(peer_node_addr) else {
3948            return false;
3949        };
3950        let stale_after_ms = self
3951            .config
3952            .node
3953            .heartbeat_interval_secs
3954            .saturating_mul(1000)
3955            .max(1000);
3956        peer.idle_time(Self::now_ms()) > stale_after_ms
3957    }
3958
3959    pub(in crate::node) fn active_peer_current_udp_candidate(
3960        &self,
3961        peer_node_addr: &NodeAddr,
3962    ) -> Option<PeerAddress> {
3963        let peer = self.peers.get(peer_node_addr)?;
3964        let current_addr = peer.current_addr()?;
3965        if let Some(transport_id) = peer.transport_id() {
3966            if let Some(transport) = self.transports.get(&transport_id) {
3967                if transport.transport_type().name != "udp" {
3968                    return None;
3969                }
3970            } else if !self
3971                .transports
3972                .values()
3973                .any(|transport| transport.transport_type().name == "udp")
3974            {
3975                return None;
3976            }
3977        } else if !self
3978            .transports
3979            .values()
3980            .any(|transport| transport.transport_type().name == "udp")
3981        {
3982            return None;
3983        }
3984        let socket_addr = current_addr.as_str()?.parse::<SocketAddr>().ok()?;
3985
3986        Some(
3987            PeerAddress::with_priority("udp", socket_addr.to_string(), 240)
3988                .with_seen_at_ms(Self::now_ms()),
3989        )
3990    }
3991
3992    pub(in crate::node) fn active_peer_matches_candidate(
3993        &self,
3994        peer_node_addr: &NodeAddr,
3995        candidate: &PeerAddress,
3996    ) -> bool {
3997        let Some(peer) = self.peers.get(peer_node_addr) else {
3998            return false;
3999        };
4000        let Some(current_addr) = peer.current_addr() else {
4001            return false;
4002        };
4003        if let Some(peer_transport_id) = peer.transport_id()
4004            && let Some((candidate_transport_id, candidate_addr)) =
4005                self.resolve_peer_address_for_match(candidate)
4006        {
4007            return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
4008        }
4009        if peer
4010            .transport_id()
4011            .map(|id| self.bootstrap_transports.contains(&id))
4012            .unwrap_or(false)
4013        {
4014            return false;
4015        }
4016        let current_addr = current_addr.to_string();
4017        let current_transport = peer
4018            .transport_id()
4019            .and_then(|id| self.transports.get(&id))
4020            .map(|transport| transport.transport_type().name);
4021
4022        candidate.addr == current_addr
4023            && current_transport
4024                .map(|transport| transport == candidate.transport)
4025                .unwrap_or(true)
4026    }
4027
4028    fn configured_path_priority(
4029        &self,
4030        peer_node_addr: &NodeAddr,
4031        transport_id: TransportId,
4032        remote_addr: &TransportAddr,
4033    ) -> Option<u8> {
4034        self.configured_peer(peer_node_addr)?
4035            .addresses
4036            .iter()
4037            .filter_map(|candidate| {
4038                let (candidate_transport_id, candidate_addr) =
4039                    self.resolve_peer_address_for_match(candidate)?;
4040                (candidate_transport_id == transport_id && &candidate_addr == remote_addr)
4041                    .then_some(candidate.priority)
4042            })
4043            .min()
4044    }
4045
4046    pub(in crate::node) fn alternate_path_priority_allows_replace(
4047        &self,
4048        peer_node_addr: &NodeAddr,
4049        candidate_transport_id: TransportId,
4050        candidate_addr: &TransportAddr,
4051    ) -> bool {
4052        const UNKNOWN_PATH_PRIORITY: u16 = u8::MAX as u16 + 1;
4053
4054        let Some(peer) = self.peers.get(peer_node_addr) else {
4055            return true;
4056        };
4057        let Some(current_transport_id) = peer.transport_id() else {
4058            return true;
4059        };
4060        let Some(current_addr) = peer.current_addr() else {
4061            return true;
4062        };
4063
4064        let current_priority = self
4065            .configured_path_priority(peer_node_addr, current_transport_id, current_addr)
4066            .map(u16::from)
4067            .unwrap_or(UNKNOWN_PATH_PRIORITY);
4068        let candidate_priority = self
4069            .configured_path_priority(peer_node_addr, candidate_transport_id, candidate_addr)
4070            .map(u16::from)
4071            .unwrap_or(UNKNOWN_PATH_PRIORITY);
4072
4073        if candidate_priority <= current_priority {
4074            return true;
4075        }
4076
4077        debug!(
4078            peer = %self.peer_display_name(peer_node_addr),
4079            current_transport_id = %current_transport_id,
4080            current_addr = %current_addr,
4081            current_priority,
4082            candidate_transport_id = %candidate_transport_id,
4083            candidate_addr = %candidate_addr,
4084            candidate_priority,
4085            "Suppressing lower-priority alternate path while current path remains healthy"
4086        );
4087        false
4088    }
4089
4090    pub(in crate::node) fn authenticated_packet_path_allows_bookkeeping(
4091        &mut self,
4092        peer_node_addr: &NodeAddr,
4093        candidate_transport_id: TransportId,
4094        candidate_addr: &TransportAddr,
4095        now_ms: u64,
4096    ) -> bool {
4097        let Some(peer) = self.peers.get(peer_node_addr) else {
4098            return true;
4099        };
4100
4101        if peer.transport_id() == Some(candidate_transport_id)
4102            && peer.current_addr() == Some(candidate_addr)
4103        {
4104            return true;
4105        }
4106
4107        let current_path_sendable = peer.can_send();
4108        if !current_path_sendable
4109            || self.session_direct_path_blocks_direct_payload(peer_node_addr, now_ms)
4110        {
4111            return true;
4112        }
4113
4114        self.alternate_path_priority_allows_replace(
4115            peer_node_addr,
4116            candidate_transport_id,
4117            candidate_addr,
4118        )
4119    }
4120
4121    pub(in crate::node) fn active_peer_uses_recent_endpoint_path(
4122        &self,
4123        peer_node_addr: &NodeAddr,
4124        peer_config: &PeerConfig,
4125    ) -> bool {
4126        peer_config.addresses.iter().any(|addr| {
4127            addr.seen_at_ms.is_some() && self.active_peer_matches_candidate(peer_node_addr, addr)
4128        })
4129    }
4130
4131    pub(in crate::node) fn active_peer_uses_traversal_path(
4132        &self,
4133        peer_node_addr: &NodeAddr,
4134        peer_config: &PeerConfig,
4135    ) -> bool {
4136        let via_bootstrap_transport = self
4137            .peers
4138            .get(peer_node_addr)
4139            .and_then(|peer| peer.transport_id())
4140            .map(|id| self.bootstrap_transports.contains(&id))
4141            .unwrap_or(false);
4142
4143        via_bootstrap_transport
4144            || self.active_peer_uses_recent_endpoint_path(peer_node_addr, peer_config)
4145    }
4146
4147    // === Control API methods ===
4148
4149    /// Connect to a peer via the control API.
4150    ///
4151    /// Creates an ephemeral peer connection (not persisted to config, no
4152    /// auto-reconnect). Reuses the same connection path as auto-connect
4153    /// peers. Returns JSON data on success or an error message.
4154    pub(crate) async fn api_connect(
4155        &mut self,
4156        npub: &str,
4157        address: &str,
4158        transport: &str,
4159    ) -> Result<serde_json::Value, String> {
4160        let peer_config = PeerConfig {
4161            npub: npub.to_string(),
4162            alias: None,
4163            addresses: vec![PeerAddress::new(transport, address)],
4164            connect_policy: ConnectPolicy::Manual,
4165            auto_reconnect: false,
4166            discovery_fallback_transit: true,
4167        };
4168
4169        // Pre-seed identity cache (same as initiate_peer_connections does)
4170        if let Ok(identity) = PeerIdentity::from_npub(npub) {
4171            self.peer_aliases
4172                .insert(*identity.node_addr(), identity.short_npub());
4173            self.register_identity(*identity.node_addr(), identity.pubkey_full());
4174        }
4175
4176        self.initiate_peer_connection(&peer_config)
4177            .await
4178            .map(|()| {
4179                info!(
4180                    npub = %npub,
4181                    address = %address,
4182                    transport = %transport,
4183                    "API connect initiated"
4184                );
4185                serde_json::json!({
4186                    "npub": npub,
4187                    "address": address,
4188                    "transport": transport,
4189                })
4190            })
4191            .map_err(|e| e.to_string())
4192    }
4193
4194    /// Disconnect a peer via the control API.
4195    ///
4196    /// Removes the peer and suppresses auto-reconnect.
4197    pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
4198        let peer_identity =
4199            PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
4200        let node_addr = *peer_identity.node_addr();
4201
4202        if !self.peers.contains_key(&node_addr) {
4203            return Err(format!("peer not found: {npub}"));
4204        }
4205
4206        // Remove the peer (full cleanup: sessions, indices, links, tree, bloom)
4207        self.remove_active_peer(&node_addr);
4208
4209        // Suppress any pending auto-reconnect
4210        self.retry_pending.remove(&node_addr);
4211
4212        info!(npub = %npub, "API disconnect completed");
4213
4214        Ok(serde_json::json!({
4215            "npub": npub,
4216            "disconnected": true,
4217        }))
4218    }
4219
4220    /// Adopt an already-established UDP traversal and start the normal FIPS
4221    /// Noise handshake over it.
4222    ///
4223    /// This is intended for integration with an external rendezvous runtime
4224    /// that has already completed relay signaling, STUN observation, and UDP
4225    /// hole punching. After handoff, the adopted socket is owned by FIPS.
4226    pub async fn adopt_established_traversal(
4227        &mut self,
4228        traversal: EstablishedTraversal,
4229    ) -> Result<BootstrapHandoffResult, NodeError> {
4230        debug!(
4231            peer_npub = %traversal.peer_npub,
4232            session_id = %traversal.session_id,
4233            remote_addr = %traversal.remote_addr,
4234            "adopting established traversal socket"
4235        );
4236
4237        if !self.state.is_operational() {
4238            return Err(NodeError::NotStarted);
4239        }
4240
4241        let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
4242        let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
4243            NodeError::InvalidPeerNpub {
4244                npub: traversal.peer_npub.clone(),
4245                reason: e.to_string(),
4246            }
4247        })?;
4248        let peer_node_addr = *peer_identity.node_addr();
4249        if self.peers.contains_key(&peer_node_addr) {
4250            debug!(
4251                peer_npub = %traversal.peer_npub,
4252                "Adopting NAT traversal handoff as alternate path for already-connected peer"
4253            );
4254        }
4255
4256        self.peer_aliases
4257            .insert(peer_node_addr, peer_identity.short_npub());
4258        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
4259
4260        let transport_id = self.allocate_transport_id();
4261        // Adopted ephemeral UDP transports inherit MTU + socket-buffer sizing
4262        // (and accept_connections / advertise flags) from the operator's
4263        // configured [transports.udp] when the bootstrap runtime doesn't
4264        // pass an explicit override. Lookup tries `transport_name` first
4265        // (covers the `Named` multi-listener variant) and falls back to the
4266        // unnamed `Single` listener, so single- and named-listener configs
4267        // both inherit cleanly.
4268        //
4269        // Tradeoff: `UdpConfig::default()` sets MTU 1280 (IPv6 minimum), the
4270        // only value guaranteed to survive arbitrary middlebox paths.
4271        // Inheriting a higher operator-chosen MTU means NAT-traversed flows
4272        // initially attempt that MTU and may black-hole on tighter paths
4273        // until reactive `MtuExceeded` recovery kicks in. Operators who
4274        // raise the primary MTU based on known-clean topology accept that
4275        // tradeoff; the silent drop on a too-low default was strictly
4276        // worse for the common case where the primary MTU is reachable.
4277        //
4278        // Bind / external address fields are cleared since the socket is
4279        // already bound.
4280        let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
4281            let mut cfg = self
4282                .lookup_udp_config(traversal.transport_name.as_deref())
4283                .or_else(|| self.lookup_udp_config(None))
4284                .cloned()
4285                .unwrap_or_default();
4286            cfg.bind_addr = None;
4287            cfg.external_addr = None;
4288            cfg
4289        });
4290        let mut transport = crate::transport::udp::UdpTransport::new(
4291            transport_id,
4292            traversal.transport_name.clone(),
4293            inherited_config,
4294            packet_tx,
4295        );
4296
4297        transport
4298            .adopt_socket_async(traversal.socket)
4299            .await
4300            .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
4301
4302        let local_addr = transport.local_addr().ok_or_else(|| {
4303            NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
4304        })?;
4305
4306        self.transports.insert(
4307            transport_id,
4308            crate::transport::TransportHandle::Udp(transport),
4309        );
4310        self.bootstrap_transports.insert(transport_id);
4311        self.bootstrap_transport_npubs
4312            .insert(transport_id, traversal.peer_npub.clone());
4313
4314        let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
4315        if let Err(err) = self
4316            .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
4317            .await
4318        {
4319            self.bootstrap_transports.remove(&transport_id);
4320            self.bootstrap_transport_npubs.remove(&transport_id);
4321            if let Some(mut handle) = self.transports.remove(&transport_id) {
4322                let _ = handle.stop().await;
4323            }
4324            return Err(err);
4325        }
4326
4327        info!(
4328            peer = %self.peer_display_name(&peer_node_addr),
4329            transport_id = %transport_id,
4330            local_addr = %local_addr,
4331            remote_addr = %traversal.remote_addr,
4332            session_id = %traversal.session_id,
4333            "adopted NAT traversal socket; handshake initiated"
4334        );
4335
4336        Ok(BootstrapHandoffResult {
4337            transport_id,
4338            local_addr,
4339            remote_addr: traversal.remote_addr,
4340            peer_node_addr,
4341            session_id: traversal.session_id,
4342        })
4343    }
4344}