Skip to main content

fips_core/node/
lifecycle.rs

1//! Node lifecycle management: start, stop, and peer connection initiation.
2
3use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, NostrDiscoveryPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6    ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, MeshTraversalSignal, NostrDiscovery,
7    OverlayAdvert, OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason, SessionMessageType};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use secp256k1::PublicKey;
18use std::collections::{HashMap, HashSet};
19use std::net::{IpAddr, SocketAddr};
20use std::thread;
21use std::time::Duration;
22use tracing::{debug, info, warn};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25enum MeshSignalSessionAction {
26    Send,
27    Defer,
28    Drop,
29}
30
31#[cfg(debug_assertions)]
32fn node_start_debug_log(message: impl AsRef<str>) {
33    use std::io::Write as _;
34
35    if let Ok(mut file) = std::fs::OpenOptions::new()
36        .create(true)
37        .append(true)
38        .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
39    {
40        let _ = writeln!(
41            file,
42            "{:?} {}",
43            std::time::SystemTime::now(),
44            message.as_ref()
45        );
46    }
47}
48
49#[cfg(not(debug_assertions))]
50fn node_start_debug_log(_message: impl AsRef<str>) {}
51
52/// True if `ip` is not a viable canonical advert endpoint for peers off
53/// the publisher's own LAN. Covers RFC1918, loopback, link-local, IPv4
54/// CGNAT (100.64/10), unspecified, multicast/benchmark, and IPv6
55/// unique-local/loopback/unspecified. We never publish these as the
56/// peer's primary `runtime_endpoint`; an off-LAN consumer can't route
57/// to them, and latching one in onto a slow overlay-relay fallback is
58/// the original bug this guard exists to prevent.
59fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
60    match ip {
61        IpAddr::V4(v4) => {
62            v4.is_private()
63                || v4.is_loopback()
64                || v4.is_link_local()
65                || v4.is_unspecified()
66                || v4.is_multicast()
67                || v4.is_broadcast()
68                || v4.is_documentation()
69                // 100.64.0.0/10 — CGNAT, RFC 6598. Not routable on the
70                // public internet; behaves like an extra NAT layer.
71                || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
72        }
73        IpAddr::V6(v6) => {
74            v6.is_loopback()
75                || v6.is_unspecified()
76                || v6.is_unique_local()
77                || v6.is_multicast()
78                // IPv6 link-local: fe80::/10
79                || (v6.segments()[0] & 0xffc0) == 0xfe80
80        }
81    }
82}
83
84fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
85    matches!(
86        (local, remote),
87        (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
88    )
89}
90
91const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
92const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
93const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
94const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
95
96impl Node {
97    /// Initiate connections to configured static peers.
98    ///
99    /// For each peer configured with AutoConnect policy, creates a link and
100    /// peer entry, then starts the Noise handshake by sending the first message.
101    /// Replace the runtime peer list. Newly added auto-connect peers get
102    /// `initiate_peer_connection` immediately; removed peers are dropped from
103    /// the retry queue (the regular liveness timeout reaps any active session
104    /// — we don't proactively disconnect, since the same npub might be on its
105    /// way back via a fresh advert). Existing auto-connect entries with new
106    /// direct addresses are dialed immediately, and retry state is refreshed
107    /// so later attempts also use the latest hints.
108    pub(super) async fn update_peers(
109        &mut self,
110        new_peers: Vec<crate::config::PeerConfig>,
111    ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
112        use std::collections::{HashMap, HashSet};
113
114        let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
115            HashMap::with_capacity(new_peers.len());
116        let mut new_order = Vec::with_capacity(new_peers.len());
117        for peer in new_peers {
118            let identity = match PeerIdentity::from_npub(&peer.npub) {
119                Ok(id) => id,
120                Err(e) => {
121                    return Err(crate::node::NodeError::InvalidPeerNpub {
122                        npub: peer.npub.clone(),
123                        reason: e.to_string(),
124                    });
125                }
126            };
127            // Last-write-wins on duplicates so callers passing a multi-source
128            // candidate list (e.g. operator hints + recent-peers cache for
129            // the same npub) get the merge they meant.
130            let node_addr = *identity.node_addr();
131            if !new_by_addr.contains_key(&node_addr) {
132                new_order.push(node_addr);
133            }
134            new_by_addr.insert(node_addr, peer);
135        }
136
137        let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
138            .config
139            .peers()
140            .iter()
141            .filter_map(|pc| {
142                PeerIdentity::from_npub(&pc.npub)
143                    .ok()
144                    .map(|id| (*id.node_addr(), pc.clone()))
145            })
146            .collect();
147
148        let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
149        let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
150
151        let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
152        let added: Vec<_> = new_addrs.difference(&current_addrs).copied().collect();
153        let kept: Vec<_> = new_addrs.intersection(&current_addrs).copied().collect();
154
155        let mut outcome = crate::node::UpdatePeersOutcome::default();
156
157        for node_addr in &removed {
158            if self.retry_pending.remove(node_addr).is_some() {
159                debug!(
160                    peer = %self.peer_display_name(node_addr),
161                    "Dropping retry entry for peer removed from runtime peer list"
162                );
163            }
164            self.peer_aliases.remove(node_addr);
165            self.set_discovery_fallback_transit_allowed(*node_addr, false);
166            outcome.removed += 1;
167        }
168
169        let mut auto_connect_refresh_configs = Vec::new();
170        for node_addr in &kept {
171            let new_pc = &new_by_addr[node_addr];
172            let current_pc = &current_by_addr[node_addr];
173            if new_pc.addresses != current_pc.addresses
174                || new_pc.alias != current_pc.alias
175                || new_pc.connect_policy != current_pc.connect_policy
176                || new_pc.auto_reconnect != current_pc.auto_reconnect
177                || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
178            {
179                outcome.updated += 1;
180                self.set_discovery_fallback_transit_allowed(
181                    *node_addr,
182                    new_pc.discovery_fallback_transit,
183                );
184                if let Some(state) = self.retry_pending.get_mut(node_addr) {
185                    state.peer_config = new_pc.clone();
186                    state.retry_after_ms = Self::now_ms();
187                }
188                if let Some(alias) = new_pc.alias.clone() {
189                    self.peer_aliases.insert(*node_addr, alias);
190                }
191                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
192                    auto_connect_refresh_configs.push(new_pc.clone());
193                }
194            } else {
195                outcome.unchanged += 1;
196                self.set_discovery_fallback_transit_allowed(
197                    *node_addr,
198                    new_pc.discovery_fallback_transit,
199                );
200                if let Some(state) = self.retry_pending.get_mut(node_addr) {
201                    state.peer_config = new_pc.clone();
202                }
203                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
204                    auto_connect_refresh_configs.push(new_pc.clone());
205                }
206            }
207        }
208
209        let added_configs: Vec<crate::config::PeerConfig> = new_order
210            .iter()
211            .filter(|addr| added.contains(addr))
212            .map(|addr| new_by_addr[addr].clone())
213            .collect();
214
215        // Replace the live config peer list before initiating connections so
216        // any helper that consults `self.config.peers()` during the dial
217        // (alias lookup, retry-state seeding) sees the new entries.
218        self.config.peers = new_order
219            .iter()
220            .filter_map(|addr| new_by_addr.get(addr).cloned())
221            .collect();
222        self.configured_peer_send_weights = Self::configured_peer_send_weights(&self.config);
223
224        for peer_config in added_configs {
225            outcome.added += 1;
226            let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
227                continue;
228            };
229            let name = peer_config
230                .alias
231                .clone()
232                .unwrap_or_else(|| identity.short_npub());
233            self.peer_aliases.insert(*identity.node_addr(), name);
234            self.set_discovery_fallback_transit_allowed(
235                *identity.node_addr(),
236                peer_config.discovery_fallback_transit,
237            );
238            self.register_identity(*identity.node_addr(), identity.pubkey_full());
239
240            if !peer_config.is_auto_connect() {
241                continue;
242            }
243
244            match self
245                .try_auto_connect_graph_session(&peer_config, identity)
246                .await
247            {
248                Ok(true) => continue,
249                Ok(false) => {}
250                Err(err) => {
251                    debug!(
252                        npub = %peer_config.npub,
253                        error = %err,
254                        "Existing FIPS graph did not warm newly added peer"
255                    );
256                }
257            }
258
259            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
260                warn!(
261                    npub = %peer_config.npub,
262                    error = %e,
263                    "Failed to initiate connection for newly added peer"
264                );
265                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
266                    self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
267                }
268                if matches!(e, crate::node::NodeError::NoTransportForType(_))
269                    && let Some(bootstrap) = self.nostr_discovery.clone()
270                {
271                    bootstrap
272                        .request_advert_stale_check(peer_config.npub.clone())
273                        .await;
274                }
275            }
276        }
277
278        for peer_config in auto_connect_refresh_configs {
279            let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
280                continue;
281            };
282            let node_addr = *peer_identity.node_addr();
283
284            if self.peers.contains_key(&node_addr) {
285                match self
286                    .initiate_active_peer_alternative_connection(&peer_config)
287                    .await
288                {
289                    Ok(attempted) => {
290                        if attempted {
291                            debug!(
292                                peer = %self.peer_display_name(&node_addr),
293                                "Started non-disruptive alternate-path handshake for active peer"
294                            );
295                        }
296                    }
297                    Err(e) => {
298                        debug!(
299                            npub = %peer_config.npub,
300                            error = %e,
301                            "Active peer alternate-path refresh did not start"
302                        );
303                    }
304                }
305                continue;
306            }
307
308            match self
309                .try_auto_connect_graph_session(&peer_config, peer_identity)
310                .await
311            {
312                Ok(true) => continue,
313                Ok(false) => {}
314                Err(err) => {
315                    debug!(
316                        npub = %peer_config.npub,
317                        error = %err,
318                        "Existing FIPS graph did not warm refreshed peer"
319                    );
320                }
321            }
322
323            match self.initiate_peer_connection(&peer_config).await {
324                Ok(()) => {
325                    let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
326                    if let Some(state) = self.retry_pending.get_mut(&node_addr) {
327                        state.peer_config = peer_config;
328                        state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
329                    }
330                }
331                Err(e) => {
332                    debug!(
333                        npub = %peer_config.npub,
334                        error = %e,
335                        "Refreshed peer addresses did not initiate a direct connection"
336                    );
337                    self.schedule_retry_after_error(node_addr, Self::now_ms(), &e);
338                }
339            }
340        }
341
342        self.warm_auto_connect_graph_sessions().await;
343
344        Ok(outcome)
345    }
346
347    pub(super) async fn initiate_peer_connections(&mut self) {
348        // Build display name map from all configured peers (alias or short npub),
349        // and pre-seed the identity cache from each peer's npub so that TUN packets
350        // addressed to a configured peer can be dispatched (and trigger session
351        // initiation) immediately on startup — without waiting for the link-layer
352        // handshake to complete first.
353        let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
354            .config
355            .peers()
356            .iter()
357            .filter_map(|pc| {
358                PeerIdentity::from_npub(&pc.npub)
359                    .ok()
360                    .map(|id| (id, pc.alias.clone()))
361            })
362            .collect();
363
364        for (identity, alias) in peer_identities {
365            let name = alias.unwrap_or_else(|| identity.short_npub());
366            self.peer_aliases.insert(*identity.node_addr(), name);
367            // Pre-seed identity cache. The parity may be wrong (npub is x-only)
368            // but will be corrected to the real value when the peer is promoted
369            // after a successful Noise handshake.
370            self.register_identity(*identity.node_addr(), identity.pubkey_full());
371        }
372
373        // Collect peer configs to avoid borrow conflicts
374        let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
375
376        if peer_configs.is_empty() {
377            debug!("No static peers configured");
378            return;
379        }
380
381        debug!(
382            count = peer_configs.len(),
383            "Initiating static peer connections"
384        );
385
386        for peer_config in peer_configs {
387            let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
388                Ok(identity) => identity,
389                Err(_) => continue,
390            };
391            match self
392                .try_auto_connect_graph_session(&peer_config, peer_identity)
393                .await
394            {
395                Ok(true) => continue,
396                Ok(false) => {}
397                Err(err) => {
398                    debug!(
399                        npub = %peer_config.npub,
400                        error = %err,
401                        "Existing FIPS graph did not warm auto-connect peer"
402                    );
403                }
404            }
405            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
406                warn!(
407                    npub = %peer_config.npub,
408                    alias = ?peer_config.alias,
409                    error = %e,
410                    "Failed to initiate peer connection"
411                );
412                // Schedule a retry so transient address-resolution failures
413                // (e.g. cached endpoints stale, NAT rebinds, all addresses
414                // currently unreachable) recover without a daemon restart.
415                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
416                    self.schedule_retry_after_error(*peer_identity.node_addr(), Self::now_ms(), &e);
417                }
418                // No-transport failures most often mean the cached overlay
419                // advert is pointing at a dead post-NAT-rebind address. The
420                // advert cache is read-only inside fetch_advert, so retries
421                // would loop on the same dead address until expiry. Force a
422                // re-fetch so the next retry tick picks up fresh endpoints.
423                if matches!(e, crate::node::NodeError::NoTransportForType(_))
424                    && let Some(bootstrap) = self.nostr_discovery.clone()
425                {
426                    bootstrap
427                        .request_advert_stale_check(peer_config.npub.clone())
428                        .await;
429                }
430            }
431        }
432
433        self.warm_auto_connect_graph_sessions().await;
434    }
435
436    pub(in crate::node) async fn try_auto_connect_graph_session(
437        &mut self,
438        peer_config: &PeerConfig,
439        peer_identity: PeerIdentity,
440    ) -> Result<bool, NodeError> {
441        if !peer_config.is_auto_connect() {
442            return Ok(false);
443        }
444
445        let peer_node_addr = *peer_identity.node_addr();
446        if self
447            .peers
448            .get(&peer_node_addr)
449            .is_some_and(|peer| peer.can_send())
450        {
451            return Ok(false);
452        }
453        if self.auto_connect_should_race_direct_path(peer_config) {
454            return Ok(false);
455        }
456        if self
457            .sessions
458            .get(&peer_node_addr)
459            .is_some_and(|entry| entry.is_established() || entry.is_initiating())
460        {
461            return Ok(true);
462        }
463        if self.find_next_hop(&peer_node_addr).is_none() {
464            return Ok(false);
465        }
466
467        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
468        match self
469            .initiate_session(peer_node_addr, peer_identity.pubkey_full())
470            .await
471        {
472            Ok(()) => {
473                debug!(
474                    peer = %self.peer_display_name(&peer_node_addr),
475                    "Warmed auto-connect peer session over existing FIPS graph"
476                );
477                Ok(true)
478            }
479            Err(NodeError::SendFailed { node_addr, reason })
480                if node_addr == peer_node_addr && reason == "no route to destination" =>
481            {
482                self.maybe_initiate_lookup(&peer_node_addr).await;
483                Ok(false)
484            }
485            Err(err) => Err(err),
486        }
487    }
488
489    fn auto_connect_should_race_direct_path(&self, peer_config: &PeerConfig) -> bool {
490        !peer_config.addresses.is_empty() || self.config.node.discovery.nostr.enabled
491    }
492
493    /// Initiate a connection to a single peer.
494    ///
495    /// Creates a link, starts the Noise handshake, and sends the first message.
496    pub(super) async fn initiate_peer_connection(
497        &mut self,
498        peer_config: &crate::config::PeerConfig,
499    ) -> Result<(), NodeError> {
500        self.initiate_peer_connection_inner(peer_config).await
501    }
502
503    /// Initiate a connection from the retry path. Identical to
504    /// [`initiate_peer_connection`] today — both paths fan out across every
505    /// known address (explicit priority first, then freshness) in a single
506    /// pass. The two entry points stay separate so callers can be distinguished
507    /// in tracing.
508    pub(super) async fn initiate_peer_retry_connection(
509        &mut self,
510        peer_config: &crate::config::PeerConfig,
511    ) -> Result<(), NodeError> {
512        self.initiate_peer_connection_inner(peer_config).await
513    }
514
515    pub(in crate::node) async fn initiate_active_peer_alternative_connection(
516        &mut self,
517        peer_config: &crate::config::PeerConfig,
518    ) -> Result<bool, NodeError> {
519        self.initiate_active_peer_alternative_connection_inner(peer_config, false)
520            .await
521    }
522
523    pub(in crate::node) async fn initiate_active_peer_direct_refresh_connection(
524        &mut self,
525        peer_config: &crate::config::PeerConfig,
526    ) -> Result<bool, NodeError> {
527        self.initiate_active_peer_alternative_connection_inner(peer_config, true)
528            .await
529    }
530
531    async fn initiate_active_peer_alternative_connection_inner(
532        &mut self,
533        peer_config: &crate::config::PeerConfig,
534        allow_same_path_refresh: bool,
535    ) -> Result<bool, NodeError> {
536        let peer_identity =
537            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
538                npub: peer_config.npub.clone(),
539                reason: e.to_string(),
540            })?;
541        let peer_node_addr = *peer_identity.node_addr();
542
543        if !self.peers.contains_key(&peer_node_addr) {
544            self.initiate_peer_connection(peer_config).await?;
545            return Ok(true);
546        }
547
548        // Keep the current link live and race fresh concrete candidates.
549        // Cross-connection resolution still decides which replacement link
550        // wins if both peers try the same upgrade; the important part here is
551        // that a stale path does not depend on the remote peer receiving our
552        // hint first before either side attempts the better address.
553        self.try_active_peer_alternative_addresses(
554            peer_config,
555            peer_identity,
556            allow_same_path_refresh,
557        )
558        .await
559    }
560
561    async fn initiate_peer_connection_inner(
562        &mut self,
563        peer_config: &crate::config::PeerConfig,
564    ) -> Result<(), NodeError> {
565        // Parse the peer's npub to get their identity
566        let peer_identity =
567            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
568                npub: peer_config.npub.clone(),
569                reason: e.to_string(),
570            })?;
571
572        let peer_node_addr = *peer_identity.node_addr();
573
574        // Check if peer already exists (fully authenticated)
575        if self.peers.contains_key(&peer_node_addr) {
576            debug!(
577                npub = %peer_config.npub,
578                "Peer already exists, skipping"
579            );
580            return Ok(());
581        }
582
583        self.try_peer_addresses(peer_config, peer_identity, true)
584            .await
585    }
586
587    fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
588        self.connections.values().any(|conn| {
589            conn.expected_identity()
590                .map(|id| id.node_addr() == peer_node_addr)
591                .unwrap_or(false)
592        })
593    }
594
595    fn is_connecting_to_peer_on_path(
596        &self,
597        peer_node_addr: &NodeAddr,
598        transport_id: TransportId,
599        remote_addr: &TransportAddr,
600    ) -> bool {
601        self.connections.values().any(|conn| {
602            conn.expected_identity()
603                .map(|id| id.node_addr() == peer_node_addr)
604                .unwrap_or(false)
605                && conn.transport_id() == Some(transport_id)
606                && conn.source_addr() == Some(remote_addr)
607        }) || self.pending_connects.iter().any(|pending| {
608            pending.peer_identity.node_addr() == peer_node_addr
609                && pending.transport_id == transport_id
610                && &pending.remote_addr == remote_addr
611        })
612    }
613
614    pub(in crate::node) fn should_warm_auto_connect_session(
615        &self,
616        peer_node_addr: &NodeAddr,
617    ) -> bool {
618        if self
619            .peers
620            .get(peer_node_addr)
621            .is_some_and(|peer| peer.can_send())
622            || self
623                .sessions
624                .get(peer_node_addr)
625                .is_some_and(|entry| entry.is_established())
626        {
627            return false;
628        }
629
630        self.config.peers().iter().any(|peer| {
631            peer.is_auto_connect()
632                && PeerIdentity::from_npub(&peer.npub)
633                    .map(|identity| identity.node_addr() == peer_node_addr)
634                    .unwrap_or(false)
635        })
636    }
637
638    pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
639        if !self.peers.values().any(|peer| peer.can_send()) {
640            return 0;
641        }
642
643        let mut budget = self.graph_session_warmup_budget();
644        if budget == 0 {
645            return 0;
646        }
647
648        let peer_identities: Vec<_> = self
649            .config
650            .auto_connect_peers()
651            .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
652            .collect();
653
654        let mut warmed = 0;
655        for identity in peer_identities {
656            if budget == 0 {
657                break;
658            }
659
660            let peer_node_addr = *identity.node_addr();
661            if peer_node_addr == *self.identity.node_addr()
662                || !self.should_warm_auto_connect_session(&peer_node_addr)
663                || self
664                    .sessions
665                    .get(&peer_node_addr)
666                    .is_some_and(|entry| entry.is_initiating())
667            {
668                continue;
669            }
670
671            self.register_identity(peer_node_addr, identity.pubkey_full());
672
673            if self.find_next_hop(&peer_node_addr).is_some() {
674                match self
675                    .initiate_session(peer_node_addr, identity.pubkey_full())
676                    .await
677                {
678                    Ok(()) => {
679                        warmed += 1;
680                        budget = budget.saturating_sub(1);
681                        debug!(
682                            peer = %self.peer_display_name(&peer_node_addr),
683                            "Warmed auto-connect peer session over existing FIPS graph"
684                        );
685                    }
686                    Err(NodeError::SendFailed { node_addr, reason })
687                        if node_addr == peer_node_addr && reason == "no route to destination" =>
688                    {
689                        self.maybe_initiate_lookup(&peer_node_addr).await;
690                        warmed += 1;
691                        budget = budget.saturating_sub(1);
692                    }
693                    Err(err) => {
694                        debug!(
695                            peer = %self.peer_display_name(&peer_node_addr),
696                            error = %err,
697                            "Failed to warm auto-connect peer session"
698                        );
699                    }
700                }
701            } else {
702                self.maybe_initiate_lookup(&peer_node_addr).await;
703                warmed += 1;
704                budget = budget.saturating_sub(1);
705            }
706        }
707
708        warmed
709    }
710
711    pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
712        let max_destinations = self.config.node.session.pending_max_destinations;
713        if max_destinations == 0 {
714            return 0;
715        }
716
717        let pending_sessions = self
718            .sessions
719            .values()
720            .filter(|entry| !entry.is_established())
721            .count();
722        let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
723        max_destinations
724            .saturating_sub(pending_total)
725            .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
726    }
727
728    fn outbound_handshake_slots(&self) -> usize {
729        let used = self
730            .connections
731            .len()
732            .saturating_add(self.pending_connects.len());
733        if self.max_connections == 0 {
734            usize::MAX
735        } else {
736            self.max_connections.saturating_sub(used)
737        }
738    }
739
740    fn outbound_link_slots(&self) -> usize {
741        if self.max_links == 0 {
742            usize::MAX
743        } else {
744            self.max_links.saturating_sub(self.links.len())
745        }
746    }
747
748    fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
749        if !self.peers.contains_key(peer_node_addr)
750            && self.max_peers > 0
751            && self.peers.len() >= self.max_peers
752        {
753            return 0;
754        }
755
756        let in_flight_for_peer = self
757            .connections
758            .values()
759            .filter(|conn| {
760                conn.expected_identity()
761                    .map(|id| id.node_addr() == peer_node_addr)
762                    .unwrap_or(false)
763            })
764            .count()
765            .saturating_add(
766                self.pending_connects
767                    .iter()
768                    .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
769                    .count(),
770            );
771
772        self.outbound_handshake_slots()
773            .min(self.outbound_link_slots())
774            .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
775    }
776
777    fn discovery_connect_budget(&self) -> usize {
778        self.outbound_handshake_slots()
779            .min(self.outbound_link_slots())
780            .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
781    }
782
783    /// Find a UDP transport whose bound socket can send to `remote_addr`.
784    ///
785    /// LAN discovery can surface both IPv4 and IPv6 addresses for the same
786    /// service. A wildcard IPv4 socket cannot send to an IPv6 link-local
787    /// target, and vice versa, so callers must choose by socket family rather
788    /// than by transport type alone.
789    fn find_udp_transport_for_remote_addr(
790        &self,
791        remote_addr: SocketAddr,
792    ) -> Option<(TransportId, SocketAddr)> {
793        self.transports
794            .iter()
795            .filter(|(id, handle)| {
796                handle.transport_type().name == "udp"
797                    && handle.is_operational()
798                    && !self.bootstrap_transports.contains(id)
799            })
800            .filter_map(|(id, handle)| {
801                let local_addr = handle.local_addr()?;
802                socket_addr_families_compatible(local_addr, remote_addr)
803                    .then_some((*id, local_addr))
804            })
805            .min_by_key(|(id, _)| id.as_u32())
806    }
807
808    pub(super) fn transport_discovery_candidate(
809        &self,
810        discovered_transport_id: TransportId,
811        discovered_addr: TransportAddr,
812    ) -> Option<(TransportId, TransportAddr, &'static str)> {
813        let transport = self.transports.get(&discovered_transport_id)?;
814        let transport_name = transport.transport_type().name;
815
816        if transport_name != "udp" {
817            return Some((discovered_transport_id, discovered_addr, transport_name));
818        }
819
820        let Some(remote_socket_addr) = discovered_addr
821            .as_str()
822            .and_then(|addr| addr.parse::<SocketAddr>().ok())
823        else {
824            if self.bootstrap_transports.contains(&discovered_transport_id) {
825                debug!(
826                    transport_id = %discovered_transport_id,
827                    remote_addr = %discovered_addr,
828                    "transport discovery: skip non-numeric UDP address from bootstrap transport"
829                );
830                return None;
831            }
832            return Some((discovered_transport_id, discovered_addr, transport_name));
833        };
834
835        let Some((transport_id, local_addr)) =
836            self.find_udp_transport_for_remote_addr(remote_socket_addr)
837        else {
838            debug!(
839                transport_id = %discovered_transport_id,
840                remote_addr = %discovered_addr,
841                "transport discovery: skip UDP peer with no compatible local socket"
842            );
843            return None;
844        };
845
846        if transport_id != discovered_transport_id {
847            debug!(
848                discovered_transport_id = %discovered_transport_id,
849                selected_transport_id = %transport_id,
850                local_addr = %local_addr,
851                remote_addr = %remote_socket_addr,
852                "transport discovery: selected compatible UDP transport"
853            );
854        }
855
856        Some((
857            transport_id,
858            TransportAddr::from_socket_addr(remote_socket_addr),
859            transport_name,
860        ))
861    }
862
863    fn peer_address_string_for_transport_candidate(
864        &self,
865        transport_id: TransportId,
866        transport_name: &str,
867        remote_addr: &TransportAddr,
868    ) -> String {
869        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
870        let _ = (transport_id, transport_name);
871
872        #[cfg(any(target_os = "linux", target_os = "macos"))]
873        if transport_name == "ethernet"
874            && remote_addr.as_bytes().len() == 6
875            && let Some(interface) = self
876                .transports
877                .get(&transport_id)
878                .and_then(|transport| transport.interface_name())
879        {
880            let mut mac = [0u8; 6];
881            mac.copy_from_slice(remote_addr.as_bytes());
882            return format!(
883                "{interface}/{}",
884                crate::transport::ethernet::format_mac(&mac)
885            );
886        }
887
888        remote_addr.to_string()
889    }
890
891    fn resolve_peer_address_for_match(
892        &self,
893        candidate: &PeerAddress,
894    ) -> Option<(TransportId, TransportAddr)> {
895        if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
896            return None;
897        }
898
899        if candidate.transport == "ethernet" {
900            return self.resolve_ethernet_addr(&candidate.addr).ok();
901        }
902
903        if candidate.transport == "ble" {
904            #[cfg(bluer_available)]
905            {
906                return self.resolve_ble_addr(&candidate.addr).ok();
907            }
908            #[cfg(not(bluer_available))]
909            {
910                return None;
911            }
912        }
913
914        let transport_id = if candidate.transport == "udp"
915            && let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
916        {
917            self.find_udp_transport_for_remote_addr(remote_socket_addr)
918                .map(|(id, _)| id)?
919        } else {
920            self.find_transport_for_type(&candidate.transport)?
921        };
922
923        Some((transport_id, TransportAddr::from_string(&candidate.addr)))
924    }
925
926    /// Initiate a connection to a peer on a specific transport and address.
927    ///
928    /// For connectionless transports (UDP, Ethernet): allocates a link, starts
929    /// the Noise IK handshake, sends msg1, and registers the connection for
930    /// msg2 dispatch.
931    ///
932    /// For connection-oriented transports (TCP, Tor): allocates a link and
933    /// starts a non-blocking transport connect. The handshake is deferred
934    /// until the transport connection is established — the tick handler
935    /// polls `connection_state()` and initiates the handshake when ready.
936    pub(super) async fn initiate_connection(
937        &mut self,
938        transport_id: TransportId,
939        remote_addr: TransportAddr,
940        peer_identity: PeerIdentity,
941    ) -> Result<(), NodeError> {
942        let peer_node_addr = *peer_identity.node_addr();
943
944        if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
945            debug!(
946                peer = %self.peer_display_name(&peer_node_addr),
947                transport_id = %transport_id,
948                remote_addr = %remote_addr,
949                "Connection already in progress for candidate path"
950            );
951            return Ok(());
952        }
953
954        if self.outbound_handshake_slots() == 0 {
955            return Err(NodeError::MaxConnectionsExceeded {
956                max: self.max_connections,
957            });
958        }
959
960        if self.outbound_link_slots() == 0 {
961            return Err(NodeError::MaxLinksExceeded {
962                max: self.max_links,
963            });
964        }
965
966        if !self.peers.contains_key(&peer_node_addr)
967            && self.max_peers > 0
968            && self.peers.len() >= self.max_peers
969        {
970            return Err(NodeError::MaxPeersExceeded {
971                max: self.max_peers,
972            });
973        }
974
975        self.authorize_peer(
976            &peer_identity,
977            PeerAclContext::OutboundConnect,
978            transport_id,
979            &remote_addr,
980        )?;
981
982        let is_connection_oriented = self
983            .transports
984            .get(&transport_id)
985            .map(|t| t.transport_type().connection_oriented)
986            .unwrap_or(false);
987
988        // Allocate link ID and create link
989        let link_id = self.allocate_link_id();
990
991        let link = if is_connection_oriented {
992            Link::new(
993                link_id,
994                transport_id,
995                remote_addr.clone(),
996                LinkDirection::Outbound,
997                Duration::from_millis(self.config.node.base_rtt_ms),
998            )
999        } else {
1000            Link::connectionless(
1001                link_id,
1002                transport_id,
1003                remote_addr.clone(),
1004                LinkDirection::Outbound,
1005                Duration::from_millis(self.config.node.base_rtt_ms),
1006            )
1007        };
1008
1009        self.links.insert(link_id, link);
1010
1011        // Add reverse lookup for packet dispatch
1012        self.addr_to_link
1013            .insert((transport_id, remote_addr.clone()), link_id);
1014
1015        if is_connection_oriented {
1016            // Connection-oriented: start non-blocking connect, defer handshake
1017            if let Some(transport) = self.transports.get(&transport_id) {
1018                match transport.connect(&remote_addr).await {
1019                    Ok(()) => {
1020                        debug!(
1021                            peer = %self.peer_display_name(&peer_node_addr),
1022                            transport_id = %transport_id,
1023                            remote_addr = %remote_addr,
1024                            link_id = %link_id,
1025                            "Transport connect initiated (non-blocking)"
1026                        );
1027                        self.pending_connects.push(super::PendingConnect {
1028                            link_id,
1029                            transport_id,
1030                            remote_addr,
1031                            peer_identity,
1032                        });
1033                    }
1034                    Err(e) => {
1035                        // Clean up link
1036                        self.links.remove(&link_id);
1037                        self.addr_to_link.remove(&(transport_id, remote_addr));
1038                        return Err(NodeError::from_transport_error(e));
1039                    }
1040                }
1041            }
1042            Ok(())
1043        } else {
1044            // Connectionless: proceed with immediate handshake
1045            self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
1046                .await
1047        }
1048    }
1049
1050    /// Start the Noise handshake on a link and send msg1.
1051    ///
1052    /// Called immediately for connectionless transports, or after the
1053    /// transport connection is established for connection-oriented transports.
1054    pub(super) async fn start_handshake(
1055        &mut self,
1056        link_id: LinkId,
1057        transport_id: TransportId,
1058        remote_addr: TransportAddr,
1059        peer_identity: PeerIdentity,
1060    ) -> Result<(), NodeError> {
1061        let peer_node_addr = *peer_identity.node_addr();
1062
1063        // Create connection in handshake phase (outbound knows expected identity)
1064        let current_time_ms = Self::now_ms();
1065        let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
1066
1067        // Allocate a session index for this handshake
1068        let our_index = match self.index_allocator.allocate() {
1069            Ok(idx) => idx,
1070            Err(e) => {
1071                // Clean up the link we just created
1072                self.links.remove(&link_id);
1073                self.addr_to_link.remove(&(transport_id, remote_addr));
1074                return Err(NodeError::IndexAllocationFailed(e.to_string()));
1075            }
1076        };
1077
1078        // Start the Noise handshake and get message 1
1079        let our_keypair = self.identity.keypair();
1080        let noise_msg1 =
1081            match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
1082                Ok(msg) => msg,
1083                Err(e) => {
1084                    // Clean up the index and link
1085                    let _ = self.index_allocator.free(our_index);
1086                    self.links.remove(&link_id);
1087                    self.addr_to_link.remove(&(transport_id, remote_addr));
1088                    return Err(NodeError::HandshakeFailed(e.to_string()));
1089                }
1090            };
1091
1092        // Set index and transport info on the connection
1093        connection.set_our_index(our_index);
1094        connection.set_transport_id(transport_id);
1095        connection.set_source_addr(remote_addr.clone());
1096
1097        // Build wire format msg1: [0x01][sender_idx:4 LE][noise_msg1:82]
1098        let wire_msg1 = build_msg1(our_index, &noise_msg1);
1099
1100        debug!(
1101            peer = %self.peer_display_name(&peer_node_addr),
1102            transport_id = %transport_id,
1103            remote_addr = %remote_addr,
1104            link_id = %link_id,
1105            our_index = %our_index,
1106            "Connection initiated"
1107        );
1108
1109        // Store msg1 for resend and schedule first resend
1110        let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
1111        connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
1112
1113        // Track in pending_outbound for msg2 dispatch
1114        self.pending_outbound
1115            .insert((transport_id, our_index.as_u32()), link_id);
1116        self.connections.insert(link_id, connection);
1117
1118        // Send the wire format handshake message. If the very first send fails
1119        // synchronously (for example an IPv6 candidate on an IPv4-only UDP
1120        // socket), undo this candidate so the caller can try the next address
1121        // in the same dial pass.
1122        let send_result = match self.transports.get(&transport_id) {
1123            Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
1124            None => None,
1125        };
1126        match send_result {
1127            Some(send_result) => {
1128                self.note_local_send_outcome(&send_result);
1129                match send_result {
1130                    Ok(bytes) => {
1131                        debug!(
1132                            link_id = %link_id,
1133                            our_index = %our_index,
1134                            bytes,
1135                            "Sent Noise handshake message 1 (wire format)"
1136                        );
1137                    }
1138                    Err(e) => {
1139                        warn!(
1140                            link_id = %link_id,
1141                            transport_id = %transport_id,
1142                            remote_addr = %remote_addr,
1143                            our_index = %our_index,
1144                            error = %e,
1145                            "Failed to send handshake message"
1146                        );
1147                        self.pending_outbound
1148                            .remove(&(transport_id, our_index.as_u32()));
1149                        self.connections.remove(&link_id);
1150                        self.links.remove(&link_id);
1151                        self.addr_to_link
1152                            .remove(&(transport_id, remote_addr.clone()));
1153                        let _ = self.index_allocator.free(our_index);
1154                        return Err(NodeError::from_transport_error(e));
1155                    }
1156                }
1157            }
1158            None => {
1159                self.pending_outbound
1160                    .remove(&(transport_id, our_index.as_u32()));
1161                self.connections.remove(&link_id);
1162                self.links.remove(&link_id);
1163                self.addr_to_link
1164                    .remove(&(transport_id, remote_addr.clone()));
1165                let _ = self.index_allocator.free(our_index);
1166                return Err(NodeError::TransportError(format!(
1167                    "transport {transport_id} disappeared before first handshake send"
1168                )));
1169            }
1170        }
1171
1172        Ok(())
1173    }
1174
1175    /// Poll all transports for discovered peers and auto-connect.
1176    ///
1177    /// Called from the tick handler. Iterates operational transports,
1178    /// drains their discovery buffers, and initiates connections to
1179    /// newly discovered peers (if auto_connect is enabled).
1180    pub(super) async fn poll_transport_discovery(&mut self) {
1181        // Collect discoveries first to avoid borrow conflict with self
1182        let mut to_connect = Vec::new();
1183        let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
1184        let mut connect_budget = self.discovery_connect_budget();
1185        let mut skipped_budget = 0usize;
1186
1187        for transport in self.transports.values() {
1188            if !transport.is_operational() {
1189                continue;
1190            }
1191            if !transport.auto_connect() {
1192                // Still drain the buffer so it doesn't grow unbounded
1193                let _ = transport.discover();
1194                continue;
1195            }
1196            let discovered = match transport.discover() {
1197                Ok(peers) => peers,
1198                Err(_) => continue,
1199            };
1200            for peer in discovered {
1201                let discovered_transport_id = peer.transport_id;
1202                let pubkey = match peer.pubkey_hint {
1203                    Some(pk) => pk,
1204                    None => continue,
1205                };
1206                let identity = PeerIdentity::from_pubkey(pubkey);
1207                let node_addr = *identity.node_addr();
1208
1209                // Skip self
1210                if node_addr == *self.identity.node_addr() {
1211                    continue;
1212                }
1213
1214                let Some((candidate_transport_id, remote_addr, transport_name)) =
1215                    self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1216                else {
1217                    continue;
1218                };
1219
1220                if self.peers.contains_key(&node_addr) {
1221                    let candidate = PeerAddress::new(
1222                        transport_name,
1223                        self.peer_address_string_for_transport_candidate(
1224                            candidate_transport_id,
1225                            transport_name,
1226                            &remote_addr,
1227                        ),
1228                    );
1229                    if self.active_peer_candidate_is_fresh_enough_to_skip(
1230                        &node_addr,
1231                        std::slice::from_ref(&candidate),
1232                    ) {
1233                        continue;
1234                    }
1235                    if self.is_connecting_to_peer_on_path(
1236                        &node_addr,
1237                        candidate_transport_id,
1238                        &remote_addr,
1239                    ) {
1240                        continue;
1241                    }
1242                    let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1243                    if connect_budget == 0
1244                        || self
1245                            .path_candidate_attempt_budget(&node_addr)
1246                            .saturating_sub(queued_for_peer)
1247                            == 0
1248                    {
1249                        skipped_budget = skipped_budget.saturating_add(1);
1250                        continue;
1251                    }
1252                    to_connect.push((candidate_transport_id, remote_addr, identity, true));
1253                    *queued_per_peer.entry(node_addr).or_default() += 1;
1254                    connect_budget = connect_budget.saturating_sub(1);
1255                    continue;
1256                }
1257
1258                if self.is_connecting_to_peer_on_path(
1259                    &node_addr,
1260                    candidate_transport_id,
1261                    &remote_addr,
1262                ) {
1263                    continue;
1264                }
1265
1266                let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1267                if connect_budget == 0
1268                    || self
1269                        .path_candidate_attempt_budget(&node_addr)
1270                        .saturating_sub(queued_for_peer)
1271                        == 0
1272                {
1273                    skipped_budget = skipped_budget.saturating_add(1);
1274                    continue;
1275                }
1276                to_connect.push((candidate_transport_id, remote_addr, identity, false));
1277                *queued_per_peer.entry(node_addr).or_default() += 1;
1278                connect_budget = connect_budget.saturating_sub(1);
1279            }
1280        }
1281
1282        if skipped_budget > 0 {
1283            debug!(
1284                skipped = skipped_budget,
1285                queued = to_connect.len(),
1286                "Transport discovery connect budget exhausted"
1287            );
1288        }
1289
1290        for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1291            info!(
1292                peer = %self.peer_display_name(identity.node_addr()),
1293                transport_id = %transport_id,
1294                remote_addr = %remote_addr,
1295                active_refresh,
1296                "Auto-connecting to discovered peer"
1297            );
1298            if let Err(e) = self
1299                .initiate_connection(transport_id, remote_addr, identity)
1300                .await
1301            {
1302                warn!(error = %e, "Failed to auto-connect to discovered peer");
1303            }
1304        }
1305    }
1306
1307    pub(super) async fn poll_nostr_discovery(&mut self) {
1308        let Some(bootstrap) = self.nostr_discovery.clone() else {
1309            return;
1310        };
1311
1312        bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
1313        bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
1314
1315        self.drain_nostr_mesh_signals(&bootstrap).await;
1316
1317        for event in bootstrap.drain_events().await {
1318            match event {
1319                BootstrapEvent::Established { traversal } => {
1320                    let active_refresh = PeerIdentity::from_npub(&traversal.peer_npub)
1321                        .ok()
1322                        .is_some_and(|identity| self.peers.contains_key(identity.node_addr()));
1323                    let admission_allowed = if active_refresh {
1324                        self.outbound_direct_refresh_admission_check()
1325                    } else {
1326                        self.outbound_admission_check()
1327                    };
1328                    if !admission_allowed {
1329                        debug!(
1330                            peer_npub = %traversal.peer_npub,
1331                            peers = self.peers.len(),
1332                            max_peers = self.max_peers,
1333                            active_refresh,
1334                            "Dropping established NAT traversal: at capacity"
1335                        );
1336                        continue;
1337                    }
1338                    let peer_npub = traversal.peer_npub.clone();
1339                    match self.adopt_established_traversal(traversal).await {
1340                        Ok(_) => {
1341                            info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1342                        }
1343                        Err(err) => {
1344                            warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1345                            if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1346                                self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1347                            }
1348                        }
1349                    }
1350                }
1351                BootstrapEvent::Failed {
1352                    peer_config,
1353                    reason,
1354                } => {
1355                    let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1356                        Ok(identity) => identity,
1357                        Err(_) => continue,
1358                    };
1359                    let node_addr = *peer_identity.node_addr();
1360                    let now_ms = Self::now_ms();
1361                    if self.peers.contains_key(&node_addr) {
1362                        if self.active_peer_should_keep_direct_retry(&node_addr, &peer_config) {
1363                            let decision =
1364                                bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1365                            if decision.should_warn {
1366                                warn!(
1367                                    npub = %peer_config.npub,
1368                                    error = %reason,
1369                                    consecutive_failures = decision.consecutive_failures,
1370                                    cooldown_secs = decision
1371                                        .cooldown_until_ms
1372                                        .map(|t| t.saturating_sub(now_ms) / 1000),
1373                                    "Direct-path NAT traversal upgrade failed"
1374                                );
1375                            } else {
1376                                debug!(
1377                                    npub = %peer_config.npub,
1378                                    error = %reason,
1379                                    consecutive_failures = decision.consecutive_failures,
1380                                    "Direct-path NAT traversal upgrade failed (suppressed by warn-rate-limit)"
1381                                );
1382                            }
1383                            if decision.crossed_threshold {
1384                                bootstrap
1385                                    .request_advert_stale_check(peer_config.npub.clone())
1386                                    .await;
1387                            }
1388                            self.schedule_link_dead_reprobe(node_addr, now_ms);
1389                        } else {
1390                            debug!(
1391                                npub = %peer_config.npub,
1392                                error = %reason,
1393                                "Ignoring failed NAT traversal for already-connected peer on fresh direct path"
1394                            );
1395                        }
1396                        continue;
1397                    }
1398                    if self.is_connecting_to_peer(&node_addr) {
1399                        debug!(
1400                            npub = %peer_config.npub,
1401                            error = %reason,
1402                            "Ignoring failed NAT traversal while peer handshake is already in progress"
1403                        );
1404                        continue;
1405                    }
1406
1407                    let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1408                    if decision.should_warn {
1409                        warn!(
1410                            npub = %peer_config.npub,
1411                            error = %reason,
1412                            consecutive_failures = decision.consecutive_failures,
1413                            cooldown_secs = decision
1414                                .cooldown_until_ms
1415                                .map(|t| t.saturating_sub(now_ms) / 1000),
1416                            "NAT traversal failed"
1417                        );
1418                    } else {
1419                        debug!(
1420                            npub = %peer_config.npub,
1421                            error = %reason,
1422                            consecutive_failures = decision.consecutive_failures,
1423                            "NAT traversal failed (suppressed by warn-rate-limit)"
1424                        );
1425                    }
1426
1427                    // B6: stale-advert eviction on the streak-threshold
1428                    // crossing. Fire-and-forget; the outcome is logged so
1429                    // operators can see when peers get cleaned up.
1430                    if decision.crossed_threshold {
1431                        bootstrap
1432                            .request_advert_stale_check(peer_config.npub.clone())
1433                            .await;
1434                    }
1435
1436                    if self
1437                        .try_peer_addresses(&peer_config, peer_identity, false)
1438                        .await
1439                        .is_ok()
1440                    {
1441                        continue;
1442                    }
1443
1444                    self.schedule_retry(node_addr, now_ms);
1445                    if self.nostr_cooldown_applies_to_peer_config(&peer_config)
1446                        && let Some(cooldown_until_ms) = decision.cooldown_until_ms
1447                        && let Some(state) = self.retry_pending.get_mut(&node_addr)
1448                    {
1449                        // Push the next retry past the cooldown so the
1450                        // open-discovery sweep doesn't re-enqueue and the
1451                        // per-attempt backoff doesn't fire sooner.
1452                        state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1453                    }
1454                }
1455            }
1456        }
1457
1458        self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1459            .await;
1460        self.queue_open_discovery_retries(&bootstrap).await;
1461        self.queue_active_fallback_direct_retries(&bootstrap);
1462
1463        // Advert refresh can touch STUN/public-endpoint discovery on some
1464        // configs. Drain traversal events and queue direct retries first so a
1465        // slow refresh cannot delay path recovery work already waiting on us.
1466        if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1467            debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1468        }
1469    }
1470
1471    async fn drain_nostr_mesh_signals(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
1472        let mut deferred = Vec::new();
1473
1474        for signal in bootstrap.drain_mesh_signals().await {
1475            let (peer_npub, msg_type, payload) = match &signal {
1476                MeshTraversalSignal::Offer { peer_npub, offer } => {
1477                    let payload = match serde_json::to_vec(&offer) {
1478                        Ok(payload) => payload,
1479                        Err(error) => {
1480                            debug!(
1481                                peer = %peer_npub,
1482                                error = %error,
1483                                "Failed to encode mesh traversal offer"
1484                            );
1485                            continue;
1486                        }
1487                    };
1488                    (
1489                        peer_npub.clone(),
1490                        SessionMessageType::TraversalOffer.to_byte(),
1491                        payload,
1492                    )
1493                }
1494                MeshTraversalSignal::Answer { peer_npub, answer } => {
1495                    let payload = match serde_json::to_vec(&answer) {
1496                        Ok(payload) => payload,
1497                        Err(error) => {
1498                            debug!(
1499                                peer = %peer_npub,
1500                                error = %error,
1501                                "Failed to encode mesh traversal answer"
1502                            );
1503                            continue;
1504                        }
1505                    };
1506                    (
1507                        peer_npub.clone(),
1508                        SessionMessageType::TraversalAnswer.to_byte(),
1509                        payload,
1510                    )
1511                }
1512            };
1513
1514            let peer_identity = match PeerIdentity::from_npub(&peer_npub) {
1515                Ok(identity) => identity,
1516                Err(error) => {
1517                    debug!(
1518                        peer = %peer_npub,
1519                        error = %error,
1520                        "Cannot send mesh traversal signal to invalid peer npub"
1521                    );
1522                    continue;
1523                }
1524            };
1525            let peer_addr = *peer_identity.node_addr();
1526            match self
1527                .mesh_signal_session_action(peer_addr, peer_identity.pubkey_full())
1528                .await
1529            {
1530                MeshSignalSessionAction::Send => {}
1531                MeshSignalSessionAction::Defer => {
1532                    deferred.push(signal);
1533                    continue;
1534                }
1535                MeshSignalSessionAction::Drop => continue,
1536            }
1537
1538            if let Err(error) = self.send_session_msg(&peer_addr, msg_type, &payload).await {
1539                debug!(
1540                    peer = %self.peer_display_name(&peer_addr),
1541                    error = %error,
1542                    "Failed to send mesh traversal signal"
1543                );
1544            }
1545        }
1546
1547        for signal in deferred {
1548            bootstrap.requeue_mesh_signal(signal);
1549        }
1550    }
1551
1552    async fn mesh_signal_session_action(
1553        &mut self,
1554        peer_addr: NodeAddr,
1555        peer_pubkey: PublicKey,
1556    ) -> MeshSignalSessionAction {
1557        if let Some(entry) = self.sessions.get(&peer_addr) {
1558            if entry.is_established() {
1559                return MeshSignalSessionAction::Send;
1560            }
1561            if entry.is_initiating() || entry.is_awaiting_msg3() {
1562                debug!(
1563                    peer = %self.peer_display_name(&peer_addr),
1564                    "Deferring mesh traversal signal until end-to-end session is established"
1565                );
1566                return MeshSignalSessionAction::Defer;
1567            }
1568        }
1569
1570        if self.find_next_hop(&peer_addr).is_none() {
1571            debug!(
1572                peer = %self.peer_display_name(&peer_addr),
1573                "Cannot warm mesh traversal signal session without a FIPS route"
1574            );
1575            self.maybe_initiate_lookup(&peer_addr).await;
1576            return MeshSignalSessionAction::Drop;
1577        }
1578
1579        self.register_identity(peer_addr, peer_pubkey);
1580        match self.initiate_session(peer_addr, peer_pubkey).await {
1581            Ok(()) => {
1582                debug!(
1583                    peer = %self.peer_display_name(&peer_addr),
1584                    "Warming end-to-end session for mesh traversal signal"
1585                );
1586                MeshSignalSessionAction::Defer
1587            }
1588            Err(NodeError::SendFailed { node_addr, reason })
1589                if node_addr == peer_addr && reason == "no route to destination" =>
1590            {
1591                debug!(
1592                    peer = %self.peer_display_name(&peer_addr),
1593                    "Cannot warm mesh traversal signal session without a FIPS route"
1594                );
1595                self.maybe_initiate_lookup(&peer_addr).await;
1596                MeshSignalSessionAction::Drop
1597            }
1598            Err(error) => {
1599                debug!(
1600                    peer = %self.peer_display_name(&peer_addr),
1601                    error = %error,
1602                    "Failed to warm end-to-end session for mesh traversal signal"
1603                );
1604                MeshSignalSessionAction::Drop
1605            }
1606        }
1607    }
1608
1609    /// Resolve the LAN-only discovery scope. Applications with explicit
1610    /// connectivity config can set `node.discovery.lan.scope` without
1611    /// changing the public Nostr discovery `app` tag. The older fallback
1612    /// extracts a scope from the Nostr app tag used by default scoped
1613    /// discovery.
1614    pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1615        if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1616            let scope = scope.trim();
1617            if !scope.is_empty() {
1618                return Some(scope.to_string());
1619            }
1620        }
1621
1622        let app = self.config.node.discovery.nostr.app.trim();
1623        if app.is_empty() {
1624            return None;
1625        }
1626        if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1627            let scope = rest.trim();
1628            if scope.is_empty() {
1629                None
1630            } else {
1631                Some(scope.to_string())
1632            }
1633        } else {
1634            Some(app.to_string())
1635        }
1636    }
1637
1638    pub(super) fn start_local_instance_discovery(&mut self) {
1639        if !self.config.node.discovery.local.enabled {
1640            return;
1641        }
1642        let Some(scope) = self.lan_discovery_scope() else {
1643            debug!("local instance discovery not started: no discovery scope");
1644            return;
1645        };
1646        let now_ms = Self::now_ms();
1647        match crate::discovery::local::LocalInstanceRegistry::new(
1648            self.identity.npub(),
1649            scope,
1650            &self.config.node.discovery.local,
1651            now_ms,
1652        ) {
1653            Ok(registry) => {
1654                self.local_instance_registry = Some(registry);
1655                self.local_instance_started_at_ms = Some(now_ms);
1656                self.last_local_instance_publish_ms = None;
1657                self.last_local_instance_scan_ms = None;
1658                self.publish_local_instance_record(now_ms);
1659                info!("Same-host FIPS instance discovery enabled");
1660            }
1661            Err(crate::discovery::local::LocalInstanceRegistryError::Disabled) => {
1662                debug!("same-host FIPS instance discovery disabled");
1663            }
1664            Err(err) => {
1665                debug!(error = %err, "same-host FIPS instance discovery not started");
1666            }
1667        }
1668    }
1669
1670    fn local_instance_contacts(&self) -> Vec<crate::discovery::local::LocalInstanceContact> {
1671        let mut contacts = Vec::new();
1672        for handle in self.transports.values() {
1673            if !handle.is_operational() || !handle.accept_connections() {
1674                continue;
1675            }
1676            let transport = handle.transport_type().name;
1677            if transport != "udp" && transport != "tcp" {
1678                continue;
1679            }
1680            let Some(local_addr) = handle.local_addr() else {
1681                continue;
1682            };
1683            let Some(contact) =
1684                crate::discovery::local::contact_for_transport_addr(transport, local_addr)
1685            else {
1686                continue;
1687            };
1688            if contacts
1689                .iter()
1690                .any(|existing: &crate::discovery::local::LocalInstanceContact| {
1691                    existing.transport == contact.transport && existing.addr == contact.addr
1692                })
1693            {
1694                continue;
1695            }
1696            contacts.push(contact);
1697        }
1698        contacts
1699    }
1700
1701    fn publish_local_instance_record(&mut self, now_ms: u64) {
1702        let Some(registry) = self.local_instance_registry.clone() else {
1703            return;
1704        };
1705        let contacts = self.local_instance_contacts();
1706        match registry.publish(contacts, now_ms) {
1707            Ok(()) => {
1708                self.last_local_instance_publish_ms = Some(now_ms);
1709            }
1710            Err(err) => {
1711                debug!(error = %err, "failed to publish same-host FIPS instance record");
1712            }
1713        }
1714    }
1715
1716    fn maybe_publish_local_instance_record(&mut self, now_ms: u64) {
1717        if self.local_instance_registry.is_none() {
1718            return;
1719        }
1720        let interval_ms = self.config.node.discovery.local.publish_interval_ms();
1721        let due = self
1722            .last_local_instance_publish_ms
1723            .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1724            .unwrap_or(true);
1725        if due {
1726            self.publish_local_instance_record(now_ms);
1727        }
1728    }
1729
1730    fn local_instance_scan_due(&self, now_ms: u64) -> bool {
1731        if self.local_instance_registry.is_none() {
1732            return false;
1733        }
1734        let cfg = &self.config.node.discovery.local;
1735        let interval_ms = if self
1736            .local_instance_started_at_ms
1737            .map(|started| now_ms.saturating_sub(started) <= cfg.startup_scan_duration_ms())
1738            .unwrap_or(false)
1739        {
1740            cfg.startup_scan_interval_ms()
1741        } else {
1742            cfg.scan_interval_ms()
1743        };
1744        self.last_local_instance_scan_ms
1745            .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1746            .unwrap_or(true)
1747    }
1748
1749    fn local_instance_peer_allowed(&self, identity: &PeerIdentity) -> bool {
1750        if self.config.peers().iter().any(|peer| {
1751            PeerIdentity::from_npub(&peer.npub)
1752                .map(|configured| configured.node_addr() == identity.node_addr())
1753                .unwrap_or(false)
1754        }) {
1755            return true;
1756        }
1757        self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Open
1758    }
1759
1760    fn local_instance_peer_addresses(
1761        &self,
1762        record: &crate::discovery::local::LocalInstanceRecord,
1763    ) -> Vec<PeerAddress> {
1764        let mut addresses = Vec::new();
1765        for contact in &record.contacts {
1766            if contact.transport != "udp" && contact.transport != "tcp" {
1767                continue;
1768            }
1769            let Ok(socket_addr) = contact.addr.parse::<SocketAddr>() else {
1770                debug!(
1771                    npub = %record.npub,
1772                    transport = %contact.transport,
1773                    addr = %contact.addr,
1774                    "local instance discovery: skip non-socket contact"
1775                );
1776                continue;
1777            };
1778            if !socket_addr.ip().is_loopback() {
1779                debug!(
1780                    npub = %record.npub,
1781                    addr = %contact.addr,
1782                    "local instance discovery: skip non-loopback contact"
1783                );
1784                continue;
1785            }
1786            let address =
1787                PeerAddress::with_priority(contact.transport.clone(), contact.addr.clone(), 10)
1788                    .with_seen_at_ms(record.updated_at_ms);
1789            if addresses.iter().any(|existing: &PeerAddress| {
1790                existing.transport == address.transport && existing.addr == address.addr
1791            }) {
1792                continue;
1793            }
1794            addresses.push(address);
1795        }
1796        addresses
1797    }
1798
1799    /// Scan the same-host registry only on startup and then at a low cadence.
1800    /// This avoids a per-second filesystem poll while preserving the fast path
1801    /// for processes launched around the same time.
1802    pub(super) async fn poll_local_instance_discovery(&mut self) {
1803        let Some(registry) = self.local_instance_registry.clone() else {
1804            return;
1805        };
1806        let now_ms = Self::now_ms();
1807        self.maybe_publish_local_instance_record(now_ms);
1808        if !self.local_instance_scan_due(now_ms) {
1809            return;
1810        }
1811        self.last_local_instance_scan_ms = Some(now_ms);
1812
1813        let records = match registry.scan(now_ms, self.config.node.discovery.local.stale_after_ms())
1814        {
1815            Ok(records) => records,
1816            Err(err) => {
1817                debug!(error = %err, "same-host FIPS instance scan failed");
1818                return;
1819            }
1820        };
1821        if records.is_empty() {
1822            return;
1823        }
1824
1825        let mut connect_budget = self.discovery_connect_budget();
1826        let mut skipped_budget = 0usize;
1827        for record in records {
1828            let identity = match PeerIdentity::from_npub(&record.npub) {
1829                Ok(identity) => identity,
1830                Err(err) => {
1831                    debug!(npub = %record.npub, error = %err, "local instance discovery: skip bad npub");
1832                    continue;
1833                }
1834            };
1835            let peer_node_addr = *identity.node_addr();
1836            if peer_node_addr == *self.identity.node_addr() {
1837                continue;
1838            }
1839            if !self.local_instance_peer_allowed(&identity) {
1840                debug!(
1841                    npub = %identity.short_npub(),
1842                    "local instance discovery: skip unconfigured peer"
1843                );
1844                continue;
1845            }
1846
1847            let addresses = self.local_instance_peer_addresses(&record);
1848            if addresses.is_empty() {
1849                continue;
1850            }
1851
1852            if self.peers.contains_key(&peer_node_addr)
1853                && self.active_peer_candidate_is_fresh_enough_to_skip(&peer_node_addr, &addresses)
1854            {
1855                continue;
1856            }
1857
1858            for address in addresses {
1859                let Some((transport_id, remote_addr)) =
1860                    self.resolve_peer_address_for_match(&address)
1861                else {
1862                    continue;
1863                };
1864                if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1865                    continue;
1866                }
1867                if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1868                    skipped_budget = skipped_budget.saturating_add(1);
1869                    continue;
1870                }
1871                info!(
1872                    npub = %identity.short_npub(),
1873                    transport = %address.transport,
1874                    addr = %address.addr,
1875                    "same-host FIPS instance discovery: initiating handshake"
1876                );
1877                if let Err(err) = self
1878                    .initiate_connection(transport_id, remote_addr, identity)
1879                    .await
1880                {
1881                    debug!(
1882                        npub = %record.npub,
1883                        error = %err,
1884                        "same-host FIPS instance discovery: failed to initiate connection"
1885                    );
1886                }
1887                connect_budget = connect_budget.saturating_sub(1);
1888            }
1889        }
1890        if skipped_budget > 0 {
1891            debug!(
1892                skipped = skipped_budget,
1893                "same-host FIPS instance discovery connect budget exhausted"
1894            );
1895        }
1896    }
1897
1898    /// Drain mDNS-discovered peers and initiate Noise XX handshakes. For
1899    /// active peers this is a non-disruptive alternate-path refresh: the
1900    /// current link stays live until a new handshake authenticates and
1901    /// promotes. The handshake itself is the authentication — a spoofed
1902    /// mDNS advert with someone else's npub fails the XX exchange and
1903    /// is dropped.
1904    pub(super) async fn poll_lan_discovery(&mut self) {
1905        let Some(runtime) = self.lan_discovery.clone() else {
1906            return;
1907        };
1908        let events = runtime.drain_events().await;
1909        if events.is_empty() {
1910            return;
1911        }
1912        let mut connect_budget = self.discovery_connect_budget();
1913        let mut skipped_budget = 0usize;
1914        for event in events {
1915            let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1916            let Some((transport_id, local_addr)) =
1917                self.find_udp_transport_for_remote_addr(peer.addr)
1918            else {
1919                debug!(
1920                    addr = %peer.addr,
1921                    "lan: skip discovered peer with no compatible UDP transport"
1922                );
1923                continue;
1924            };
1925            let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1926                Ok(id) => id,
1927                Err(err) => {
1928                    debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1929                    continue;
1930                }
1931            };
1932            let peer_node_addr = *identity.node_addr();
1933            let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1934            if self.peers.contains_key(&peer_node_addr) {
1935                let candidate = PeerAddress::new("udp", peer.addr.to_string());
1936                if self.active_peer_candidate_is_fresh_enough_to_skip(
1937                    &peer_node_addr,
1938                    std::slice::from_ref(&candidate),
1939                ) {
1940                    continue;
1941                }
1942                if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1943                    continue;
1944                }
1945                if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1946                    skipped_budget = skipped_budget.saturating_add(1);
1947                    continue;
1948                }
1949                info!(
1950                    npub = %identity.short_npub(),
1951                    addr = %peer.addr,
1952                    local_addr = %local_addr,
1953                    "lan: initiating alternate-path handshake to active peer"
1954                );
1955                if let Err(err) = self
1956                    .initiate_connection(transport_id, remote_addr, identity)
1957                    .await
1958                {
1959                    debug!(
1960                        npub = %peer.npub,
1961                        error = %err,
1962                        "lan: failed to initiate active peer alternate-path handshake"
1963                    );
1964                }
1965                connect_budget = connect_budget.saturating_sub(1);
1966                continue;
1967            }
1968            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1969                continue;
1970            }
1971            if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1972                skipped_budget = skipped_budget.saturating_add(1);
1973                continue;
1974            }
1975            info!(
1976                npub = %identity.short_npub(),
1977                addr = %peer.addr,
1978                local_addr = %local_addr,
1979                "lan: initiating handshake to discovered peer"
1980            );
1981            if let Err(err) = self
1982                .initiate_connection(transport_id, remote_addr, identity)
1983                .await
1984            {
1985                debug!(
1986                    npub = %peer.npub,
1987                    error = %err,
1988                    "lan: failed to initiate connection to discovered peer"
1989                );
1990            }
1991            connect_budget = connect_budget.saturating_sub(1);
1992        }
1993        if skipped_budget > 0 {
1994            debug!(
1995                skipped = skipped_budget,
1996                "lan: discovery connect budget exhausted"
1997            );
1998        }
1999    }
2000
2001    /// Poll pending transport connects and initiate handshakes for ready ones.
2002    ///
2003    /// Called from the tick handler. For each pending connect, queries the
2004    /// transport's connection state. When a connection is established,
2005    /// marks the link as Connected and starts the Noise handshake.
2006    /// Failed connections are cleaned up and scheduled for retry.
2007    pub(super) async fn poll_pending_connects(&mut self) {
2008        if self.pending_connects.is_empty() {
2009            return;
2010        }
2011
2012        let mut completed = Vec::new();
2013
2014        for (i, pending) in self.pending_connects.iter().enumerate() {
2015            let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
2016                transport.connection_state(&pending.remote_addr)
2017            } else {
2018                crate::transport::ConnectionState::Failed("transport removed".into())
2019            };
2020
2021            match state {
2022                crate::transport::ConnectionState::Connected => {
2023                    completed.push((i, true, None));
2024                }
2025                crate::transport::ConnectionState::Failed(reason) => {
2026                    completed.push((i, false, Some(reason)));
2027                }
2028                crate::transport::ConnectionState::Connecting => {
2029                    // Still in progress, check on next tick
2030                }
2031                crate::transport::ConnectionState::None => {
2032                    // Shouldn't happen — treat as failure
2033                    completed.push((i, false, Some("no connection attempt found".into())));
2034                }
2035            }
2036        }
2037
2038        // Process completions in reverse order to preserve indices
2039        for (i, success, reason) in completed.into_iter().rev() {
2040            let pending = self.pending_connects.remove(i);
2041
2042            if success {
2043                // Mark link as Connected
2044                if let Some(link) = self.links.get_mut(&pending.link_id) {
2045                    link.set_connected();
2046                }
2047
2048                debug!(
2049                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2050                    transport_id = %pending.transport_id,
2051                    remote_addr = %pending.remote_addr,
2052                    link_id = %pending.link_id,
2053                    "Transport connected, starting handshake"
2054                );
2055
2056                // Start the handshake now that the transport is connected
2057                if let Err(e) = self
2058                    .start_handshake(
2059                        pending.link_id,
2060                        pending.transport_id,
2061                        pending.remote_addr.clone(),
2062                        pending.peer_identity,
2063                    )
2064                    .await
2065                {
2066                    warn!(
2067                        link_id = %pending.link_id,
2068                        error = %e,
2069                        "Failed to start handshake after transport connect"
2070                    );
2071                    // Clean up link on handshake failure
2072                    self.remove_link(&pending.link_id);
2073                }
2074            } else {
2075                let reason = reason.unwrap_or_default();
2076                warn!(
2077                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
2078                    transport_id = %pending.transport_id,
2079                    remote_addr = %pending.remote_addr,
2080                    link_id = %pending.link_id,
2081                    reason = %reason,
2082                    "Transport connect failed"
2083                );
2084
2085                // Clean up link and schedule retry
2086                self.remove_link(&pending.link_id);
2087                self.links.remove(&pending.link_id);
2088                self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
2089            }
2090        }
2091    }
2092
2093    // === State Transitions ===
2094
2095    /// Start the node.
2096    ///
2097    /// Initializes the TUN interface (if configured), spawns I/O threads,
2098    /// and transitions to the Running state.
2099    pub async fn start(&mut self) -> Result<(), NodeError> {
2100        node_start_debug_log("Node::start begin");
2101        if !self.state.can_start() {
2102            return Err(NodeError::AlreadyStarted);
2103        }
2104        self.state = NodeState::Starting;
2105        node_start_debug_log("Node::start state set to starting");
2106
2107        // Create packet channel for transport -> Node communication
2108        let packet_buffer_size = self.config.node.buffers.packet_channel;
2109        let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
2110        self.packet_tx = Some(packet_tx.clone());
2111        self.packet_rx = Some(packet_rx);
2112        node_start_debug_log("Node::start packet channel created");
2113
2114        // Initialize transports first (before TUN, before Nostr discovery).
2115        node_start_debug_log("Node::start create transports begin");
2116        let transport_handles = self.create_transports(&packet_tx).await;
2117        node_start_debug_log(format!(
2118            "Node::start create transports complete count={}",
2119            transport_handles.len()
2120        ));
2121
2122        for mut handle in transport_handles {
2123            let transport_id = handle.transport_id();
2124            let transport_type = handle.transport_type().name;
2125            let name = handle.name().map(|s| s.to_string());
2126
2127            node_start_debug_log(format!(
2128                "Node::start transport start begin id={} type={} name={:?}",
2129                transport_id, transport_type, name
2130            ));
2131            match handle.start().await {
2132                Ok(()) => {
2133                    node_start_debug_log(format!(
2134                        "Node::start transport start ok id={} type={}",
2135                        transport_id, transport_type
2136                    ));
2137                    self.transports.insert(transport_id, handle);
2138                }
2139                Err(e) => {
2140                    node_start_debug_log(format!(
2141                        "Node::start transport start error id={} type={} error={}",
2142                        transport_id, transport_type, e
2143                    ));
2144                    if let Some(ref n) = name {
2145                        warn!(transport_type, name = %n, error = %e, "Transport failed to start");
2146                    } else {
2147                        warn!(transport_type, error = %e, "Transport failed to start");
2148                    }
2149                }
2150            }
2151        }
2152
2153        if !self.transports.is_empty() {
2154            info!(count = self.transports.len(), "Transports initialized");
2155        }
2156
2157        // Spawn the off-task FMP-encrypt + UDP-send + FMP-decrypt
2158        // worker pools. **Unix only** — both pools issue direct
2159        // sendmmsg(2) / sendmsg(2)+UDP_GSO / recvmmsg(2) calls on
2160        // raw file descriptors via `AsRawFd`, which is a unix-only
2161        // trait. On Windows the rx_loop's tokio-based send/recv
2162        // remain the canonical path; the perf overhaul lands its
2163        // gains on unix.
2164        //
2165        // Worker count defaults to the number of CPUs, overridable
2166        // via `FIPS_ENCRYPT_WORKERS=N` / `FIPS_DECRYPT_WORKERS=N`
2167        // for debug / benchmarking. Hash-by-destination means a
2168        // single TCP flow pins to one worker (preserves wire
2169        // ordering); additional workers light up under multi-flow
2170        // / multi-peer load. See `node::encrypt_worker` /
2171        // `node::decrypt_worker` for full rationale.
2172        #[cfg(unix)]
2173        {
2174            if self.config.node.worker_pools_enabled {
2175                node_start_debug_log("Node::start worker pools begin");
2176                let cpu_default = std::thread::available_parallelism()
2177                    .map(|n| n.get())
2178                    .unwrap_or(1)
2179                    .max(1);
2180                let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
2181                    .ok()
2182                    .and_then(|s| s.parse().ok())
2183                    .unwrap_or(cpu_default)
2184                    .max(1);
2185                self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
2186                    encrypt_worker_count,
2187                ));
2188                info!(
2189                    workers = encrypt_worker_count,
2190                    "Spawned FMP-encrypt worker pool"
2191                );
2192
2193                // `FIPS_DECRYPT_WORKERS=0` disables the pool entirely and
2194                // falls through to the in-line rx_loop decrypt path (the
2195                // "test-mode" branch in `handle_encrypted_frame`, which is
2196                // in fact a fully functional synchronous decrypt). Useful
2197                // as an A/B against the worker pipeline when chasing
2198                // scheduling/queueing regressions on the native macOS
2199                // path. Any non-zero value (env or default) spawns the
2200                // pool as before.
2201                let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
2202                    .ok()
2203                    .and_then(|s| s.parse().ok())
2204                    .unwrap_or(cpu_default);
2205                if decrypt_worker_count == 0 {
2206                    info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
2207                } else {
2208                    self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
2209                        decrypt_worker_count,
2210                    ));
2211                    info!(
2212                        workers = decrypt_worker_count,
2213                        "Spawned FMP+FSP-decrypt worker pool"
2214                    );
2215                }
2216                node_start_debug_log("Node::start worker pools complete");
2217            } else {
2218                node_start_debug_log("Node::start worker pools disabled");
2219                info!("FIPS worker pools disabled; using in-line crypto/send path");
2220            }
2221        }
2222
2223        if self.config.node.discovery.nostr.enabled {
2224            node_start_debug_log("Node::start nostr discovery start begin");
2225            match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
2226                .await
2227            {
2228                Ok(runtime) => {
2229                    node_start_debug_log("Node::start nostr discovery runtime created");
2230                    if let Err(err) = self.refresh_overlay_advert(&runtime).await {
2231                        warn!(error = %err, "Failed to publish initial Nostr overlay advert");
2232                    }
2233                    node_start_debug_log("Node::start nostr overlay advert refreshed");
2234                    self.nostr_discovery = Some(runtime);
2235                    self.nostr_discovery_started_at_ms = Some(Self::now_ms());
2236                    info!("Nostr overlay discovery enabled");
2237                }
2238                Err(err) => {
2239                    node_start_debug_log(format!(
2240                        "Node::start nostr discovery start error error={}",
2241                        err
2242                    ));
2243                    warn!(error = %err, "Failed to start Nostr overlay discovery");
2244                }
2245            }
2246        }
2247
2248        // mDNS / DNS-SD LAN discovery. Independent of Nostr — runs even
2249        // when Nostr is disabled, since it gives us sub-second pairing
2250        // on the same link without any relay or NAT-traversal roundtrip.
2251        if self.config.node.discovery.lan.enabled {
2252            node_start_debug_log("Node::start lan discovery start begin");
2253            let advertised_udp_port = self
2254                .transports
2255                .values()
2256                .filter(|h| h.is_operational())
2257                .filter(|h| h.transport_type().name == "udp")
2258                .find_map(|h| h.local_addr().map(|addr| addr.port()))
2259                .unwrap_or(0);
2260            let scope = self.lan_discovery_scope();
2261            match crate::discovery::lan::LanDiscovery::start(
2262                &self.identity,
2263                scope,
2264                advertised_udp_port,
2265                self.config.node.discovery.lan.clone(),
2266            )
2267            .await
2268            {
2269                Ok(runtime) => {
2270                    node_start_debug_log("Node::start lan discovery start ok");
2271                    self.lan_discovery = Some(runtime);
2272                    info!("LAN mDNS discovery enabled");
2273                }
2274                Err(err) => {
2275                    node_start_debug_log(format!(
2276                        "Node::start lan discovery start error error={}",
2277                        err
2278                    ));
2279                    debug!(error = %err, "LAN mDNS discovery not started");
2280                }
2281            }
2282        }
2283
2284        self.start_local_instance_discovery();
2285        self.poll_local_instance_discovery().await;
2286
2287        // Connect to static peers before TUN is active
2288        // This allows handshake messages to be sent before we start accepting packets
2289        node_start_debug_log("Node::start initiate peer connections begin");
2290        self.initiate_peer_connections().await;
2291        node_start_debug_log("Node::start initiate peer connections complete");
2292
2293        // Initialize TUN interface last, after transports and peers are ready
2294        if self.config.tun.enabled {
2295            node_start_debug_log("Node::start tun init begin");
2296            let address = *self.identity.address();
2297            match TunDevice::create(&self.config.tun, address).await {
2298                Ok(device) => {
2299                    let mtu = device.mtu();
2300                    let name = device.name().to_string();
2301                    let our_addr = *device.address();
2302
2303                    info!("TUN device active:");
2304                    info!("     name: {}", name);
2305                    info!("  address: {}", device.address());
2306                    info!("      mtu: {}", mtu);
2307
2308                    // Calculate max MSS for TCP clamping
2309                    let effective_mtu = self.effective_ipv6_mtu();
2310                    let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); // IPv6 + TCP headers
2311
2312                    info!("effective MTU: {} bytes", effective_mtu);
2313                    debug!("   max TCP MSS: {} bytes", max_mss);
2314
2315                    // On macOS, create a shutdown pipe. Writing to it unblocks the
2316                    // reader thread's select() loop without closing the TUN fd
2317                    // (which would cause a double-close when TunDevice drops).
2318                    #[cfg(target_os = "macos")]
2319                    let (shutdown_read_fd, shutdown_write_fd) = {
2320                        let mut fds = [0i32; 2];
2321                        if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
2322                            return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
2323                                "failed to create shutdown pipe".into(),
2324                            )));
2325                        }
2326                        (fds[0], fds[1])
2327                    };
2328
2329                    // Create writer (dups the fd for independent write access).
2330                    // Pass path_mtu_lookup so inbound SYN-ACK clamp can read
2331                    // per-destination path MTU learned via discovery.
2332                    let (writer, tun_tx) =
2333                        device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
2334
2335                    // Spawn writer thread
2336                    let writer_handle = thread::spawn(move || {
2337                        writer.run();
2338                    });
2339
2340                    // Clone tun_tx for the reader
2341                    let reader_tun_tx = tun_tx.clone();
2342
2343                    // Create outbound channel for TUN reader → Node
2344                    let tun_channel_size = self.config.node.buffers.tun_channel;
2345                    let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
2346
2347                    // Spawn reader thread
2348                    let transport_mtu = self.transport_mtu();
2349                    let path_mtu_lookup = self.path_mtu_lookup.clone();
2350                    #[cfg(target_os = "macos")]
2351                    let reader_handle = thread::spawn(move || {
2352                        run_tun_reader(
2353                            device,
2354                            mtu,
2355                            our_addr,
2356                            reader_tun_tx,
2357                            outbound_tx,
2358                            transport_mtu,
2359                            path_mtu_lookup,
2360                            shutdown_read_fd,
2361                        );
2362                    });
2363                    #[cfg(not(target_os = "macos"))]
2364                    let reader_handle = thread::spawn(move || {
2365                        run_tun_reader(
2366                            device,
2367                            mtu,
2368                            our_addr,
2369                            reader_tun_tx,
2370                            outbound_tx,
2371                            transport_mtu,
2372                            path_mtu_lookup,
2373                        );
2374                    });
2375
2376                    self.tun_state = TunState::Active;
2377                    self.tun_name = Some(name);
2378                    self.tun_tx = Some(tun_tx);
2379                    self.tun_outbound_rx = Some(outbound_rx);
2380                    self.tun_reader_handle = Some(reader_handle);
2381                    self.tun_writer_handle = Some(writer_handle);
2382                    #[cfg(target_os = "macos")]
2383                    {
2384                        self.tun_shutdown_fd = Some(shutdown_write_fd);
2385                    }
2386                }
2387                Err(e) => {
2388                    self.tun_state = TunState::Failed;
2389                    warn!(error = %e, "Failed to initialize TUN, continuing without it");
2390                }
2391            }
2392            node_start_debug_log("Node::start tun init complete");
2393        }
2394
2395        // Initialize DNS responder (independent of TUN).
2396        //
2397        // Default bind_addr is "::1" (IPv6 loopback). The shipped
2398        // fips-dns-setup configures systemd-resolved via a global
2399        // /etc/systemd/resolved.conf.d/fips.conf drop-in pointing at
2400        // [::1]:5354, which sidesteps a Linux IPV6_PKTINFO behaviour
2401        // where self-destined traffic to fips0's address is attributed
2402        // to fips0 in PKTINFO and gets silently dropped by the
2403        // mesh-interface filter in src/upper/dns.rs.
2404        //
2405        // For mesh-reachable resolution (rare), set bind_addr: "::"
2406        // in fips.yaml. The mesh-interface filter remains active to
2407        // prevent hosts-file alias enumeration in that mode.
2408        // `IPV6_V6ONLY=0` is set explicitly so IPv4 clients on
2409        // 127.0.0.1 still reach us regardless of kernel sysctl
2410        // defaults — but only when bind is on a wildcard / IPv6 path.
2411        if self.config.dns.enabled {
2412            node_start_debug_log("Node::start dns init begin");
2413            let addr_str = self.config.dns.bind_addr();
2414            match addr_str.parse::<std::net::IpAddr>() {
2415                Ok(ip) => {
2416                    let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
2417                    match Self::bind_dns_socket(bind) {
2418                        Ok(socket) => {
2419                            let dns_channel_size = self.config.node.buffers.dns_channel;
2420                            let (identity_tx, identity_rx) =
2421                                tokio::sync::mpsc::channel(dns_channel_size);
2422                            let dns_ttl = self.config.dns.ttl();
2423                            let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
2424                                self.config.peers(),
2425                            );
2426                            let reloader = if self.config.node.system_files_enabled {
2427                                let hosts_path = std::path::PathBuf::from(
2428                                    crate::upper::hosts::DEFAULT_HOSTS_PATH,
2429                                );
2430                                crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
2431                            } else {
2432                                crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
2433                            };
2434                            // Resolve the TUN ifindex so the responder can
2435                            // drop queries arriving on the mesh interface
2436                            // (fips0). Without this, the `::` bind exposes
2437                            // /etc/fips/hosts alias probing to any mesh peer.
2438                            // When TUN isn't enabled or the name can't be
2439                            // resolved, `None` disables the filter (there
2440                            // is no mesh surface to defend anyway).
2441                            let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
2442                            info!(
2443                                bind = %bind,
2444                                hosts = reloader.hosts().len(),
2445                                mesh_ifindex = ?mesh_ifindex,
2446                                "DNS responder started for .fips domain (auto-reload enabled)"
2447                            );
2448                            let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
2449                                socket,
2450                                identity_tx,
2451                                dns_ttl,
2452                                reloader,
2453                                mesh_ifindex,
2454                            ));
2455                            self.dns_identity_rx = Some(identity_rx);
2456                            self.dns_task = Some(handle);
2457                        }
2458                        Err(e) => {
2459                            warn!(bind = %bind, error = %e, "Failed to start DNS responder");
2460                        }
2461                    }
2462                }
2463                Err(e) => {
2464                    warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
2465                }
2466            }
2467            node_start_debug_log("Node::start dns init complete");
2468        }
2469
2470        self.state = NodeState::Running;
2471        node_start_debug_log("Node::start running");
2472        info!("Node started:");
2473        info!("       state: {}", self.state);
2474        info!("  transports: {}", self.transports.len());
2475        info!(" connections: {}", self.connections.len());
2476        Ok(())
2477    }
2478
2479    /// Bind a UDP socket for the DNS responder.
2480    ///
2481    /// For IPv6 binds (including `::`), sets `IPV6_V6ONLY=0` so the socket
2482    /// also accepts IPv4-mapped addresses. This guarantees dual-stack
2483    /// delivery regardless of `net.ipv6.bindv6only` sysctl on the host —
2484    /// v4 clients on 127.0.0.1 and v6 clients on the fips0 address both
2485    /// land on the same socket.
2486    ///
2487    /// Also enables `IPV6_RECVPKTINFO` on IPv6 sockets so the responder
2488    /// can learn the arrival interface per packet. The responder uses that
2489    /// to drop queries arriving on the mesh TUN, closing the hosts-file
2490    /// probing side-channel created by the `::` bind.
2491    fn bind_dns_socket(
2492        addr: std::net::SocketAddr,
2493    ) -> Result<tokio::net::UdpSocket, std::io::Error> {
2494        use socket2::{Domain, Protocol, Socket, Type};
2495        let domain = if addr.is_ipv4() {
2496            Domain::IPV4
2497        } else {
2498            Domain::IPV6
2499        };
2500        let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
2501        if addr.is_ipv6() {
2502            sock.set_only_v6(false)?;
2503            #[cfg(unix)]
2504            Self::set_recv_pktinfo_v6(&sock)?;
2505        }
2506        sock.set_nonblocking(true)?;
2507        sock.bind(&addr.into())?;
2508        tokio::net::UdpSocket::from_std(sock.into())
2509    }
2510
2511    /// Enable `IPV6_RECVPKTINFO` on an IPv6 UDP socket.
2512    ///
2513    /// After this setsockopt, each `recvmsg()` call on the socket receives
2514    /// an `IPV6_PKTINFO` control message containing the arrival interface
2515    /// index, which the DNS responder uses for its mesh-interface filter.
2516    #[cfg(unix)]
2517    fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
2518        use std::os::fd::AsRawFd;
2519        let enable: libc::c_int = 1;
2520        let ret = unsafe {
2521            libc::setsockopt(
2522                sock.as_raw_fd(),
2523                libc::IPPROTO_IPV6,
2524                libc::IPV6_RECVPKTINFO,
2525                &enable as *const _ as *const libc::c_void,
2526                std::mem::size_of::<libc::c_int>() as libc::socklen_t,
2527            )
2528        };
2529        if ret < 0 {
2530            return Err(std::io::Error::last_os_error());
2531        }
2532        Ok(())
2533    }
2534
2535    /// Resolve the mesh TUN interface index by name.
2536    ///
2537    /// Returns `None` if the interface does not exist (e.g. TUN disabled
2538    /// or not yet created). A `None` result disables the DNS responder's
2539    /// mesh-interface filter — safe, because if there is no fips0 there
2540    /// is no mesh exposure to defend against.
2541    fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
2542        #[cfg(unix)]
2543        {
2544            let c_name = std::ffi::CString::new(name).ok()?;
2545            let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
2546            if idx == 0 { None } else { Some(idx) }
2547        }
2548        #[cfg(not(unix))]
2549        {
2550            let _ = name;
2551            None
2552        }
2553    }
2554
2555    /// Stop the node.
2556    ///
2557    /// Shuts down TUN interface, stops I/O threads, and transitions to
2558    /// the Stopped state.
2559    pub async fn stop(&mut self) -> Result<(), NodeError> {
2560        if !self.state.can_stop() {
2561            return Err(NodeError::NotStarted);
2562        }
2563        self.state = NodeState::Stopping;
2564        info!(state = %self.state, "Node stopping");
2565
2566        // Stop DNS responder
2567        if let Some(handle) = self.dns_task.take() {
2568            handle.abort();
2569            debug!("DNS responder stopped");
2570        }
2571
2572        // Send disconnect notifications to all active peers before closing transports
2573        self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
2574            .await;
2575
2576        // Stop Nostr overlay discovery background work and withdraw any advert.
2577        if let Some(bootstrap) = self.nostr_discovery.take()
2578            && let Err(e) = bootstrap.shutdown().await
2579        {
2580            warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
2581        }
2582
2583        // Tear down LAN mDNS responder + browser. Best-effort: the
2584        // OS will eventually time the advert out via its TTL even if
2585        // we don't get a clean unregister out before the daemon exits.
2586        if let Some(lan) = self.lan_discovery.take() {
2587            lan.shutdown().await;
2588        }
2589
2590        if let Some(registry) = self.local_instance_registry.take()
2591            && let Err(err) = registry.remove()
2592        {
2593            debug!(error = %err, "failed to remove same-host FIPS instance record");
2594        }
2595
2596        // Shutdown transports (they're packet producers)
2597        let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
2598        for transport_id in transport_ids {
2599            if let Some(mut handle) = self.transports.remove(&transport_id) {
2600                let transport_type = handle.transport_type().name;
2601                match handle.stop().await {
2602                    Ok(()) => {
2603                        info!(transport_id = %transport_id, transport_type, "Transport stopped");
2604                    }
2605                    Err(e) => {
2606                        warn!(
2607                            transport_id = %transport_id,
2608                            transport_type,
2609                            error = %e,
2610                            "Transport stop failed"
2611                        );
2612                    }
2613                }
2614            }
2615        }
2616
2617        // Drop packet channels
2618        self.packet_tx.take();
2619        self.packet_rx.take();
2620
2621        // Shutdown TUN interface
2622        if let Some(name) = self.tun_name.take() {
2623            info!(name = %name, "Shutting down TUN interface");
2624
2625            // Drop the tun_tx to signal the writer to stop
2626            self.tun_tx.take();
2627
2628            // Delete the interface (on Linux, causes reader to get EFAULT)
2629            if let Err(e) = shutdown_tun_interface(&name).await {
2630                warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
2631            }
2632
2633            // On macOS, signal the reader thread to exit by writing to the
2634            // shutdown pipe. The reader's select() will wake up and break.
2635            #[cfg(target_os = "macos")]
2636            if let Some(fd) = self.tun_shutdown_fd.take() {
2637                unsafe {
2638                    libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
2639                    libc::close(fd);
2640                }
2641            }
2642
2643            // Wait for threads to finish
2644            if let Some(handle) = self.tun_reader_handle.take() {
2645                let _ = handle.join();
2646            }
2647            if let Some(handle) = self.tun_writer_handle.take() {
2648                let _ = handle.join();
2649            }
2650
2651            self.tun_state = TunState::Disabled;
2652        }
2653
2654        self.state = NodeState::Stopped;
2655        info!(state = %self.state, "Node stopped");
2656        Ok(())
2657    }
2658
2659    /// Send disconnect notifications to all active peers.
2660    ///
2661    /// Best-effort: send failures are logged and ignored since the transport
2662    /// may already be degraded. This runs before transports are shut down.
2663    async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
2664        let disconnect = Disconnect::new(reason);
2665        let plaintext = disconnect.encode();
2666
2667        // Collect node_addrs to avoid borrow conflict with send helper
2668        let peer_addrs: Vec<NodeAddr> = self
2669            .peers
2670            .iter()
2671            .filter(|(_, peer)| peer.can_send() && peer.has_session())
2672            .map(|(addr, _)| *addr)
2673            .collect();
2674
2675        if peer_addrs.is_empty() {
2676            debug!(
2677                total_peers = self.peers.len(),
2678                "No sendable peers for disconnect notification"
2679            );
2680            return;
2681        }
2682
2683        let mut sent = 0usize;
2684        for node_addr in &peer_addrs {
2685            match self
2686                .send_encrypted_link_message(node_addr, &plaintext)
2687                .await
2688            {
2689                Ok(()) => sent += 1,
2690                Err(e) => {
2691                    debug!(
2692                        peer = %self.peer_display_name(node_addr),
2693                        error = %e,
2694                        "Failed to send disconnect (transport may be down)"
2695                    );
2696                }
2697            }
2698        }
2699
2700        info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2701    }
2702
2703    pub(in crate::node) fn static_peer_addresses(
2704        &self,
2705        peer_config: &PeerConfig,
2706    ) -> Vec<PeerAddress> {
2707        peer_config
2708            .addresses_by_priority()
2709            .into_iter()
2710            .cloned()
2711            .collect()
2712    }
2713
2714    async fn nostr_peer_fallback_addresses(
2715        &self,
2716        peer_config: &PeerConfig,
2717        existing: &[PeerAddress],
2718    ) -> Vec<PeerAddress> {
2719        if !self.config.node.discovery.nostr.enabled
2720            || self.config.node.discovery.nostr.policy
2721                == crate::config::NostrDiscoveryPolicy::Disabled
2722        {
2723            return Vec::new();
2724        }
2725
2726        let Some(bootstrap) = self.nostr_discovery.clone() else {
2727            return Vec::new();
2728        };
2729        if self.nostr_cooldown_applies_to_peer_config(peer_config)
2730            && bootstrap
2731                .cooldown_until(&peer_config.npub, Self::now_ms())
2732                .is_some()
2733        {
2734            debug!(
2735                npub = %peer_config.npub,
2736                "Skipping cached Nostr fallback endpoints while peer is in traversal cooldown"
2737            );
2738            return Vec::new();
2739        }
2740        let endpoints = match bootstrap
2741            .cached_advert_endpoints_for_peer(&peer_config.npub)
2742            .await
2743        {
2744            Some(endpoints) => endpoints,
2745            None => {
2746                debug!(
2747                    npub = %peer_config.npub,
2748                    "No cached Nostr advert endpoints for configured peer"
2749                );
2750                return Vec::new();
2751            }
2752        };
2753
2754        let mut fallback = Vec::new();
2755        let mut next_priority = existing
2756            .iter()
2757            .map(|addr| addr.priority)
2758            .max()
2759            .unwrap_or(100)
2760            .saturating_add(1);
2761        // Stamp every overlay-derived candidate with the current wall clock.
2762        // The dialer still honors explicit priority first; this timestamp is
2763        // only a recency tiebreaker within the same priority tier. We use a
2764        // single timestamp per fetch because all candidates in one advert are
2765        // equally fresh.
2766        let seen_at_ms = Self::now_ms();
2767        for endpoint in endpoints {
2768            let Some(candidate) =
2769                Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2770            else {
2771                continue;
2772            };
2773            if existing
2774                .iter()
2775                .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2776                || fallback.iter().any(|addr: &PeerAddress| {
2777                    addr.transport == candidate.transport && addr.addr == candidate.addr
2778                })
2779            {
2780                continue;
2781            }
2782            fallback.push(candidate);
2783            next_priority = next_priority.saturating_add(1);
2784        }
2785        fallback
2786    }
2787
2788    pub(in crate::node) async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2789        if !self.config.node.discovery.nostr.enabled
2790            || self.config.node.discovery.nostr.policy
2791                == crate::config::NostrDiscoveryPolicy::Disabled
2792        {
2793            return false;
2794        }
2795        let Some(bootstrap) = self.nostr_discovery.clone() else {
2796            return false;
2797        };
2798        let now_ms = Self::now_ms();
2799        if self.nostr_cooldown_applies_to_peer_config(peer_config)
2800            && let Some(cooldown_until_ms) = bootstrap.cooldown_until(&peer_config.npub, now_ms)
2801        {
2802            debug!(
2803                npub = %peer_config.npub,
2804                cooldown_secs = cooldown_until_ms.saturating_sub(now_ms) / 1000,
2805                "Skipping Nostr traversal request while peer is in cooldown"
2806            );
2807            return false;
2808        }
2809        bootstrap.set_outbound_admission(self.open_discovery_outbound_admission_check());
2810        bootstrap.set_direct_refresh_admission(self.outbound_direct_refresh_admission_check());
2811        let mesh_signaling_allowed = self.mesh_signaling_allowed_for_peer(peer_config);
2812        let started = bootstrap
2813            .request_connect_with_mesh_signaling(peer_config.clone(), mesh_signaling_allowed)
2814            .await;
2815        if started {
2816            info!(
2817                npub = %peer_config.npub,
2818                mesh_signaling_allowed,
2819                "Started background UDP NAT traversal attempt"
2820            );
2821        } else {
2822            debug!(
2823                npub = %peer_config.npub,
2824                mesh_signaling_allowed,
2825                "Background UDP NAT traversal attempt already in progress"
2826            );
2827        }
2828        true
2829    }
2830
2831    fn nostr_cooldown_applies_to_peer_config(&self, peer_config: &PeerConfig) -> bool {
2832        !self.mesh_signaling_allowed_for_peer(peer_config)
2833    }
2834
2835    pub(in crate::node) fn mesh_signaling_allowed_for_peer(
2836        &self,
2837        peer_config: &PeerConfig,
2838    ) -> bool {
2839        let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
2840            return false;
2841        };
2842        let peer_addr = identity.node_addr();
2843        self.configured_peer(peer_addr).is_some()
2844    }
2845
2846    fn overlay_endpoint_to_peer_address(
2847        endpoint: &OverlayEndpointAdvert,
2848        priority: u8,
2849        seen_at_ms: u64,
2850    ) -> Option<PeerAddress> {
2851        let transport = match endpoint.transport {
2852            OverlayTransportKind::Udp => "udp",
2853            OverlayTransportKind::Tcp => "tcp",
2854            OverlayTransportKind::Tor => "tor",
2855            OverlayTransportKind::WebRtc => "webrtc",
2856        };
2857        Some(
2858            PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2859                .with_seen_at_ms(seen_at_ms),
2860        )
2861    }
2862
2863    async fn attempt_peer_address_list(
2864        &mut self,
2865        peer_config: &PeerConfig,
2866        peer_identity: PeerIdentity,
2867        allow_bootstrap_nat: bool,
2868        addresses: &[PeerAddress],
2869    ) -> Result<(), NodeError> {
2870        let mut attempted = false;
2871        let mut local_route_error = None;
2872        let peer_node_addr = *peer_identity.node_addr();
2873        let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2874
2875        for addr in addresses {
2876            if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2877                if !allow_bootstrap_nat {
2878                    continue;
2879                }
2880                if self.request_nostr_bootstrap(peer_config).await {
2881                    attempted = true;
2882                    continue;
2883                }
2884                debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2885                continue;
2886            }
2887
2888            let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2889                match self.resolve_ethernet_addr(&addr.addr) {
2890                    Ok(result) => result,
2891                    Err(e) => {
2892                        debug!(
2893                            transport = %addr.transport,
2894                            addr = %addr.addr,
2895                            error = %e,
2896                            "Failed to resolve Ethernet address"
2897                        );
2898                        continue;
2899                    }
2900                }
2901            } else if addr.transport == "ble" {
2902                #[cfg(bluer_available)]
2903                {
2904                    match self.resolve_ble_addr(&addr.addr) {
2905                        Ok(result) => result,
2906                        Err(e) => {
2907                            debug!(
2908                                transport = %addr.transport,
2909                                addr = %addr.addr,
2910                                error = %e,
2911                                "Failed to resolve BLE address"
2912                            );
2913                            continue;
2914                        }
2915                    }
2916                }
2917                #[cfg(not(bluer_available))]
2918                {
2919                    debug!(transport = %addr.transport, "BLE transport not available on this build");
2920                    continue;
2921                }
2922            } else {
2923                let tid = if addr.transport == "udp"
2924                    && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2925                {
2926                    match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2927                        Some((id, _)) => id,
2928                        None => {
2929                            debug!(
2930                                transport = %addr.transport,
2931                                addr = %addr.addr,
2932                                "No compatible operational UDP transport for address"
2933                            );
2934                            continue;
2935                        }
2936                    }
2937                } else {
2938                    match self.find_transport_for_type(&addr.transport) {
2939                        Some(id) => id,
2940                        None => {
2941                            debug!(
2942                                transport = %addr.transport,
2943                                addr = %addr.addr,
2944                                "No operational transport for address type"
2945                            );
2946                            continue;
2947                        }
2948                    }
2949                };
2950                (tid, TransportAddr::from_string(&addr.addr))
2951            };
2952
2953            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2954                attempted = true;
2955                debug!(
2956                    npub = %peer_config.npub,
2957                    transport_id = %transport_id,
2958                    remote_addr = %remote_addr,
2959                    "Skipping duplicate in-flight candidate path"
2960                );
2961                continue;
2962            }
2963
2964            if concrete_budget == 0 {
2965                debug!(
2966                    npub = %peer_config.npub,
2967                    max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2968                    "Path candidate race budget exhausted"
2969                );
2970                break;
2971            }
2972
2973            match self
2974                .initiate_connection(transport_id, remote_addr, peer_identity)
2975                .await
2976            {
2977                Ok(()) => {
2978                    attempted = true;
2979                    concrete_budget = concrete_budget.saturating_sub(1);
2980                }
2981                Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2982                Err(e) => {
2983                    if e.is_local_route_unavailable() && local_route_error.is_none() {
2984                        local_route_error = Some(e.to_string());
2985                    }
2986                    debug!(
2987                        npub = %peer_config.npub,
2988                        transport_id = %transport_id,
2989                        error = %e,
2990                        "Connection attempt failed, trying next address"
2991                    );
2992                }
2993            }
2994        }
2995
2996        if attempted {
2997            return Ok(());
2998        }
2999
3000        if let Some(error) = local_route_error {
3001            return Err(NodeError::LocalRouteUnavailable(error));
3002        }
3003
3004        Err(NodeError::NoTransportForType(format!(
3005            "no operational transport for any of {}'s addresses",
3006            peer_config.npub
3007        )))
3008    }
3009
3010    async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
3011        self.run_open_discovery_sweep(bootstrap, None, "per-tick")
3012            .await;
3013    }
3014
3015    pub(in crate::node) fn queue_active_fallback_direct_retries(
3016        &mut self,
3017        _bootstrap: &std::sync::Arc<NostrDiscovery>,
3018    ) {
3019        let now_ms = Self::now_ms();
3020        let peer_configs = self
3021            .config
3022            .auto_connect_peers()
3023            .cloned()
3024            .collect::<Vec<_>>();
3025
3026        for peer_config in peer_configs {
3027            let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
3028                continue;
3029            };
3030            let node_addr = *peer_identity.node_addr();
3031
3032            if self.retry_pending.contains_key(&node_addr)
3033                || !self.peers.contains_key(&node_addr)
3034                || self.is_connecting_to_peer(&node_addr)
3035                || !self.active_peer_should_keep_direct_retry(&node_addr, &peer_config)
3036            {
3037                continue;
3038            }
3039
3040            let mut state = super::retry::RetryState::new(peer_config.clone());
3041            state.reconnect = true;
3042            state.retry_after_ms = now_ms;
3043            self.retry_pending.insert(node_addr, state);
3044
3045            debug!(
3046                peer = %self.peer_display_name(&node_addr),
3047                "Queued direct-path retry for active fallback peer"
3048            );
3049        }
3050    }
3051
3052    /// Open-discovery cache sweep. Iterates the cached overlay adverts and
3053    /// queues retries for non-configured, not-yet-connected peers.
3054    ///
3055    /// `max_age_secs`, if set, filters out adverts whose `created_at` is
3056    /// older than `now - max_age_secs`. The per-tick sweep passes `None`
3057    /// (relies on the cache's own `valid_until_ms` filter); the one-shot
3058    /// startup sweep passes `Some(startup_sweep_max_age_secs)`.
3059    ///
3060    /// `caller` is a short label included in log lines so per-tick and
3061    /// startup sweeps are distinguishable in operator-facing logs.
3062    pub(in crate::node) async fn run_open_discovery_sweep(
3063        &mut self,
3064        bootstrap: &std::sync::Arc<NostrDiscovery>,
3065        max_age_secs: Option<u64>,
3066        caller: &'static str,
3067    ) {
3068        if !self.config.node.discovery.nostr.enabled
3069            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3070        {
3071            return;
3072        }
3073
3074        let configured_npubs = self
3075            .config
3076            .peers()
3077            .iter()
3078            .map(|peer| peer.npub.clone())
3079            .collect::<HashSet<_>>();
3080        let now_ms = Self::now_ms();
3081        let now_secs = now_ms / 1000;
3082        let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
3083        if enqueue_budget == 0 {
3084            debug!(
3085                caller = %caller,
3086                "open-discovery sweep: enqueue budget is 0, skipping"
3087            );
3088            return;
3089        }
3090
3091        let candidates = bootstrap.cached_open_discovery_candidates(64).await;
3092        let cached_count = candidates.len();
3093        let mut enqueued = 0usize;
3094        let mut skipped_age = 0usize;
3095        let mut skipped_configured = 0usize;
3096        let mut skipped_self = 0usize;
3097        let mut skipped_connected = 0usize;
3098        let mut skipped_retry_pending = 0usize;
3099        let mut skipped_connecting = 0usize;
3100        let mut skipped_no_endpoints = 0usize;
3101        let mut skipped_invalid_npub = 0usize;
3102        let mut skipped_cooldown = 0usize;
3103
3104        for (npub, endpoints, created_at_secs) in candidates {
3105            if enqueue_budget == 0 {
3106                break;
3107            }
3108
3109            if let Some(max_age) = max_age_secs
3110                && now_secs.saturating_sub(created_at_secs) > max_age
3111            {
3112                skipped_age = skipped_age.saturating_add(1);
3113                continue;
3114            }
3115
3116            if configured_npubs.contains(&npub) {
3117                // Configured peers don't go through the open-discovery
3118                // enqueue path — their `PeerConfig` is already in
3119                // `self.config.peers()`, so the regular retry queue is
3120                // what drives their reconnect. But on cold start with
3121                // NAT'd peers, every initial `initiate_peer_connection`
3122                // fails (no overlay data yet, static cache hints empty
3123                // or stale), each pushes the peer into `retry_pending`
3124                // with exponential backoff (5/10/20/40/80s), and by the
3125                // time the next backoff slot fires the Nostr advert is
3126                // already cached — we just don't act on it for ~80s.
3127                //
3128                // The arrival of an advert (which this sweep sees) means
3129                // we now have a path to dial. If the peer's retry is
3130                // scheduled in the future, pull it forward to "now" so
3131                // the next `process_pending_retries` tick fires it
3132                // immediately. The retry path (`initiate_peer_retry_
3133                // connection` → `try_peer_addresses`) then refetches
3134                // the advert and dials it — no behavioral change
3135                // beyond schedule timing.
3136                if let Ok(identity) = PeerIdentity::from_npub(&npub) {
3137                    let configured_addr = *identity.node_addr();
3138                    if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3139                        skipped_cooldown = skipped_cooldown.saturating_add(1);
3140                        skipped_configured = skipped_configured.saturating_add(1);
3141                        continue;
3142                    }
3143                    if let Some(state) = self.retry_pending.get_mut(&configured_addr)
3144                        && state.retry_after_ms > now_ms
3145                    {
3146                        state.retry_after_ms = now_ms;
3147                        debug!(
3148                            caller = %caller,
3149                            peer = %self.peer_display_name(&configured_addr),
3150                            advert_age_secs = now_secs.saturating_sub(created_at_secs),
3151                            "Expediting configured-peer retry after fresh overlay advert"
3152                        );
3153                    }
3154                }
3155                skipped_configured = skipped_configured.saturating_add(1);
3156                continue;
3157            }
3158
3159            let peer_identity = match PeerIdentity::from_npub(&npub) {
3160                Ok(identity) => identity,
3161                Err(_) => {
3162                    skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
3163                    continue;
3164                }
3165            };
3166            let node_addr = *peer_identity.node_addr();
3167            if node_addr == *self.identity.node_addr() {
3168                skipped_self = skipped_self.saturating_add(1);
3169                continue;
3170            }
3171            if self.peers.contains_key(&node_addr) {
3172                skipped_connected = skipped_connected.saturating_add(1);
3173                continue;
3174            }
3175            if self.retry_pending.contains_key(&node_addr) {
3176                skipped_retry_pending = skipped_retry_pending.saturating_add(1);
3177                continue;
3178            }
3179            if bootstrap.cooldown_until(&npub, now_ms).is_some() {
3180                skipped_cooldown = skipped_cooldown.saturating_add(1);
3181                continue;
3182            }
3183            let connecting = self.connections.values().any(|conn| {
3184                conn.expected_identity()
3185                    .map(|id| id.node_addr() == &node_addr)
3186                    .unwrap_or(false)
3187            });
3188            if connecting {
3189                skipped_connecting = skipped_connecting.saturating_add(1);
3190                continue;
3191            }
3192
3193            let mut addresses = Vec::new();
3194            let mut priority = 120u8;
3195            let seen_at_ms = Self::now_ms();
3196            for endpoint in endpoints {
3197                let Some(candidate) =
3198                    Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
3199                else {
3200                    continue;
3201                };
3202                if addresses.iter().any(|existing: &PeerAddress| {
3203                    existing.transport == candidate.transport && existing.addr == candidate.addr
3204                }) {
3205                    continue;
3206                }
3207                addresses.push(candidate);
3208                priority = priority.saturating_add(1);
3209            }
3210            if addresses.is_empty() {
3211                skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
3212                continue;
3213            }
3214
3215            self.peer_aliases
3216                .entry(node_addr)
3217                .or_insert_with(|| peer_identity.short_npub());
3218            self.register_identity(node_addr, peer_identity.pubkey_full());
3219
3220            let mut state = super::retry::RetryState::new(PeerConfig {
3221                npub: npub.clone(),
3222                alias: None,
3223                addresses,
3224                connect_policy: ConnectPolicy::AutoConnect,
3225                auto_reconnect: true,
3226                discovery_fallback_transit: false,
3227            });
3228            state.reconnect = false;
3229            state.retry_after_ms = now_ms;
3230            state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
3231            self.retry_pending.insert(node_addr, state);
3232            info!(
3233                caller = %caller,
3234                peer = %peer_identity.short_npub(),
3235                advert_age_secs = now_secs.saturating_sub(created_at_secs),
3236                "open-discovery sweep: queued retry for cached advert"
3237            );
3238            enqueue_budget = enqueue_budget.saturating_sub(1);
3239            enqueued = enqueued.saturating_add(1);
3240        }
3241
3242        // Always log a one-line summary on the startup sweep so operators
3243        // can verify it ran. Per-tick sweeps are noisier; only summarize
3244        // when something happened.
3245        let total_skipped = skipped_age
3246            + skipped_configured
3247            + skipped_self
3248            + skipped_connected
3249            + skipped_retry_pending
3250            + skipped_connecting
3251            + skipped_no_endpoints
3252            + skipped_invalid_npub
3253            + skipped_cooldown;
3254        let should_summarize = caller == "startup" || enqueued > 0;
3255        if should_summarize {
3256            info!(
3257                caller = %caller,
3258                cached = cached_count,
3259                queued = enqueued,
3260                skipped_age = skipped_age,
3261                skipped_configured = skipped_configured,
3262                skipped_self = skipped_self,
3263                skipped_connected = skipped_connected,
3264                skipped_retry_pending = skipped_retry_pending,
3265                skipped_connecting = skipped_connecting,
3266                skipped_no_endpoints = skipped_no_endpoints,
3267                skipped_invalid_npub = skipped_invalid_npub,
3268                skipped_cooldown = skipped_cooldown,
3269                skipped_total = total_skipped,
3270                "open-discovery sweep complete"
3271            );
3272        }
3273    }
3274
3275    /// One-shot startup sweep: runs once after the configured settle
3276    /// delay, iterating the cached overlay adverts and queueing retries
3277    /// for any peer with a recent enough advert that we haven't already
3278    /// configured statically or established a link to.
3279    ///
3280    /// Gated identically to [`run_open_discovery_sweep`]: requires
3281    /// `node.discovery.nostr.enabled` and `policy == open`.
3282    async fn maybe_run_startup_open_discovery_sweep(
3283        &mut self,
3284        bootstrap: &std::sync::Arc<NostrDiscovery>,
3285    ) {
3286        if self.startup_open_discovery_sweep_done {
3287            return;
3288        }
3289        if !self.config.node.discovery.nostr.enabled
3290            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
3291        {
3292            // Mark done so we don't keep re-checking on every tick.
3293            self.startup_open_discovery_sweep_done = true;
3294            return;
3295        }
3296        let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
3297            return;
3298        };
3299        let now_ms = Self::now_ms();
3300        let delay_ms = self
3301            .config
3302            .node
3303            .discovery
3304            .nostr
3305            .startup_sweep_delay_secs
3306            .saturating_mul(1000);
3307        if now_ms < started_at_ms.saturating_add(delay_ms) {
3308            return;
3309        }
3310
3311        let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
3312        self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
3313            .await;
3314        self.startup_open_discovery_sweep_done = true;
3315    }
3316
3317    fn available_outbound_slots(&self) -> usize {
3318        let connection_used = self
3319            .connections
3320            .len()
3321            .saturating_add(self.pending_connects.len());
3322        let connection_slots = if self.max_connections == 0 {
3323            usize::MAX
3324        } else {
3325            self.max_connections.saturating_sub(connection_used)
3326        };
3327
3328        let peer_slots = if self.max_peers == 0 {
3329            usize::MAX
3330        } else {
3331            self.max_peers.saturating_sub(self.peers.len())
3332        };
3333
3334        let link_slots = if self.max_links == 0 {
3335            usize::MAX
3336        } else {
3337            self.max_links.saturating_sub(self.links.len())
3338        };
3339
3340        connection_slots.min(peer_slots).min(link_slots)
3341    }
3342
3343    pub(in crate::node) fn open_discovery_enqueue_budget(
3344        &self,
3345        configured_npubs: &HashSet<String>,
3346    ) -> usize {
3347        let current_open_discovery_active = self
3348            .peers
3349            .values()
3350            .filter(|peer| !configured_npubs.contains(&peer.npub()))
3351            .count();
3352        let current_open_discovery_pending = self
3353            .retry_pending
3354            .values()
3355            .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
3356            .count();
3357
3358        let cap_remaining = self
3359            .config
3360            .node
3361            .discovery
3362            .nostr
3363            .open_discovery_max_pending
3364            .saturating_sub(current_open_discovery_active)
3365            .saturating_sub(current_open_discovery_pending);
3366
3367        cap_remaining.min(self.available_outbound_slots())
3368    }
3369
3370    fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
3371        now_ms.saturating_add(
3372            self.config
3373                .node
3374                .discovery
3375                .nostr
3376                .advert_ttl_secs
3377                .saturating_mul(1000)
3378                .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
3379        )
3380    }
3381
3382    async fn build_overlay_advert(
3383        &self,
3384        bootstrap: &std::sync::Arc<NostrDiscovery>,
3385    ) -> Option<OverlayAdvert> {
3386        if !self.config.node.discovery.nostr.enabled {
3387            return None;
3388        }
3389
3390        let mut endpoints = Vec::new();
3391        let mut has_udp_nat = false;
3392        let mut has_webrtc = false;
3393
3394        for handle in self.transports.values() {
3395            if !handle.is_operational() {
3396                continue;
3397            }
3398
3399            match handle.transport_type().name {
3400                "udp" => {
3401                    let Some(cfg) = self.lookup_udp_config(handle.name()) else {
3402                        continue;
3403                    };
3404                    if !cfg.advertise_on_nostr() {
3405                        continue;
3406                    }
3407                    if cfg.is_public() {
3408                        // Precedence:
3409                        // 1. operator-supplied `external_addr` (skips STUN)
3410                        // 2. non-wildcard *public* `local_addr` (operator
3411                        //    bound to a specific public IP directly)
3412                        // 3. STUN auto-discovery against ephemeral socket
3413                        //    (also taken when bind is wildcard *or* private —
3414                        //    a private bind is not peer-reachable, so we
3415                        //    must publish the public reflexive instead)
3416                        // 4. loud warn + omit endpoint
3417                        if let Some(explicit) = cfg.external_advert_addr() {
3418                            endpoints.push(OverlayEndpointAdvert {
3419                                transport: OverlayTransportKind::Udp,
3420                                addr: explicit.to_string(),
3421                            });
3422                        } else {
3423                            match handle.local_addr() {
3424                                Some(addr)
3425                                    if !addr.ip().is_unspecified()
3426                                        && !is_unroutable_advert_ip(addr.ip()) =>
3427                                {
3428                                    endpoints.push(OverlayEndpointAdvert {
3429                                        transport: OverlayTransportKind::Udp,
3430                                        addr: addr.to_string(),
3431                                    });
3432                                }
3433                                Some(addr) => {
3434                                    let key = handle.transport_id().as_u32();
3435                                    let port = addr.port();
3436                                    if let Some(public) =
3437                                        bootstrap.learn_public_udp_addr(key, port).await
3438                                    {
3439                                        endpoints.push(OverlayEndpointAdvert {
3440                                            transport: OverlayTransportKind::Udp,
3441                                            addr: public.to_string(),
3442                                        });
3443                                    } else {
3444                                        warn!(
3445                                            transport_id = key,
3446                                            bind_addr = %addr,
3447                                            "advert: udp public=true but bind is wildcard \
3448                                            or private and STUN observation failed; \
3449                                            advertising no UDP endpoint. Either set \
3450                                            transports.udp.external_addr, bind to a \
3451                                            specific *public* IP, or ensure \
3452                                            node.discovery.nostr.stun_servers is reachable"
3453                                        );
3454                                    }
3455                                }
3456                                None => {}
3457                            }
3458                        }
3459                    } else {
3460                        endpoints.push(OverlayEndpointAdvert {
3461                            transport: OverlayTransportKind::Udp,
3462                            addr: "nat".to_string(),
3463                        });
3464                        has_udp_nat = true;
3465                    }
3466                }
3467                "webrtc" => {
3468                    let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
3469                        continue;
3470                    };
3471                    if !cfg.advertise_on_nostr() {
3472                        continue;
3473                    }
3474                    endpoints.push(OverlayEndpointAdvert {
3475                        transport: OverlayTransportKind::WebRtc,
3476                        addr: hex::encode(self.identity.pubkey_full().serialize()),
3477                    });
3478                    has_webrtc = true;
3479                }
3480                "tcp" => {
3481                    let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
3482                        continue;
3483                    };
3484                    if !cfg.advertise_on_nostr() {
3485                        continue;
3486                    }
3487                    // Precedence:
3488                    // 1. operator-supplied `external_addr` (only path that
3489                    //    works on cloud-NAT setups where the public IP is
3490                    //    not on a host interface).
3491                    // 2. non-wildcard *public* `local_addr` (operator bound
3492                    //    to a specific public IP directly).
3493                    // 3. loud warn + omit endpoint (no TCP STUN equivalent).
3494                    //
3495                    // A wildcard *or* private bind is never advertised as-is
3496                    // — peers off-LAN can't reach a private bind, and there
3497                    // is no TCP STUN to discover a public reflexive.
3498                    if let Some(explicit) = cfg.external_advert_addr() {
3499                        endpoints.push(OverlayEndpointAdvert {
3500                            transport: OverlayTransportKind::Tcp,
3501                            addr: explicit.to_string(),
3502                        });
3503                    } else {
3504                        match handle.local_addr() {
3505                            Some(addr)
3506                                if !addr.ip().is_unspecified()
3507                                    && !is_unroutable_advert_ip(addr.ip()) =>
3508                            {
3509                                endpoints.push(OverlayEndpointAdvert {
3510                                    transport: OverlayTransportKind::Tcp,
3511                                    addr: addr.to_string(),
3512                                });
3513                            }
3514                            Some(addr) => {
3515                                warn!(
3516                                    bind_addr = %addr,
3517                                    "advert: tcp advertise_on_nostr=true bound to wildcard \
3518                                    or private IP and no transports.tcp.external_addr set; \
3519                                    advertising no TCP endpoint. Either set external_addr \
3520                                    to the public IP (recommended for cloud 1:1-NAT setups) \
3521                                    or bind explicitly to the public IP"
3522                                );
3523                            }
3524                            None => {}
3525                        }
3526                    }
3527                }
3528                "tor" => {
3529                    let Some(cfg) = self.lookup_tor_config(handle.name()) else {
3530                        continue;
3531                    };
3532                    if !cfg.advertise_on_nostr() {
3533                        continue;
3534                    }
3535                    if let Some(addr) = handle.onion_address() {
3536                        endpoints.push(OverlayEndpointAdvert {
3537                            transport: OverlayTransportKind::Tor,
3538                            addr: format!("{}:{}", addr, cfg.advertised_port()),
3539                        });
3540                    }
3541                }
3542                _ => {}
3543            }
3544        }
3545
3546        if endpoints.is_empty() {
3547            return None;
3548        }
3549
3550        Some(OverlayAdvert {
3551            identifier: ADVERT_IDENTIFIER.to_string(),
3552            version: ADVERT_VERSION,
3553            endpoints,
3554            signal_relays: (has_udp_nat || has_webrtc)
3555                .then(|| self.config.node.discovery.nostr.dm_relays.clone()),
3556            stun_servers: (has_udp_nat || has_webrtc)
3557                .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
3558        })
3559    }
3560
3561    async fn refresh_overlay_advert(
3562        &self,
3563        bootstrap: &std::sync::Arc<NostrDiscovery>,
3564    ) -> Result<(), crate::discovery::nostr::BootstrapError> {
3565        let advert = self.build_overlay_advert(bootstrap).await;
3566        bootstrap.update_local_advert(advert).await
3567    }
3568
3569    fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
3570        match (&self.config.transports.udp, transport_name) {
3571            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3572            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3573            _ => None,
3574        }
3575    }
3576
3577    fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
3578        match (&self.config.transports.tcp, transport_name) {
3579            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3580            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3581            _ => None,
3582        }
3583    }
3584
3585    fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
3586        match (&self.config.transports.tor, transport_name) {
3587            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3588            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3589            _ => None,
3590        }
3591    }
3592
3593    fn lookup_webrtc_config(
3594        &self,
3595        transport_name: Option<&str>,
3596    ) -> Option<&crate::config::WebRtcConfig> {
3597        match (&self.config.transports.webrtc, transport_name) {
3598            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3599            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3600            _ => None,
3601        }
3602    }
3603
3604    pub(in crate::node) async fn try_peer_addresses(
3605        &mut self,
3606        peer_config: &PeerConfig,
3607        peer_identity: PeerIdentity,
3608        allow_bootstrap_nat: bool,
3609    ) -> Result<(), NodeError> {
3610        let peer_node_addr = *peer_identity.node_addr();
3611        if self.peers.contains_key(&peer_node_addr) {
3612            debug!(
3613                npub = %peer_config.npub,
3614                "Peer already exists, skipping address attempts"
3615            );
3616            return Ok(());
3617        }
3618
3619        let candidates = self.peer_address_candidates(peer_config).await;
3620
3621        if candidates.is_empty() {
3622            if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3623                return Ok(());
3624            }
3625            return Err(NodeError::NoTransportForType(format!(
3626                "no addresses known for {}",
3627                peer_config.npub
3628            )));
3629        }
3630
3631        if self
3632            .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
3633            .await
3634            .is_ok()
3635        {
3636            if allow_bootstrap_nat {
3637                self.request_nostr_bootstrap(peer_config).await;
3638            }
3639            return Ok(());
3640        }
3641
3642        if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3643            return Ok(());
3644        }
3645
3646        Err(NodeError::NoTransportForType(format!(
3647            "no operational transport for any of {}'s addresses",
3648            peer_config.npub
3649        )))
3650    }
3651
3652    async fn try_active_peer_alternative_addresses(
3653        &mut self,
3654        peer_config: &PeerConfig,
3655        peer_identity: PeerIdentity,
3656        allow_same_path_refresh: bool,
3657    ) -> Result<bool, NodeError> {
3658        let peer_node_addr = *peer_identity.node_addr();
3659        let mut candidates = self.peer_address_candidates(peer_config).await;
3660        let same_path_refresh_needed = allow_same_path_refresh
3661            && (self.active_peer_needs_same_path_refresh(&peer_node_addr)
3662                || self
3663                    .peers
3664                    .get(&peer_node_addr)
3665                    .is_some_and(|peer| !peer.can_send()));
3666        if same_path_refresh_needed
3667            && let Some(candidate) = self.active_peer_current_udp_candidate(&peer_node_addr)
3668            && !candidates.iter().any(|existing| {
3669                existing.transport == candidate.transport && existing.addr == candidate.addr
3670            })
3671        {
3672            candidates.push(candidate);
3673            Self::sort_peer_address_candidates(&mut candidates);
3674        }
3675        let should_try_nostr =
3676            self.active_peer_should_keep_direct_retry(&peer_node_addr, peer_config);
3677
3678        if candidates.is_empty() {
3679            if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3680                return Ok(true);
3681            }
3682            return Err(NodeError::NoTransportForType(format!(
3683                "no addresses known for {}",
3684                peer_config.npub
3685            )));
3686        }
3687
3688        let alternatives: Vec<_> = candidates
3689            .into_iter()
3690            .filter(|addr| {
3691                same_path_refresh_needed
3692                    || !self.active_peer_matches_candidate(&peer_node_addr, addr)
3693            })
3694            .collect();
3695
3696        if alternatives.is_empty() {
3697            if should_try_nostr && self.request_nostr_bootstrap(peer_config).await {
3698                return Ok(true);
3699            }
3700            return Ok(false);
3701        }
3702
3703        let needs_separate_nostr_attempt = should_try_nostr
3704            && !alternatives
3705                .iter()
3706                .any(|addr| addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat"));
3707        let address_result = self
3708            .attempt_peer_address_list(peer_config, peer_identity, true, &alternatives)
3709            .await;
3710        let nostr_attempted =
3711            needs_separate_nostr_attempt && self.request_nostr_bootstrap(peer_config).await;
3712
3713        match address_result {
3714            Ok(()) => Ok(true),
3715            Err(err) if nostr_attempted => {
3716                debug!(
3717                    npub = %peer_config.npub,
3718                    error = %err,
3719                    "Static active-peer direct-path alternatives failed; Nostr traversal still queued"
3720                );
3721                Ok(true)
3722            }
3723            Err(err) => Err(err),
3724        }
3725    }
3726
3727    async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
3728        // Merge every candidate from every source we have for this peer.
3729        // Explicitly configured addresses keep first shot, then freshly
3730        // fetched overlay adverts are appended as fallback candidates. This
3731        // lets native peers try known LAN/nvpn/static UDP routes before
3732        // slower WebRTC/Nostr-discovered paths, while still racing every
3733        // concrete candidate that fits in the per-peer budget.
3734        let static_addresses = self.static_peer_addresses(peer_config);
3735        let overlay_addresses = self
3736            .nostr_peer_fallback_addresses(peer_config, &static_addresses)
3737            .await;
3738
3739        let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
3740        for addr in overlay_addresses.into_iter().chain(static_addresses) {
3741            if !candidates.iter().any(|existing: &PeerAddress| {
3742                existing.transport == addr.transport && existing.addr == addr.addr
3743            }) {
3744                candidates.push(addr);
3745            }
3746        }
3747
3748        Self::sort_peer_address_candidates(&mut candidates);
3749
3750        candidates
3751    }
3752
3753    fn sort_peer_address_candidates(candidates: &mut [PeerAddress]) {
3754        // Stable sort: explicit priority is the contract, and freshness only
3755        // breaks ties inside one priority tier. Overlay-discovered endpoints
3756        // are assigned lower priority than configured/static hints when both
3757        // exist, so operator-provided LAN routes keep first shot without
3758        // dropping fresh overlay candidates from the race.
3759        candidates.sort_by(|a, b| {
3760            if a.priority != b.priority {
3761                return a.priority.cmp(&b.priority);
3762            }
3763            match (a.seen_at_ms, b.seen_at_ms) {
3764                (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
3765                (Some(_), None) => std::cmp::Ordering::Less,
3766                (None, Some(_)) => std::cmp::Ordering::Greater,
3767                (None, None) => std::cmp::Ordering::Equal,
3768            }
3769        });
3770    }
3771
3772    fn active_peer_matches_any_candidate(
3773        &self,
3774        peer_node_addr: &NodeAddr,
3775        candidates: &[PeerAddress],
3776    ) -> bool {
3777        candidates
3778            .iter()
3779            .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
3780    }
3781
3782    pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
3783        &self,
3784        peer_node_addr: &NodeAddr,
3785        candidates: &[PeerAddress],
3786    ) -> bool {
3787        if !self
3788            .peers
3789            .get(peer_node_addr)
3790            .is_some_and(|peer| peer.can_send())
3791        {
3792            return false;
3793        }
3794        if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
3795            return false;
3796        }
3797        !self.active_peer_needs_same_path_refresh(peer_node_addr)
3798    }
3799
3800    pub(in crate::node) fn active_peer_should_keep_direct_retry(
3801        &self,
3802        peer_node_addr: &NodeAddr,
3803        peer_config: &PeerConfig,
3804    ) -> bool {
3805        let Some(peer) = self.peers.get(peer_node_addr) else {
3806            return false;
3807        };
3808
3809        let static_addresses = self.static_peer_addresses(peer_config);
3810        if !static_addresses.is_empty() {
3811            return !self
3812                .active_peer_candidate_is_fresh_enough_to_skip(peer_node_addr, &static_addresses);
3813        }
3814
3815        if peer_config.npub.is_empty() {
3816            return false;
3817        }
3818
3819        if !self.config.node.discovery.nostr.enabled
3820            || self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Disabled
3821        {
3822            return false;
3823        }
3824
3825        let Some(transport_id) = peer.transport_id() else {
3826            return true;
3827        };
3828
3829        if self.bootstrap_transports.contains(&transport_id) {
3830            return self.active_peer_needs_same_path_refresh(peer_node_addr);
3831        }
3832
3833        let Some(transport) = self.transports.get(&transport_id) else {
3834            return true;
3835        };
3836
3837        if transport.transport_type().name != "udp" {
3838            return true;
3839        }
3840
3841        self.active_peer_needs_same_path_refresh(peer_node_addr)
3842    }
3843
3844    pub(in crate::node) fn clear_retry_unless_direct_refresh_needed(
3845        &mut self,
3846        peer_node_addr: &NodeAddr,
3847    ) {
3848        let keep_retry = self
3849            .retry_pending
3850            .get(peer_node_addr)
3851            .map(|state| state.peer_config.clone())
3852            .is_some_and(|peer_config| {
3853                self.active_peer_should_keep_direct_retry(peer_node_addr, &peer_config)
3854            });
3855
3856        if !keep_retry {
3857            self.retry_pending.remove(peer_node_addr);
3858        }
3859    }
3860
3861    fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
3862        let Some(peer) = self.peers.get(peer_node_addr) else {
3863            return false;
3864        };
3865        let stale_after_ms = self
3866            .config
3867            .node
3868            .heartbeat_interval_secs
3869            .saturating_mul(1000)
3870            .max(1000);
3871        peer.idle_time(Self::now_ms()) > stale_after_ms
3872    }
3873
3874    pub(in crate::node) fn active_peer_current_udp_candidate(
3875        &self,
3876        peer_node_addr: &NodeAddr,
3877    ) -> Option<PeerAddress> {
3878        let peer = self.peers.get(peer_node_addr)?;
3879        let current_addr = peer.current_addr()?;
3880        if let Some(transport_id) = peer.transport_id() {
3881            if let Some(transport) = self.transports.get(&transport_id) {
3882                if transport.transport_type().name != "udp" {
3883                    return None;
3884                }
3885            } else if !self
3886                .transports
3887                .values()
3888                .any(|transport| transport.transport_type().name == "udp")
3889            {
3890                return None;
3891            }
3892        } else if !self
3893            .transports
3894            .values()
3895            .any(|transport| transport.transport_type().name == "udp")
3896        {
3897            return None;
3898        }
3899        let socket_addr = current_addr.as_str()?.parse::<SocketAddr>().ok()?;
3900
3901        Some(
3902            PeerAddress::with_priority("udp", socket_addr.to_string(), 240)
3903                .with_seen_at_ms(Self::now_ms()),
3904        )
3905    }
3906
3907    pub(in crate::node) fn active_peer_matches_candidate(
3908        &self,
3909        peer_node_addr: &NodeAddr,
3910        candidate: &PeerAddress,
3911    ) -> bool {
3912        let Some(peer) = self.peers.get(peer_node_addr) else {
3913            return false;
3914        };
3915        let Some(current_addr) = peer.current_addr() else {
3916            return false;
3917        };
3918        if let Some(peer_transport_id) = peer.transport_id()
3919            && let Some((candidate_transport_id, candidate_addr)) =
3920                self.resolve_peer_address_for_match(candidate)
3921        {
3922            return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
3923        }
3924        if peer
3925            .transport_id()
3926            .map(|id| self.bootstrap_transports.contains(&id))
3927            .unwrap_or(false)
3928        {
3929            return false;
3930        }
3931        let current_addr = current_addr.to_string();
3932        let current_transport = peer
3933            .transport_id()
3934            .and_then(|id| self.transports.get(&id))
3935            .map(|transport| transport.transport_type().name);
3936
3937        candidate.addr == current_addr
3938            && current_transport
3939                .map(|transport| transport == candidate.transport)
3940                .unwrap_or(true)
3941    }
3942
3943    fn configured_path_priority(
3944        &self,
3945        peer_node_addr: &NodeAddr,
3946        transport_id: TransportId,
3947        remote_addr: &TransportAddr,
3948    ) -> Option<u8> {
3949        self.configured_peer(peer_node_addr)?
3950            .addresses
3951            .iter()
3952            .filter_map(|candidate| {
3953                let (candidate_transport_id, candidate_addr) =
3954                    self.resolve_peer_address_for_match(candidate)?;
3955                (candidate_transport_id == transport_id && &candidate_addr == remote_addr)
3956                    .then_some(candidate.priority)
3957            })
3958            .min()
3959    }
3960
3961    pub(in crate::node) fn alternate_path_priority_allows_replace(
3962        &self,
3963        peer_node_addr: &NodeAddr,
3964        candidate_transport_id: TransportId,
3965        candidate_addr: &TransportAddr,
3966    ) -> bool {
3967        const UNKNOWN_PATH_PRIORITY: u16 = u8::MAX as u16 + 1;
3968
3969        let Some(peer) = self.peers.get(peer_node_addr) else {
3970            return true;
3971        };
3972        let Some(current_transport_id) = peer.transport_id() else {
3973            return true;
3974        };
3975        let Some(current_addr) = peer.current_addr() else {
3976            return true;
3977        };
3978
3979        let current_priority = self
3980            .configured_path_priority(peer_node_addr, current_transport_id, current_addr)
3981            .map(u16::from)
3982            .unwrap_or(UNKNOWN_PATH_PRIORITY);
3983        let candidate_priority = self
3984            .configured_path_priority(peer_node_addr, candidate_transport_id, candidate_addr)
3985            .map(u16::from)
3986            .unwrap_or(UNKNOWN_PATH_PRIORITY);
3987
3988        if candidate_priority <= current_priority {
3989            return true;
3990        }
3991
3992        debug!(
3993            peer = %self.peer_display_name(peer_node_addr),
3994            current_transport_id = %current_transport_id,
3995            current_addr = %current_addr,
3996            current_priority,
3997            candidate_transport_id = %candidate_transport_id,
3998            candidate_addr = %candidate_addr,
3999            candidate_priority,
4000            "Suppressing lower-priority alternate path while current path remains healthy"
4001        );
4002        false
4003    }
4004
4005    pub(in crate::node) fn authenticated_packet_path_allows_bookkeeping(
4006        &mut self,
4007        peer_node_addr: &NodeAddr,
4008        candidate_transport_id: TransportId,
4009        candidate_addr: &TransportAddr,
4010        now_ms: u64,
4011    ) -> bool {
4012        let Some(peer) = self.peers.get(peer_node_addr) else {
4013            return true;
4014        };
4015
4016        if peer.transport_id() == Some(candidate_transport_id)
4017            && peer.current_addr() == Some(candidate_addr)
4018        {
4019            return true;
4020        }
4021
4022        if !peer.can_send() || self.session_direct_path_is_degraded(peer_node_addr, now_ms) {
4023            return true;
4024        }
4025
4026        self.alternate_path_priority_allows_replace(
4027            peer_node_addr,
4028            candidate_transport_id,
4029            candidate_addr,
4030        )
4031    }
4032
4033    pub(in crate::node) fn active_peer_uses_recent_endpoint_path(
4034        &self,
4035        peer_node_addr: &NodeAddr,
4036        peer_config: &PeerConfig,
4037    ) -> bool {
4038        peer_config.addresses.iter().any(|addr| {
4039            addr.seen_at_ms.is_some() && self.active_peer_matches_candidate(peer_node_addr, addr)
4040        })
4041    }
4042
4043    pub(in crate::node) fn active_peer_uses_traversal_path(
4044        &self,
4045        peer_node_addr: &NodeAddr,
4046        peer_config: &PeerConfig,
4047    ) -> bool {
4048        let via_bootstrap_transport = self
4049            .peers
4050            .get(peer_node_addr)
4051            .and_then(|peer| peer.transport_id())
4052            .map(|id| self.bootstrap_transports.contains(&id))
4053            .unwrap_or(false);
4054
4055        via_bootstrap_transport
4056            || self.active_peer_uses_recent_endpoint_path(peer_node_addr, peer_config)
4057    }
4058
4059    // === Control API methods ===
4060
4061    /// Connect to a peer via the control API.
4062    ///
4063    /// Creates an ephemeral peer connection (not persisted to config, no
4064    /// auto-reconnect). Reuses the same connection path as auto-connect
4065    /// peers. Returns JSON data on success or an error message.
4066    pub(crate) async fn api_connect(
4067        &mut self,
4068        npub: &str,
4069        address: &str,
4070        transport: &str,
4071    ) -> Result<serde_json::Value, String> {
4072        let peer_config = PeerConfig {
4073            npub: npub.to_string(),
4074            alias: None,
4075            addresses: vec![PeerAddress::new(transport, address)],
4076            connect_policy: ConnectPolicy::Manual,
4077            auto_reconnect: false,
4078            discovery_fallback_transit: true,
4079        };
4080
4081        // Pre-seed identity cache (same as initiate_peer_connections does)
4082        if let Ok(identity) = PeerIdentity::from_npub(npub) {
4083            self.peer_aliases
4084                .insert(*identity.node_addr(), identity.short_npub());
4085            self.register_identity(*identity.node_addr(), identity.pubkey_full());
4086        }
4087
4088        self.initiate_peer_connection(&peer_config)
4089            .await
4090            .map(|()| {
4091                info!(
4092                    npub = %npub,
4093                    address = %address,
4094                    transport = %transport,
4095                    "API connect initiated"
4096                );
4097                serde_json::json!({
4098                    "npub": npub,
4099                    "address": address,
4100                    "transport": transport,
4101                })
4102            })
4103            .map_err(|e| e.to_string())
4104    }
4105
4106    /// Disconnect a peer via the control API.
4107    ///
4108    /// Removes the peer and suppresses auto-reconnect.
4109    pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
4110        let peer_identity =
4111            PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
4112        let node_addr = *peer_identity.node_addr();
4113
4114        if !self.peers.contains_key(&node_addr) {
4115            return Err(format!("peer not found: {npub}"));
4116        }
4117
4118        // Remove the peer (full cleanup: sessions, indices, links, tree, bloom)
4119        self.remove_active_peer(&node_addr);
4120
4121        // Suppress any pending auto-reconnect
4122        self.retry_pending.remove(&node_addr);
4123
4124        info!(npub = %npub, "API disconnect completed");
4125
4126        Ok(serde_json::json!({
4127            "npub": npub,
4128            "disconnected": true,
4129        }))
4130    }
4131
4132    /// Adopt an already-established UDP traversal and start the normal FIPS
4133    /// Noise handshake over it.
4134    ///
4135    /// This is intended for integration with an external rendezvous runtime
4136    /// that has already completed relay signaling, STUN observation, and UDP
4137    /// hole punching. After handoff, the adopted socket is owned by FIPS.
4138    pub async fn adopt_established_traversal(
4139        &mut self,
4140        traversal: EstablishedTraversal,
4141    ) -> Result<BootstrapHandoffResult, NodeError> {
4142        debug!(
4143            peer_npub = %traversal.peer_npub,
4144            session_id = %traversal.session_id,
4145            remote_addr = %traversal.remote_addr,
4146            "adopting established traversal socket"
4147        );
4148
4149        if !self.state.is_operational() {
4150            return Err(NodeError::NotStarted);
4151        }
4152
4153        let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
4154        let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
4155            NodeError::InvalidPeerNpub {
4156                npub: traversal.peer_npub.clone(),
4157                reason: e.to_string(),
4158            }
4159        })?;
4160        let peer_node_addr = *peer_identity.node_addr();
4161        if self.peers.contains_key(&peer_node_addr) {
4162            debug!(
4163                peer_npub = %traversal.peer_npub,
4164                "Adopting NAT traversal handoff as alternate path for already-connected peer"
4165            );
4166        }
4167
4168        self.peer_aliases
4169            .insert(peer_node_addr, peer_identity.short_npub());
4170        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
4171
4172        let transport_id = self.allocate_transport_id();
4173        // Adopted ephemeral UDP transports inherit MTU + socket-buffer sizing
4174        // (and accept_connections / advertise flags) from the operator's
4175        // configured [transports.udp] when the bootstrap runtime doesn't
4176        // pass an explicit override. Lookup tries `transport_name` first
4177        // (covers the `Named` multi-listener variant) and falls back to the
4178        // unnamed `Single` listener, so single- and named-listener configs
4179        // both inherit cleanly.
4180        //
4181        // Tradeoff: `UdpConfig::default()` sets MTU 1280 (IPv6 minimum), the
4182        // only value guaranteed to survive arbitrary middlebox paths.
4183        // Inheriting a higher operator-chosen MTU means NAT-traversed flows
4184        // initially attempt that MTU and may black-hole on tighter paths
4185        // until reactive `MtuExceeded` recovery kicks in. Operators who
4186        // raise the primary MTU based on known-clean topology accept that
4187        // tradeoff; the silent drop on a too-low default was strictly
4188        // worse for the common case where the primary MTU is reachable.
4189        //
4190        // Bind / external address fields are cleared since the socket is
4191        // already bound.
4192        let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
4193            let mut cfg = self
4194                .lookup_udp_config(traversal.transport_name.as_deref())
4195                .or_else(|| self.lookup_udp_config(None))
4196                .cloned()
4197                .unwrap_or_default();
4198            cfg.bind_addr = None;
4199            cfg.external_addr = None;
4200            cfg
4201        });
4202        let mut transport = crate::transport::udp::UdpTransport::new(
4203            transport_id,
4204            traversal.transport_name.clone(),
4205            inherited_config,
4206            packet_tx,
4207        );
4208
4209        transport
4210            .adopt_socket_async(traversal.socket)
4211            .await
4212            .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
4213
4214        let local_addr = transport.local_addr().ok_or_else(|| {
4215            NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
4216        })?;
4217
4218        self.transports.insert(
4219            transport_id,
4220            crate::transport::TransportHandle::Udp(transport),
4221        );
4222        self.bootstrap_transports.insert(transport_id);
4223        self.bootstrap_transport_npubs
4224            .insert(transport_id, traversal.peer_npub.clone());
4225
4226        let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
4227        if let Err(err) = self
4228            .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
4229            .await
4230        {
4231            self.bootstrap_transports.remove(&transport_id);
4232            self.bootstrap_transport_npubs.remove(&transport_id);
4233            if let Some(mut handle) = self.transports.remove(&transport_id) {
4234                let _ = handle.stop().await;
4235            }
4236            return Err(err);
4237        }
4238
4239        info!(
4240            peer = %self.peer_display_name(&peer_node_addr),
4241            transport_id = %transport_id,
4242            local_addr = %local_addr,
4243            remote_addr = %traversal.remote_addr,
4244            session_id = %traversal.session_id,
4245            "adopted NAT traversal socket; handshake initiated"
4246        );
4247
4248        Ok(BootstrapHandoffResult {
4249            transport_id,
4250            local_addr,
4251            remote_addr: traversal.remote_addr,
4252            peer_node_addr,
4253            session_id: traversal.session_id,
4254        })
4255    }
4256}