Skip to main content

fips_core/node/
lifecycle.rs

1//! Node lifecycle management: start, stop, and peer connection initiation.
2
3use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, NostrDiscoveryPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6    ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, NostrDiscovery, OverlayAdvert,
7    OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use std::collections::{HashMap, HashSet};
18use std::net::{IpAddr, SocketAddr};
19use std::thread;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23#[cfg(debug_assertions)]
24fn node_start_debug_log(message: impl AsRef<str>) {
25    use std::io::Write as _;
26
27    if let Ok(mut file) = std::fs::OpenOptions::new()
28        .create(true)
29        .append(true)
30        .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
31    {
32        let _ = writeln!(
33            file,
34            "{:?} {}",
35            std::time::SystemTime::now(),
36            message.as_ref()
37        );
38    }
39}
40
41#[cfg(not(debug_assertions))]
42fn node_start_debug_log(_message: impl AsRef<str>) {}
43
44/// True if `ip` is not a viable canonical advert endpoint for peers off
45/// the publisher's own LAN. Covers RFC1918, loopback, link-local, IPv4
46/// CGNAT (100.64/10), unspecified, multicast/benchmark, and IPv6
47/// unique-local/loopback/unspecified. We never publish these as the
48/// peer's primary `runtime_endpoint`; an off-LAN consumer can't route
49/// to them, and latching one in onto a slow overlay-relay fallback is
50/// the original bug this guard exists to prevent.
51fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
52    match ip {
53        IpAddr::V4(v4) => {
54            v4.is_private()
55                || v4.is_loopback()
56                || v4.is_link_local()
57                || v4.is_unspecified()
58                || v4.is_multicast()
59                || v4.is_broadcast()
60                || v4.is_documentation()
61                // 100.64.0.0/10 — CGNAT, RFC 6598. Not routable on the
62                // public internet; behaves like an extra NAT layer.
63                || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
64        }
65        IpAddr::V6(v6) => {
66            v6.is_loopback()
67                || v6.is_unspecified()
68                || v6.is_unique_local()
69                || v6.is_multicast()
70                // IPv6 link-local: fe80::/10
71                || (v6.segments()[0] & 0xffc0) == 0xfe80
72        }
73    }
74}
75
76fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
77    matches!(
78        (local, remote),
79        (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
80    )
81}
82
83const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
84const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
85const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
86const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
87
88impl Node {
89    /// Initiate connections to configured static peers.
90    ///
91    /// For each peer configured with AutoConnect policy, creates a link and
92    /// peer entry, then starts the Noise handshake by sending the first message.
93    /// Replace the runtime peer list. Newly added auto-connect peers get
94    /// `initiate_peer_connection` immediately; removed peers are dropped from
95    /// the retry queue (the regular liveness timeout reaps any active session
96    /// — we don't proactively disconnect, since the same npub might be on its
97    /// way back via a fresh advert). Existing auto-connect entries with new
98    /// direct addresses are dialed immediately, and retry state is refreshed
99    /// so later attempts also use the latest hints.
100    pub(super) async fn update_peers(
101        &mut self,
102        new_peers: Vec<crate::config::PeerConfig>,
103    ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
104        use std::collections::{HashMap, HashSet};
105
106        let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
107            HashMap::with_capacity(new_peers.len());
108        let mut new_order = Vec::with_capacity(new_peers.len());
109        for peer in new_peers {
110            let identity = match PeerIdentity::from_npub(&peer.npub) {
111                Ok(id) => id,
112                Err(e) => {
113                    return Err(crate::node::NodeError::InvalidPeerNpub {
114                        npub: peer.npub.clone(),
115                        reason: e.to_string(),
116                    });
117                }
118            };
119            // Last-write-wins on duplicates so callers passing a multi-source
120            // candidate list (e.g. operator hints + recent-peers cache for
121            // the same npub) get the merge they meant.
122            let node_addr = *identity.node_addr();
123            if !new_by_addr.contains_key(&node_addr) {
124                new_order.push(node_addr);
125            }
126            new_by_addr.insert(node_addr, peer);
127        }
128
129        let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
130            .config
131            .peers()
132            .iter()
133            .filter_map(|pc| {
134                PeerIdentity::from_npub(&pc.npub)
135                    .ok()
136                    .map(|id| (*id.node_addr(), pc.clone()))
137            })
138            .collect();
139
140        let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
141        let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
142
143        let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
144        let added: Vec<_> = new_addrs.difference(&current_addrs).copied().collect();
145        let kept: Vec<_> = new_addrs.intersection(&current_addrs).copied().collect();
146
147        let mut outcome = crate::node::UpdatePeersOutcome::default();
148
149        for node_addr in &removed {
150            if self.retry_pending.remove(node_addr).is_some() {
151                debug!(
152                    peer = %self.peer_display_name(node_addr),
153                    "Dropping retry entry for peer removed from runtime peer list"
154                );
155            }
156            self.peer_aliases.remove(node_addr);
157            self.set_discovery_fallback_transit_allowed(*node_addr, false);
158            outcome.removed += 1;
159        }
160
161        let mut auto_connect_refresh_configs = Vec::new();
162        for node_addr in &kept {
163            let new_pc = &new_by_addr[node_addr];
164            let current_pc = &current_by_addr[node_addr];
165            if new_pc.addresses != current_pc.addresses
166                || new_pc.alias != current_pc.alias
167                || new_pc.connect_policy != current_pc.connect_policy
168                || new_pc.auto_reconnect != current_pc.auto_reconnect
169                || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
170            {
171                outcome.updated += 1;
172                self.set_discovery_fallback_transit_allowed(
173                    *node_addr,
174                    new_pc.discovery_fallback_transit,
175                );
176                if let Some(state) = self.retry_pending.get_mut(node_addr) {
177                    state.peer_config = new_pc.clone();
178                    state.retry_after_ms = Self::now_ms();
179                }
180                if let Some(alias) = new_pc.alias.clone() {
181                    self.peer_aliases.insert(*node_addr, alias);
182                }
183                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
184                    auto_connect_refresh_configs.push(new_pc.clone());
185                }
186            } else {
187                outcome.unchanged += 1;
188                self.set_discovery_fallback_transit_allowed(
189                    *node_addr,
190                    new_pc.discovery_fallback_transit,
191                );
192                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
193                    auto_connect_refresh_configs.push(new_pc.clone());
194                }
195            }
196        }
197
198        let added_configs: Vec<crate::config::PeerConfig> = new_order
199            .iter()
200            .filter(|addr| added.contains(addr))
201            .map(|addr| new_by_addr[addr].clone())
202            .collect();
203
204        // Replace the live config peer list before initiating connections so
205        // any helper that consults `self.config.peers()` during the dial
206        // (alias lookup, retry-state seeding) sees the new entries.
207        self.config.peers = new_order
208            .iter()
209            .filter_map(|addr| new_by_addr.get(addr).cloned())
210            .collect();
211
212        for peer_config in added_configs {
213            outcome.added += 1;
214            let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
215                continue;
216            };
217            let name = peer_config
218                .alias
219                .clone()
220                .unwrap_or_else(|| identity.short_npub());
221            self.peer_aliases.insert(*identity.node_addr(), name);
222            self.set_discovery_fallback_transit_allowed(
223                *identity.node_addr(),
224                peer_config.discovery_fallback_transit,
225            );
226            self.register_identity(*identity.node_addr(), identity.pubkey_full());
227
228            if !peer_config.is_auto_connect() {
229                continue;
230            }
231
232            match self
233                .try_auto_connect_graph_session(&peer_config, identity)
234                .await
235            {
236                Ok(true) => continue,
237                Ok(false) => {}
238                Err(err) => {
239                    debug!(
240                        npub = %peer_config.npub,
241                        error = %err,
242                        "Existing FIPS graph did not warm newly added peer"
243                    );
244                }
245            }
246
247            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
248                warn!(
249                    npub = %peer_config.npub,
250                    error = %e,
251                    "Failed to initiate connection for newly added peer"
252                );
253                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
254                    self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
255                }
256                if matches!(e, crate::node::NodeError::NoTransportForType(_))
257                    && let Some(bootstrap) = self.nostr_discovery.clone()
258                {
259                    bootstrap
260                        .request_advert_stale_check(peer_config.npub.clone())
261                        .await;
262                }
263            }
264        }
265
266        for peer_config in auto_connect_refresh_configs {
267            let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
268                continue;
269            };
270            let node_addr = *peer_identity.node_addr();
271
272            if self.peers.contains_key(&node_addr) {
273                match self
274                    .initiate_active_peer_alternative_connection(&peer_config)
275                    .await
276                {
277                    Ok(attempted) => {
278                        if attempted {
279                            debug!(
280                                peer = %self.peer_display_name(&node_addr),
281                                "Started non-disruptive alternate-path handshake for active peer"
282                            );
283                        }
284                    }
285                    Err(e) => {
286                        debug!(
287                            npub = %peer_config.npub,
288                            error = %e,
289                            "Active peer alternate-path refresh did not start"
290                        );
291                    }
292                }
293                continue;
294            }
295
296            match self
297                .try_auto_connect_graph_session(&peer_config, peer_identity)
298                .await
299            {
300                Ok(true) => continue,
301                Ok(false) => {}
302                Err(err) => {
303                    debug!(
304                        npub = %peer_config.npub,
305                        error = %err,
306                        "Existing FIPS graph did not warm refreshed peer"
307                    );
308                }
309            }
310
311            match self.initiate_peer_connection(&peer_config).await {
312                Ok(()) => {
313                    let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
314                    if let Some(state) = self.retry_pending.get_mut(&node_addr) {
315                        state.peer_config = peer_config;
316                        state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
317                    }
318                }
319                Err(e) => {
320                    debug!(
321                        npub = %peer_config.npub,
322                        error = %e,
323                        "Refreshed peer addresses did not initiate a direct connection"
324                    );
325                    self.schedule_retry(node_addr, Self::now_ms());
326                }
327            }
328        }
329
330        self.warm_auto_connect_graph_sessions().await;
331
332        Ok(outcome)
333    }
334
335    pub(super) async fn initiate_peer_connections(&mut self) {
336        // Build display name map from all configured peers (alias or short npub),
337        // and pre-seed the identity cache from each peer's npub so that TUN packets
338        // addressed to a configured peer can be dispatched (and trigger session
339        // initiation) immediately on startup — without waiting for the link-layer
340        // handshake to complete first.
341        let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
342            .config
343            .peers()
344            .iter()
345            .filter_map(|pc| {
346                PeerIdentity::from_npub(&pc.npub)
347                    .ok()
348                    .map(|id| (id, pc.alias.clone()))
349            })
350            .collect();
351
352        for (identity, alias) in peer_identities {
353            let name = alias.unwrap_or_else(|| identity.short_npub());
354            self.peer_aliases.insert(*identity.node_addr(), name);
355            // Pre-seed identity cache. The parity may be wrong (npub is x-only)
356            // but will be corrected to the real value when the peer is promoted
357            // after a successful Noise handshake.
358            self.register_identity(*identity.node_addr(), identity.pubkey_full());
359        }
360
361        // Collect peer configs to avoid borrow conflicts
362        let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
363
364        if peer_configs.is_empty() {
365            debug!("No static peers configured");
366            return;
367        }
368
369        debug!(
370            count = peer_configs.len(),
371            "Initiating static peer connections"
372        );
373
374        for peer_config in peer_configs {
375            let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
376                Ok(identity) => identity,
377                Err(_) => continue,
378            };
379            match self
380                .try_auto_connect_graph_session(&peer_config, peer_identity)
381                .await
382            {
383                Ok(true) => continue,
384                Ok(false) => {}
385                Err(err) => {
386                    debug!(
387                        npub = %peer_config.npub,
388                        error = %err,
389                        "Existing FIPS graph did not warm auto-connect peer"
390                    );
391                }
392            }
393            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
394                warn!(
395                    npub = %peer_config.npub,
396                    alias = ?peer_config.alias,
397                    error = %e,
398                    "Failed to initiate peer connection"
399                );
400                // Schedule a retry so transient address-resolution failures
401                // (e.g. cached endpoints stale, NAT rebinds, all addresses
402                // currently unreachable) recover without a daemon restart.
403                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
404                    self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
405                }
406                // No-transport failures most often mean the cached overlay
407                // advert is pointing at a dead post-NAT-rebind address. The
408                // advert cache is read-only inside fetch_advert, so retries
409                // would loop on the same dead address until expiry. Force a
410                // re-fetch so the next retry tick picks up fresh endpoints.
411                if matches!(e, crate::node::NodeError::NoTransportForType(_))
412                    && let Some(bootstrap) = self.nostr_discovery.clone()
413                {
414                    bootstrap
415                        .request_advert_stale_check(peer_config.npub.clone())
416                        .await;
417                }
418            }
419        }
420
421        self.warm_auto_connect_graph_sessions().await;
422    }
423
424    pub(in crate::node) async fn try_auto_connect_graph_session(
425        &mut self,
426        peer_config: &PeerConfig,
427        peer_identity: PeerIdentity,
428    ) -> Result<bool, NodeError> {
429        if !peer_config.is_auto_connect() {
430            return Ok(false);
431        }
432
433        let peer_node_addr = *peer_identity.node_addr();
434        if self.peers.contains_key(&peer_node_addr) {
435            return Ok(false);
436        }
437        if self
438            .sessions
439            .get(&peer_node_addr)
440            .is_some_and(|entry| entry.is_established() || entry.is_initiating())
441        {
442            return Ok(true);
443        }
444        if self.find_next_hop(&peer_node_addr).is_none() {
445            return Ok(false);
446        }
447
448        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
449        match self
450            .initiate_session(peer_node_addr, peer_identity.pubkey_full())
451            .await
452        {
453            Ok(()) => {
454                debug!(
455                    peer = %self.peer_display_name(&peer_node_addr),
456                    "Warmed auto-connect peer session over existing FIPS graph"
457                );
458                Ok(true)
459            }
460            Err(NodeError::SendFailed { node_addr, reason })
461                if node_addr == peer_node_addr && reason == "no route to destination" =>
462            {
463                self.maybe_initiate_lookup(&peer_node_addr).await;
464                Ok(false)
465            }
466            Err(err) => Err(err),
467        }
468    }
469
470    /// Initiate a connection to a single peer.
471    ///
472    /// Creates a link, starts the Noise handshake, and sends the first message.
473    pub(super) async fn initiate_peer_connection(
474        &mut self,
475        peer_config: &crate::config::PeerConfig,
476    ) -> Result<(), NodeError> {
477        self.initiate_peer_connection_inner(peer_config).await
478    }
479
480    /// Initiate a connection from the retry path. Identical to
481    /// [`initiate_peer_connection`] today — both paths fan out across every
482    /// known address (overlay-fresh first, then operator/cache hints) in a
483    /// single pass. The two entry points stay separate so callers can be
484    /// distinguished in tracing.
485    pub(super) async fn initiate_peer_retry_connection(
486        &mut self,
487        peer_config: &crate::config::PeerConfig,
488    ) -> Result<(), NodeError> {
489        self.initiate_peer_connection_inner(peer_config).await
490    }
491
492    async fn initiate_active_peer_alternative_connection(
493        &mut self,
494        peer_config: &crate::config::PeerConfig,
495    ) -> Result<bool, NodeError> {
496        let peer_identity =
497            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
498                npub: peer_config.npub.clone(),
499                reason: e.to_string(),
500            })?;
501        let peer_node_addr = *peer_identity.node_addr();
502
503        if !self.peers.contains_key(&peer_node_addr) {
504            self.initiate_peer_connection(peer_config).await?;
505            return Ok(true);
506        }
507
508        // Keep the current link live and race fresh concrete candidates.
509        // Cross-connection resolution still decides which replacement link
510        // wins if both peers try the same upgrade; the important part here is
511        // that a stale path does not depend on the remote peer receiving our
512        // hint first before either side attempts the better address.
513        self.try_active_peer_alternative_addresses(peer_config, peer_identity)
514            .await
515    }
516
517    async fn initiate_peer_connection_inner(
518        &mut self,
519        peer_config: &crate::config::PeerConfig,
520    ) -> Result<(), NodeError> {
521        // Parse the peer's npub to get their identity
522        let peer_identity =
523            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
524                npub: peer_config.npub.clone(),
525                reason: e.to_string(),
526            })?;
527
528        let peer_node_addr = *peer_identity.node_addr();
529
530        // Check if peer already exists (fully authenticated)
531        if self.peers.contains_key(&peer_node_addr) {
532            debug!(
533                npub = %peer_config.npub,
534                "Peer already exists, skipping"
535            );
536            return Ok(());
537        }
538
539        self.try_peer_addresses(peer_config, peer_identity, true)
540            .await
541    }
542
543    fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
544        self.connections.values().any(|conn| {
545            conn.expected_identity()
546                .map(|id| id.node_addr() == peer_node_addr)
547                .unwrap_or(false)
548        })
549    }
550
551    fn is_connecting_to_peer_on_path(
552        &self,
553        peer_node_addr: &NodeAddr,
554        transport_id: TransportId,
555        remote_addr: &TransportAddr,
556    ) -> bool {
557        self.connections.values().any(|conn| {
558            conn.expected_identity()
559                .map(|id| id.node_addr() == peer_node_addr)
560                .unwrap_or(false)
561                && conn.transport_id() == Some(transport_id)
562                && conn.source_addr() == Some(remote_addr)
563        }) || self.pending_connects.iter().any(|pending| {
564            pending.peer_identity.node_addr() == peer_node_addr
565                && pending.transport_id == transport_id
566                && &pending.remote_addr == remote_addr
567        })
568    }
569
570    pub(in crate::node) fn should_warm_auto_connect_session(
571        &self,
572        peer_node_addr: &NodeAddr,
573    ) -> bool {
574        if self.peers.contains_key(peer_node_addr)
575            || self
576                .sessions
577                .get(peer_node_addr)
578                .is_some_and(|entry| entry.is_established())
579        {
580            return false;
581        }
582
583        self.config.peers().iter().any(|peer| {
584            peer.is_auto_connect()
585                && PeerIdentity::from_npub(&peer.npub)
586                    .map(|identity| identity.node_addr() == peer_node_addr)
587                    .unwrap_or(false)
588        })
589    }
590
591    pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
592        if !self.peers.values().any(|peer| peer.can_send()) {
593            return 0;
594        }
595
596        let mut budget = self.graph_session_warmup_budget();
597        if budget == 0 {
598            return 0;
599        }
600
601        let peer_identities: Vec<_> = self
602            .config
603            .auto_connect_peers()
604            .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
605            .collect();
606
607        let mut warmed = 0;
608        for identity in peer_identities {
609            if budget == 0 {
610                break;
611            }
612
613            let peer_node_addr = *identity.node_addr();
614            if peer_node_addr == *self.identity.node_addr()
615                || !self.should_warm_auto_connect_session(&peer_node_addr)
616                || self
617                    .sessions
618                    .get(&peer_node_addr)
619                    .is_some_and(|entry| entry.is_initiating())
620            {
621                continue;
622            }
623
624            self.register_identity(peer_node_addr, identity.pubkey_full());
625
626            if self.find_next_hop(&peer_node_addr).is_some() {
627                match self
628                    .initiate_session(peer_node_addr, identity.pubkey_full())
629                    .await
630                {
631                    Ok(()) => {
632                        warmed += 1;
633                        budget = budget.saturating_sub(1);
634                        debug!(
635                            peer = %self.peer_display_name(&peer_node_addr),
636                            "Warmed auto-connect peer session over existing FIPS graph"
637                        );
638                    }
639                    Err(NodeError::SendFailed { node_addr, reason })
640                        if node_addr == peer_node_addr && reason == "no route to destination" =>
641                    {
642                        self.maybe_initiate_lookup(&peer_node_addr).await;
643                        warmed += 1;
644                        budget = budget.saturating_sub(1);
645                    }
646                    Err(err) => {
647                        debug!(
648                            peer = %self.peer_display_name(&peer_node_addr),
649                            error = %err,
650                            "Failed to warm auto-connect peer session"
651                        );
652                    }
653                }
654            } else {
655                self.maybe_initiate_lookup(&peer_node_addr).await;
656                warmed += 1;
657                budget = budget.saturating_sub(1);
658            }
659        }
660
661        warmed
662    }
663
664    pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
665        let max_destinations = self.config.node.session.pending_max_destinations;
666        if max_destinations == 0 {
667            return 0;
668        }
669
670        let pending_sessions = self
671            .sessions
672            .values()
673            .filter(|entry| !entry.is_established())
674            .count();
675        let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
676        max_destinations
677            .saturating_sub(pending_total)
678            .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
679    }
680
681    fn outbound_handshake_slots(&self) -> usize {
682        let used = self
683            .connections
684            .len()
685            .saturating_add(self.pending_connects.len());
686        if self.max_connections == 0 {
687            usize::MAX
688        } else {
689            self.max_connections.saturating_sub(used)
690        }
691    }
692
693    fn outbound_link_slots(&self) -> usize {
694        if self.max_links == 0 {
695            usize::MAX
696        } else {
697            self.max_links.saturating_sub(self.links.len())
698        }
699    }
700
701    fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
702        if !self.peers.contains_key(peer_node_addr)
703            && self.max_peers > 0
704            && self.peers.len() >= self.max_peers
705        {
706            return 0;
707        }
708
709        let in_flight_for_peer = self
710            .connections
711            .values()
712            .filter(|conn| {
713                conn.expected_identity()
714                    .map(|id| id.node_addr() == peer_node_addr)
715                    .unwrap_or(false)
716            })
717            .count()
718            .saturating_add(
719                self.pending_connects
720                    .iter()
721                    .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
722                    .count(),
723            );
724
725        self.outbound_handshake_slots()
726            .min(self.outbound_link_slots())
727            .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
728    }
729
730    fn discovery_connect_budget(&self) -> usize {
731        self.outbound_handshake_slots()
732            .min(self.outbound_link_slots())
733            .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
734    }
735
736    /// Find a UDP transport whose bound socket can send to `remote_addr`.
737    ///
738    /// LAN discovery can surface both IPv4 and IPv6 addresses for the same
739    /// service. A wildcard IPv4 socket cannot send to an IPv6 link-local
740    /// target, and vice versa, so callers must choose by socket family rather
741    /// than by transport type alone.
742    fn find_udp_transport_for_remote_addr(
743        &self,
744        remote_addr: SocketAddr,
745    ) -> Option<(TransportId, SocketAddr)> {
746        self.transports
747            .iter()
748            .filter(|(id, handle)| {
749                handle.transport_type().name == "udp"
750                    && handle.is_operational()
751                    && !self.bootstrap_transports.contains(id)
752            })
753            .filter_map(|(id, handle)| {
754                let local_addr = handle.local_addr()?;
755                socket_addr_families_compatible(local_addr, remote_addr)
756                    .then_some((*id, local_addr))
757            })
758            .min_by_key(|(id, _)| id.as_u32())
759    }
760
761    pub(super) fn transport_discovery_candidate(
762        &self,
763        discovered_transport_id: TransportId,
764        discovered_addr: TransportAddr,
765    ) -> Option<(TransportId, TransportAddr, &'static str)> {
766        let transport = self.transports.get(&discovered_transport_id)?;
767        let transport_name = transport.transport_type().name;
768
769        if transport_name != "udp" {
770            return Some((discovered_transport_id, discovered_addr, transport_name));
771        }
772
773        let Some(remote_socket_addr) = discovered_addr
774            .as_str()
775            .and_then(|addr| addr.parse::<SocketAddr>().ok())
776        else {
777            if self.bootstrap_transports.contains(&discovered_transport_id) {
778                debug!(
779                    transport_id = %discovered_transport_id,
780                    remote_addr = %discovered_addr,
781                    "transport discovery: skip non-numeric UDP address from bootstrap transport"
782                );
783                return None;
784            }
785            return Some((discovered_transport_id, discovered_addr, transport_name));
786        };
787
788        let Some((transport_id, local_addr)) =
789            self.find_udp_transport_for_remote_addr(remote_socket_addr)
790        else {
791            debug!(
792                transport_id = %discovered_transport_id,
793                remote_addr = %discovered_addr,
794                "transport discovery: skip UDP peer with no compatible local socket"
795            );
796            return None;
797        };
798
799        if transport_id != discovered_transport_id {
800            debug!(
801                discovered_transport_id = %discovered_transport_id,
802                selected_transport_id = %transport_id,
803                local_addr = %local_addr,
804                remote_addr = %remote_socket_addr,
805                "transport discovery: selected compatible UDP transport"
806            );
807        }
808
809        Some((
810            transport_id,
811            TransportAddr::from_socket_addr(remote_socket_addr),
812            transport_name,
813        ))
814    }
815
816    fn peer_address_string_for_transport_candidate(
817        &self,
818        transport_id: TransportId,
819        transport_name: &str,
820        remote_addr: &TransportAddr,
821    ) -> String {
822        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
823        let _ = (transport_id, transport_name);
824
825        #[cfg(any(target_os = "linux", target_os = "macos"))]
826        if transport_name == "ethernet"
827            && remote_addr.as_bytes().len() == 6
828            && let Some(interface) = self
829                .transports
830                .get(&transport_id)
831                .and_then(|transport| transport.interface_name())
832        {
833            let mut mac = [0u8; 6];
834            mac.copy_from_slice(remote_addr.as_bytes());
835            return format!(
836                "{interface}/{}",
837                crate::transport::ethernet::format_mac(&mac)
838            );
839        }
840
841        remote_addr.to_string()
842    }
843
844    fn resolve_peer_address_for_match(
845        &self,
846        candidate: &PeerAddress,
847    ) -> Option<(TransportId, TransportAddr)> {
848        if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
849            return None;
850        }
851
852        if candidate.transport == "ethernet" {
853            return self.resolve_ethernet_addr(&candidate.addr).ok();
854        }
855
856        if candidate.transport == "ble" {
857            #[cfg(bluer_available)]
858            {
859                return self.resolve_ble_addr(&candidate.addr).ok();
860            }
861            #[cfg(not(bluer_available))]
862            {
863                return None;
864            }
865        }
866
867        let transport_id = if candidate.transport == "udp"
868            && let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
869        {
870            self.find_udp_transport_for_remote_addr(remote_socket_addr)
871                .map(|(id, _)| id)?
872        } else {
873            self.find_transport_for_type(&candidate.transport)?
874        };
875
876        Some((transport_id, TransportAddr::from_string(&candidate.addr)))
877    }
878
879    /// Initiate a connection to a peer on a specific transport and address.
880    ///
881    /// For connectionless transports (UDP, Ethernet): allocates a link, starts
882    /// the Noise IK handshake, sends msg1, and registers the connection for
883    /// msg2 dispatch.
884    ///
885    /// For connection-oriented transports (TCP, Tor): allocates a link and
886    /// starts a non-blocking transport connect. The handshake is deferred
887    /// until the transport connection is established — the tick handler
888    /// polls `connection_state()` and initiates the handshake when ready.
889    pub(super) async fn initiate_connection(
890        &mut self,
891        transport_id: TransportId,
892        remote_addr: TransportAddr,
893        peer_identity: PeerIdentity,
894    ) -> Result<(), NodeError> {
895        let peer_node_addr = *peer_identity.node_addr();
896
897        if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
898            debug!(
899                peer = %self.peer_display_name(&peer_node_addr),
900                transport_id = %transport_id,
901                remote_addr = %remote_addr,
902                "Connection already in progress for candidate path"
903            );
904            return Ok(());
905        }
906
907        if self.outbound_handshake_slots() == 0 {
908            return Err(NodeError::MaxConnectionsExceeded {
909                max: self.max_connections,
910            });
911        }
912
913        if self.outbound_link_slots() == 0 {
914            return Err(NodeError::MaxLinksExceeded {
915                max: self.max_links,
916            });
917        }
918
919        if !self.peers.contains_key(&peer_node_addr)
920            && self.max_peers > 0
921            && self.peers.len() >= self.max_peers
922        {
923            return Err(NodeError::MaxPeersExceeded {
924                max: self.max_peers,
925            });
926        }
927
928        self.authorize_peer(
929            &peer_identity,
930            PeerAclContext::OutboundConnect,
931            transport_id,
932            &remote_addr,
933        )?;
934
935        let is_connection_oriented = self
936            .transports
937            .get(&transport_id)
938            .map(|t| t.transport_type().connection_oriented)
939            .unwrap_or(false);
940
941        // Allocate link ID and create link
942        let link_id = self.allocate_link_id();
943
944        let link = if is_connection_oriented {
945            Link::new(
946                link_id,
947                transport_id,
948                remote_addr.clone(),
949                LinkDirection::Outbound,
950                Duration::from_millis(self.config.node.base_rtt_ms),
951            )
952        } else {
953            Link::connectionless(
954                link_id,
955                transport_id,
956                remote_addr.clone(),
957                LinkDirection::Outbound,
958                Duration::from_millis(self.config.node.base_rtt_ms),
959            )
960        };
961
962        self.links.insert(link_id, link);
963
964        // Add reverse lookup for packet dispatch
965        self.addr_to_link
966            .insert((transport_id, remote_addr.clone()), link_id);
967
968        if is_connection_oriented {
969            // Connection-oriented: start non-blocking connect, defer handshake
970            if let Some(transport) = self.transports.get(&transport_id) {
971                match transport.connect(&remote_addr).await {
972                    Ok(()) => {
973                        debug!(
974                            peer = %self.peer_display_name(&peer_node_addr),
975                            transport_id = %transport_id,
976                            remote_addr = %remote_addr,
977                            link_id = %link_id,
978                            "Transport connect initiated (non-blocking)"
979                        );
980                        self.pending_connects.push(super::PendingConnect {
981                            link_id,
982                            transport_id,
983                            remote_addr,
984                            peer_identity,
985                        });
986                    }
987                    Err(e) => {
988                        // Clean up link
989                        self.links.remove(&link_id);
990                        self.addr_to_link.remove(&(transport_id, remote_addr));
991                        return Err(NodeError::TransportError(e.to_string()));
992                    }
993                }
994            }
995            Ok(())
996        } else {
997            // Connectionless: proceed with immediate handshake
998            self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
999                .await
1000        }
1001    }
1002
1003    /// Start the Noise handshake on a link and send msg1.
1004    ///
1005    /// Called immediately for connectionless transports, or after the
1006    /// transport connection is established for connection-oriented transports.
1007    pub(super) async fn start_handshake(
1008        &mut self,
1009        link_id: LinkId,
1010        transport_id: TransportId,
1011        remote_addr: TransportAddr,
1012        peer_identity: PeerIdentity,
1013    ) -> Result<(), NodeError> {
1014        let peer_node_addr = *peer_identity.node_addr();
1015
1016        // Create connection in handshake phase (outbound knows expected identity)
1017        let current_time_ms = Self::now_ms();
1018        let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
1019
1020        // Allocate a session index for this handshake
1021        let our_index = match self.index_allocator.allocate() {
1022            Ok(idx) => idx,
1023            Err(e) => {
1024                // Clean up the link we just created
1025                self.links.remove(&link_id);
1026                self.addr_to_link.remove(&(transport_id, remote_addr));
1027                return Err(NodeError::IndexAllocationFailed(e.to_string()));
1028            }
1029        };
1030
1031        // Start the Noise handshake and get message 1
1032        let our_keypair = self.identity.keypair();
1033        let noise_msg1 =
1034            match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
1035                Ok(msg) => msg,
1036                Err(e) => {
1037                    // Clean up the index and link
1038                    let _ = self.index_allocator.free(our_index);
1039                    self.links.remove(&link_id);
1040                    self.addr_to_link.remove(&(transport_id, remote_addr));
1041                    return Err(NodeError::HandshakeFailed(e.to_string()));
1042                }
1043            };
1044
1045        // Set index and transport info on the connection
1046        connection.set_our_index(our_index);
1047        connection.set_transport_id(transport_id);
1048        connection.set_source_addr(remote_addr.clone());
1049
1050        // Build wire format msg1: [0x01][sender_idx:4 LE][noise_msg1:82]
1051        let wire_msg1 = build_msg1(our_index, &noise_msg1);
1052
1053        debug!(
1054            peer = %self.peer_display_name(&peer_node_addr),
1055            transport_id = %transport_id,
1056            remote_addr = %remote_addr,
1057            link_id = %link_id,
1058            our_index = %our_index,
1059            "Connection initiated"
1060        );
1061
1062        // Store msg1 for resend and schedule first resend
1063        let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
1064        connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
1065
1066        // Track in pending_outbound for msg2 dispatch
1067        self.pending_outbound
1068            .insert((transport_id, our_index.as_u32()), link_id);
1069        self.connections.insert(link_id, connection);
1070
1071        // Send the wire format handshake message. If the very first send fails
1072        // synchronously (for example an IPv6 candidate on an IPv4-only UDP
1073        // socket), undo this candidate so the caller can try the next address
1074        // in the same dial pass.
1075        let send_result = match self.transports.get(&transport_id) {
1076            Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
1077            None => None,
1078        };
1079        match send_result {
1080            Some(send_result) => {
1081                self.note_local_send_outcome(&send_result);
1082                match send_result {
1083                    Ok(bytes) => {
1084                        debug!(
1085                            link_id = %link_id,
1086                            our_index = %our_index,
1087                            bytes,
1088                            "Sent Noise handshake message 1 (wire format)"
1089                        );
1090                    }
1091                    Err(e) => {
1092                        warn!(
1093                            link_id = %link_id,
1094                            transport_id = %transport_id,
1095                            remote_addr = %remote_addr,
1096                            our_index = %our_index,
1097                            error = %e,
1098                            "Failed to send handshake message"
1099                        );
1100                        self.pending_outbound
1101                            .remove(&(transport_id, our_index.as_u32()));
1102                        self.connections.remove(&link_id);
1103                        self.links.remove(&link_id);
1104                        self.addr_to_link
1105                            .remove(&(transport_id, remote_addr.clone()));
1106                        let _ = self.index_allocator.free(our_index);
1107                        return Err(NodeError::TransportError(e.to_string()));
1108                    }
1109                }
1110            }
1111            None => {
1112                self.pending_outbound
1113                    .remove(&(transport_id, our_index.as_u32()));
1114                self.connections.remove(&link_id);
1115                self.links.remove(&link_id);
1116                self.addr_to_link
1117                    .remove(&(transport_id, remote_addr.clone()));
1118                let _ = self.index_allocator.free(our_index);
1119                return Err(NodeError::TransportError(format!(
1120                    "transport {transport_id} disappeared before first handshake send"
1121                )));
1122            }
1123        }
1124
1125        Ok(())
1126    }
1127
1128    /// Poll all transports for discovered peers and auto-connect.
1129    ///
1130    /// Called from the tick handler. Iterates operational transports,
1131    /// drains their discovery buffers, and initiates connections to
1132    /// newly discovered peers (if auto_connect is enabled).
1133    pub(super) async fn poll_transport_discovery(&mut self) {
1134        // Collect discoveries first to avoid borrow conflict with self
1135        let mut to_connect = Vec::new();
1136        let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
1137        let mut connect_budget = self.discovery_connect_budget();
1138        let mut skipped_budget = 0usize;
1139
1140        for transport in self.transports.values() {
1141            if !transport.is_operational() {
1142                continue;
1143            }
1144            if !transport.auto_connect() {
1145                // Still drain the buffer so it doesn't grow unbounded
1146                let _ = transport.discover();
1147                continue;
1148            }
1149            let discovered = match transport.discover() {
1150                Ok(peers) => peers,
1151                Err(_) => continue,
1152            };
1153            for peer in discovered {
1154                let discovered_transport_id = peer.transport_id;
1155                let pubkey = match peer.pubkey_hint {
1156                    Some(pk) => pk,
1157                    None => continue,
1158                };
1159                let identity = PeerIdentity::from_pubkey(pubkey);
1160                let node_addr = *identity.node_addr();
1161
1162                // Skip self
1163                if node_addr == *self.identity.node_addr() {
1164                    continue;
1165                }
1166
1167                let Some((candidate_transport_id, remote_addr, transport_name)) =
1168                    self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1169                else {
1170                    continue;
1171                };
1172
1173                if self.peers.contains_key(&node_addr) {
1174                    let candidate = PeerAddress::new(
1175                        transport_name,
1176                        self.peer_address_string_for_transport_candidate(
1177                            candidate_transport_id,
1178                            transport_name,
1179                            &remote_addr,
1180                        ),
1181                    );
1182                    if self.active_peer_candidate_is_fresh_enough_to_skip(
1183                        &node_addr,
1184                        std::slice::from_ref(&candidate),
1185                    ) {
1186                        continue;
1187                    }
1188                    if self.is_connecting_to_peer_on_path(
1189                        &node_addr,
1190                        candidate_transport_id,
1191                        &remote_addr,
1192                    ) {
1193                        continue;
1194                    }
1195                    let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1196                    if connect_budget == 0
1197                        || self
1198                            .path_candidate_attempt_budget(&node_addr)
1199                            .saturating_sub(queued_for_peer)
1200                            == 0
1201                    {
1202                        skipped_budget = skipped_budget.saturating_add(1);
1203                        continue;
1204                    }
1205                    to_connect.push((candidate_transport_id, remote_addr, identity, true));
1206                    *queued_per_peer.entry(node_addr).or_default() += 1;
1207                    connect_budget = connect_budget.saturating_sub(1);
1208                    continue;
1209                }
1210
1211                if self.is_connecting_to_peer_on_path(
1212                    &node_addr,
1213                    candidate_transport_id,
1214                    &remote_addr,
1215                ) {
1216                    continue;
1217                }
1218
1219                let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1220                if connect_budget == 0
1221                    || self
1222                        .path_candidate_attempt_budget(&node_addr)
1223                        .saturating_sub(queued_for_peer)
1224                        == 0
1225                {
1226                    skipped_budget = skipped_budget.saturating_add(1);
1227                    continue;
1228                }
1229                to_connect.push((candidate_transport_id, remote_addr, identity, false));
1230                *queued_per_peer.entry(node_addr).or_default() += 1;
1231                connect_budget = connect_budget.saturating_sub(1);
1232            }
1233        }
1234
1235        if skipped_budget > 0 {
1236            debug!(
1237                skipped = skipped_budget,
1238                queued = to_connect.len(),
1239                "Transport discovery connect budget exhausted"
1240            );
1241        }
1242
1243        for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1244            info!(
1245                peer = %self.peer_display_name(identity.node_addr()),
1246                transport_id = %transport_id,
1247                remote_addr = %remote_addr,
1248                active_refresh,
1249                "Auto-connecting to discovered peer"
1250            );
1251            if let Err(e) = self
1252                .initiate_connection(transport_id, remote_addr, identity)
1253                .await
1254            {
1255                warn!(error = %e, "Failed to auto-connect to discovered peer");
1256            }
1257        }
1258    }
1259
1260    pub(super) async fn poll_nostr_discovery(&mut self) {
1261        let Some(bootstrap) = self.nostr_discovery.clone() else {
1262            return;
1263        };
1264
1265        bootstrap.set_outbound_admission(self.outbound_admission_check());
1266
1267        if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1268            debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1269        }
1270
1271        for event in bootstrap.drain_events().await {
1272            match event {
1273                BootstrapEvent::Established { traversal } => {
1274                    if !self.outbound_admission_check() {
1275                        debug!(
1276                            peer_npub = %traversal.peer_npub,
1277                            peers = self.peers.len(),
1278                            max_peers = self.max_peers,
1279                            "Dropping established NAT traversal: at capacity"
1280                        );
1281                        continue;
1282                    }
1283                    let peer_npub = traversal.peer_npub.clone();
1284                    match self.adopt_established_traversal(traversal).await {
1285                        Ok(_) => {
1286                            info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1287                        }
1288                        Err(err) => {
1289                            warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1290                            if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1291                                self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1292                            }
1293                        }
1294                    }
1295                }
1296                BootstrapEvent::Failed {
1297                    peer_config,
1298                    reason,
1299                } => {
1300                    let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1301                        Ok(identity) => identity,
1302                        Err(_) => continue,
1303                    };
1304                    let node_addr = *peer_identity.node_addr();
1305                    if self.peers.contains_key(&node_addr) {
1306                        debug!(
1307                            npub = %peer_config.npub,
1308                            error = %reason,
1309                            "Ignoring failed NAT traversal for already-connected peer"
1310                        );
1311                        continue;
1312                    }
1313                    if self.is_connecting_to_peer(&node_addr) {
1314                        debug!(
1315                            npub = %peer_config.npub,
1316                            error = %reason,
1317                            "Ignoring failed NAT traversal while peer handshake is already in progress"
1318                        );
1319                        continue;
1320                    }
1321
1322                    let now_ms = Self::now_ms();
1323                    let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1324                    if decision.should_warn {
1325                        warn!(
1326                            npub = %peer_config.npub,
1327                            error = %reason,
1328                            consecutive_failures = decision.consecutive_failures,
1329                            cooldown_secs = decision
1330                                .cooldown_until_ms
1331                                .map(|t| t.saturating_sub(now_ms) / 1000),
1332                            "NAT traversal failed"
1333                        );
1334                    } else {
1335                        debug!(
1336                            npub = %peer_config.npub,
1337                            error = %reason,
1338                            consecutive_failures = decision.consecutive_failures,
1339                            "NAT traversal failed (suppressed by warn-rate-limit)"
1340                        );
1341                    }
1342
1343                    // B6: stale-advert eviction on the streak-threshold
1344                    // crossing. Fire-and-forget; the outcome is logged so
1345                    // operators can see when peers get cleaned up.
1346                    if decision.crossed_threshold {
1347                        bootstrap
1348                            .request_advert_stale_check(peer_config.npub.clone())
1349                            .await;
1350                    }
1351
1352                    if self
1353                        .try_peer_addresses(&peer_config, peer_identity, false)
1354                        .await
1355                        .is_ok()
1356                    {
1357                        continue;
1358                    }
1359
1360                    self.schedule_retry(node_addr, now_ms);
1361                    if let Some(cooldown_until_ms) = decision.cooldown_until_ms
1362                        && let Some(state) = self.retry_pending.get_mut(&node_addr)
1363                    {
1364                        // Push the next retry past the cooldown so the
1365                        // open-discovery sweep doesn't re-enqueue and the
1366                        // per-attempt backoff doesn't fire sooner.
1367                        state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1368                    }
1369                }
1370            }
1371        }
1372
1373        self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1374            .await;
1375        self.queue_open_discovery_retries(&bootstrap).await;
1376    }
1377
1378    /// Resolve the LAN-only discovery scope. Applications with explicit
1379    /// connectivity config can set `node.discovery.lan.scope` without
1380    /// changing the public Nostr discovery `app` tag. The older fallback
1381    /// extracts a scope from the Nostr app tag used by default scoped
1382    /// discovery.
1383    pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1384        if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1385            let scope = scope.trim();
1386            if !scope.is_empty() {
1387                return Some(scope.to_string());
1388            }
1389        }
1390
1391        let app = self.config.node.discovery.nostr.app.trim();
1392        if app.is_empty() {
1393            return None;
1394        }
1395        if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1396            let scope = rest.trim();
1397            if scope.is_empty() {
1398                None
1399            } else {
1400                Some(scope.to_string())
1401            }
1402        } else {
1403            Some(app.to_string())
1404        }
1405    }
1406
1407    pub(super) fn start_local_instance_discovery(&mut self) {
1408        if !self.config.node.discovery.local.enabled {
1409            return;
1410        }
1411        let Some(scope) = self.lan_discovery_scope() else {
1412            debug!("local instance discovery not started: no discovery scope");
1413            return;
1414        };
1415        let now_ms = Self::now_ms();
1416        match crate::discovery::local::LocalInstanceRegistry::new(
1417            self.identity.npub(),
1418            scope,
1419            &self.config.node.discovery.local,
1420            now_ms,
1421        ) {
1422            Ok(registry) => {
1423                self.local_instance_registry = Some(registry);
1424                self.local_instance_started_at_ms = Some(now_ms);
1425                self.last_local_instance_publish_ms = None;
1426                self.last_local_instance_scan_ms = None;
1427                self.publish_local_instance_record(now_ms);
1428                info!("Same-host FIPS instance discovery enabled");
1429            }
1430            Err(crate::discovery::local::LocalInstanceRegistryError::Disabled) => {
1431                debug!("same-host FIPS instance discovery disabled");
1432            }
1433            Err(err) => {
1434                debug!(error = %err, "same-host FIPS instance discovery not started");
1435            }
1436        }
1437    }
1438
1439    fn local_instance_contacts(&self) -> Vec<crate::discovery::local::LocalInstanceContact> {
1440        let mut contacts = Vec::new();
1441        for handle in self.transports.values() {
1442            if !handle.is_operational() || !handle.accept_connections() {
1443                continue;
1444            }
1445            let transport = handle.transport_type().name;
1446            if transport != "udp" && transport != "tcp" {
1447                continue;
1448            }
1449            let Some(local_addr) = handle.local_addr() else {
1450                continue;
1451            };
1452            let Some(contact) =
1453                crate::discovery::local::contact_for_transport_addr(transport, local_addr)
1454            else {
1455                continue;
1456            };
1457            if contacts
1458                .iter()
1459                .any(|existing: &crate::discovery::local::LocalInstanceContact| {
1460                    existing.transport == contact.transport && existing.addr == contact.addr
1461                })
1462            {
1463                continue;
1464            }
1465            contacts.push(contact);
1466        }
1467        contacts
1468    }
1469
1470    fn publish_local_instance_record(&mut self, now_ms: u64) {
1471        let Some(registry) = self.local_instance_registry.clone() else {
1472            return;
1473        };
1474        let contacts = self.local_instance_contacts();
1475        match registry.publish(contacts, now_ms) {
1476            Ok(()) => {
1477                self.last_local_instance_publish_ms = Some(now_ms);
1478            }
1479            Err(err) => {
1480                debug!(error = %err, "failed to publish same-host FIPS instance record");
1481            }
1482        }
1483    }
1484
1485    fn maybe_publish_local_instance_record(&mut self, now_ms: u64) {
1486        if self.local_instance_registry.is_none() {
1487            return;
1488        }
1489        let interval_ms = self.config.node.discovery.local.publish_interval_ms();
1490        let due = self
1491            .last_local_instance_publish_ms
1492            .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1493            .unwrap_or(true);
1494        if due {
1495            self.publish_local_instance_record(now_ms);
1496        }
1497    }
1498
1499    fn local_instance_scan_due(&self, now_ms: u64) -> bool {
1500        if self.local_instance_registry.is_none() {
1501            return false;
1502        }
1503        let cfg = &self.config.node.discovery.local;
1504        let interval_ms = if self
1505            .local_instance_started_at_ms
1506            .map(|started| now_ms.saturating_sub(started) <= cfg.startup_scan_duration_ms())
1507            .unwrap_or(false)
1508        {
1509            cfg.startup_scan_interval_ms()
1510        } else {
1511            cfg.scan_interval_ms()
1512        };
1513        self.last_local_instance_scan_ms
1514            .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1515            .unwrap_or(true)
1516    }
1517
1518    fn local_instance_peer_allowed(&self, identity: &PeerIdentity) -> bool {
1519        if self.config.peers().iter().any(|peer| {
1520            PeerIdentity::from_npub(&peer.npub)
1521                .map(|configured| configured.node_addr() == identity.node_addr())
1522                .unwrap_or(false)
1523        }) {
1524            return true;
1525        }
1526        self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Open
1527    }
1528
1529    fn local_instance_peer_addresses(
1530        &self,
1531        record: &crate::discovery::local::LocalInstanceRecord,
1532    ) -> Vec<PeerAddress> {
1533        let mut addresses = Vec::new();
1534        for contact in &record.contacts {
1535            if contact.transport != "udp" && contact.transport != "tcp" {
1536                continue;
1537            }
1538            let Ok(socket_addr) = contact.addr.parse::<SocketAddr>() else {
1539                debug!(
1540                    npub = %record.npub,
1541                    transport = %contact.transport,
1542                    addr = %contact.addr,
1543                    "local instance discovery: skip non-socket contact"
1544                );
1545                continue;
1546            };
1547            if !socket_addr.ip().is_loopback() {
1548                debug!(
1549                    npub = %record.npub,
1550                    addr = %contact.addr,
1551                    "local instance discovery: skip non-loopback contact"
1552                );
1553                continue;
1554            }
1555            let address =
1556                PeerAddress::with_priority(contact.transport.clone(), contact.addr.clone(), 10)
1557                    .with_seen_at_ms(record.updated_at_ms);
1558            if addresses.iter().any(|existing: &PeerAddress| {
1559                existing.transport == address.transport && existing.addr == address.addr
1560            }) {
1561                continue;
1562            }
1563            addresses.push(address);
1564        }
1565        addresses
1566    }
1567
1568    /// Scan the same-host registry only on startup and then at a low cadence.
1569    /// This avoids a per-second filesystem poll while preserving the fast path
1570    /// for processes launched around the same time.
1571    pub(super) async fn poll_local_instance_discovery(&mut self) {
1572        let Some(registry) = self.local_instance_registry.clone() else {
1573            return;
1574        };
1575        let now_ms = Self::now_ms();
1576        self.maybe_publish_local_instance_record(now_ms);
1577        if !self.local_instance_scan_due(now_ms) {
1578            return;
1579        }
1580        self.last_local_instance_scan_ms = Some(now_ms);
1581
1582        let records = match registry.scan(now_ms, self.config.node.discovery.local.stale_after_ms())
1583        {
1584            Ok(records) => records,
1585            Err(err) => {
1586                debug!(error = %err, "same-host FIPS instance scan failed");
1587                return;
1588            }
1589        };
1590        if records.is_empty() {
1591            return;
1592        }
1593
1594        let mut connect_budget = self.discovery_connect_budget();
1595        let mut skipped_budget = 0usize;
1596        for record in records {
1597            let identity = match PeerIdentity::from_npub(&record.npub) {
1598                Ok(identity) => identity,
1599                Err(err) => {
1600                    debug!(npub = %record.npub, error = %err, "local instance discovery: skip bad npub");
1601                    continue;
1602                }
1603            };
1604            let peer_node_addr = *identity.node_addr();
1605            if peer_node_addr == *self.identity.node_addr() {
1606                continue;
1607            }
1608            if !self.local_instance_peer_allowed(&identity) {
1609                debug!(
1610                    npub = %identity.short_npub(),
1611                    "local instance discovery: skip unconfigured peer"
1612                );
1613                continue;
1614            }
1615
1616            let addresses = self.local_instance_peer_addresses(&record);
1617            if addresses.is_empty() {
1618                continue;
1619            }
1620
1621            if self.peers.contains_key(&peer_node_addr)
1622                && self.active_peer_candidate_is_fresh_enough_to_skip(&peer_node_addr, &addresses)
1623            {
1624                continue;
1625            }
1626
1627            for address in addresses {
1628                let Some((transport_id, remote_addr)) =
1629                    self.resolve_peer_address_for_match(&address)
1630                else {
1631                    continue;
1632                };
1633                if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1634                    continue;
1635                }
1636                if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1637                    skipped_budget = skipped_budget.saturating_add(1);
1638                    continue;
1639                }
1640                info!(
1641                    npub = %identity.short_npub(),
1642                    transport = %address.transport,
1643                    addr = %address.addr,
1644                    "same-host FIPS instance discovery: initiating handshake"
1645                );
1646                if let Err(err) = self
1647                    .initiate_connection(transport_id, remote_addr, identity)
1648                    .await
1649                {
1650                    debug!(
1651                        npub = %record.npub,
1652                        error = %err,
1653                        "same-host FIPS instance discovery: failed to initiate connection"
1654                    );
1655                }
1656                connect_budget = connect_budget.saturating_sub(1);
1657            }
1658        }
1659        if skipped_budget > 0 {
1660            debug!(
1661                skipped = skipped_budget,
1662                "same-host FIPS instance discovery connect budget exhausted"
1663            );
1664        }
1665    }
1666
1667    /// Drain mDNS-discovered peers and initiate Noise XX handshakes. For
1668    /// active peers this is a non-disruptive alternate-path refresh: the
1669    /// current link stays live until a new handshake authenticates and
1670    /// promotes. The handshake itself is the authentication — a spoofed
1671    /// mDNS advert with someone else's npub fails the XX exchange and
1672    /// is dropped.
1673    pub(super) async fn poll_lan_discovery(&mut self) {
1674        let Some(runtime) = self.lan_discovery.clone() else {
1675            return;
1676        };
1677        let events = runtime.drain_events().await;
1678        if events.is_empty() {
1679            return;
1680        }
1681        let mut connect_budget = self.discovery_connect_budget();
1682        let mut skipped_budget = 0usize;
1683        for event in events {
1684            let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1685            let Some((transport_id, local_addr)) =
1686                self.find_udp_transport_for_remote_addr(peer.addr)
1687            else {
1688                debug!(
1689                    addr = %peer.addr,
1690                    "lan: skip discovered peer with no compatible UDP transport"
1691                );
1692                continue;
1693            };
1694            let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1695                Ok(id) => id,
1696                Err(err) => {
1697                    debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1698                    continue;
1699                }
1700            };
1701            let peer_node_addr = *identity.node_addr();
1702            let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1703            if self.peers.contains_key(&peer_node_addr) {
1704                let candidate = PeerAddress::new("udp", peer.addr.to_string());
1705                if self.active_peer_candidate_is_fresh_enough_to_skip(
1706                    &peer_node_addr,
1707                    std::slice::from_ref(&candidate),
1708                ) {
1709                    continue;
1710                }
1711                if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1712                    continue;
1713                }
1714                if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1715                    skipped_budget = skipped_budget.saturating_add(1);
1716                    continue;
1717                }
1718                info!(
1719                    npub = %identity.short_npub(),
1720                    addr = %peer.addr,
1721                    local_addr = %local_addr,
1722                    "lan: initiating alternate-path handshake to active peer"
1723                );
1724                if let Err(err) = self
1725                    .initiate_connection(transport_id, remote_addr, identity)
1726                    .await
1727                {
1728                    debug!(
1729                        npub = %peer.npub,
1730                        error = %err,
1731                        "lan: failed to initiate active peer alternate-path handshake"
1732                    );
1733                }
1734                connect_budget = connect_budget.saturating_sub(1);
1735                continue;
1736            }
1737            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1738                continue;
1739            }
1740            if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1741                skipped_budget = skipped_budget.saturating_add(1);
1742                continue;
1743            }
1744            info!(
1745                npub = %identity.short_npub(),
1746                addr = %peer.addr,
1747                local_addr = %local_addr,
1748                "lan: initiating handshake to discovered peer"
1749            );
1750            if let Err(err) = self
1751                .initiate_connection(transport_id, remote_addr, identity)
1752                .await
1753            {
1754                debug!(
1755                    npub = %peer.npub,
1756                    error = %err,
1757                    "lan: failed to initiate connection to discovered peer"
1758                );
1759            }
1760            connect_budget = connect_budget.saturating_sub(1);
1761        }
1762        if skipped_budget > 0 {
1763            debug!(
1764                skipped = skipped_budget,
1765                "lan: discovery connect budget exhausted"
1766            );
1767        }
1768    }
1769
1770    /// Poll pending transport connects and initiate handshakes for ready ones.
1771    ///
1772    /// Called from the tick handler. For each pending connect, queries the
1773    /// transport's connection state. When a connection is established,
1774    /// marks the link as Connected and starts the Noise handshake.
1775    /// Failed connections are cleaned up and scheduled for retry.
1776    pub(super) async fn poll_pending_connects(&mut self) {
1777        if self.pending_connects.is_empty() {
1778            return;
1779        }
1780
1781        let mut completed = Vec::new();
1782
1783        for (i, pending) in self.pending_connects.iter().enumerate() {
1784            let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
1785                transport.connection_state(&pending.remote_addr)
1786            } else {
1787                crate::transport::ConnectionState::Failed("transport removed".into())
1788            };
1789
1790            match state {
1791                crate::transport::ConnectionState::Connected => {
1792                    completed.push((i, true, None));
1793                }
1794                crate::transport::ConnectionState::Failed(reason) => {
1795                    completed.push((i, false, Some(reason)));
1796                }
1797                crate::transport::ConnectionState::Connecting => {
1798                    // Still in progress, check on next tick
1799                }
1800                crate::transport::ConnectionState::None => {
1801                    // Shouldn't happen — treat as failure
1802                    completed.push((i, false, Some("no connection attempt found".into())));
1803                }
1804            }
1805        }
1806
1807        // Process completions in reverse order to preserve indices
1808        for (i, success, reason) in completed.into_iter().rev() {
1809            let pending = self.pending_connects.remove(i);
1810
1811            if success {
1812                // Mark link as Connected
1813                if let Some(link) = self.links.get_mut(&pending.link_id) {
1814                    link.set_connected();
1815                }
1816
1817                debug!(
1818                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1819                    transport_id = %pending.transport_id,
1820                    remote_addr = %pending.remote_addr,
1821                    link_id = %pending.link_id,
1822                    "Transport connected, starting handshake"
1823                );
1824
1825                // Start the handshake now that the transport is connected
1826                if let Err(e) = self
1827                    .start_handshake(
1828                        pending.link_id,
1829                        pending.transport_id,
1830                        pending.remote_addr.clone(),
1831                        pending.peer_identity,
1832                    )
1833                    .await
1834                {
1835                    warn!(
1836                        link_id = %pending.link_id,
1837                        error = %e,
1838                        "Failed to start handshake after transport connect"
1839                    );
1840                    // Clean up link on handshake failure
1841                    self.remove_link(&pending.link_id);
1842                }
1843            } else {
1844                let reason = reason.unwrap_or_default();
1845                warn!(
1846                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1847                    transport_id = %pending.transport_id,
1848                    remote_addr = %pending.remote_addr,
1849                    link_id = %pending.link_id,
1850                    reason = %reason,
1851                    "Transport connect failed"
1852                );
1853
1854                // Clean up link and schedule retry
1855                self.remove_link(&pending.link_id);
1856                self.links.remove(&pending.link_id);
1857                self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
1858            }
1859        }
1860    }
1861
1862    // === State Transitions ===
1863
1864    /// Start the node.
1865    ///
1866    /// Initializes the TUN interface (if configured), spawns I/O threads,
1867    /// and transitions to the Running state.
1868    pub async fn start(&mut self) -> Result<(), NodeError> {
1869        node_start_debug_log("Node::start begin");
1870        if !self.state.can_start() {
1871            return Err(NodeError::AlreadyStarted);
1872        }
1873        self.state = NodeState::Starting;
1874        node_start_debug_log("Node::start state set to starting");
1875
1876        // Create packet channel for transport -> Node communication
1877        let packet_buffer_size = self.config.node.buffers.packet_channel;
1878        let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
1879        self.packet_tx = Some(packet_tx.clone());
1880        self.packet_rx = Some(packet_rx);
1881        node_start_debug_log("Node::start packet channel created");
1882
1883        // Initialize transports first (before TUN, before Nostr discovery).
1884        node_start_debug_log("Node::start create transports begin");
1885        let transport_handles = self.create_transports(&packet_tx).await;
1886        node_start_debug_log(format!(
1887            "Node::start create transports complete count={}",
1888            transport_handles.len()
1889        ));
1890
1891        for mut handle in transport_handles {
1892            let transport_id = handle.transport_id();
1893            let transport_type = handle.transport_type().name;
1894            let name = handle.name().map(|s| s.to_string());
1895
1896            node_start_debug_log(format!(
1897                "Node::start transport start begin id={} type={} name={:?}",
1898                transport_id, transport_type, name
1899            ));
1900            match handle.start().await {
1901                Ok(()) => {
1902                    node_start_debug_log(format!(
1903                        "Node::start transport start ok id={} type={}",
1904                        transport_id, transport_type
1905                    ));
1906                    self.transports.insert(transport_id, handle);
1907                }
1908                Err(e) => {
1909                    node_start_debug_log(format!(
1910                        "Node::start transport start error id={} type={} error={}",
1911                        transport_id, transport_type, e
1912                    ));
1913                    if let Some(ref n) = name {
1914                        warn!(transport_type, name = %n, error = %e, "Transport failed to start");
1915                    } else {
1916                        warn!(transport_type, error = %e, "Transport failed to start");
1917                    }
1918                }
1919            }
1920        }
1921
1922        if !self.transports.is_empty() {
1923            info!(count = self.transports.len(), "Transports initialized");
1924        }
1925
1926        // Spawn the off-task FMP-encrypt + UDP-send + FMP-decrypt
1927        // worker pools. **Unix only** — both pools issue direct
1928        // sendmmsg(2) / sendmsg(2)+UDP_GSO / recvmmsg(2) calls on
1929        // raw file descriptors via `AsRawFd`, which is a unix-only
1930        // trait. On Windows the rx_loop's tokio-based send/recv
1931        // remain the canonical path; the perf overhaul lands its
1932        // gains on unix.
1933        //
1934        // Worker count defaults to the number of CPUs, overridable
1935        // via `FIPS_ENCRYPT_WORKERS=N` / `FIPS_DECRYPT_WORKERS=N`
1936        // for debug / benchmarking. Hash-by-destination means a
1937        // single TCP flow pins to one worker (preserves wire
1938        // ordering); additional workers light up under multi-flow
1939        // / multi-peer load. See `node::encrypt_worker` /
1940        // `node::decrypt_worker` for full rationale.
1941        #[cfg(unix)]
1942        {
1943            if self.config.node.worker_pools_enabled {
1944                node_start_debug_log("Node::start worker pools begin");
1945                let cpu_default = std::thread::available_parallelism()
1946                    .map(|n| n.get())
1947                    .unwrap_or(1)
1948                    .max(1);
1949                let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
1950                    .ok()
1951                    .and_then(|s| s.parse().ok())
1952                    .unwrap_or(cpu_default)
1953                    .max(1);
1954                self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
1955                    encrypt_worker_count,
1956                ));
1957                info!(
1958                    workers = encrypt_worker_count,
1959                    "Spawned FMP-encrypt worker pool"
1960                );
1961
1962                // `FIPS_DECRYPT_WORKERS=0` disables the pool entirely and
1963                // falls through to the in-line rx_loop decrypt path (the
1964                // "test-mode" branch in `handle_encrypted_frame`, which is
1965                // in fact a fully functional synchronous decrypt). Useful
1966                // as an A/B against the worker pipeline when chasing
1967                // scheduling/queueing regressions on the native macOS
1968                // path. Any non-zero value (env or default) spawns the
1969                // pool as before.
1970                let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
1971                    .ok()
1972                    .and_then(|s| s.parse().ok())
1973                    .unwrap_or(cpu_default);
1974                if decrypt_worker_count == 0 {
1975                    info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
1976                } else {
1977                    self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
1978                        decrypt_worker_count,
1979                    ));
1980                    info!(
1981                        workers = decrypt_worker_count,
1982                        "Spawned FMP+FSP-decrypt worker pool"
1983                    );
1984                }
1985                node_start_debug_log("Node::start worker pools complete");
1986            } else {
1987                node_start_debug_log("Node::start worker pools disabled");
1988                info!("FIPS worker pools disabled; using in-line crypto/send path");
1989            }
1990        }
1991
1992        if self.config.node.discovery.nostr.enabled {
1993            node_start_debug_log("Node::start nostr discovery start begin");
1994            match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
1995                .await
1996            {
1997                Ok(runtime) => {
1998                    node_start_debug_log("Node::start nostr discovery runtime created");
1999                    if let Err(err) = self.refresh_overlay_advert(&runtime).await {
2000                        warn!(error = %err, "Failed to publish initial Nostr overlay advert");
2001                    }
2002                    node_start_debug_log("Node::start nostr overlay advert refreshed");
2003                    self.nostr_discovery = Some(runtime);
2004                    self.nostr_discovery_started_at_ms = Some(Self::now_ms());
2005                    info!("Nostr overlay discovery enabled");
2006                }
2007                Err(err) => {
2008                    node_start_debug_log(format!(
2009                        "Node::start nostr discovery start error error={}",
2010                        err
2011                    ));
2012                    warn!(error = %err, "Failed to start Nostr overlay discovery");
2013                }
2014            }
2015        }
2016
2017        // mDNS / DNS-SD LAN discovery. Independent of Nostr — runs even
2018        // when Nostr is disabled, since it gives us sub-second pairing
2019        // on the same link without any relay or NAT-traversal roundtrip.
2020        if self.config.node.discovery.lan.enabled {
2021            node_start_debug_log("Node::start lan discovery start begin");
2022            let advertised_udp_port = self
2023                .transports
2024                .values()
2025                .filter(|h| h.is_operational())
2026                .filter(|h| h.transport_type().name == "udp")
2027                .find_map(|h| h.local_addr().map(|addr| addr.port()))
2028                .unwrap_or(0);
2029            let scope = self.lan_discovery_scope();
2030            match crate::discovery::lan::LanDiscovery::start(
2031                &self.identity,
2032                scope,
2033                advertised_udp_port,
2034                self.config.node.discovery.lan.clone(),
2035            )
2036            .await
2037            {
2038                Ok(runtime) => {
2039                    node_start_debug_log("Node::start lan discovery start ok");
2040                    self.lan_discovery = Some(runtime);
2041                    info!("LAN mDNS discovery enabled");
2042                }
2043                Err(err) => {
2044                    node_start_debug_log(format!(
2045                        "Node::start lan discovery start error error={}",
2046                        err
2047                    ));
2048                    debug!(error = %err, "LAN mDNS discovery not started");
2049                }
2050            }
2051        }
2052
2053        self.start_local_instance_discovery();
2054        self.poll_local_instance_discovery().await;
2055
2056        // Connect to static peers before TUN is active
2057        // This allows handshake messages to be sent before we start accepting packets
2058        node_start_debug_log("Node::start initiate peer connections begin");
2059        self.initiate_peer_connections().await;
2060        node_start_debug_log("Node::start initiate peer connections complete");
2061
2062        // Initialize TUN interface last, after transports and peers are ready
2063        if self.config.tun.enabled {
2064            node_start_debug_log("Node::start tun init begin");
2065            let address = *self.identity.address();
2066            match TunDevice::create(&self.config.tun, address).await {
2067                Ok(device) => {
2068                    let mtu = device.mtu();
2069                    let name = device.name().to_string();
2070                    let our_addr = *device.address();
2071
2072                    info!("TUN device active:");
2073                    info!("     name: {}", name);
2074                    info!("  address: {}", device.address());
2075                    info!("      mtu: {}", mtu);
2076
2077                    // Calculate max MSS for TCP clamping
2078                    let effective_mtu = self.effective_ipv6_mtu();
2079                    let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); // IPv6 + TCP headers
2080
2081                    info!("effective MTU: {} bytes", effective_mtu);
2082                    debug!("   max TCP MSS: {} bytes", max_mss);
2083
2084                    // On macOS, create a shutdown pipe. Writing to it unblocks the
2085                    // reader thread's select() loop without closing the TUN fd
2086                    // (which would cause a double-close when TunDevice drops).
2087                    #[cfg(target_os = "macos")]
2088                    let (shutdown_read_fd, shutdown_write_fd) = {
2089                        let mut fds = [0i32; 2];
2090                        if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
2091                            return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
2092                                "failed to create shutdown pipe".into(),
2093                            )));
2094                        }
2095                        (fds[0], fds[1])
2096                    };
2097
2098                    // Create writer (dups the fd for independent write access).
2099                    // Pass path_mtu_lookup so inbound SYN-ACK clamp can read
2100                    // per-destination path MTU learned via discovery.
2101                    let (writer, tun_tx) =
2102                        device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
2103
2104                    // Spawn writer thread
2105                    let writer_handle = thread::spawn(move || {
2106                        writer.run();
2107                    });
2108
2109                    // Clone tun_tx for the reader
2110                    let reader_tun_tx = tun_tx.clone();
2111
2112                    // Create outbound channel for TUN reader → Node
2113                    let tun_channel_size = self.config.node.buffers.tun_channel;
2114                    let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
2115
2116                    // Spawn reader thread
2117                    let transport_mtu = self.transport_mtu();
2118                    let path_mtu_lookup = self.path_mtu_lookup.clone();
2119                    #[cfg(target_os = "macos")]
2120                    let reader_handle = thread::spawn(move || {
2121                        run_tun_reader(
2122                            device,
2123                            mtu,
2124                            our_addr,
2125                            reader_tun_tx,
2126                            outbound_tx,
2127                            transport_mtu,
2128                            path_mtu_lookup,
2129                            shutdown_read_fd,
2130                        );
2131                    });
2132                    #[cfg(not(target_os = "macos"))]
2133                    let reader_handle = thread::spawn(move || {
2134                        run_tun_reader(
2135                            device,
2136                            mtu,
2137                            our_addr,
2138                            reader_tun_tx,
2139                            outbound_tx,
2140                            transport_mtu,
2141                            path_mtu_lookup,
2142                        );
2143                    });
2144
2145                    self.tun_state = TunState::Active;
2146                    self.tun_name = Some(name);
2147                    self.tun_tx = Some(tun_tx);
2148                    self.tun_outbound_rx = Some(outbound_rx);
2149                    self.tun_reader_handle = Some(reader_handle);
2150                    self.tun_writer_handle = Some(writer_handle);
2151                    #[cfg(target_os = "macos")]
2152                    {
2153                        self.tun_shutdown_fd = Some(shutdown_write_fd);
2154                    }
2155                }
2156                Err(e) => {
2157                    self.tun_state = TunState::Failed;
2158                    warn!(error = %e, "Failed to initialize TUN, continuing without it");
2159                }
2160            }
2161            node_start_debug_log("Node::start tun init complete");
2162        }
2163
2164        // Initialize DNS responder (independent of TUN).
2165        //
2166        // Default bind_addr is "::1" (IPv6 loopback). The shipped
2167        // fips-dns-setup configures systemd-resolved via a global
2168        // /etc/systemd/resolved.conf.d/fips.conf drop-in pointing at
2169        // [::1]:5354, which sidesteps a Linux IPV6_PKTINFO behaviour
2170        // where self-destined traffic to fips0's address is attributed
2171        // to fips0 in PKTINFO and gets silently dropped by the
2172        // mesh-interface filter in src/upper/dns.rs.
2173        //
2174        // For mesh-reachable resolution (rare), set bind_addr: "::"
2175        // in fips.yaml. The mesh-interface filter remains active to
2176        // prevent hosts-file alias enumeration in that mode.
2177        // `IPV6_V6ONLY=0` is set explicitly so IPv4 clients on
2178        // 127.0.0.1 still reach us regardless of kernel sysctl
2179        // defaults — but only when bind is on a wildcard / IPv6 path.
2180        if self.config.dns.enabled {
2181            node_start_debug_log("Node::start dns init begin");
2182            let addr_str = self.config.dns.bind_addr();
2183            match addr_str.parse::<std::net::IpAddr>() {
2184                Ok(ip) => {
2185                    let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
2186                    match Self::bind_dns_socket(bind) {
2187                        Ok(socket) => {
2188                            let dns_channel_size = self.config.node.buffers.dns_channel;
2189                            let (identity_tx, identity_rx) =
2190                                tokio::sync::mpsc::channel(dns_channel_size);
2191                            let dns_ttl = self.config.dns.ttl();
2192                            let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
2193                                self.config.peers(),
2194                            );
2195                            let reloader = if self.config.node.system_files_enabled {
2196                                let hosts_path = std::path::PathBuf::from(
2197                                    crate::upper::hosts::DEFAULT_HOSTS_PATH,
2198                                );
2199                                crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
2200                            } else {
2201                                crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
2202                            };
2203                            // Resolve the TUN ifindex so the responder can
2204                            // drop queries arriving on the mesh interface
2205                            // (fips0). Without this, the `::` bind exposes
2206                            // /etc/fips/hosts alias probing to any mesh peer.
2207                            // When TUN isn't enabled or the name can't be
2208                            // resolved, `None` disables the filter (there
2209                            // is no mesh surface to defend anyway).
2210                            let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
2211                            info!(
2212                                bind = %bind,
2213                                hosts = reloader.hosts().len(),
2214                                mesh_ifindex = ?mesh_ifindex,
2215                                "DNS responder started for .fips domain (auto-reload enabled)"
2216                            );
2217                            let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
2218                                socket,
2219                                identity_tx,
2220                                dns_ttl,
2221                                reloader,
2222                                mesh_ifindex,
2223                            ));
2224                            self.dns_identity_rx = Some(identity_rx);
2225                            self.dns_task = Some(handle);
2226                        }
2227                        Err(e) => {
2228                            warn!(bind = %bind, error = %e, "Failed to start DNS responder");
2229                        }
2230                    }
2231                }
2232                Err(e) => {
2233                    warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
2234                }
2235            }
2236            node_start_debug_log("Node::start dns init complete");
2237        }
2238
2239        self.state = NodeState::Running;
2240        node_start_debug_log("Node::start running");
2241        info!("Node started:");
2242        info!("       state: {}", self.state);
2243        info!("  transports: {}", self.transports.len());
2244        info!(" connections: {}", self.connections.len());
2245        Ok(())
2246    }
2247
2248    /// Bind a UDP socket for the DNS responder.
2249    ///
2250    /// For IPv6 binds (including `::`), sets `IPV6_V6ONLY=0` so the socket
2251    /// also accepts IPv4-mapped addresses. This guarantees dual-stack
2252    /// delivery regardless of `net.ipv6.bindv6only` sysctl on the host —
2253    /// v4 clients on 127.0.0.1 and v6 clients on the fips0 address both
2254    /// land on the same socket.
2255    ///
2256    /// Also enables `IPV6_RECVPKTINFO` on IPv6 sockets so the responder
2257    /// can learn the arrival interface per packet. The responder uses that
2258    /// to drop queries arriving on the mesh TUN, closing the hosts-file
2259    /// probing side-channel created by the `::` bind.
2260    fn bind_dns_socket(
2261        addr: std::net::SocketAddr,
2262    ) -> Result<tokio::net::UdpSocket, std::io::Error> {
2263        use socket2::{Domain, Protocol, Socket, Type};
2264        let domain = if addr.is_ipv4() {
2265            Domain::IPV4
2266        } else {
2267            Domain::IPV6
2268        };
2269        let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
2270        if addr.is_ipv6() {
2271            sock.set_only_v6(false)?;
2272            #[cfg(unix)]
2273            Self::set_recv_pktinfo_v6(&sock)?;
2274        }
2275        sock.set_nonblocking(true)?;
2276        sock.bind(&addr.into())?;
2277        tokio::net::UdpSocket::from_std(sock.into())
2278    }
2279
2280    /// Enable `IPV6_RECVPKTINFO` on an IPv6 UDP socket.
2281    ///
2282    /// After this setsockopt, each `recvmsg()` call on the socket receives
2283    /// an `IPV6_PKTINFO` control message containing the arrival interface
2284    /// index, which the DNS responder uses for its mesh-interface filter.
2285    #[cfg(unix)]
2286    fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
2287        use std::os::fd::AsRawFd;
2288        let enable: libc::c_int = 1;
2289        let ret = unsafe {
2290            libc::setsockopt(
2291                sock.as_raw_fd(),
2292                libc::IPPROTO_IPV6,
2293                libc::IPV6_RECVPKTINFO,
2294                &enable as *const _ as *const libc::c_void,
2295                std::mem::size_of::<libc::c_int>() as libc::socklen_t,
2296            )
2297        };
2298        if ret < 0 {
2299            return Err(std::io::Error::last_os_error());
2300        }
2301        Ok(())
2302    }
2303
2304    /// Resolve the mesh TUN interface index by name.
2305    ///
2306    /// Returns `None` if the interface does not exist (e.g. TUN disabled
2307    /// or not yet created). A `None` result disables the DNS responder's
2308    /// mesh-interface filter — safe, because if there is no fips0 there
2309    /// is no mesh exposure to defend against.
2310    fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
2311        #[cfg(unix)]
2312        {
2313            let c_name = std::ffi::CString::new(name).ok()?;
2314            let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
2315            if idx == 0 { None } else { Some(idx) }
2316        }
2317        #[cfg(not(unix))]
2318        {
2319            let _ = name;
2320            None
2321        }
2322    }
2323
2324    /// Stop the node.
2325    ///
2326    /// Shuts down TUN interface, stops I/O threads, and transitions to
2327    /// the Stopped state.
2328    pub async fn stop(&mut self) -> Result<(), NodeError> {
2329        if !self.state.can_stop() {
2330            return Err(NodeError::NotStarted);
2331        }
2332        self.state = NodeState::Stopping;
2333        info!(state = %self.state, "Node stopping");
2334
2335        // Stop DNS responder
2336        if let Some(handle) = self.dns_task.take() {
2337            handle.abort();
2338            debug!("DNS responder stopped");
2339        }
2340
2341        // Send disconnect notifications to all active peers before closing transports
2342        self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
2343            .await;
2344
2345        // Stop Nostr overlay discovery background work and withdraw any advert.
2346        if let Some(bootstrap) = self.nostr_discovery.take()
2347            && let Err(e) = bootstrap.shutdown().await
2348        {
2349            warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
2350        }
2351
2352        // Tear down LAN mDNS responder + browser. Best-effort: the
2353        // OS will eventually time the advert out via its TTL even if
2354        // we don't get a clean unregister out before the daemon exits.
2355        if let Some(lan) = self.lan_discovery.take() {
2356            lan.shutdown().await;
2357        }
2358
2359        if let Some(registry) = self.local_instance_registry.take()
2360            && let Err(err) = registry.remove()
2361        {
2362            debug!(error = %err, "failed to remove same-host FIPS instance record");
2363        }
2364
2365        // Shutdown transports (they're packet producers)
2366        let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
2367        for transport_id in transport_ids {
2368            if let Some(mut handle) = self.transports.remove(&transport_id) {
2369                let transport_type = handle.transport_type().name;
2370                match handle.stop().await {
2371                    Ok(()) => {
2372                        info!(transport_id = %transport_id, transport_type, "Transport stopped");
2373                    }
2374                    Err(e) => {
2375                        warn!(
2376                            transport_id = %transport_id,
2377                            transport_type,
2378                            error = %e,
2379                            "Transport stop failed"
2380                        );
2381                    }
2382                }
2383            }
2384        }
2385
2386        // Drop packet channels
2387        self.packet_tx.take();
2388        self.packet_rx.take();
2389
2390        // Shutdown TUN interface
2391        if let Some(name) = self.tun_name.take() {
2392            info!(name = %name, "Shutting down TUN interface");
2393
2394            // Drop the tun_tx to signal the writer to stop
2395            self.tun_tx.take();
2396
2397            // Delete the interface (on Linux, causes reader to get EFAULT)
2398            if let Err(e) = shutdown_tun_interface(&name).await {
2399                warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
2400            }
2401
2402            // On macOS, signal the reader thread to exit by writing to the
2403            // shutdown pipe. The reader's select() will wake up and break.
2404            #[cfg(target_os = "macos")]
2405            if let Some(fd) = self.tun_shutdown_fd.take() {
2406                unsafe {
2407                    libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
2408                    libc::close(fd);
2409                }
2410            }
2411
2412            // Wait for threads to finish
2413            if let Some(handle) = self.tun_reader_handle.take() {
2414                let _ = handle.join();
2415            }
2416            if let Some(handle) = self.tun_writer_handle.take() {
2417                let _ = handle.join();
2418            }
2419
2420            self.tun_state = TunState::Disabled;
2421        }
2422
2423        self.state = NodeState::Stopped;
2424        info!(state = %self.state, "Node stopped");
2425        Ok(())
2426    }
2427
2428    /// Send disconnect notifications to all active peers.
2429    ///
2430    /// Best-effort: send failures are logged and ignored since the transport
2431    /// may already be degraded. This runs before transports are shut down.
2432    async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
2433        let disconnect = Disconnect::new(reason);
2434        let plaintext = disconnect.encode();
2435
2436        // Collect node_addrs to avoid borrow conflict with send helper
2437        let peer_addrs: Vec<NodeAddr> = self
2438            .peers
2439            .iter()
2440            .filter(|(_, peer)| peer.can_send() && peer.has_session())
2441            .map(|(addr, _)| *addr)
2442            .collect();
2443
2444        if peer_addrs.is_empty() {
2445            debug!(
2446                total_peers = self.peers.len(),
2447                "No sendable peers for disconnect notification"
2448            );
2449            return;
2450        }
2451
2452        let mut sent = 0usize;
2453        for node_addr in &peer_addrs {
2454            match self
2455                .send_encrypted_link_message(node_addr, &plaintext)
2456                .await
2457            {
2458                Ok(()) => sent += 1,
2459                Err(e) => {
2460                    debug!(
2461                        peer = %self.peer_display_name(node_addr),
2462                        error = %e,
2463                        "Failed to send disconnect (transport may be down)"
2464                    );
2465                }
2466            }
2467        }
2468
2469        info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2470    }
2471
2472    fn static_peer_addresses(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
2473        peer_config
2474            .addresses_by_priority()
2475            .into_iter()
2476            .cloned()
2477            .collect()
2478    }
2479
2480    async fn nostr_peer_fallback_addresses(
2481        &self,
2482        peer_config: &PeerConfig,
2483        existing: &[PeerAddress],
2484    ) -> Vec<PeerAddress> {
2485        if !self.config.node.discovery.nostr.enabled
2486            || self.config.node.discovery.nostr.policy
2487                == crate::config::NostrDiscoveryPolicy::Disabled
2488        {
2489            return Vec::new();
2490        }
2491
2492        let Some(bootstrap) = self.nostr_discovery.clone() else {
2493            return Vec::new();
2494        };
2495        let endpoints = match bootstrap
2496            .cached_advert_endpoints_for_peer(&peer_config.npub)
2497            .await
2498        {
2499            Some(endpoints) => endpoints,
2500            None => {
2501                debug!(
2502                    npub = %peer_config.npub,
2503                    "No cached Nostr advert endpoints for configured peer"
2504                );
2505                return Vec::new();
2506            }
2507        };
2508
2509        let mut fallback = Vec::new();
2510        let mut next_priority = existing
2511            .iter()
2512            .map(|addr| addr.priority)
2513            .max()
2514            .unwrap_or(100)
2515            .saturating_add(1);
2516        // Stamp every overlay-derived candidate with the current wall clock
2517        // so the dialer ranks it ahead of unstamped operator hints. We use a
2518        // single timestamp per fetch (rather than per-endpoint) because all
2519        // candidates in a single advert are equally fresh.
2520        let seen_at_ms = Self::now_ms();
2521        for endpoint in endpoints {
2522            let Some(candidate) =
2523                Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2524            else {
2525                continue;
2526            };
2527            if existing
2528                .iter()
2529                .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2530                || fallback.iter().any(|addr: &PeerAddress| {
2531                    addr.transport == candidate.transport && addr.addr == candidate.addr
2532                })
2533            {
2534                continue;
2535            }
2536            fallback.push(candidate);
2537            next_priority = next_priority.saturating_add(1);
2538        }
2539        fallback
2540    }
2541
2542    async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2543        if !self.config.node.discovery.nostr.enabled
2544            || self.config.node.discovery.nostr.policy
2545                == crate::config::NostrDiscoveryPolicy::Disabled
2546        {
2547            return false;
2548        }
2549        let Some(bootstrap) = self.nostr_discovery.clone() else {
2550            return false;
2551        };
2552        bootstrap.request_connect(peer_config.clone()).await;
2553        info!(npub = %peer_config.npub, "Started background Nostr UDP NAT traversal attempt");
2554        true
2555    }
2556
2557    fn overlay_endpoint_to_peer_address(
2558        endpoint: &OverlayEndpointAdvert,
2559        priority: u8,
2560        seen_at_ms: u64,
2561    ) -> Option<PeerAddress> {
2562        let transport = match endpoint.transport {
2563            OverlayTransportKind::Udp => "udp",
2564            OverlayTransportKind::Tcp => "tcp",
2565            OverlayTransportKind::Tor => "tor",
2566            OverlayTransportKind::WebRtc => "webrtc",
2567        };
2568        Some(
2569            PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2570                .with_seen_at_ms(seen_at_ms),
2571        )
2572    }
2573
2574    async fn attempt_peer_address_list(
2575        &mut self,
2576        peer_config: &PeerConfig,
2577        peer_identity: PeerIdentity,
2578        allow_bootstrap_nat: bool,
2579        addresses: &[PeerAddress],
2580    ) -> Result<(), NodeError> {
2581        let mut attempted = false;
2582        let peer_node_addr = *peer_identity.node_addr();
2583        let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2584
2585        for addr in addresses {
2586            if attempted && addr.seen_at_ms.is_some() {
2587                debug!(
2588                    npub = %peer_config.npub,
2589                    transport = %addr.transport,
2590                    addr = %addr.addr,
2591                    "Skipping overlay fallback because a configured direct candidate is already in flight"
2592                );
2593                continue;
2594            }
2595
2596            if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2597                if attempted {
2598                    debug!(
2599                        npub = %peer_config.npub,
2600                        "Skipping Nostr NAT fallback because a configured direct candidate is already in flight"
2601                    );
2602                    continue;
2603                }
2604                if !allow_bootstrap_nat {
2605                    continue;
2606                }
2607                if self.request_nostr_bootstrap(peer_config).await {
2608                    attempted = true;
2609                    continue;
2610                }
2611                debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2612                continue;
2613            }
2614
2615            let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2616                match self.resolve_ethernet_addr(&addr.addr) {
2617                    Ok(result) => result,
2618                    Err(e) => {
2619                        debug!(
2620                            transport = %addr.transport,
2621                            addr = %addr.addr,
2622                            error = %e,
2623                            "Failed to resolve Ethernet address"
2624                        );
2625                        continue;
2626                    }
2627                }
2628            } else if addr.transport == "ble" {
2629                #[cfg(bluer_available)]
2630                {
2631                    match self.resolve_ble_addr(&addr.addr) {
2632                        Ok(result) => result,
2633                        Err(e) => {
2634                            debug!(
2635                                transport = %addr.transport,
2636                                addr = %addr.addr,
2637                                error = %e,
2638                                "Failed to resolve BLE address"
2639                            );
2640                            continue;
2641                        }
2642                    }
2643                }
2644                #[cfg(not(bluer_available))]
2645                {
2646                    debug!(transport = %addr.transport, "BLE transport not available on this build");
2647                    continue;
2648                }
2649            } else {
2650                let tid = if addr.transport == "udp"
2651                    && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2652                {
2653                    match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2654                        Some((id, _)) => id,
2655                        None => {
2656                            debug!(
2657                                transport = %addr.transport,
2658                                addr = %addr.addr,
2659                                "No compatible operational UDP transport for address"
2660                            );
2661                            continue;
2662                        }
2663                    }
2664                } else {
2665                    match self.find_transport_for_type(&addr.transport) {
2666                        Some(id) => id,
2667                        None => {
2668                            debug!(
2669                                transport = %addr.transport,
2670                                addr = %addr.addr,
2671                                "No operational transport for address type"
2672                            );
2673                            continue;
2674                        }
2675                    }
2676                };
2677                (tid, TransportAddr::from_string(&addr.addr))
2678            };
2679
2680            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2681                attempted = true;
2682                debug!(
2683                    npub = %peer_config.npub,
2684                    transport_id = %transport_id,
2685                    remote_addr = %remote_addr,
2686                    "Skipping duplicate in-flight candidate path"
2687                );
2688                continue;
2689            }
2690
2691            if concrete_budget == 0 {
2692                debug!(
2693                    npub = %peer_config.npub,
2694                    max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2695                    "Path candidate race budget exhausted"
2696                );
2697                break;
2698            }
2699
2700            match self
2701                .initiate_connection(transport_id, remote_addr, peer_identity)
2702                .await
2703            {
2704                Ok(()) => {
2705                    attempted = true;
2706                    concrete_budget = concrete_budget.saturating_sub(1);
2707                }
2708                Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2709                Err(e) => {
2710                    debug!(
2711                        npub = %peer_config.npub,
2712                        transport_id = %transport_id,
2713                        error = %e,
2714                        "Connection attempt failed, trying next address"
2715                    );
2716                }
2717            }
2718        }
2719
2720        if attempted {
2721            return Ok(());
2722        }
2723
2724        Err(NodeError::NoTransportForType(format!(
2725            "no operational transport for any of {}'s addresses",
2726            peer_config.npub
2727        )))
2728    }
2729
2730    async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
2731        self.run_open_discovery_sweep(bootstrap, None, "per-tick")
2732            .await;
2733    }
2734
2735    /// Open-discovery cache sweep. Iterates the cached overlay adverts and
2736    /// queues retries for non-configured, not-yet-connected peers.
2737    ///
2738    /// `max_age_secs`, if set, filters out adverts whose `created_at` is
2739    /// older than `now - max_age_secs`. The per-tick sweep passes `None`
2740    /// (relies on the cache's own `valid_until_ms` filter); the one-shot
2741    /// startup sweep passes `Some(startup_sweep_max_age_secs)`.
2742    ///
2743    /// `caller` is a short label included in log lines so per-tick and
2744    /// startup sweeps are distinguishable in operator-facing logs.
2745    pub(in crate::node) async fn run_open_discovery_sweep(
2746        &mut self,
2747        bootstrap: &std::sync::Arc<NostrDiscovery>,
2748        max_age_secs: Option<u64>,
2749        caller: &'static str,
2750    ) {
2751        if !self.config.node.discovery.nostr.enabled
2752            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2753        {
2754            return;
2755        }
2756
2757        let configured_npubs = self
2758            .config
2759            .peers()
2760            .iter()
2761            .map(|peer| peer.npub.clone())
2762            .collect::<HashSet<_>>();
2763        let now_ms = Self::now_ms();
2764        let now_secs = now_ms / 1000;
2765        let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
2766        if enqueue_budget == 0 {
2767            debug!(
2768                caller = %caller,
2769                "open-discovery sweep: enqueue budget is 0, skipping"
2770            );
2771            return;
2772        }
2773
2774        let candidates = bootstrap.cached_open_discovery_candidates(64).await;
2775        let cached_count = candidates.len();
2776        let mut enqueued = 0usize;
2777        let mut skipped_age = 0usize;
2778        let mut skipped_configured = 0usize;
2779        let mut skipped_self = 0usize;
2780        let mut skipped_connected = 0usize;
2781        let mut skipped_retry_pending = 0usize;
2782        let mut skipped_connecting = 0usize;
2783        let mut skipped_no_endpoints = 0usize;
2784        let mut skipped_invalid_npub = 0usize;
2785        let mut skipped_cooldown = 0usize;
2786
2787        for (npub, endpoints, created_at_secs) in candidates {
2788            if enqueue_budget == 0 {
2789                break;
2790            }
2791
2792            if let Some(max_age) = max_age_secs
2793                && now_secs.saturating_sub(created_at_secs) > max_age
2794            {
2795                skipped_age = skipped_age.saturating_add(1);
2796                continue;
2797            }
2798
2799            if configured_npubs.contains(&npub) {
2800                // Configured peers don't go through the open-discovery
2801                // enqueue path — their `PeerConfig` is already in
2802                // `self.config.peers()`, so the regular retry queue is
2803                // what drives their reconnect. But on cold start with
2804                // NAT'd peers, every initial `initiate_peer_connection`
2805                // fails (no overlay data yet, static cache hints empty
2806                // or stale), each pushes the peer into `retry_pending`
2807                // with exponential backoff (5/10/20/40/80s), and by the
2808                // time the next backoff slot fires the Nostr advert is
2809                // already cached — we just don't act on it for ~80s.
2810                //
2811                // The arrival of an advert (which this sweep sees) means
2812                // we now have a path to dial. If the peer's retry is
2813                // scheduled in the future, pull it forward to "now" so
2814                // the next `process_pending_retries` tick fires it
2815                // immediately. The retry path (`initiate_peer_retry_
2816                // connection` → `try_peer_addresses`) then refetches
2817                // the advert and dials it — no behavioral change
2818                // beyond schedule timing.
2819                if let Ok(identity) = PeerIdentity::from_npub(&npub) {
2820                    let configured_addr = *identity.node_addr();
2821                    if let Some(state) = self.retry_pending.get_mut(&configured_addr)
2822                        && state.retry_after_ms > now_ms
2823                    {
2824                        state.retry_after_ms = now_ms;
2825                        debug!(
2826                            caller = %caller,
2827                            peer = %self.peer_display_name(&configured_addr),
2828                            advert_age_secs = now_secs.saturating_sub(created_at_secs),
2829                            "Expediting configured-peer retry after fresh overlay advert"
2830                        );
2831                    }
2832                }
2833                skipped_configured = skipped_configured.saturating_add(1);
2834                continue;
2835            }
2836
2837            let peer_identity = match PeerIdentity::from_npub(&npub) {
2838                Ok(identity) => identity,
2839                Err(_) => {
2840                    skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
2841                    continue;
2842                }
2843            };
2844            let node_addr = *peer_identity.node_addr();
2845            if node_addr == *self.identity.node_addr() {
2846                skipped_self = skipped_self.saturating_add(1);
2847                continue;
2848            }
2849            if self.peers.contains_key(&node_addr) {
2850                skipped_connected = skipped_connected.saturating_add(1);
2851                continue;
2852            }
2853            if self.retry_pending.contains_key(&node_addr) {
2854                skipped_retry_pending = skipped_retry_pending.saturating_add(1);
2855                continue;
2856            }
2857            if bootstrap.cooldown_until(&npub, now_ms).is_some() {
2858                skipped_cooldown = skipped_cooldown.saturating_add(1);
2859                continue;
2860            }
2861            let connecting = self.connections.values().any(|conn| {
2862                conn.expected_identity()
2863                    .map(|id| id.node_addr() == &node_addr)
2864                    .unwrap_or(false)
2865            });
2866            if connecting {
2867                skipped_connecting = skipped_connecting.saturating_add(1);
2868                continue;
2869            }
2870
2871            let mut addresses = Vec::new();
2872            let mut priority = 120u8;
2873            let seen_at_ms = Self::now_ms();
2874            for endpoint in endpoints {
2875                let Some(candidate) =
2876                    Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
2877                else {
2878                    continue;
2879                };
2880                if addresses.iter().any(|existing: &PeerAddress| {
2881                    existing.transport == candidate.transport && existing.addr == candidate.addr
2882                }) {
2883                    continue;
2884                }
2885                addresses.push(candidate);
2886                priority = priority.saturating_add(1);
2887            }
2888            if addresses.is_empty() {
2889                skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
2890                continue;
2891            }
2892
2893            self.peer_aliases
2894                .entry(node_addr)
2895                .or_insert_with(|| peer_identity.short_npub());
2896            self.register_identity(node_addr, peer_identity.pubkey_full());
2897
2898            let mut state = super::retry::RetryState::new(PeerConfig {
2899                npub: npub.clone(),
2900                alias: None,
2901                addresses,
2902                connect_policy: ConnectPolicy::AutoConnect,
2903                auto_reconnect: true,
2904                discovery_fallback_transit: false,
2905            });
2906            state.reconnect = false;
2907            state.retry_after_ms = now_ms;
2908            state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
2909            self.retry_pending.insert(node_addr, state);
2910            info!(
2911                caller = %caller,
2912                peer = %peer_identity.short_npub(),
2913                advert_age_secs = now_secs.saturating_sub(created_at_secs),
2914                "open-discovery sweep: queued retry for cached advert"
2915            );
2916            enqueue_budget = enqueue_budget.saturating_sub(1);
2917            enqueued = enqueued.saturating_add(1);
2918        }
2919
2920        // Always log a one-line summary on the startup sweep so operators
2921        // can verify it ran. Per-tick sweeps are noisier; only summarize
2922        // when something happened.
2923        let total_skipped = skipped_age
2924            + skipped_configured
2925            + skipped_self
2926            + skipped_connected
2927            + skipped_retry_pending
2928            + skipped_connecting
2929            + skipped_no_endpoints
2930            + skipped_invalid_npub
2931            + skipped_cooldown;
2932        let should_summarize = caller == "startup" || enqueued > 0;
2933        if should_summarize {
2934            info!(
2935                caller = %caller,
2936                cached = cached_count,
2937                queued = enqueued,
2938                skipped_age = skipped_age,
2939                skipped_configured = skipped_configured,
2940                skipped_self = skipped_self,
2941                skipped_connected = skipped_connected,
2942                skipped_retry_pending = skipped_retry_pending,
2943                skipped_connecting = skipped_connecting,
2944                skipped_no_endpoints = skipped_no_endpoints,
2945                skipped_invalid_npub = skipped_invalid_npub,
2946                skipped_cooldown = skipped_cooldown,
2947                skipped_total = total_skipped,
2948                "open-discovery sweep complete"
2949            );
2950        }
2951    }
2952
2953    /// One-shot startup sweep: runs once after the configured settle
2954    /// delay, iterating the cached overlay adverts and queueing retries
2955    /// for any peer with a recent enough advert that we haven't already
2956    /// configured statically or established a link to.
2957    ///
2958    /// Gated identically to [`run_open_discovery_sweep`]: requires
2959    /// `node.discovery.nostr.enabled` and `policy == open`.
2960    async fn maybe_run_startup_open_discovery_sweep(
2961        &mut self,
2962        bootstrap: &std::sync::Arc<NostrDiscovery>,
2963    ) {
2964        if self.startup_open_discovery_sweep_done {
2965            return;
2966        }
2967        if !self.config.node.discovery.nostr.enabled
2968            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2969        {
2970            // Mark done so we don't keep re-checking on every tick.
2971            self.startup_open_discovery_sweep_done = true;
2972            return;
2973        }
2974        let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
2975            return;
2976        };
2977        let now_ms = Self::now_ms();
2978        let delay_ms = self
2979            .config
2980            .node
2981            .discovery
2982            .nostr
2983            .startup_sweep_delay_secs
2984            .saturating_mul(1000);
2985        if now_ms < started_at_ms.saturating_add(delay_ms) {
2986            return;
2987        }
2988
2989        let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
2990        self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
2991            .await;
2992        self.startup_open_discovery_sweep_done = true;
2993    }
2994
2995    fn available_outbound_slots(&self) -> usize {
2996        let connection_used = self
2997            .connections
2998            .len()
2999            .saturating_add(self.pending_connects.len());
3000        let connection_slots = if self.max_connections == 0 {
3001            usize::MAX
3002        } else {
3003            self.max_connections.saturating_sub(connection_used)
3004        };
3005
3006        let peer_slots = if self.max_peers == 0 {
3007            usize::MAX
3008        } else {
3009            self.max_peers.saturating_sub(self.peers.len())
3010        };
3011
3012        connection_slots.min(peer_slots)
3013    }
3014
3015    fn open_discovery_enqueue_budget(&self, configured_npubs: &HashSet<String>) -> usize {
3016        let current_open_discovery_pending = self
3017            .retry_pending
3018            .values()
3019            .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
3020            .count();
3021
3022        let cap_remaining = self
3023            .config
3024            .node
3025            .discovery
3026            .nostr
3027            .open_discovery_max_pending
3028            .saturating_sub(current_open_discovery_pending);
3029
3030        cap_remaining.min(self.available_outbound_slots())
3031    }
3032
3033    fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
3034        now_ms.saturating_add(
3035            self.config
3036                .node
3037                .discovery
3038                .nostr
3039                .advert_ttl_secs
3040                .saturating_mul(1000)
3041                .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
3042        )
3043    }
3044
3045    async fn build_overlay_advert(
3046        &self,
3047        bootstrap: &std::sync::Arc<NostrDiscovery>,
3048    ) -> Option<OverlayAdvert> {
3049        if !self.config.node.discovery.nostr.enabled {
3050            return None;
3051        }
3052
3053        let mut endpoints = Vec::new();
3054        let mut has_udp_nat = false;
3055        let mut has_webrtc = false;
3056
3057        for handle in self.transports.values() {
3058            if !handle.is_operational() {
3059                continue;
3060            }
3061
3062            match handle.transport_type().name {
3063                "udp" => {
3064                    let Some(cfg) = self.lookup_udp_config(handle.name()) else {
3065                        continue;
3066                    };
3067                    if !cfg.advertise_on_nostr() {
3068                        continue;
3069                    }
3070                    if cfg.is_public() {
3071                        // Precedence:
3072                        // 1. operator-supplied `external_addr` (skips STUN)
3073                        // 2. non-wildcard *public* `local_addr` (operator
3074                        //    bound to a specific public IP directly)
3075                        // 3. STUN auto-discovery against ephemeral socket
3076                        //    (also taken when bind is wildcard *or* private —
3077                        //    a private bind is not peer-reachable, so we
3078                        //    must publish the public reflexive instead)
3079                        // 4. loud warn + omit endpoint
3080                        if let Some(explicit) = cfg.external_advert_addr() {
3081                            endpoints.push(OverlayEndpointAdvert {
3082                                transport: OverlayTransportKind::Udp,
3083                                addr: explicit.to_string(),
3084                            });
3085                        } else {
3086                            match handle.local_addr() {
3087                                Some(addr)
3088                                    if !addr.ip().is_unspecified()
3089                                        && !is_unroutable_advert_ip(addr.ip()) =>
3090                                {
3091                                    endpoints.push(OverlayEndpointAdvert {
3092                                        transport: OverlayTransportKind::Udp,
3093                                        addr: addr.to_string(),
3094                                    });
3095                                }
3096                                Some(addr) => {
3097                                    let key = handle.transport_id().as_u32();
3098                                    let port = addr.port();
3099                                    if let Some(public) =
3100                                        bootstrap.learn_public_udp_addr(key, port).await
3101                                    {
3102                                        endpoints.push(OverlayEndpointAdvert {
3103                                            transport: OverlayTransportKind::Udp,
3104                                            addr: public.to_string(),
3105                                        });
3106                                    } else {
3107                                        warn!(
3108                                            transport_id = key,
3109                                            bind_addr = %addr,
3110                                            "advert: udp public=true but bind is wildcard \
3111                                            or private and STUN observation failed; \
3112                                            advertising no UDP endpoint. Either set \
3113                                            transports.udp.external_addr, bind to a \
3114                                            specific *public* IP, or ensure \
3115                                            node.discovery.nostr.stun_servers is reachable"
3116                                        );
3117                                    }
3118                                }
3119                                None => {}
3120                            }
3121                        }
3122                    } else {
3123                        endpoints.push(OverlayEndpointAdvert {
3124                            transport: OverlayTransportKind::Udp,
3125                            addr: "nat".to_string(),
3126                        });
3127                        has_udp_nat = true;
3128                    }
3129                }
3130                "webrtc" => {
3131                    let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
3132                        continue;
3133                    };
3134                    if !cfg.advertise_on_nostr() {
3135                        continue;
3136                    }
3137                    endpoints.push(OverlayEndpointAdvert {
3138                        transport: OverlayTransportKind::WebRtc,
3139                        addr: hex::encode(self.identity.pubkey_full().serialize()),
3140                    });
3141                    has_webrtc = true;
3142                }
3143                "tcp" => {
3144                    let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
3145                        continue;
3146                    };
3147                    if !cfg.advertise_on_nostr() {
3148                        continue;
3149                    }
3150                    // Precedence:
3151                    // 1. operator-supplied `external_addr` (only path that
3152                    //    works on cloud-NAT setups where the public IP is
3153                    //    not on a host interface).
3154                    // 2. non-wildcard *public* `local_addr` (operator bound
3155                    //    to a specific public IP directly).
3156                    // 3. loud warn + omit endpoint (no TCP STUN equivalent).
3157                    //
3158                    // A wildcard *or* private bind is never advertised as-is
3159                    // — peers off-LAN can't reach a private bind, and there
3160                    // is no TCP STUN to discover a public reflexive.
3161                    if let Some(explicit) = cfg.external_advert_addr() {
3162                        endpoints.push(OverlayEndpointAdvert {
3163                            transport: OverlayTransportKind::Tcp,
3164                            addr: explicit.to_string(),
3165                        });
3166                    } else {
3167                        match handle.local_addr() {
3168                            Some(addr)
3169                                if !addr.ip().is_unspecified()
3170                                    && !is_unroutable_advert_ip(addr.ip()) =>
3171                            {
3172                                endpoints.push(OverlayEndpointAdvert {
3173                                    transport: OverlayTransportKind::Tcp,
3174                                    addr: addr.to_string(),
3175                                });
3176                            }
3177                            Some(addr) => {
3178                                warn!(
3179                                    bind_addr = %addr,
3180                                    "advert: tcp advertise_on_nostr=true bound to wildcard \
3181                                    or private IP and no transports.tcp.external_addr set; \
3182                                    advertising no TCP endpoint. Either set external_addr \
3183                                    to the public IP (recommended for cloud 1:1-NAT setups) \
3184                                    or bind explicitly to the public IP"
3185                                );
3186                            }
3187                            None => {}
3188                        }
3189                    }
3190                }
3191                "tor" => {
3192                    let Some(cfg) = self.lookup_tor_config(handle.name()) else {
3193                        continue;
3194                    };
3195                    if !cfg.advertise_on_nostr() {
3196                        continue;
3197                    }
3198                    if let Some(addr) = handle.onion_address() {
3199                        endpoints.push(OverlayEndpointAdvert {
3200                            transport: OverlayTransportKind::Tor,
3201                            addr: format!("{}:{}", addr, cfg.advertised_port()),
3202                        });
3203                    }
3204                }
3205                _ => {}
3206            }
3207        }
3208
3209        if endpoints.is_empty() {
3210            return None;
3211        }
3212
3213        Some(OverlayAdvert {
3214            identifier: ADVERT_IDENTIFIER.to_string(),
3215            version: ADVERT_VERSION,
3216            endpoints,
3217            signal_relays: (has_udp_nat || has_webrtc)
3218                .then(|| self.config.node.discovery.nostr.dm_relays.clone()),
3219            stun_servers: (has_udp_nat || has_webrtc)
3220                .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
3221        })
3222    }
3223
3224    async fn refresh_overlay_advert(
3225        &self,
3226        bootstrap: &std::sync::Arc<NostrDiscovery>,
3227    ) -> Result<(), crate::discovery::nostr::BootstrapError> {
3228        let advert = self.build_overlay_advert(bootstrap).await;
3229        bootstrap.update_local_advert(advert).await
3230    }
3231
3232    fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
3233        match (&self.config.transports.udp, transport_name) {
3234            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3235            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3236            _ => None,
3237        }
3238    }
3239
3240    fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
3241        match (&self.config.transports.tcp, transport_name) {
3242            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3243            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3244            _ => None,
3245        }
3246    }
3247
3248    fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
3249        match (&self.config.transports.tor, transport_name) {
3250            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3251            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3252            _ => None,
3253        }
3254    }
3255
3256    fn lookup_webrtc_config(
3257        &self,
3258        transport_name: Option<&str>,
3259    ) -> Option<&crate::config::WebRtcConfig> {
3260        match (&self.config.transports.webrtc, transport_name) {
3261            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3262            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3263            _ => None,
3264        }
3265    }
3266
3267    pub(in crate::node) async fn try_peer_addresses(
3268        &mut self,
3269        peer_config: &PeerConfig,
3270        peer_identity: PeerIdentity,
3271        allow_bootstrap_nat: bool,
3272    ) -> Result<(), NodeError> {
3273        let peer_node_addr = *peer_identity.node_addr();
3274        if self.peers.contains_key(&peer_node_addr) {
3275            debug!(
3276                npub = %peer_config.npub,
3277                "Peer already exists, skipping address attempts"
3278            );
3279            return Ok(());
3280        }
3281
3282        let candidates = self.peer_address_candidates(peer_config).await;
3283
3284        if candidates.is_empty() {
3285            if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3286                return Ok(());
3287            }
3288            return Err(NodeError::NoTransportForType(format!(
3289                "no addresses known for {}",
3290                peer_config.npub
3291            )));
3292        }
3293
3294        if self
3295            .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
3296            .await
3297            .is_ok()
3298        {
3299            return Ok(());
3300        }
3301
3302        Err(NodeError::NoTransportForType(format!(
3303            "no operational transport for any of {}'s addresses",
3304            peer_config.npub
3305        )))
3306    }
3307
3308    async fn try_active_peer_alternative_addresses(
3309        &mut self,
3310        peer_config: &PeerConfig,
3311        peer_identity: PeerIdentity,
3312    ) -> Result<bool, NodeError> {
3313        let peer_node_addr = *peer_identity.node_addr();
3314        let candidates = self.peer_address_candidates(peer_config).await;
3315
3316        if candidates.is_empty() {
3317            return Err(NodeError::NoTransportForType(format!(
3318                "no addresses known for {}",
3319                peer_config.npub
3320            )));
3321        }
3322
3323        let alternatives: Vec<_> = candidates
3324            .into_iter()
3325            .filter(|addr| !(addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat")))
3326            .filter(|addr| !self.active_peer_matches_candidate(&peer_node_addr, addr))
3327            .collect();
3328
3329        if alternatives.is_empty() {
3330            return Err(NodeError::NoTransportForType(format!(
3331                "no concrete alternate addresses known for {}",
3332                peer_config.npub
3333            )));
3334        }
3335
3336        self.attempt_peer_address_list(peer_config, peer_identity, false, &alternatives)
3337            .await?;
3338        Ok(true)
3339    }
3340
3341    async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
3342        // Merge every candidate from every source we have for this peer.
3343        // Explicitly configured addresses keep first shot, then freshly
3344        // fetched overlay adverts are appended as fallback candidates. This
3345        // lets native peers try known LAN/nvpn/static UDP routes before
3346        // slower WebRTC/Nostr-discovered paths, while still racing every
3347        // concrete candidate that fits in the per-peer budget.
3348        let static_addresses = self.static_peer_addresses(peer_config);
3349        let overlay_addresses = self
3350            .nostr_peer_fallback_addresses(peer_config, &static_addresses)
3351            .await;
3352
3353        let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
3354        for addr in overlay_addresses.into_iter().chain(static_addresses) {
3355            if !candidates.iter().any(|existing: &PeerAddress| {
3356                existing.transport == addr.transport && existing.addr == addr.addr
3357            }) {
3358                candidates.push(addr);
3359            }
3360        }
3361
3362        // Stable sort: explicit priority first, with recency only breaking
3363        // ties inside a source tier. `nostr_peer_fallback_addresses` assigns
3364        // overlay addresses priorities after the static max, so a fresh
3365        // overlay advert cannot leapfrog an operator-provided direct hint.
3366        candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
3367            _ if a.priority != b.priority => a.priority.cmp(&b.priority),
3368            (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
3369            (Some(_), None) => std::cmp::Ordering::Less,
3370            (None, Some(_)) => std::cmp::Ordering::Greater,
3371            (None, None) => std::cmp::Ordering::Equal,
3372        });
3373
3374        candidates
3375    }
3376
3377    fn active_peer_matches_any_candidate(
3378        &self,
3379        peer_node_addr: &NodeAddr,
3380        candidates: &[PeerAddress],
3381    ) -> bool {
3382        candidates
3383            .iter()
3384            .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
3385    }
3386
3387    pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
3388        &self,
3389        peer_node_addr: &NodeAddr,
3390        candidates: &[PeerAddress],
3391    ) -> bool {
3392        if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
3393            return false;
3394        }
3395        !self.active_peer_needs_same_path_refresh(peer_node_addr)
3396    }
3397
3398    fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
3399        let Some(peer) = self.peers.get(peer_node_addr) else {
3400            return false;
3401        };
3402        let stale_after_ms = self
3403            .config
3404            .node
3405            .heartbeat_interval_secs
3406            .saturating_mul(1000)
3407            .max(1000);
3408        peer.idle_time(Self::now_ms()) > stale_after_ms
3409    }
3410
3411    fn active_peer_matches_candidate(
3412        &self,
3413        peer_node_addr: &NodeAddr,
3414        candidate: &PeerAddress,
3415    ) -> bool {
3416        let Some(peer) = self.peers.get(peer_node_addr) else {
3417            return false;
3418        };
3419        let Some(current_addr) = peer.current_addr() else {
3420            return false;
3421        };
3422        if let Some(peer_transport_id) = peer.transport_id()
3423            && let Some((candidate_transport_id, candidate_addr)) =
3424                self.resolve_peer_address_for_match(candidate)
3425        {
3426            return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
3427        }
3428        if peer
3429            .transport_id()
3430            .map(|id| self.bootstrap_transports.contains(&id))
3431            .unwrap_or(false)
3432        {
3433            return false;
3434        }
3435        let current_addr = current_addr.to_string();
3436        let current_transport = peer
3437            .transport_id()
3438            .and_then(|id| self.transports.get(&id))
3439            .map(|transport| transport.transport_type().name);
3440
3441        candidate.addr == current_addr
3442            && current_transport
3443                .map(|transport| transport == candidate.transport)
3444                .unwrap_or(true)
3445    }
3446
3447    // === Control API methods ===
3448
3449    /// Connect to a peer via the control API.
3450    ///
3451    /// Creates an ephemeral peer connection (not persisted to config, no
3452    /// auto-reconnect). Reuses the same connection path as auto-connect
3453    /// peers. Returns JSON data on success or an error message.
3454    pub(crate) async fn api_connect(
3455        &mut self,
3456        npub: &str,
3457        address: &str,
3458        transport: &str,
3459    ) -> Result<serde_json::Value, String> {
3460        let peer_config = PeerConfig {
3461            npub: npub.to_string(),
3462            alias: None,
3463            addresses: vec![PeerAddress::new(transport, address)],
3464            connect_policy: ConnectPolicy::Manual,
3465            auto_reconnect: false,
3466            discovery_fallback_transit: true,
3467        };
3468
3469        // Pre-seed identity cache (same as initiate_peer_connections does)
3470        if let Ok(identity) = PeerIdentity::from_npub(npub) {
3471            self.peer_aliases
3472                .insert(*identity.node_addr(), identity.short_npub());
3473            self.register_identity(*identity.node_addr(), identity.pubkey_full());
3474        }
3475
3476        self.initiate_peer_connection(&peer_config)
3477            .await
3478            .map(|()| {
3479                info!(
3480                    npub = %npub,
3481                    address = %address,
3482                    transport = %transport,
3483                    "API connect initiated"
3484                );
3485                serde_json::json!({
3486                    "npub": npub,
3487                    "address": address,
3488                    "transport": transport,
3489                })
3490            })
3491            .map_err(|e| e.to_string())
3492    }
3493
3494    /// Disconnect a peer via the control API.
3495    ///
3496    /// Removes the peer and suppresses auto-reconnect.
3497    pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
3498        let peer_identity =
3499            PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
3500        let node_addr = *peer_identity.node_addr();
3501
3502        if !self.peers.contains_key(&node_addr) {
3503            return Err(format!("peer not found: {npub}"));
3504        }
3505
3506        // Remove the peer (full cleanup: sessions, indices, links, tree, bloom)
3507        self.remove_active_peer(&node_addr);
3508
3509        // Suppress any pending auto-reconnect
3510        self.retry_pending.remove(&node_addr);
3511
3512        info!(npub = %npub, "API disconnect completed");
3513
3514        Ok(serde_json::json!({
3515            "npub": npub,
3516            "disconnected": true,
3517        }))
3518    }
3519
3520    /// Adopt an already-established UDP traversal and start the normal FIPS
3521    /// Noise handshake over it.
3522    ///
3523    /// This is intended for integration with an external rendezvous runtime
3524    /// that has already completed relay signaling, STUN observation, and UDP
3525    /// hole punching. After handoff, the adopted socket is owned by FIPS.
3526    pub async fn adopt_established_traversal(
3527        &mut self,
3528        traversal: EstablishedTraversal,
3529    ) -> Result<BootstrapHandoffResult, NodeError> {
3530        debug!(
3531            peer_npub = %traversal.peer_npub,
3532            session_id = %traversal.session_id,
3533            remote_addr = %traversal.remote_addr,
3534            "adopting established traversal socket"
3535        );
3536
3537        if !self.state.is_operational() {
3538            return Err(NodeError::NotStarted);
3539        }
3540
3541        let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
3542        let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
3543            NodeError::InvalidPeerNpub {
3544                npub: traversal.peer_npub.clone(),
3545                reason: e.to_string(),
3546            }
3547        })?;
3548        let peer_node_addr = *peer_identity.node_addr();
3549        if self.peers.contains_key(&peer_node_addr) {
3550            debug!(
3551                peer_npub = %traversal.peer_npub,
3552                "Adopting NAT traversal handoff as alternate path for already-connected peer"
3553            );
3554        }
3555
3556        self.peer_aliases
3557            .insert(peer_node_addr, peer_identity.short_npub());
3558        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
3559
3560        let transport_id = self.allocate_transport_id();
3561        // Adopted ephemeral UDP transports inherit MTU + socket-buffer sizing
3562        // (and accept_connections / advertise flags) from the operator's
3563        // configured [transports.udp] when the bootstrap runtime doesn't
3564        // pass an explicit override. Lookup tries `transport_name` first
3565        // (covers the `Named` multi-listener variant) and falls back to the
3566        // unnamed `Single` listener, so single- and named-listener configs
3567        // both inherit cleanly.
3568        //
3569        // Tradeoff: `UdpConfig::default()` sets MTU 1280 (IPv6 minimum), the
3570        // only value guaranteed to survive arbitrary middlebox paths.
3571        // Inheriting a higher operator-chosen MTU means NAT-traversed flows
3572        // initially attempt that MTU and may black-hole on tighter paths
3573        // until reactive `MtuExceeded` recovery kicks in. Operators who
3574        // raise the primary MTU based on known-clean topology accept that
3575        // tradeoff; the silent drop on a too-low default was strictly
3576        // worse for the common case where the primary MTU is reachable.
3577        //
3578        // Bind / external address fields are cleared since the socket is
3579        // already bound.
3580        let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
3581            let mut cfg = self
3582                .lookup_udp_config(traversal.transport_name.as_deref())
3583                .or_else(|| self.lookup_udp_config(None))
3584                .cloned()
3585                .unwrap_or_default();
3586            cfg.bind_addr = None;
3587            cfg.external_addr = None;
3588            cfg
3589        });
3590        let mut transport = crate::transport::udp::UdpTransport::new(
3591            transport_id,
3592            traversal.transport_name.clone(),
3593            inherited_config,
3594            packet_tx,
3595        );
3596
3597        transport
3598            .adopt_socket_async(traversal.socket)
3599            .await
3600            .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
3601
3602        let local_addr = transport.local_addr().ok_or_else(|| {
3603            NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
3604        })?;
3605
3606        self.transports.insert(
3607            transport_id,
3608            crate::transport::TransportHandle::Udp(transport),
3609        );
3610        self.bootstrap_transports.insert(transport_id);
3611        self.bootstrap_transport_npubs
3612            .insert(transport_id, traversal.peer_npub.clone());
3613
3614        let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
3615        if let Err(err) = self
3616            .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
3617            .await
3618        {
3619            self.bootstrap_transports.remove(&transport_id);
3620            self.bootstrap_transport_npubs.remove(&transport_id);
3621            if let Some(mut handle) = self.transports.remove(&transport_id) {
3622                let _ = handle.stop().await;
3623            }
3624            return Err(err);
3625        }
3626
3627        info!(
3628            peer = %self.peer_display_name(&peer_node_addr),
3629            transport_id = %transport_id,
3630            local_addr = %local_addr,
3631            remote_addr = %traversal.remote_addr,
3632            session_id = %traversal.session_id,
3633            "adopted NAT traversal socket; handshake initiated"
3634        );
3635
3636        Ok(BootstrapHandoffResult {
3637            transport_id,
3638            local_addr,
3639            remote_addr: traversal.remote_addr,
3640            peer_node_addr,
3641            session_id: traversal.session_id,
3642        })
3643    }
3644}