Skip to main content

fips_core/node/
lifecycle.rs

1//! Node lifecycle management: start, stop, and peer connection initiation.
2
3use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, NostrDiscoveryPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6    ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, NostrDiscovery, OverlayAdvert,
7    OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use std::collections::{HashMap, HashSet};
18use std::net::{IpAddr, SocketAddr};
19use std::thread;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23#[cfg(debug_assertions)]
24fn node_start_debug_log(message: impl AsRef<str>) {
25    use std::io::Write as _;
26
27    if let Ok(mut file) = std::fs::OpenOptions::new()
28        .create(true)
29        .append(true)
30        .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
31    {
32        let _ = writeln!(
33            file,
34            "{:?} {}",
35            std::time::SystemTime::now(),
36            message.as_ref()
37        );
38    }
39}
40
41#[cfg(not(debug_assertions))]
42fn node_start_debug_log(_message: impl AsRef<str>) {}
43
44/// True if `ip` is not a viable canonical advert endpoint for peers off
45/// the publisher's own LAN. Covers RFC1918, loopback, link-local, IPv4
46/// CGNAT (100.64/10), unspecified, multicast/benchmark, and IPv6
47/// unique-local/loopback/unspecified. We never publish these as the
48/// peer's primary `runtime_endpoint`; an off-LAN consumer can't route
49/// to them, and latching one in onto a slow overlay-relay fallback is
50/// the original bug this guard exists to prevent.
51fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
52    match ip {
53        IpAddr::V4(v4) => {
54            v4.is_private()
55                || v4.is_loopback()
56                || v4.is_link_local()
57                || v4.is_unspecified()
58                || v4.is_multicast()
59                || v4.is_broadcast()
60                || v4.is_documentation()
61                // 100.64.0.0/10 — CGNAT, RFC 6598. Not routable on the
62                // public internet; behaves like an extra NAT layer.
63                || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
64        }
65        IpAddr::V6(v6) => {
66            v6.is_loopback()
67                || v6.is_unspecified()
68                || v6.is_unique_local()
69                || v6.is_multicast()
70                // IPv6 link-local: fe80::/10
71                || (v6.segments()[0] & 0xffc0) == 0xfe80
72        }
73    }
74}
75
76fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
77    matches!(
78        (local, remote),
79        (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
80    )
81}
82
83const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
84const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
85const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
86const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
87
88impl Node {
89    /// Initiate connections to configured static peers.
90    ///
91    /// For each peer configured with AutoConnect policy, creates a link and
92    /// peer entry, then starts the Noise handshake by sending the first message.
93    /// Replace the runtime peer list. Newly added auto-connect peers get
94    /// `initiate_peer_connection` immediately; removed peers are dropped from
95    /// the retry queue (the regular liveness timeout reaps any active session
96    /// — we don't proactively disconnect, since the same npub might be on its
97    /// way back via a fresh advert). Existing auto-connect entries with new
98    /// direct addresses are dialed immediately, and retry state is refreshed
99    /// so later attempts also use the latest hints.
100    pub(super) async fn update_peers(
101        &mut self,
102        new_peers: Vec<crate::config::PeerConfig>,
103    ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
104        use std::collections::{HashMap, HashSet};
105
106        let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
107            HashMap::with_capacity(new_peers.len());
108        let mut new_order = Vec::with_capacity(new_peers.len());
109        for peer in new_peers {
110            let identity = match PeerIdentity::from_npub(&peer.npub) {
111                Ok(id) => id,
112                Err(e) => {
113                    return Err(crate::node::NodeError::InvalidPeerNpub {
114                        npub: peer.npub.clone(),
115                        reason: e.to_string(),
116                    });
117                }
118            };
119            // Last-write-wins on duplicates so callers passing a multi-source
120            // candidate list (e.g. operator hints + recent-peers cache for
121            // the same npub) get the merge they meant.
122            let node_addr = *identity.node_addr();
123            if !new_by_addr.contains_key(&node_addr) {
124                new_order.push(node_addr);
125            }
126            new_by_addr.insert(node_addr, peer);
127        }
128
129        let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
130            .config
131            .peers()
132            .iter()
133            .filter_map(|pc| {
134                PeerIdentity::from_npub(&pc.npub)
135                    .ok()
136                    .map(|id| (*id.node_addr(), pc.clone()))
137            })
138            .collect();
139
140        let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
141        let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
142
143        let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
144        let added: Vec<_> = new_addrs.difference(&current_addrs).copied().collect();
145        let kept: Vec<_> = new_addrs.intersection(&current_addrs).copied().collect();
146
147        let mut outcome = crate::node::UpdatePeersOutcome::default();
148
149        for node_addr in &removed {
150            if self.retry_pending.remove(node_addr).is_some() {
151                debug!(
152                    peer = %self.peer_display_name(node_addr),
153                    "Dropping retry entry for peer removed from runtime peer list"
154                );
155            }
156            self.peer_aliases.remove(node_addr);
157            self.set_discovery_fallback_transit_allowed(*node_addr, false);
158            outcome.removed += 1;
159        }
160
161        let mut auto_connect_refresh_configs = Vec::new();
162        for node_addr in &kept {
163            let new_pc = &new_by_addr[node_addr];
164            let current_pc = &current_by_addr[node_addr];
165            if new_pc.addresses != current_pc.addresses
166                || new_pc.alias != current_pc.alias
167                || new_pc.connect_policy != current_pc.connect_policy
168                || new_pc.auto_reconnect != current_pc.auto_reconnect
169                || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
170            {
171                outcome.updated += 1;
172                self.set_discovery_fallback_transit_allowed(
173                    *node_addr,
174                    new_pc.discovery_fallback_transit,
175                );
176                if let Some(state) = self.retry_pending.get_mut(node_addr) {
177                    state.peer_config = new_pc.clone();
178                    state.retry_after_ms = Self::now_ms();
179                }
180                if let Some(alias) = new_pc.alias.clone() {
181                    self.peer_aliases.insert(*node_addr, alias);
182                }
183                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
184                    auto_connect_refresh_configs.push(new_pc.clone());
185                }
186            } else {
187                outcome.unchanged += 1;
188                self.set_discovery_fallback_transit_allowed(
189                    *node_addr,
190                    new_pc.discovery_fallback_transit,
191                );
192                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
193                    auto_connect_refresh_configs.push(new_pc.clone());
194                }
195            }
196        }
197
198        let added_configs: Vec<crate::config::PeerConfig> = new_order
199            .iter()
200            .filter(|addr| added.contains(addr))
201            .map(|addr| new_by_addr[addr].clone())
202            .collect();
203
204        // Replace the live config peer list before initiating connections so
205        // any helper that consults `self.config.peers()` during the dial
206        // (alias lookup, retry-state seeding) sees the new entries.
207        self.config.peers = new_order
208            .iter()
209            .filter_map(|addr| new_by_addr.get(addr).cloned())
210            .collect();
211        self.configured_peer_send_weights = Self::configured_peer_send_weights(&self.config);
212
213        for peer_config in added_configs {
214            outcome.added += 1;
215            let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
216                continue;
217            };
218            let name = peer_config
219                .alias
220                .clone()
221                .unwrap_or_else(|| identity.short_npub());
222            self.peer_aliases.insert(*identity.node_addr(), name);
223            self.set_discovery_fallback_transit_allowed(
224                *identity.node_addr(),
225                peer_config.discovery_fallback_transit,
226            );
227            self.register_identity(*identity.node_addr(), identity.pubkey_full());
228
229            if !peer_config.is_auto_connect() {
230                continue;
231            }
232
233            match self
234                .try_auto_connect_graph_session(&peer_config, identity)
235                .await
236            {
237                Ok(true) => continue,
238                Ok(false) => {}
239                Err(err) => {
240                    debug!(
241                        npub = %peer_config.npub,
242                        error = %err,
243                        "Existing FIPS graph did not warm newly added peer"
244                    );
245                }
246            }
247
248            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
249                warn!(
250                    npub = %peer_config.npub,
251                    error = %e,
252                    "Failed to initiate connection for newly added peer"
253                );
254                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
255                    self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
256                }
257                if matches!(e, crate::node::NodeError::NoTransportForType(_))
258                    && let Some(bootstrap) = self.nostr_discovery.clone()
259                {
260                    bootstrap
261                        .request_advert_stale_check(peer_config.npub.clone())
262                        .await;
263                }
264            }
265        }
266
267        for peer_config in auto_connect_refresh_configs {
268            let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
269                continue;
270            };
271            let node_addr = *peer_identity.node_addr();
272
273            if self.peers.contains_key(&node_addr) {
274                match self
275                    .initiate_active_peer_alternative_connection(&peer_config)
276                    .await
277                {
278                    Ok(attempted) => {
279                        if attempted {
280                            debug!(
281                                peer = %self.peer_display_name(&node_addr),
282                                "Started non-disruptive alternate-path handshake for active peer"
283                            );
284                        }
285                    }
286                    Err(e) => {
287                        debug!(
288                            npub = %peer_config.npub,
289                            error = %e,
290                            "Active peer alternate-path refresh did not start"
291                        );
292                    }
293                }
294                continue;
295            }
296
297            match self
298                .try_auto_connect_graph_session(&peer_config, peer_identity)
299                .await
300            {
301                Ok(true) => continue,
302                Ok(false) => {}
303                Err(err) => {
304                    debug!(
305                        npub = %peer_config.npub,
306                        error = %err,
307                        "Existing FIPS graph did not warm refreshed peer"
308                    );
309                }
310            }
311
312            match self.initiate_peer_connection(&peer_config).await {
313                Ok(()) => {
314                    let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
315                    if let Some(state) = self.retry_pending.get_mut(&node_addr) {
316                        state.peer_config = peer_config;
317                        state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
318                    }
319                }
320                Err(e) => {
321                    debug!(
322                        npub = %peer_config.npub,
323                        error = %e,
324                        "Refreshed peer addresses did not initiate a direct connection"
325                    );
326                    self.schedule_retry(node_addr, Self::now_ms());
327                }
328            }
329        }
330
331        self.warm_auto_connect_graph_sessions().await;
332
333        Ok(outcome)
334    }
335
336    pub(super) async fn initiate_peer_connections(&mut self) {
337        // Build display name map from all configured peers (alias or short npub),
338        // and pre-seed the identity cache from each peer's npub so that TUN packets
339        // addressed to a configured peer can be dispatched (and trigger session
340        // initiation) immediately on startup — without waiting for the link-layer
341        // handshake to complete first.
342        let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
343            .config
344            .peers()
345            .iter()
346            .filter_map(|pc| {
347                PeerIdentity::from_npub(&pc.npub)
348                    .ok()
349                    .map(|id| (id, pc.alias.clone()))
350            })
351            .collect();
352
353        for (identity, alias) in peer_identities {
354            let name = alias.unwrap_or_else(|| identity.short_npub());
355            self.peer_aliases.insert(*identity.node_addr(), name);
356            // Pre-seed identity cache. The parity may be wrong (npub is x-only)
357            // but will be corrected to the real value when the peer is promoted
358            // after a successful Noise handshake.
359            self.register_identity(*identity.node_addr(), identity.pubkey_full());
360        }
361
362        // Collect peer configs to avoid borrow conflicts
363        let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
364
365        if peer_configs.is_empty() {
366            debug!("No static peers configured");
367            return;
368        }
369
370        debug!(
371            count = peer_configs.len(),
372            "Initiating static peer connections"
373        );
374
375        for peer_config in peer_configs {
376            let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
377                Ok(identity) => identity,
378                Err(_) => continue,
379            };
380            match self
381                .try_auto_connect_graph_session(&peer_config, peer_identity)
382                .await
383            {
384                Ok(true) => continue,
385                Ok(false) => {}
386                Err(err) => {
387                    debug!(
388                        npub = %peer_config.npub,
389                        error = %err,
390                        "Existing FIPS graph did not warm auto-connect peer"
391                    );
392                }
393            }
394            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
395                warn!(
396                    npub = %peer_config.npub,
397                    alias = ?peer_config.alias,
398                    error = %e,
399                    "Failed to initiate peer connection"
400                );
401                // Schedule a retry so transient address-resolution failures
402                // (e.g. cached endpoints stale, NAT rebinds, all addresses
403                // currently unreachable) recover without a daemon restart.
404                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
405                    self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
406                }
407                // No-transport failures most often mean the cached overlay
408                // advert is pointing at a dead post-NAT-rebind address. The
409                // advert cache is read-only inside fetch_advert, so retries
410                // would loop on the same dead address until expiry. Force a
411                // re-fetch so the next retry tick picks up fresh endpoints.
412                if matches!(e, crate::node::NodeError::NoTransportForType(_))
413                    && let Some(bootstrap) = self.nostr_discovery.clone()
414                {
415                    bootstrap
416                        .request_advert_stale_check(peer_config.npub.clone())
417                        .await;
418                }
419            }
420        }
421
422        self.warm_auto_connect_graph_sessions().await;
423    }
424
425    pub(in crate::node) async fn try_auto_connect_graph_session(
426        &mut self,
427        peer_config: &PeerConfig,
428        peer_identity: PeerIdentity,
429    ) -> Result<bool, NodeError> {
430        if !peer_config.is_auto_connect() {
431            return Ok(false);
432        }
433
434        let peer_node_addr = *peer_identity.node_addr();
435        if self.peers.contains_key(&peer_node_addr) {
436            return Ok(false);
437        }
438        if self
439            .sessions
440            .get(&peer_node_addr)
441            .is_some_and(|entry| entry.is_established() || entry.is_initiating())
442        {
443            return Ok(true);
444        }
445        if self.find_next_hop(&peer_node_addr).is_none() {
446            return Ok(false);
447        }
448
449        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
450        match self
451            .initiate_session(peer_node_addr, peer_identity.pubkey_full())
452            .await
453        {
454            Ok(()) => {
455                debug!(
456                    peer = %self.peer_display_name(&peer_node_addr),
457                    "Warmed auto-connect peer session over existing FIPS graph"
458                );
459                Ok(true)
460            }
461            Err(NodeError::SendFailed { node_addr, reason })
462                if node_addr == peer_node_addr && reason == "no route to destination" =>
463            {
464                self.maybe_initiate_lookup(&peer_node_addr).await;
465                Ok(false)
466            }
467            Err(err) => Err(err),
468        }
469    }
470
471    /// Initiate a connection to a single peer.
472    ///
473    /// Creates a link, starts the Noise handshake, and sends the first message.
474    pub(super) async fn initiate_peer_connection(
475        &mut self,
476        peer_config: &crate::config::PeerConfig,
477    ) -> Result<(), NodeError> {
478        self.initiate_peer_connection_inner(peer_config).await
479    }
480
481    /// Initiate a connection from the retry path. Identical to
482    /// [`initiate_peer_connection`] today — both paths fan out across every
483    /// known address (overlay-fresh first, then operator/cache hints) in a
484    /// single pass. The two entry points stay separate so callers can be
485    /// distinguished in tracing.
486    pub(super) async fn initiate_peer_retry_connection(
487        &mut self,
488        peer_config: &crate::config::PeerConfig,
489    ) -> Result<(), NodeError> {
490        self.initiate_peer_connection_inner(peer_config).await
491    }
492
493    async fn initiate_active_peer_alternative_connection(
494        &mut self,
495        peer_config: &crate::config::PeerConfig,
496    ) -> Result<bool, NodeError> {
497        let peer_identity =
498            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
499                npub: peer_config.npub.clone(),
500                reason: e.to_string(),
501            })?;
502        let peer_node_addr = *peer_identity.node_addr();
503
504        if !self.peers.contains_key(&peer_node_addr) {
505            self.initiate_peer_connection(peer_config).await?;
506            return Ok(true);
507        }
508
509        // Keep the current link live and race fresh concrete candidates.
510        // Cross-connection resolution still decides which replacement link
511        // wins if both peers try the same upgrade; the important part here is
512        // that a stale path does not depend on the remote peer receiving our
513        // hint first before either side attempts the better address.
514        self.try_active_peer_alternative_addresses(peer_config, peer_identity)
515            .await
516    }
517
518    async fn initiate_peer_connection_inner(
519        &mut self,
520        peer_config: &crate::config::PeerConfig,
521    ) -> Result<(), NodeError> {
522        // Parse the peer's npub to get their identity
523        let peer_identity =
524            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
525                npub: peer_config.npub.clone(),
526                reason: e.to_string(),
527            })?;
528
529        let peer_node_addr = *peer_identity.node_addr();
530
531        // Check if peer already exists (fully authenticated)
532        if self.peers.contains_key(&peer_node_addr) {
533            debug!(
534                npub = %peer_config.npub,
535                "Peer already exists, skipping"
536            );
537            return Ok(());
538        }
539
540        self.try_peer_addresses(peer_config, peer_identity, true)
541            .await
542    }
543
544    fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
545        self.connections.values().any(|conn| {
546            conn.expected_identity()
547                .map(|id| id.node_addr() == peer_node_addr)
548                .unwrap_or(false)
549        })
550    }
551
552    fn is_connecting_to_peer_on_path(
553        &self,
554        peer_node_addr: &NodeAddr,
555        transport_id: TransportId,
556        remote_addr: &TransportAddr,
557    ) -> bool {
558        self.connections.values().any(|conn| {
559            conn.expected_identity()
560                .map(|id| id.node_addr() == peer_node_addr)
561                .unwrap_or(false)
562                && conn.transport_id() == Some(transport_id)
563                && conn.source_addr() == Some(remote_addr)
564        }) || self.pending_connects.iter().any(|pending| {
565            pending.peer_identity.node_addr() == peer_node_addr
566                && pending.transport_id == transport_id
567                && &pending.remote_addr == remote_addr
568        })
569    }
570
571    pub(in crate::node) fn should_warm_auto_connect_session(
572        &self,
573        peer_node_addr: &NodeAddr,
574    ) -> bool {
575        if self.peers.contains_key(peer_node_addr)
576            || self
577                .sessions
578                .get(peer_node_addr)
579                .is_some_and(|entry| entry.is_established())
580        {
581            return false;
582        }
583
584        self.config.peers().iter().any(|peer| {
585            peer.is_auto_connect()
586                && PeerIdentity::from_npub(&peer.npub)
587                    .map(|identity| identity.node_addr() == peer_node_addr)
588                    .unwrap_or(false)
589        })
590    }
591
592    pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
593        if !self.peers.values().any(|peer| peer.can_send()) {
594            return 0;
595        }
596
597        let mut budget = self.graph_session_warmup_budget();
598        if budget == 0 {
599            return 0;
600        }
601
602        let peer_identities: Vec<_> = self
603            .config
604            .auto_connect_peers()
605            .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
606            .collect();
607
608        let mut warmed = 0;
609        for identity in peer_identities {
610            if budget == 0 {
611                break;
612            }
613
614            let peer_node_addr = *identity.node_addr();
615            if peer_node_addr == *self.identity.node_addr()
616                || !self.should_warm_auto_connect_session(&peer_node_addr)
617                || self
618                    .sessions
619                    .get(&peer_node_addr)
620                    .is_some_and(|entry| entry.is_initiating())
621            {
622                continue;
623            }
624
625            self.register_identity(peer_node_addr, identity.pubkey_full());
626
627            if self.find_next_hop(&peer_node_addr).is_some() {
628                match self
629                    .initiate_session(peer_node_addr, identity.pubkey_full())
630                    .await
631                {
632                    Ok(()) => {
633                        warmed += 1;
634                        budget = budget.saturating_sub(1);
635                        debug!(
636                            peer = %self.peer_display_name(&peer_node_addr),
637                            "Warmed auto-connect peer session over existing FIPS graph"
638                        );
639                    }
640                    Err(NodeError::SendFailed { node_addr, reason })
641                        if node_addr == peer_node_addr && reason == "no route to destination" =>
642                    {
643                        self.maybe_initiate_lookup(&peer_node_addr).await;
644                        warmed += 1;
645                        budget = budget.saturating_sub(1);
646                    }
647                    Err(err) => {
648                        debug!(
649                            peer = %self.peer_display_name(&peer_node_addr),
650                            error = %err,
651                            "Failed to warm auto-connect peer session"
652                        );
653                    }
654                }
655            } else {
656                self.maybe_initiate_lookup(&peer_node_addr).await;
657                warmed += 1;
658                budget = budget.saturating_sub(1);
659            }
660        }
661
662        warmed
663    }
664
665    pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
666        let max_destinations = self.config.node.session.pending_max_destinations;
667        if max_destinations == 0 {
668            return 0;
669        }
670
671        let pending_sessions = self
672            .sessions
673            .values()
674            .filter(|entry| !entry.is_established())
675            .count();
676        let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
677        max_destinations
678            .saturating_sub(pending_total)
679            .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
680    }
681
682    fn outbound_handshake_slots(&self) -> usize {
683        let used = self
684            .connections
685            .len()
686            .saturating_add(self.pending_connects.len());
687        if self.max_connections == 0 {
688            usize::MAX
689        } else {
690            self.max_connections.saturating_sub(used)
691        }
692    }
693
694    fn outbound_link_slots(&self) -> usize {
695        if self.max_links == 0 {
696            usize::MAX
697        } else {
698            self.max_links.saturating_sub(self.links.len())
699        }
700    }
701
702    fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
703        if !self.peers.contains_key(peer_node_addr)
704            && self.max_peers > 0
705            && self.peers.len() >= self.max_peers
706        {
707            return 0;
708        }
709
710        let in_flight_for_peer = self
711            .connections
712            .values()
713            .filter(|conn| {
714                conn.expected_identity()
715                    .map(|id| id.node_addr() == peer_node_addr)
716                    .unwrap_or(false)
717            })
718            .count()
719            .saturating_add(
720                self.pending_connects
721                    .iter()
722                    .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
723                    .count(),
724            );
725
726        self.outbound_handshake_slots()
727            .min(self.outbound_link_slots())
728            .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
729    }
730
731    fn discovery_connect_budget(&self) -> usize {
732        self.outbound_handshake_slots()
733            .min(self.outbound_link_slots())
734            .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
735    }
736
737    /// Find a UDP transport whose bound socket can send to `remote_addr`.
738    ///
739    /// LAN discovery can surface both IPv4 and IPv6 addresses for the same
740    /// service. A wildcard IPv4 socket cannot send to an IPv6 link-local
741    /// target, and vice versa, so callers must choose by socket family rather
742    /// than by transport type alone.
743    fn find_udp_transport_for_remote_addr(
744        &self,
745        remote_addr: SocketAddr,
746    ) -> Option<(TransportId, SocketAddr)> {
747        self.transports
748            .iter()
749            .filter(|(id, handle)| {
750                handle.transport_type().name == "udp"
751                    && handle.is_operational()
752                    && !self.bootstrap_transports.contains(id)
753            })
754            .filter_map(|(id, handle)| {
755                let local_addr = handle.local_addr()?;
756                socket_addr_families_compatible(local_addr, remote_addr)
757                    .then_some((*id, local_addr))
758            })
759            .min_by_key(|(id, _)| id.as_u32())
760    }
761
762    pub(super) fn transport_discovery_candidate(
763        &self,
764        discovered_transport_id: TransportId,
765        discovered_addr: TransportAddr,
766    ) -> Option<(TransportId, TransportAddr, &'static str)> {
767        let transport = self.transports.get(&discovered_transport_id)?;
768        let transport_name = transport.transport_type().name;
769
770        if transport_name != "udp" {
771            return Some((discovered_transport_id, discovered_addr, transport_name));
772        }
773
774        let Some(remote_socket_addr) = discovered_addr
775            .as_str()
776            .and_then(|addr| addr.parse::<SocketAddr>().ok())
777        else {
778            if self.bootstrap_transports.contains(&discovered_transport_id) {
779                debug!(
780                    transport_id = %discovered_transport_id,
781                    remote_addr = %discovered_addr,
782                    "transport discovery: skip non-numeric UDP address from bootstrap transport"
783                );
784                return None;
785            }
786            return Some((discovered_transport_id, discovered_addr, transport_name));
787        };
788
789        let Some((transport_id, local_addr)) =
790            self.find_udp_transport_for_remote_addr(remote_socket_addr)
791        else {
792            debug!(
793                transport_id = %discovered_transport_id,
794                remote_addr = %discovered_addr,
795                "transport discovery: skip UDP peer with no compatible local socket"
796            );
797            return None;
798        };
799
800        if transport_id != discovered_transport_id {
801            debug!(
802                discovered_transport_id = %discovered_transport_id,
803                selected_transport_id = %transport_id,
804                local_addr = %local_addr,
805                remote_addr = %remote_socket_addr,
806                "transport discovery: selected compatible UDP transport"
807            );
808        }
809
810        Some((
811            transport_id,
812            TransportAddr::from_socket_addr(remote_socket_addr),
813            transport_name,
814        ))
815    }
816
817    fn peer_address_string_for_transport_candidate(
818        &self,
819        transport_id: TransportId,
820        transport_name: &str,
821        remote_addr: &TransportAddr,
822    ) -> String {
823        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
824        let _ = (transport_id, transport_name);
825
826        #[cfg(any(target_os = "linux", target_os = "macos"))]
827        if transport_name == "ethernet"
828            && remote_addr.as_bytes().len() == 6
829            && let Some(interface) = self
830                .transports
831                .get(&transport_id)
832                .and_then(|transport| transport.interface_name())
833        {
834            let mut mac = [0u8; 6];
835            mac.copy_from_slice(remote_addr.as_bytes());
836            return format!(
837                "{interface}/{}",
838                crate::transport::ethernet::format_mac(&mac)
839            );
840        }
841
842        remote_addr.to_string()
843    }
844
845    fn resolve_peer_address_for_match(
846        &self,
847        candidate: &PeerAddress,
848    ) -> Option<(TransportId, TransportAddr)> {
849        if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
850            return None;
851        }
852
853        if candidate.transport == "ethernet" {
854            return self.resolve_ethernet_addr(&candidate.addr).ok();
855        }
856
857        if candidate.transport == "ble" {
858            #[cfg(bluer_available)]
859            {
860                return self.resolve_ble_addr(&candidate.addr).ok();
861            }
862            #[cfg(not(bluer_available))]
863            {
864                return None;
865            }
866        }
867
868        let transport_id = if candidate.transport == "udp"
869            && let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
870        {
871            self.find_udp_transport_for_remote_addr(remote_socket_addr)
872                .map(|(id, _)| id)?
873        } else {
874            self.find_transport_for_type(&candidate.transport)?
875        };
876
877        Some((transport_id, TransportAddr::from_string(&candidate.addr)))
878    }
879
880    /// Initiate a connection to a peer on a specific transport and address.
881    ///
882    /// For connectionless transports (UDP, Ethernet): allocates a link, starts
883    /// the Noise IK handshake, sends msg1, and registers the connection for
884    /// msg2 dispatch.
885    ///
886    /// For connection-oriented transports (TCP, Tor): allocates a link and
887    /// starts a non-blocking transport connect. The handshake is deferred
888    /// until the transport connection is established — the tick handler
889    /// polls `connection_state()` and initiates the handshake when ready.
890    pub(super) async fn initiate_connection(
891        &mut self,
892        transport_id: TransportId,
893        remote_addr: TransportAddr,
894        peer_identity: PeerIdentity,
895    ) -> Result<(), NodeError> {
896        let peer_node_addr = *peer_identity.node_addr();
897
898        if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
899            debug!(
900                peer = %self.peer_display_name(&peer_node_addr),
901                transport_id = %transport_id,
902                remote_addr = %remote_addr,
903                "Connection already in progress for candidate path"
904            );
905            return Ok(());
906        }
907
908        if self.outbound_handshake_slots() == 0 {
909            return Err(NodeError::MaxConnectionsExceeded {
910                max: self.max_connections,
911            });
912        }
913
914        if self.outbound_link_slots() == 0 {
915            return Err(NodeError::MaxLinksExceeded {
916                max: self.max_links,
917            });
918        }
919
920        if !self.peers.contains_key(&peer_node_addr)
921            && self.max_peers > 0
922            && self.peers.len() >= self.max_peers
923        {
924            return Err(NodeError::MaxPeersExceeded {
925                max: self.max_peers,
926            });
927        }
928
929        self.authorize_peer(
930            &peer_identity,
931            PeerAclContext::OutboundConnect,
932            transport_id,
933            &remote_addr,
934        )?;
935
936        let is_connection_oriented = self
937            .transports
938            .get(&transport_id)
939            .map(|t| t.transport_type().connection_oriented)
940            .unwrap_or(false);
941
942        // Allocate link ID and create link
943        let link_id = self.allocate_link_id();
944
945        let link = if is_connection_oriented {
946            Link::new(
947                link_id,
948                transport_id,
949                remote_addr.clone(),
950                LinkDirection::Outbound,
951                Duration::from_millis(self.config.node.base_rtt_ms),
952            )
953        } else {
954            Link::connectionless(
955                link_id,
956                transport_id,
957                remote_addr.clone(),
958                LinkDirection::Outbound,
959                Duration::from_millis(self.config.node.base_rtt_ms),
960            )
961        };
962
963        self.links.insert(link_id, link);
964
965        // Add reverse lookup for packet dispatch
966        self.addr_to_link
967            .insert((transport_id, remote_addr.clone()), link_id);
968
969        if is_connection_oriented {
970            // Connection-oriented: start non-blocking connect, defer handshake
971            if let Some(transport) = self.transports.get(&transport_id) {
972                match transport.connect(&remote_addr).await {
973                    Ok(()) => {
974                        debug!(
975                            peer = %self.peer_display_name(&peer_node_addr),
976                            transport_id = %transport_id,
977                            remote_addr = %remote_addr,
978                            link_id = %link_id,
979                            "Transport connect initiated (non-blocking)"
980                        );
981                        self.pending_connects.push(super::PendingConnect {
982                            link_id,
983                            transport_id,
984                            remote_addr,
985                            peer_identity,
986                        });
987                    }
988                    Err(e) => {
989                        // Clean up link
990                        self.links.remove(&link_id);
991                        self.addr_to_link.remove(&(transport_id, remote_addr));
992                        return Err(NodeError::TransportError(e.to_string()));
993                    }
994                }
995            }
996            Ok(())
997        } else {
998            // Connectionless: proceed with immediate handshake
999            self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
1000                .await
1001        }
1002    }
1003
1004    /// Start the Noise handshake on a link and send msg1.
1005    ///
1006    /// Called immediately for connectionless transports, or after the
1007    /// transport connection is established for connection-oriented transports.
1008    pub(super) async fn start_handshake(
1009        &mut self,
1010        link_id: LinkId,
1011        transport_id: TransportId,
1012        remote_addr: TransportAddr,
1013        peer_identity: PeerIdentity,
1014    ) -> Result<(), NodeError> {
1015        let peer_node_addr = *peer_identity.node_addr();
1016
1017        // Create connection in handshake phase (outbound knows expected identity)
1018        let current_time_ms = Self::now_ms();
1019        let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
1020
1021        // Allocate a session index for this handshake
1022        let our_index = match self.index_allocator.allocate() {
1023            Ok(idx) => idx,
1024            Err(e) => {
1025                // Clean up the link we just created
1026                self.links.remove(&link_id);
1027                self.addr_to_link.remove(&(transport_id, remote_addr));
1028                return Err(NodeError::IndexAllocationFailed(e.to_string()));
1029            }
1030        };
1031
1032        // Start the Noise handshake and get message 1
1033        let our_keypair = self.identity.keypair();
1034        let noise_msg1 =
1035            match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
1036                Ok(msg) => msg,
1037                Err(e) => {
1038                    // Clean up the index and link
1039                    let _ = self.index_allocator.free(our_index);
1040                    self.links.remove(&link_id);
1041                    self.addr_to_link.remove(&(transport_id, remote_addr));
1042                    return Err(NodeError::HandshakeFailed(e.to_string()));
1043                }
1044            };
1045
1046        // Set index and transport info on the connection
1047        connection.set_our_index(our_index);
1048        connection.set_transport_id(transport_id);
1049        connection.set_source_addr(remote_addr.clone());
1050
1051        // Build wire format msg1: [0x01][sender_idx:4 LE][noise_msg1:82]
1052        let wire_msg1 = build_msg1(our_index, &noise_msg1);
1053
1054        debug!(
1055            peer = %self.peer_display_name(&peer_node_addr),
1056            transport_id = %transport_id,
1057            remote_addr = %remote_addr,
1058            link_id = %link_id,
1059            our_index = %our_index,
1060            "Connection initiated"
1061        );
1062
1063        // Store msg1 for resend and schedule first resend
1064        let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
1065        connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
1066
1067        // Track in pending_outbound for msg2 dispatch
1068        self.pending_outbound
1069            .insert((transport_id, our_index.as_u32()), link_id);
1070        self.connections.insert(link_id, connection);
1071
1072        // Send the wire format handshake message. If the very first send fails
1073        // synchronously (for example an IPv6 candidate on an IPv4-only UDP
1074        // socket), undo this candidate so the caller can try the next address
1075        // in the same dial pass.
1076        let send_result = match self.transports.get(&transport_id) {
1077            Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
1078            None => None,
1079        };
1080        match send_result {
1081            Some(send_result) => {
1082                self.note_local_send_outcome(&send_result);
1083                match send_result {
1084                    Ok(bytes) => {
1085                        debug!(
1086                            link_id = %link_id,
1087                            our_index = %our_index,
1088                            bytes,
1089                            "Sent Noise handshake message 1 (wire format)"
1090                        );
1091                    }
1092                    Err(e) => {
1093                        warn!(
1094                            link_id = %link_id,
1095                            transport_id = %transport_id,
1096                            remote_addr = %remote_addr,
1097                            our_index = %our_index,
1098                            error = %e,
1099                            "Failed to send handshake message"
1100                        );
1101                        self.pending_outbound
1102                            .remove(&(transport_id, our_index.as_u32()));
1103                        self.connections.remove(&link_id);
1104                        self.links.remove(&link_id);
1105                        self.addr_to_link
1106                            .remove(&(transport_id, remote_addr.clone()));
1107                        let _ = self.index_allocator.free(our_index);
1108                        return Err(NodeError::TransportError(e.to_string()));
1109                    }
1110                }
1111            }
1112            None => {
1113                self.pending_outbound
1114                    .remove(&(transport_id, our_index.as_u32()));
1115                self.connections.remove(&link_id);
1116                self.links.remove(&link_id);
1117                self.addr_to_link
1118                    .remove(&(transport_id, remote_addr.clone()));
1119                let _ = self.index_allocator.free(our_index);
1120                return Err(NodeError::TransportError(format!(
1121                    "transport {transport_id} disappeared before first handshake send"
1122                )));
1123            }
1124        }
1125
1126        Ok(())
1127    }
1128
1129    /// Poll all transports for discovered peers and auto-connect.
1130    ///
1131    /// Called from the tick handler. Iterates operational transports,
1132    /// drains their discovery buffers, and initiates connections to
1133    /// newly discovered peers (if auto_connect is enabled).
1134    pub(super) async fn poll_transport_discovery(&mut self) {
1135        // Collect discoveries first to avoid borrow conflict with self
1136        let mut to_connect = Vec::new();
1137        let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
1138        let mut connect_budget = self.discovery_connect_budget();
1139        let mut skipped_budget = 0usize;
1140
1141        for transport in self.transports.values() {
1142            if !transport.is_operational() {
1143                continue;
1144            }
1145            if !transport.auto_connect() {
1146                // Still drain the buffer so it doesn't grow unbounded
1147                let _ = transport.discover();
1148                continue;
1149            }
1150            let discovered = match transport.discover() {
1151                Ok(peers) => peers,
1152                Err(_) => continue,
1153            };
1154            for peer in discovered {
1155                let discovered_transport_id = peer.transport_id;
1156                let pubkey = match peer.pubkey_hint {
1157                    Some(pk) => pk,
1158                    None => continue,
1159                };
1160                let identity = PeerIdentity::from_pubkey(pubkey);
1161                let node_addr = *identity.node_addr();
1162
1163                // Skip self
1164                if node_addr == *self.identity.node_addr() {
1165                    continue;
1166                }
1167
1168                let Some((candidate_transport_id, remote_addr, transport_name)) =
1169                    self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1170                else {
1171                    continue;
1172                };
1173
1174                if self.peers.contains_key(&node_addr) {
1175                    let candidate = PeerAddress::new(
1176                        transport_name,
1177                        self.peer_address_string_for_transport_candidate(
1178                            candidate_transport_id,
1179                            transport_name,
1180                            &remote_addr,
1181                        ),
1182                    );
1183                    if self.active_peer_candidate_is_fresh_enough_to_skip(
1184                        &node_addr,
1185                        std::slice::from_ref(&candidate),
1186                    ) {
1187                        continue;
1188                    }
1189                    if self.is_connecting_to_peer_on_path(
1190                        &node_addr,
1191                        candidate_transport_id,
1192                        &remote_addr,
1193                    ) {
1194                        continue;
1195                    }
1196                    let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1197                    if connect_budget == 0
1198                        || self
1199                            .path_candidate_attempt_budget(&node_addr)
1200                            .saturating_sub(queued_for_peer)
1201                            == 0
1202                    {
1203                        skipped_budget = skipped_budget.saturating_add(1);
1204                        continue;
1205                    }
1206                    to_connect.push((candidate_transport_id, remote_addr, identity, true));
1207                    *queued_per_peer.entry(node_addr).or_default() += 1;
1208                    connect_budget = connect_budget.saturating_sub(1);
1209                    continue;
1210                }
1211
1212                if self.is_connecting_to_peer_on_path(
1213                    &node_addr,
1214                    candidate_transport_id,
1215                    &remote_addr,
1216                ) {
1217                    continue;
1218                }
1219
1220                let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1221                if connect_budget == 0
1222                    || self
1223                        .path_candidate_attempt_budget(&node_addr)
1224                        .saturating_sub(queued_for_peer)
1225                        == 0
1226                {
1227                    skipped_budget = skipped_budget.saturating_add(1);
1228                    continue;
1229                }
1230                to_connect.push((candidate_transport_id, remote_addr, identity, false));
1231                *queued_per_peer.entry(node_addr).or_default() += 1;
1232                connect_budget = connect_budget.saturating_sub(1);
1233            }
1234        }
1235
1236        if skipped_budget > 0 {
1237            debug!(
1238                skipped = skipped_budget,
1239                queued = to_connect.len(),
1240                "Transport discovery connect budget exhausted"
1241            );
1242        }
1243
1244        for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1245            info!(
1246                peer = %self.peer_display_name(identity.node_addr()),
1247                transport_id = %transport_id,
1248                remote_addr = %remote_addr,
1249                active_refresh,
1250                "Auto-connecting to discovered peer"
1251            );
1252            if let Err(e) = self
1253                .initiate_connection(transport_id, remote_addr, identity)
1254                .await
1255            {
1256                warn!(error = %e, "Failed to auto-connect to discovered peer");
1257            }
1258        }
1259    }
1260
1261    pub(super) async fn poll_nostr_discovery(&mut self) {
1262        let Some(bootstrap) = self.nostr_discovery.clone() else {
1263            return;
1264        };
1265
1266        bootstrap.set_outbound_admission(self.outbound_admission_check());
1267
1268        if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1269            debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1270        }
1271
1272        for event in bootstrap.drain_events().await {
1273            match event {
1274                BootstrapEvent::Established { traversal } => {
1275                    if !self.outbound_admission_check() {
1276                        debug!(
1277                            peer_npub = %traversal.peer_npub,
1278                            peers = self.peers.len(),
1279                            max_peers = self.max_peers,
1280                            "Dropping established NAT traversal: at capacity"
1281                        );
1282                        continue;
1283                    }
1284                    let peer_npub = traversal.peer_npub.clone();
1285                    match self.adopt_established_traversal(traversal).await {
1286                        Ok(_) => {
1287                            info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1288                        }
1289                        Err(err) => {
1290                            warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1291                            if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1292                                self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1293                            }
1294                        }
1295                    }
1296                }
1297                BootstrapEvent::Failed {
1298                    peer_config,
1299                    reason,
1300                } => {
1301                    let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1302                        Ok(identity) => identity,
1303                        Err(_) => continue,
1304                    };
1305                    let node_addr = *peer_identity.node_addr();
1306                    if self.peers.contains_key(&node_addr) {
1307                        debug!(
1308                            npub = %peer_config.npub,
1309                            error = %reason,
1310                            "Ignoring failed NAT traversal for already-connected peer"
1311                        );
1312                        continue;
1313                    }
1314                    if self.is_connecting_to_peer(&node_addr) {
1315                        debug!(
1316                            npub = %peer_config.npub,
1317                            error = %reason,
1318                            "Ignoring failed NAT traversal while peer handshake is already in progress"
1319                        );
1320                        continue;
1321                    }
1322
1323                    let now_ms = Self::now_ms();
1324                    let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1325                    if decision.should_warn {
1326                        warn!(
1327                            npub = %peer_config.npub,
1328                            error = %reason,
1329                            consecutive_failures = decision.consecutive_failures,
1330                            cooldown_secs = decision
1331                                .cooldown_until_ms
1332                                .map(|t| t.saturating_sub(now_ms) / 1000),
1333                            "NAT traversal failed"
1334                        );
1335                    } else {
1336                        debug!(
1337                            npub = %peer_config.npub,
1338                            error = %reason,
1339                            consecutive_failures = decision.consecutive_failures,
1340                            "NAT traversal failed (suppressed by warn-rate-limit)"
1341                        );
1342                    }
1343
1344                    // B6: stale-advert eviction on the streak-threshold
1345                    // crossing. Fire-and-forget; the outcome is logged so
1346                    // operators can see when peers get cleaned up.
1347                    if decision.crossed_threshold {
1348                        bootstrap
1349                            .request_advert_stale_check(peer_config.npub.clone())
1350                            .await;
1351                    }
1352
1353                    if self
1354                        .try_peer_addresses(&peer_config, peer_identity, false)
1355                        .await
1356                        .is_ok()
1357                    {
1358                        continue;
1359                    }
1360
1361                    self.schedule_retry(node_addr, now_ms);
1362                    if let Some(cooldown_until_ms) = decision.cooldown_until_ms
1363                        && let Some(state) = self.retry_pending.get_mut(&node_addr)
1364                    {
1365                        // Push the next retry past the cooldown so the
1366                        // open-discovery sweep doesn't re-enqueue and the
1367                        // per-attempt backoff doesn't fire sooner.
1368                        state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1369                    }
1370                }
1371            }
1372        }
1373
1374        self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1375            .await;
1376        self.queue_open_discovery_retries(&bootstrap).await;
1377    }
1378
1379    /// Resolve the LAN-only discovery scope. Applications with explicit
1380    /// connectivity config can set `node.discovery.lan.scope` without
1381    /// changing the public Nostr discovery `app` tag. The older fallback
1382    /// extracts a scope from the Nostr app tag used by default scoped
1383    /// discovery.
1384    pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1385        if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1386            let scope = scope.trim();
1387            if !scope.is_empty() {
1388                return Some(scope.to_string());
1389            }
1390        }
1391
1392        let app = self.config.node.discovery.nostr.app.trim();
1393        if app.is_empty() {
1394            return None;
1395        }
1396        if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1397            let scope = rest.trim();
1398            if scope.is_empty() {
1399                None
1400            } else {
1401                Some(scope.to_string())
1402            }
1403        } else {
1404            Some(app.to_string())
1405        }
1406    }
1407
1408    pub(super) fn start_local_instance_discovery(&mut self) {
1409        if !self.config.node.discovery.local.enabled {
1410            return;
1411        }
1412        let Some(scope) = self.lan_discovery_scope() else {
1413            debug!("local instance discovery not started: no discovery scope");
1414            return;
1415        };
1416        let now_ms = Self::now_ms();
1417        match crate::discovery::local::LocalInstanceRegistry::new(
1418            self.identity.npub(),
1419            scope,
1420            &self.config.node.discovery.local,
1421            now_ms,
1422        ) {
1423            Ok(registry) => {
1424                self.local_instance_registry = Some(registry);
1425                self.local_instance_started_at_ms = Some(now_ms);
1426                self.last_local_instance_publish_ms = None;
1427                self.last_local_instance_scan_ms = None;
1428                self.publish_local_instance_record(now_ms);
1429                info!("Same-host FIPS instance discovery enabled");
1430            }
1431            Err(crate::discovery::local::LocalInstanceRegistryError::Disabled) => {
1432                debug!("same-host FIPS instance discovery disabled");
1433            }
1434            Err(err) => {
1435                debug!(error = %err, "same-host FIPS instance discovery not started");
1436            }
1437        }
1438    }
1439
1440    fn local_instance_contacts(&self) -> Vec<crate::discovery::local::LocalInstanceContact> {
1441        let mut contacts = Vec::new();
1442        for handle in self.transports.values() {
1443            if !handle.is_operational() || !handle.accept_connections() {
1444                continue;
1445            }
1446            let transport = handle.transport_type().name;
1447            if transport != "udp" && transport != "tcp" {
1448                continue;
1449            }
1450            let Some(local_addr) = handle.local_addr() else {
1451                continue;
1452            };
1453            let Some(contact) =
1454                crate::discovery::local::contact_for_transport_addr(transport, local_addr)
1455            else {
1456                continue;
1457            };
1458            if contacts
1459                .iter()
1460                .any(|existing: &crate::discovery::local::LocalInstanceContact| {
1461                    existing.transport == contact.transport && existing.addr == contact.addr
1462                })
1463            {
1464                continue;
1465            }
1466            contacts.push(contact);
1467        }
1468        contacts
1469    }
1470
1471    fn publish_local_instance_record(&mut self, now_ms: u64) {
1472        let Some(registry) = self.local_instance_registry.clone() else {
1473            return;
1474        };
1475        let contacts = self.local_instance_contacts();
1476        match registry.publish(contacts, now_ms) {
1477            Ok(()) => {
1478                self.last_local_instance_publish_ms = Some(now_ms);
1479            }
1480            Err(err) => {
1481                debug!(error = %err, "failed to publish same-host FIPS instance record");
1482            }
1483        }
1484    }
1485
1486    fn maybe_publish_local_instance_record(&mut self, now_ms: u64) {
1487        if self.local_instance_registry.is_none() {
1488            return;
1489        }
1490        let interval_ms = self.config.node.discovery.local.publish_interval_ms();
1491        let due = self
1492            .last_local_instance_publish_ms
1493            .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1494            .unwrap_or(true);
1495        if due {
1496            self.publish_local_instance_record(now_ms);
1497        }
1498    }
1499
1500    fn local_instance_scan_due(&self, now_ms: u64) -> bool {
1501        if self.local_instance_registry.is_none() {
1502            return false;
1503        }
1504        let cfg = &self.config.node.discovery.local;
1505        let interval_ms = if self
1506            .local_instance_started_at_ms
1507            .map(|started| now_ms.saturating_sub(started) <= cfg.startup_scan_duration_ms())
1508            .unwrap_or(false)
1509        {
1510            cfg.startup_scan_interval_ms()
1511        } else {
1512            cfg.scan_interval_ms()
1513        };
1514        self.last_local_instance_scan_ms
1515            .map(|last| now_ms.saturating_sub(last) >= interval_ms)
1516            .unwrap_or(true)
1517    }
1518
1519    fn local_instance_peer_allowed(&self, identity: &PeerIdentity) -> bool {
1520        if self.config.peers().iter().any(|peer| {
1521            PeerIdentity::from_npub(&peer.npub)
1522                .map(|configured| configured.node_addr() == identity.node_addr())
1523                .unwrap_or(false)
1524        }) {
1525            return true;
1526        }
1527        self.config.node.discovery.nostr.policy == NostrDiscoveryPolicy::Open
1528    }
1529
1530    fn local_instance_peer_addresses(
1531        &self,
1532        record: &crate::discovery::local::LocalInstanceRecord,
1533    ) -> Vec<PeerAddress> {
1534        let mut addresses = Vec::new();
1535        for contact in &record.contacts {
1536            if contact.transport != "udp" && contact.transport != "tcp" {
1537                continue;
1538            }
1539            let Ok(socket_addr) = contact.addr.parse::<SocketAddr>() else {
1540                debug!(
1541                    npub = %record.npub,
1542                    transport = %contact.transport,
1543                    addr = %contact.addr,
1544                    "local instance discovery: skip non-socket contact"
1545                );
1546                continue;
1547            };
1548            if !socket_addr.ip().is_loopback() {
1549                debug!(
1550                    npub = %record.npub,
1551                    addr = %contact.addr,
1552                    "local instance discovery: skip non-loopback contact"
1553                );
1554                continue;
1555            }
1556            let address =
1557                PeerAddress::with_priority(contact.transport.clone(), contact.addr.clone(), 10)
1558                    .with_seen_at_ms(record.updated_at_ms);
1559            if addresses.iter().any(|existing: &PeerAddress| {
1560                existing.transport == address.transport && existing.addr == address.addr
1561            }) {
1562                continue;
1563            }
1564            addresses.push(address);
1565        }
1566        addresses
1567    }
1568
1569    /// Scan the same-host registry only on startup and then at a low cadence.
1570    /// This avoids a per-second filesystem poll while preserving the fast path
1571    /// for processes launched around the same time.
1572    pub(super) async fn poll_local_instance_discovery(&mut self) {
1573        let Some(registry) = self.local_instance_registry.clone() else {
1574            return;
1575        };
1576        let now_ms = Self::now_ms();
1577        self.maybe_publish_local_instance_record(now_ms);
1578        if !self.local_instance_scan_due(now_ms) {
1579            return;
1580        }
1581        self.last_local_instance_scan_ms = Some(now_ms);
1582
1583        let records = match registry.scan(now_ms, self.config.node.discovery.local.stale_after_ms())
1584        {
1585            Ok(records) => records,
1586            Err(err) => {
1587                debug!(error = %err, "same-host FIPS instance scan failed");
1588                return;
1589            }
1590        };
1591        if records.is_empty() {
1592            return;
1593        }
1594
1595        let mut connect_budget = self.discovery_connect_budget();
1596        let mut skipped_budget = 0usize;
1597        for record in records {
1598            let identity = match PeerIdentity::from_npub(&record.npub) {
1599                Ok(identity) => identity,
1600                Err(err) => {
1601                    debug!(npub = %record.npub, error = %err, "local instance discovery: skip bad npub");
1602                    continue;
1603                }
1604            };
1605            let peer_node_addr = *identity.node_addr();
1606            if peer_node_addr == *self.identity.node_addr() {
1607                continue;
1608            }
1609            if !self.local_instance_peer_allowed(&identity) {
1610                debug!(
1611                    npub = %identity.short_npub(),
1612                    "local instance discovery: skip unconfigured peer"
1613                );
1614                continue;
1615            }
1616
1617            let addresses = self.local_instance_peer_addresses(&record);
1618            if addresses.is_empty() {
1619                continue;
1620            }
1621
1622            if self.peers.contains_key(&peer_node_addr)
1623                && self.active_peer_candidate_is_fresh_enough_to_skip(&peer_node_addr, &addresses)
1624            {
1625                continue;
1626            }
1627
1628            for address in addresses {
1629                let Some((transport_id, remote_addr)) =
1630                    self.resolve_peer_address_for_match(&address)
1631                else {
1632                    continue;
1633                };
1634                if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1635                    continue;
1636                }
1637                if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1638                    skipped_budget = skipped_budget.saturating_add(1);
1639                    continue;
1640                }
1641                info!(
1642                    npub = %identity.short_npub(),
1643                    transport = %address.transport,
1644                    addr = %address.addr,
1645                    "same-host FIPS instance discovery: initiating handshake"
1646                );
1647                if let Err(err) = self
1648                    .initiate_connection(transport_id, remote_addr, identity)
1649                    .await
1650                {
1651                    debug!(
1652                        npub = %record.npub,
1653                        error = %err,
1654                        "same-host FIPS instance discovery: failed to initiate connection"
1655                    );
1656                }
1657                connect_budget = connect_budget.saturating_sub(1);
1658            }
1659        }
1660        if skipped_budget > 0 {
1661            debug!(
1662                skipped = skipped_budget,
1663                "same-host FIPS instance discovery connect budget exhausted"
1664            );
1665        }
1666    }
1667
1668    /// Drain mDNS-discovered peers and initiate Noise XX handshakes. For
1669    /// active peers this is a non-disruptive alternate-path refresh: the
1670    /// current link stays live until a new handshake authenticates and
1671    /// promotes. The handshake itself is the authentication — a spoofed
1672    /// mDNS advert with someone else's npub fails the XX exchange and
1673    /// is dropped.
1674    pub(super) async fn poll_lan_discovery(&mut self) {
1675        let Some(runtime) = self.lan_discovery.clone() else {
1676            return;
1677        };
1678        let events = runtime.drain_events().await;
1679        if events.is_empty() {
1680            return;
1681        }
1682        let mut connect_budget = self.discovery_connect_budget();
1683        let mut skipped_budget = 0usize;
1684        for event in events {
1685            let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1686            let Some((transport_id, local_addr)) =
1687                self.find_udp_transport_for_remote_addr(peer.addr)
1688            else {
1689                debug!(
1690                    addr = %peer.addr,
1691                    "lan: skip discovered peer with no compatible UDP transport"
1692                );
1693                continue;
1694            };
1695            let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1696                Ok(id) => id,
1697                Err(err) => {
1698                    debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1699                    continue;
1700                }
1701            };
1702            let peer_node_addr = *identity.node_addr();
1703            let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1704            if self.peers.contains_key(&peer_node_addr) {
1705                let candidate = PeerAddress::new("udp", peer.addr.to_string());
1706                if self.active_peer_candidate_is_fresh_enough_to_skip(
1707                    &peer_node_addr,
1708                    std::slice::from_ref(&candidate),
1709                ) {
1710                    continue;
1711                }
1712                if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1713                    continue;
1714                }
1715                if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1716                    skipped_budget = skipped_budget.saturating_add(1);
1717                    continue;
1718                }
1719                info!(
1720                    npub = %identity.short_npub(),
1721                    addr = %peer.addr,
1722                    local_addr = %local_addr,
1723                    "lan: initiating alternate-path handshake to active peer"
1724                );
1725                if let Err(err) = self
1726                    .initiate_connection(transport_id, remote_addr, identity)
1727                    .await
1728                {
1729                    debug!(
1730                        npub = %peer.npub,
1731                        error = %err,
1732                        "lan: failed to initiate active peer alternate-path handshake"
1733                    );
1734                }
1735                connect_budget = connect_budget.saturating_sub(1);
1736                continue;
1737            }
1738            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1739                continue;
1740            }
1741            if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1742                skipped_budget = skipped_budget.saturating_add(1);
1743                continue;
1744            }
1745            info!(
1746                npub = %identity.short_npub(),
1747                addr = %peer.addr,
1748                local_addr = %local_addr,
1749                "lan: initiating handshake to discovered peer"
1750            );
1751            if let Err(err) = self
1752                .initiate_connection(transport_id, remote_addr, identity)
1753                .await
1754            {
1755                debug!(
1756                    npub = %peer.npub,
1757                    error = %err,
1758                    "lan: failed to initiate connection to discovered peer"
1759                );
1760            }
1761            connect_budget = connect_budget.saturating_sub(1);
1762        }
1763        if skipped_budget > 0 {
1764            debug!(
1765                skipped = skipped_budget,
1766                "lan: discovery connect budget exhausted"
1767            );
1768        }
1769    }
1770
1771    /// Poll pending transport connects and initiate handshakes for ready ones.
1772    ///
1773    /// Called from the tick handler. For each pending connect, queries the
1774    /// transport's connection state. When a connection is established,
1775    /// marks the link as Connected and starts the Noise handshake.
1776    /// Failed connections are cleaned up and scheduled for retry.
1777    pub(super) async fn poll_pending_connects(&mut self) {
1778        if self.pending_connects.is_empty() {
1779            return;
1780        }
1781
1782        let mut completed = Vec::new();
1783
1784        for (i, pending) in self.pending_connects.iter().enumerate() {
1785            let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
1786                transport.connection_state(&pending.remote_addr)
1787            } else {
1788                crate::transport::ConnectionState::Failed("transport removed".into())
1789            };
1790
1791            match state {
1792                crate::transport::ConnectionState::Connected => {
1793                    completed.push((i, true, None));
1794                }
1795                crate::transport::ConnectionState::Failed(reason) => {
1796                    completed.push((i, false, Some(reason)));
1797                }
1798                crate::transport::ConnectionState::Connecting => {
1799                    // Still in progress, check on next tick
1800                }
1801                crate::transport::ConnectionState::None => {
1802                    // Shouldn't happen — treat as failure
1803                    completed.push((i, false, Some("no connection attempt found".into())));
1804                }
1805            }
1806        }
1807
1808        // Process completions in reverse order to preserve indices
1809        for (i, success, reason) in completed.into_iter().rev() {
1810            let pending = self.pending_connects.remove(i);
1811
1812            if success {
1813                // Mark link as Connected
1814                if let Some(link) = self.links.get_mut(&pending.link_id) {
1815                    link.set_connected();
1816                }
1817
1818                debug!(
1819                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1820                    transport_id = %pending.transport_id,
1821                    remote_addr = %pending.remote_addr,
1822                    link_id = %pending.link_id,
1823                    "Transport connected, starting handshake"
1824                );
1825
1826                // Start the handshake now that the transport is connected
1827                if let Err(e) = self
1828                    .start_handshake(
1829                        pending.link_id,
1830                        pending.transport_id,
1831                        pending.remote_addr.clone(),
1832                        pending.peer_identity,
1833                    )
1834                    .await
1835                {
1836                    warn!(
1837                        link_id = %pending.link_id,
1838                        error = %e,
1839                        "Failed to start handshake after transport connect"
1840                    );
1841                    // Clean up link on handshake failure
1842                    self.remove_link(&pending.link_id);
1843                }
1844            } else {
1845                let reason = reason.unwrap_or_default();
1846                warn!(
1847                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1848                    transport_id = %pending.transport_id,
1849                    remote_addr = %pending.remote_addr,
1850                    link_id = %pending.link_id,
1851                    reason = %reason,
1852                    "Transport connect failed"
1853                );
1854
1855                // Clean up link and schedule retry
1856                self.remove_link(&pending.link_id);
1857                self.links.remove(&pending.link_id);
1858                self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
1859            }
1860        }
1861    }
1862
1863    // === State Transitions ===
1864
1865    /// Start the node.
1866    ///
1867    /// Initializes the TUN interface (if configured), spawns I/O threads,
1868    /// and transitions to the Running state.
1869    pub async fn start(&mut self) -> Result<(), NodeError> {
1870        node_start_debug_log("Node::start begin");
1871        if !self.state.can_start() {
1872            return Err(NodeError::AlreadyStarted);
1873        }
1874        self.state = NodeState::Starting;
1875        node_start_debug_log("Node::start state set to starting");
1876
1877        // Create packet channel for transport -> Node communication
1878        let packet_buffer_size = self.config.node.buffers.packet_channel;
1879        let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
1880        self.packet_tx = Some(packet_tx.clone());
1881        self.packet_rx = Some(packet_rx);
1882        node_start_debug_log("Node::start packet channel created");
1883
1884        // Initialize transports first (before TUN, before Nostr discovery).
1885        node_start_debug_log("Node::start create transports begin");
1886        let transport_handles = self.create_transports(&packet_tx).await;
1887        node_start_debug_log(format!(
1888            "Node::start create transports complete count={}",
1889            transport_handles.len()
1890        ));
1891
1892        for mut handle in transport_handles {
1893            let transport_id = handle.transport_id();
1894            let transport_type = handle.transport_type().name;
1895            let name = handle.name().map(|s| s.to_string());
1896
1897            node_start_debug_log(format!(
1898                "Node::start transport start begin id={} type={} name={:?}",
1899                transport_id, transport_type, name
1900            ));
1901            match handle.start().await {
1902                Ok(()) => {
1903                    node_start_debug_log(format!(
1904                        "Node::start transport start ok id={} type={}",
1905                        transport_id, transport_type
1906                    ));
1907                    self.transports.insert(transport_id, handle);
1908                }
1909                Err(e) => {
1910                    node_start_debug_log(format!(
1911                        "Node::start transport start error id={} type={} error={}",
1912                        transport_id, transport_type, e
1913                    ));
1914                    if let Some(ref n) = name {
1915                        warn!(transport_type, name = %n, error = %e, "Transport failed to start");
1916                    } else {
1917                        warn!(transport_type, error = %e, "Transport failed to start");
1918                    }
1919                }
1920            }
1921        }
1922
1923        if !self.transports.is_empty() {
1924            info!(count = self.transports.len(), "Transports initialized");
1925        }
1926
1927        // Spawn the off-task FMP-encrypt + UDP-send + FMP-decrypt
1928        // worker pools. **Unix only** — both pools issue direct
1929        // sendmmsg(2) / sendmsg(2)+UDP_GSO / recvmmsg(2) calls on
1930        // raw file descriptors via `AsRawFd`, which is a unix-only
1931        // trait. On Windows the rx_loop's tokio-based send/recv
1932        // remain the canonical path; the perf overhaul lands its
1933        // gains on unix.
1934        //
1935        // Worker count defaults to the number of CPUs, overridable
1936        // via `FIPS_ENCRYPT_WORKERS=N` / `FIPS_DECRYPT_WORKERS=N`
1937        // for debug / benchmarking. Hash-by-destination means a
1938        // single TCP flow pins to one worker (preserves wire
1939        // ordering); additional workers light up under multi-flow
1940        // / multi-peer load. See `node::encrypt_worker` /
1941        // `node::decrypt_worker` for full rationale.
1942        #[cfg(unix)]
1943        {
1944            if self.config.node.worker_pools_enabled {
1945                node_start_debug_log("Node::start worker pools begin");
1946                let cpu_default = std::thread::available_parallelism()
1947                    .map(|n| n.get())
1948                    .unwrap_or(1)
1949                    .max(1);
1950                let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
1951                    .ok()
1952                    .and_then(|s| s.parse().ok())
1953                    .unwrap_or(cpu_default)
1954                    .max(1);
1955                self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
1956                    encrypt_worker_count,
1957                ));
1958                info!(
1959                    workers = encrypt_worker_count,
1960                    "Spawned FMP-encrypt worker pool"
1961                );
1962
1963                // `FIPS_DECRYPT_WORKERS=0` disables the pool entirely and
1964                // falls through to the in-line rx_loop decrypt path (the
1965                // "test-mode" branch in `handle_encrypted_frame`, which is
1966                // in fact a fully functional synchronous decrypt). Useful
1967                // as an A/B against the worker pipeline when chasing
1968                // scheduling/queueing regressions on the native macOS
1969                // path. Any non-zero value (env or default) spawns the
1970                // pool as before.
1971                let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
1972                    .ok()
1973                    .and_then(|s| s.parse().ok())
1974                    .unwrap_or(cpu_default);
1975                if decrypt_worker_count == 0 {
1976                    info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
1977                } else {
1978                    self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
1979                        decrypt_worker_count,
1980                    ));
1981                    info!(
1982                        workers = decrypt_worker_count,
1983                        "Spawned FMP+FSP-decrypt worker pool"
1984                    );
1985                }
1986                node_start_debug_log("Node::start worker pools complete");
1987            } else {
1988                node_start_debug_log("Node::start worker pools disabled");
1989                info!("FIPS worker pools disabled; using in-line crypto/send path");
1990            }
1991        }
1992
1993        if self.config.node.discovery.nostr.enabled {
1994            node_start_debug_log("Node::start nostr discovery start begin");
1995            match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
1996                .await
1997            {
1998                Ok(runtime) => {
1999                    node_start_debug_log("Node::start nostr discovery runtime created");
2000                    if let Err(err) = self.refresh_overlay_advert(&runtime).await {
2001                        warn!(error = %err, "Failed to publish initial Nostr overlay advert");
2002                    }
2003                    node_start_debug_log("Node::start nostr overlay advert refreshed");
2004                    self.nostr_discovery = Some(runtime);
2005                    self.nostr_discovery_started_at_ms = Some(Self::now_ms());
2006                    info!("Nostr overlay discovery enabled");
2007                }
2008                Err(err) => {
2009                    node_start_debug_log(format!(
2010                        "Node::start nostr discovery start error error={}",
2011                        err
2012                    ));
2013                    warn!(error = %err, "Failed to start Nostr overlay discovery");
2014                }
2015            }
2016        }
2017
2018        // mDNS / DNS-SD LAN discovery. Independent of Nostr — runs even
2019        // when Nostr is disabled, since it gives us sub-second pairing
2020        // on the same link without any relay or NAT-traversal roundtrip.
2021        if self.config.node.discovery.lan.enabled {
2022            node_start_debug_log("Node::start lan discovery start begin");
2023            let advertised_udp_port = self
2024                .transports
2025                .values()
2026                .filter(|h| h.is_operational())
2027                .filter(|h| h.transport_type().name == "udp")
2028                .find_map(|h| h.local_addr().map(|addr| addr.port()))
2029                .unwrap_or(0);
2030            let scope = self.lan_discovery_scope();
2031            match crate::discovery::lan::LanDiscovery::start(
2032                &self.identity,
2033                scope,
2034                advertised_udp_port,
2035                self.config.node.discovery.lan.clone(),
2036            )
2037            .await
2038            {
2039                Ok(runtime) => {
2040                    node_start_debug_log("Node::start lan discovery start ok");
2041                    self.lan_discovery = Some(runtime);
2042                    info!("LAN mDNS discovery enabled");
2043                }
2044                Err(err) => {
2045                    node_start_debug_log(format!(
2046                        "Node::start lan discovery start error error={}",
2047                        err
2048                    ));
2049                    debug!(error = %err, "LAN mDNS discovery not started");
2050                }
2051            }
2052        }
2053
2054        self.start_local_instance_discovery();
2055        self.poll_local_instance_discovery().await;
2056
2057        // Connect to static peers before TUN is active
2058        // This allows handshake messages to be sent before we start accepting packets
2059        node_start_debug_log("Node::start initiate peer connections begin");
2060        self.initiate_peer_connections().await;
2061        node_start_debug_log("Node::start initiate peer connections complete");
2062
2063        // Initialize TUN interface last, after transports and peers are ready
2064        if self.config.tun.enabled {
2065            node_start_debug_log("Node::start tun init begin");
2066            let address = *self.identity.address();
2067            match TunDevice::create(&self.config.tun, address).await {
2068                Ok(device) => {
2069                    let mtu = device.mtu();
2070                    let name = device.name().to_string();
2071                    let our_addr = *device.address();
2072
2073                    info!("TUN device active:");
2074                    info!("     name: {}", name);
2075                    info!("  address: {}", device.address());
2076                    info!("      mtu: {}", mtu);
2077
2078                    // Calculate max MSS for TCP clamping
2079                    let effective_mtu = self.effective_ipv6_mtu();
2080                    let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); // IPv6 + TCP headers
2081
2082                    info!("effective MTU: {} bytes", effective_mtu);
2083                    debug!("   max TCP MSS: {} bytes", max_mss);
2084
2085                    // On macOS, create a shutdown pipe. Writing to it unblocks the
2086                    // reader thread's select() loop without closing the TUN fd
2087                    // (which would cause a double-close when TunDevice drops).
2088                    #[cfg(target_os = "macos")]
2089                    let (shutdown_read_fd, shutdown_write_fd) = {
2090                        let mut fds = [0i32; 2];
2091                        if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
2092                            return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
2093                                "failed to create shutdown pipe".into(),
2094                            )));
2095                        }
2096                        (fds[0], fds[1])
2097                    };
2098
2099                    // Create writer (dups the fd for independent write access).
2100                    // Pass path_mtu_lookup so inbound SYN-ACK clamp can read
2101                    // per-destination path MTU learned via discovery.
2102                    let (writer, tun_tx) =
2103                        device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
2104
2105                    // Spawn writer thread
2106                    let writer_handle = thread::spawn(move || {
2107                        writer.run();
2108                    });
2109
2110                    // Clone tun_tx for the reader
2111                    let reader_tun_tx = tun_tx.clone();
2112
2113                    // Create outbound channel for TUN reader → Node
2114                    let tun_channel_size = self.config.node.buffers.tun_channel;
2115                    let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
2116
2117                    // Spawn reader thread
2118                    let transport_mtu = self.transport_mtu();
2119                    let path_mtu_lookup = self.path_mtu_lookup.clone();
2120                    #[cfg(target_os = "macos")]
2121                    let reader_handle = thread::spawn(move || {
2122                        run_tun_reader(
2123                            device,
2124                            mtu,
2125                            our_addr,
2126                            reader_tun_tx,
2127                            outbound_tx,
2128                            transport_mtu,
2129                            path_mtu_lookup,
2130                            shutdown_read_fd,
2131                        );
2132                    });
2133                    #[cfg(not(target_os = "macos"))]
2134                    let reader_handle = thread::spawn(move || {
2135                        run_tun_reader(
2136                            device,
2137                            mtu,
2138                            our_addr,
2139                            reader_tun_tx,
2140                            outbound_tx,
2141                            transport_mtu,
2142                            path_mtu_lookup,
2143                        );
2144                    });
2145
2146                    self.tun_state = TunState::Active;
2147                    self.tun_name = Some(name);
2148                    self.tun_tx = Some(tun_tx);
2149                    self.tun_outbound_rx = Some(outbound_rx);
2150                    self.tun_reader_handle = Some(reader_handle);
2151                    self.tun_writer_handle = Some(writer_handle);
2152                    #[cfg(target_os = "macos")]
2153                    {
2154                        self.tun_shutdown_fd = Some(shutdown_write_fd);
2155                    }
2156                }
2157                Err(e) => {
2158                    self.tun_state = TunState::Failed;
2159                    warn!(error = %e, "Failed to initialize TUN, continuing without it");
2160                }
2161            }
2162            node_start_debug_log("Node::start tun init complete");
2163        }
2164
2165        // Initialize DNS responder (independent of TUN).
2166        //
2167        // Default bind_addr is "::1" (IPv6 loopback). The shipped
2168        // fips-dns-setup configures systemd-resolved via a global
2169        // /etc/systemd/resolved.conf.d/fips.conf drop-in pointing at
2170        // [::1]:5354, which sidesteps a Linux IPV6_PKTINFO behaviour
2171        // where self-destined traffic to fips0's address is attributed
2172        // to fips0 in PKTINFO and gets silently dropped by the
2173        // mesh-interface filter in src/upper/dns.rs.
2174        //
2175        // For mesh-reachable resolution (rare), set bind_addr: "::"
2176        // in fips.yaml. The mesh-interface filter remains active to
2177        // prevent hosts-file alias enumeration in that mode.
2178        // `IPV6_V6ONLY=0` is set explicitly so IPv4 clients on
2179        // 127.0.0.1 still reach us regardless of kernel sysctl
2180        // defaults — but only when bind is on a wildcard / IPv6 path.
2181        if self.config.dns.enabled {
2182            node_start_debug_log("Node::start dns init begin");
2183            let addr_str = self.config.dns.bind_addr();
2184            match addr_str.parse::<std::net::IpAddr>() {
2185                Ok(ip) => {
2186                    let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
2187                    match Self::bind_dns_socket(bind) {
2188                        Ok(socket) => {
2189                            let dns_channel_size = self.config.node.buffers.dns_channel;
2190                            let (identity_tx, identity_rx) =
2191                                tokio::sync::mpsc::channel(dns_channel_size);
2192                            let dns_ttl = self.config.dns.ttl();
2193                            let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
2194                                self.config.peers(),
2195                            );
2196                            let reloader = if self.config.node.system_files_enabled {
2197                                let hosts_path = std::path::PathBuf::from(
2198                                    crate::upper::hosts::DEFAULT_HOSTS_PATH,
2199                                );
2200                                crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
2201                            } else {
2202                                crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
2203                            };
2204                            // Resolve the TUN ifindex so the responder can
2205                            // drop queries arriving on the mesh interface
2206                            // (fips0). Without this, the `::` bind exposes
2207                            // /etc/fips/hosts alias probing to any mesh peer.
2208                            // When TUN isn't enabled or the name can't be
2209                            // resolved, `None` disables the filter (there
2210                            // is no mesh surface to defend anyway).
2211                            let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
2212                            info!(
2213                                bind = %bind,
2214                                hosts = reloader.hosts().len(),
2215                                mesh_ifindex = ?mesh_ifindex,
2216                                "DNS responder started for .fips domain (auto-reload enabled)"
2217                            );
2218                            let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
2219                                socket,
2220                                identity_tx,
2221                                dns_ttl,
2222                                reloader,
2223                                mesh_ifindex,
2224                            ));
2225                            self.dns_identity_rx = Some(identity_rx);
2226                            self.dns_task = Some(handle);
2227                        }
2228                        Err(e) => {
2229                            warn!(bind = %bind, error = %e, "Failed to start DNS responder");
2230                        }
2231                    }
2232                }
2233                Err(e) => {
2234                    warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
2235                }
2236            }
2237            node_start_debug_log("Node::start dns init complete");
2238        }
2239
2240        self.state = NodeState::Running;
2241        node_start_debug_log("Node::start running");
2242        info!("Node started:");
2243        info!("       state: {}", self.state);
2244        info!("  transports: {}", self.transports.len());
2245        info!(" connections: {}", self.connections.len());
2246        Ok(())
2247    }
2248
2249    /// Bind a UDP socket for the DNS responder.
2250    ///
2251    /// For IPv6 binds (including `::`), sets `IPV6_V6ONLY=0` so the socket
2252    /// also accepts IPv4-mapped addresses. This guarantees dual-stack
2253    /// delivery regardless of `net.ipv6.bindv6only` sysctl on the host —
2254    /// v4 clients on 127.0.0.1 and v6 clients on the fips0 address both
2255    /// land on the same socket.
2256    ///
2257    /// Also enables `IPV6_RECVPKTINFO` on IPv6 sockets so the responder
2258    /// can learn the arrival interface per packet. The responder uses that
2259    /// to drop queries arriving on the mesh TUN, closing the hosts-file
2260    /// probing side-channel created by the `::` bind.
2261    fn bind_dns_socket(
2262        addr: std::net::SocketAddr,
2263    ) -> Result<tokio::net::UdpSocket, std::io::Error> {
2264        use socket2::{Domain, Protocol, Socket, Type};
2265        let domain = if addr.is_ipv4() {
2266            Domain::IPV4
2267        } else {
2268            Domain::IPV6
2269        };
2270        let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
2271        if addr.is_ipv6() {
2272            sock.set_only_v6(false)?;
2273            #[cfg(unix)]
2274            Self::set_recv_pktinfo_v6(&sock)?;
2275        }
2276        sock.set_nonblocking(true)?;
2277        sock.bind(&addr.into())?;
2278        tokio::net::UdpSocket::from_std(sock.into())
2279    }
2280
2281    /// Enable `IPV6_RECVPKTINFO` on an IPv6 UDP socket.
2282    ///
2283    /// After this setsockopt, each `recvmsg()` call on the socket receives
2284    /// an `IPV6_PKTINFO` control message containing the arrival interface
2285    /// index, which the DNS responder uses for its mesh-interface filter.
2286    #[cfg(unix)]
2287    fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
2288        use std::os::fd::AsRawFd;
2289        let enable: libc::c_int = 1;
2290        let ret = unsafe {
2291            libc::setsockopt(
2292                sock.as_raw_fd(),
2293                libc::IPPROTO_IPV6,
2294                libc::IPV6_RECVPKTINFO,
2295                &enable as *const _ as *const libc::c_void,
2296                std::mem::size_of::<libc::c_int>() as libc::socklen_t,
2297            )
2298        };
2299        if ret < 0 {
2300            return Err(std::io::Error::last_os_error());
2301        }
2302        Ok(())
2303    }
2304
2305    /// Resolve the mesh TUN interface index by name.
2306    ///
2307    /// Returns `None` if the interface does not exist (e.g. TUN disabled
2308    /// or not yet created). A `None` result disables the DNS responder's
2309    /// mesh-interface filter — safe, because if there is no fips0 there
2310    /// is no mesh exposure to defend against.
2311    fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
2312        #[cfg(unix)]
2313        {
2314            let c_name = std::ffi::CString::new(name).ok()?;
2315            let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
2316            if idx == 0 { None } else { Some(idx) }
2317        }
2318        #[cfg(not(unix))]
2319        {
2320            let _ = name;
2321            None
2322        }
2323    }
2324
2325    /// Stop the node.
2326    ///
2327    /// Shuts down TUN interface, stops I/O threads, and transitions to
2328    /// the Stopped state.
2329    pub async fn stop(&mut self) -> Result<(), NodeError> {
2330        if !self.state.can_stop() {
2331            return Err(NodeError::NotStarted);
2332        }
2333        self.state = NodeState::Stopping;
2334        info!(state = %self.state, "Node stopping");
2335
2336        // Stop DNS responder
2337        if let Some(handle) = self.dns_task.take() {
2338            handle.abort();
2339            debug!("DNS responder stopped");
2340        }
2341
2342        // Send disconnect notifications to all active peers before closing transports
2343        self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
2344            .await;
2345
2346        // Stop Nostr overlay discovery background work and withdraw any advert.
2347        if let Some(bootstrap) = self.nostr_discovery.take()
2348            && let Err(e) = bootstrap.shutdown().await
2349        {
2350            warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
2351        }
2352
2353        // Tear down LAN mDNS responder + browser. Best-effort: the
2354        // OS will eventually time the advert out via its TTL even if
2355        // we don't get a clean unregister out before the daemon exits.
2356        if let Some(lan) = self.lan_discovery.take() {
2357            lan.shutdown().await;
2358        }
2359
2360        if let Some(registry) = self.local_instance_registry.take()
2361            && let Err(err) = registry.remove()
2362        {
2363            debug!(error = %err, "failed to remove same-host FIPS instance record");
2364        }
2365
2366        // Shutdown transports (they're packet producers)
2367        let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
2368        for transport_id in transport_ids {
2369            if let Some(mut handle) = self.transports.remove(&transport_id) {
2370                let transport_type = handle.transport_type().name;
2371                match handle.stop().await {
2372                    Ok(()) => {
2373                        info!(transport_id = %transport_id, transport_type, "Transport stopped");
2374                    }
2375                    Err(e) => {
2376                        warn!(
2377                            transport_id = %transport_id,
2378                            transport_type,
2379                            error = %e,
2380                            "Transport stop failed"
2381                        );
2382                    }
2383                }
2384            }
2385        }
2386
2387        // Drop packet channels
2388        self.packet_tx.take();
2389        self.packet_rx.take();
2390
2391        // Shutdown TUN interface
2392        if let Some(name) = self.tun_name.take() {
2393            info!(name = %name, "Shutting down TUN interface");
2394
2395            // Drop the tun_tx to signal the writer to stop
2396            self.tun_tx.take();
2397
2398            // Delete the interface (on Linux, causes reader to get EFAULT)
2399            if let Err(e) = shutdown_tun_interface(&name).await {
2400                warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
2401            }
2402
2403            // On macOS, signal the reader thread to exit by writing to the
2404            // shutdown pipe. The reader's select() will wake up and break.
2405            #[cfg(target_os = "macos")]
2406            if let Some(fd) = self.tun_shutdown_fd.take() {
2407                unsafe {
2408                    libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
2409                    libc::close(fd);
2410                }
2411            }
2412
2413            // Wait for threads to finish
2414            if let Some(handle) = self.tun_reader_handle.take() {
2415                let _ = handle.join();
2416            }
2417            if let Some(handle) = self.tun_writer_handle.take() {
2418                let _ = handle.join();
2419            }
2420
2421            self.tun_state = TunState::Disabled;
2422        }
2423
2424        self.state = NodeState::Stopped;
2425        info!(state = %self.state, "Node stopped");
2426        Ok(())
2427    }
2428
2429    /// Send disconnect notifications to all active peers.
2430    ///
2431    /// Best-effort: send failures are logged and ignored since the transport
2432    /// may already be degraded. This runs before transports are shut down.
2433    async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
2434        let disconnect = Disconnect::new(reason);
2435        let plaintext = disconnect.encode();
2436
2437        // Collect node_addrs to avoid borrow conflict with send helper
2438        let peer_addrs: Vec<NodeAddr> = self
2439            .peers
2440            .iter()
2441            .filter(|(_, peer)| peer.can_send() && peer.has_session())
2442            .map(|(addr, _)| *addr)
2443            .collect();
2444
2445        if peer_addrs.is_empty() {
2446            debug!(
2447                total_peers = self.peers.len(),
2448                "No sendable peers for disconnect notification"
2449            );
2450            return;
2451        }
2452
2453        let mut sent = 0usize;
2454        for node_addr in &peer_addrs {
2455            match self
2456                .send_encrypted_link_message(node_addr, &plaintext)
2457                .await
2458            {
2459                Ok(()) => sent += 1,
2460                Err(e) => {
2461                    debug!(
2462                        peer = %self.peer_display_name(node_addr),
2463                        error = %e,
2464                        "Failed to send disconnect (transport may be down)"
2465                    );
2466                }
2467            }
2468        }
2469
2470        info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2471    }
2472
2473    fn static_peer_addresses(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
2474        peer_config
2475            .addresses_by_priority()
2476            .into_iter()
2477            .cloned()
2478            .collect()
2479    }
2480
2481    async fn nostr_peer_fallback_addresses(
2482        &self,
2483        peer_config: &PeerConfig,
2484        existing: &[PeerAddress],
2485    ) -> Vec<PeerAddress> {
2486        if !self.config.node.discovery.nostr.enabled
2487            || self.config.node.discovery.nostr.policy
2488                == crate::config::NostrDiscoveryPolicy::Disabled
2489        {
2490            return Vec::new();
2491        }
2492
2493        let Some(bootstrap) = self.nostr_discovery.clone() else {
2494            return Vec::new();
2495        };
2496        let endpoints = match bootstrap
2497            .cached_advert_endpoints_for_peer(&peer_config.npub)
2498            .await
2499        {
2500            Some(endpoints) => endpoints,
2501            None => {
2502                debug!(
2503                    npub = %peer_config.npub,
2504                    "No cached Nostr advert endpoints for configured peer"
2505                );
2506                return Vec::new();
2507            }
2508        };
2509
2510        let mut fallback = Vec::new();
2511        let mut next_priority = existing
2512            .iter()
2513            .map(|addr| addr.priority)
2514            .max()
2515            .unwrap_or(100)
2516            .saturating_add(1);
2517        // Stamp every overlay-derived candidate with the current wall clock
2518        // so the dialer ranks it ahead of unstamped operator hints. We use a
2519        // single timestamp per fetch (rather than per-endpoint) because all
2520        // candidates in a single advert are equally fresh.
2521        let seen_at_ms = Self::now_ms();
2522        for endpoint in endpoints {
2523            let Some(candidate) =
2524                Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2525            else {
2526                continue;
2527            };
2528            if existing
2529                .iter()
2530                .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2531                || fallback.iter().any(|addr: &PeerAddress| {
2532                    addr.transport == candidate.transport && addr.addr == candidate.addr
2533                })
2534            {
2535                continue;
2536            }
2537            fallback.push(candidate);
2538            next_priority = next_priority.saturating_add(1);
2539        }
2540        fallback
2541    }
2542
2543    async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2544        if !self.config.node.discovery.nostr.enabled
2545            || self.config.node.discovery.nostr.policy
2546                == crate::config::NostrDiscoveryPolicy::Disabled
2547        {
2548            return false;
2549        }
2550        let Some(bootstrap) = self.nostr_discovery.clone() else {
2551            return false;
2552        };
2553        bootstrap.request_connect(peer_config.clone()).await;
2554        info!(npub = %peer_config.npub, "Started background Nostr UDP NAT traversal attempt");
2555        true
2556    }
2557
2558    fn overlay_endpoint_to_peer_address(
2559        endpoint: &OverlayEndpointAdvert,
2560        priority: u8,
2561        seen_at_ms: u64,
2562    ) -> Option<PeerAddress> {
2563        let transport = match endpoint.transport {
2564            OverlayTransportKind::Udp => "udp",
2565            OverlayTransportKind::Tcp => "tcp",
2566            OverlayTransportKind::Tor => "tor",
2567            OverlayTransportKind::WebRtc => "webrtc",
2568        };
2569        Some(
2570            PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2571                .with_seen_at_ms(seen_at_ms),
2572        )
2573    }
2574
2575    async fn attempt_peer_address_list(
2576        &mut self,
2577        peer_config: &PeerConfig,
2578        peer_identity: PeerIdentity,
2579        allow_bootstrap_nat: bool,
2580        addresses: &[PeerAddress],
2581    ) -> Result<(), NodeError> {
2582        let mut attempted = false;
2583        let peer_node_addr = *peer_identity.node_addr();
2584        let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2585
2586        for addr in addresses {
2587            if attempted && addr.seen_at_ms.is_some() {
2588                debug!(
2589                    npub = %peer_config.npub,
2590                    transport = %addr.transport,
2591                    addr = %addr.addr,
2592                    "Skipping overlay fallback because a configured direct candidate is already in flight"
2593                );
2594                continue;
2595            }
2596
2597            if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2598                if attempted {
2599                    debug!(
2600                        npub = %peer_config.npub,
2601                        "Skipping Nostr NAT fallback because a configured direct candidate is already in flight"
2602                    );
2603                    continue;
2604                }
2605                if !allow_bootstrap_nat {
2606                    continue;
2607                }
2608                if self.request_nostr_bootstrap(peer_config).await {
2609                    attempted = true;
2610                    continue;
2611                }
2612                debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2613                continue;
2614            }
2615
2616            let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2617                match self.resolve_ethernet_addr(&addr.addr) {
2618                    Ok(result) => result,
2619                    Err(e) => {
2620                        debug!(
2621                            transport = %addr.transport,
2622                            addr = %addr.addr,
2623                            error = %e,
2624                            "Failed to resolve Ethernet address"
2625                        );
2626                        continue;
2627                    }
2628                }
2629            } else if addr.transport == "ble" {
2630                #[cfg(bluer_available)]
2631                {
2632                    match self.resolve_ble_addr(&addr.addr) {
2633                        Ok(result) => result,
2634                        Err(e) => {
2635                            debug!(
2636                                transport = %addr.transport,
2637                                addr = %addr.addr,
2638                                error = %e,
2639                                "Failed to resolve BLE address"
2640                            );
2641                            continue;
2642                        }
2643                    }
2644                }
2645                #[cfg(not(bluer_available))]
2646                {
2647                    debug!(transport = %addr.transport, "BLE transport not available on this build");
2648                    continue;
2649                }
2650            } else {
2651                let tid = if addr.transport == "udp"
2652                    && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2653                {
2654                    match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2655                        Some((id, _)) => id,
2656                        None => {
2657                            debug!(
2658                                transport = %addr.transport,
2659                                addr = %addr.addr,
2660                                "No compatible operational UDP transport for address"
2661                            );
2662                            continue;
2663                        }
2664                    }
2665                } else {
2666                    match self.find_transport_for_type(&addr.transport) {
2667                        Some(id) => id,
2668                        None => {
2669                            debug!(
2670                                transport = %addr.transport,
2671                                addr = %addr.addr,
2672                                "No operational transport for address type"
2673                            );
2674                            continue;
2675                        }
2676                    }
2677                };
2678                (tid, TransportAddr::from_string(&addr.addr))
2679            };
2680
2681            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2682                attempted = true;
2683                debug!(
2684                    npub = %peer_config.npub,
2685                    transport_id = %transport_id,
2686                    remote_addr = %remote_addr,
2687                    "Skipping duplicate in-flight candidate path"
2688                );
2689                continue;
2690            }
2691
2692            if concrete_budget == 0 {
2693                debug!(
2694                    npub = %peer_config.npub,
2695                    max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2696                    "Path candidate race budget exhausted"
2697                );
2698                break;
2699            }
2700
2701            match self
2702                .initiate_connection(transport_id, remote_addr, peer_identity)
2703                .await
2704            {
2705                Ok(()) => {
2706                    attempted = true;
2707                    concrete_budget = concrete_budget.saturating_sub(1);
2708                }
2709                Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2710                Err(e) => {
2711                    debug!(
2712                        npub = %peer_config.npub,
2713                        transport_id = %transport_id,
2714                        error = %e,
2715                        "Connection attempt failed, trying next address"
2716                    );
2717                }
2718            }
2719        }
2720
2721        if attempted {
2722            return Ok(());
2723        }
2724
2725        Err(NodeError::NoTransportForType(format!(
2726            "no operational transport for any of {}'s addresses",
2727            peer_config.npub
2728        )))
2729    }
2730
2731    async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
2732        self.run_open_discovery_sweep(bootstrap, None, "per-tick")
2733            .await;
2734    }
2735
2736    /// Open-discovery cache sweep. Iterates the cached overlay adverts and
2737    /// queues retries for non-configured, not-yet-connected peers.
2738    ///
2739    /// `max_age_secs`, if set, filters out adverts whose `created_at` is
2740    /// older than `now - max_age_secs`. The per-tick sweep passes `None`
2741    /// (relies on the cache's own `valid_until_ms` filter); the one-shot
2742    /// startup sweep passes `Some(startup_sweep_max_age_secs)`.
2743    ///
2744    /// `caller` is a short label included in log lines so per-tick and
2745    /// startup sweeps are distinguishable in operator-facing logs.
2746    pub(in crate::node) async fn run_open_discovery_sweep(
2747        &mut self,
2748        bootstrap: &std::sync::Arc<NostrDiscovery>,
2749        max_age_secs: Option<u64>,
2750        caller: &'static str,
2751    ) {
2752        if !self.config.node.discovery.nostr.enabled
2753            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2754        {
2755            return;
2756        }
2757
2758        let configured_npubs = self
2759            .config
2760            .peers()
2761            .iter()
2762            .map(|peer| peer.npub.clone())
2763            .collect::<HashSet<_>>();
2764        let now_ms = Self::now_ms();
2765        let now_secs = now_ms / 1000;
2766        let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
2767        if enqueue_budget == 0 {
2768            debug!(
2769                caller = %caller,
2770                "open-discovery sweep: enqueue budget is 0, skipping"
2771            );
2772            return;
2773        }
2774
2775        let candidates = bootstrap.cached_open_discovery_candidates(64).await;
2776        let cached_count = candidates.len();
2777        let mut enqueued = 0usize;
2778        let mut skipped_age = 0usize;
2779        let mut skipped_configured = 0usize;
2780        let mut skipped_self = 0usize;
2781        let mut skipped_connected = 0usize;
2782        let mut skipped_retry_pending = 0usize;
2783        let mut skipped_connecting = 0usize;
2784        let mut skipped_no_endpoints = 0usize;
2785        let mut skipped_invalid_npub = 0usize;
2786        let mut skipped_cooldown = 0usize;
2787
2788        for (npub, endpoints, created_at_secs) in candidates {
2789            if enqueue_budget == 0 {
2790                break;
2791            }
2792
2793            if let Some(max_age) = max_age_secs
2794                && now_secs.saturating_sub(created_at_secs) > max_age
2795            {
2796                skipped_age = skipped_age.saturating_add(1);
2797                continue;
2798            }
2799
2800            if configured_npubs.contains(&npub) {
2801                // Configured peers don't go through the open-discovery
2802                // enqueue path — their `PeerConfig` is already in
2803                // `self.config.peers()`, so the regular retry queue is
2804                // what drives their reconnect. But on cold start with
2805                // NAT'd peers, every initial `initiate_peer_connection`
2806                // fails (no overlay data yet, static cache hints empty
2807                // or stale), each pushes the peer into `retry_pending`
2808                // with exponential backoff (5/10/20/40/80s), and by the
2809                // time the next backoff slot fires the Nostr advert is
2810                // already cached — we just don't act on it for ~80s.
2811                //
2812                // The arrival of an advert (which this sweep sees) means
2813                // we now have a path to dial. If the peer's retry is
2814                // scheduled in the future, pull it forward to "now" so
2815                // the next `process_pending_retries` tick fires it
2816                // immediately. The retry path (`initiate_peer_retry_
2817                // connection` → `try_peer_addresses`) then refetches
2818                // the advert and dials it — no behavioral change
2819                // beyond schedule timing.
2820                if let Ok(identity) = PeerIdentity::from_npub(&npub) {
2821                    let configured_addr = *identity.node_addr();
2822                    if let Some(state) = self.retry_pending.get_mut(&configured_addr)
2823                        && state.retry_after_ms > now_ms
2824                    {
2825                        state.retry_after_ms = now_ms;
2826                        debug!(
2827                            caller = %caller,
2828                            peer = %self.peer_display_name(&configured_addr),
2829                            advert_age_secs = now_secs.saturating_sub(created_at_secs),
2830                            "Expediting configured-peer retry after fresh overlay advert"
2831                        );
2832                    }
2833                }
2834                skipped_configured = skipped_configured.saturating_add(1);
2835                continue;
2836            }
2837
2838            let peer_identity = match PeerIdentity::from_npub(&npub) {
2839                Ok(identity) => identity,
2840                Err(_) => {
2841                    skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
2842                    continue;
2843                }
2844            };
2845            let node_addr = *peer_identity.node_addr();
2846            if node_addr == *self.identity.node_addr() {
2847                skipped_self = skipped_self.saturating_add(1);
2848                continue;
2849            }
2850            if self.peers.contains_key(&node_addr) {
2851                skipped_connected = skipped_connected.saturating_add(1);
2852                continue;
2853            }
2854            if self.retry_pending.contains_key(&node_addr) {
2855                skipped_retry_pending = skipped_retry_pending.saturating_add(1);
2856                continue;
2857            }
2858            if bootstrap.cooldown_until(&npub, now_ms).is_some() {
2859                skipped_cooldown = skipped_cooldown.saturating_add(1);
2860                continue;
2861            }
2862            let connecting = self.connections.values().any(|conn| {
2863                conn.expected_identity()
2864                    .map(|id| id.node_addr() == &node_addr)
2865                    .unwrap_or(false)
2866            });
2867            if connecting {
2868                skipped_connecting = skipped_connecting.saturating_add(1);
2869                continue;
2870            }
2871
2872            let mut addresses = Vec::new();
2873            let mut priority = 120u8;
2874            let seen_at_ms = Self::now_ms();
2875            for endpoint in endpoints {
2876                let Some(candidate) =
2877                    Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
2878                else {
2879                    continue;
2880                };
2881                if addresses.iter().any(|existing: &PeerAddress| {
2882                    existing.transport == candidate.transport && existing.addr == candidate.addr
2883                }) {
2884                    continue;
2885                }
2886                addresses.push(candidate);
2887                priority = priority.saturating_add(1);
2888            }
2889            if addresses.is_empty() {
2890                skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
2891                continue;
2892            }
2893
2894            self.peer_aliases
2895                .entry(node_addr)
2896                .or_insert_with(|| peer_identity.short_npub());
2897            self.register_identity(node_addr, peer_identity.pubkey_full());
2898
2899            let mut state = super::retry::RetryState::new(PeerConfig {
2900                npub: npub.clone(),
2901                alias: None,
2902                addresses,
2903                connect_policy: ConnectPolicy::AutoConnect,
2904                auto_reconnect: true,
2905                discovery_fallback_transit: false,
2906            });
2907            state.reconnect = false;
2908            state.retry_after_ms = now_ms;
2909            state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
2910            self.retry_pending.insert(node_addr, state);
2911            info!(
2912                caller = %caller,
2913                peer = %peer_identity.short_npub(),
2914                advert_age_secs = now_secs.saturating_sub(created_at_secs),
2915                "open-discovery sweep: queued retry for cached advert"
2916            );
2917            enqueue_budget = enqueue_budget.saturating_sub(1);
2918            enqueued = enqueued.saturating_add(1);
2919        }
2920
2921        // Always log a one-line summary on the startup sweep so operators
2922        // can verify it ran. Per-tick sweeps are noisier; only summarize
2923        // when something happened.
2924        let total_skipped = skipped_age
2925            + skipped_configured
2926            + skipped_self
2927            + skipped_connected
2928            + skipped_retry_pending
2929            + skipped_connecting
2930            + skipped_no_endpoints
2931            + skipped_invalid_npub
2932            + skipped_cooldown;
2933        let should_summarize = caller == "startup" || enqueued > 0;
2934        if should_summarize {
2935            info!(
2936                caller = %caller,
2937                cached = cached_count,
2938                queued = enqueued,
2939                skipped_age = skipped_age,
2940                skipped_configured = skipped_configured,
2941                skipped_self = skipped_self,
2942                skipped_connected = skipped_connected,
2943                skipped_retry_pending = skipped_retry_pending,
2944                skipped_connecting = skipped_connecting,
2945                skipped_no_endpoints = skipped_no_endpoints,
2946                skipped_invalid_npub = skipped_invalid_npub,
2947                skipped_cooldown = skipped_cooldown,
2948                skipped_total = total_skipped,
2949                "open-discovery sweep complete"
2950            );
2951        }
2952    }
2953
2954    /// One-shot startup sweep: runs once after the configured settle
2955    /// delay, iterating the cached overlay adverts and queueing retries
2956    /// for any peer with a recent enough advert that we haven't already
2957    /// configured statically or established a link to.
2958    ///
2959    /// Gated identically to [`run_open_discovery_sweep`]: requires
2960    /// `node.discovery.nostr.enabled` and `policy == open`.
2961    async fn maybe_run_startup_open_discovery_sweep(
2962        &mut self,
2963        bootstrap: &std::sync::Arc<NostrDiscovery>,
2964    ) {
2965        if self.startup_open_discovery_sweep_done {
2966            return;
2967        }
2968        if !self.config.node.discovery.nostr.enabled
2969            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2970        {
2971            // Mark done so we don't keep re-checking on every tick.
2972            self.startup_open_discovery_sweep_done = true;
2973            return;
2974        }
2975        let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
2976            return;
2977        };
2978        let now_ms = Self::now_ms();
2979        let delay_ms = self
2980            .config
2981            .node
2982            .discovery
2983            .nostr
2984            .startup_sweep_delay_secs
2985            .saturating_mul(1000);
2986        if now_ms < started_at_ms.saturating_add(delay_ms) {
2987            return;
2988        }
2989
2990        let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
2991        self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
2992            .await;
2993        self.startup_open_discovery_sweep_done = true;
2994    }
2995
2996    fn available_outbound_slots(&self) -> usize {
2997        let connection_used = self
2998            .connections
2999            .len()
3000            .saturating_add(self.pending_connects.len());
3001        let connection_slots = if self.max_connections == 0 {
3002            usize::MAX
3003        } else {
3004            self.max_connections.saturating_sub(connection_used)
3005        };
3006
3007        let peer_slots = if self.max_peers == 0 {
3008            usize::MAX
3009        } else {
3010            self.max_peers.saturating_sub(self.peers.len())
3011        };
3012
3013        connection_slots.min(peer_slots)
3014    }
3015
3016    fn open_discovery_enqueue_budget(&self, configured_npubs: &HashSet<String>) -> usize {
3017        let current_open_discovery_pending = self
3018            .retry_pending
3019            .values()
3020            .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
3021            .count();
3022
3023        let cap_remaining = self
3024            .config
3025            .node
3026            .discovery
3027            .nostr
3028            .open_discovery_max_pending
3029            .saturating_sub(current_open_discovery_pending);
3030
3031        cap_remaining.min(self.available_outbound_slots())
3032    }
3033
3034    fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
3035        now_ms.saturating_add(
3036            self.config
3037                .node
3038                .discovery
3039                .nostr
3040                .advert_ttl_secs
3041                .saturating_mul(1000)
3042                .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
3043        )
3044    }
3045
3046    async fn build_overlay_advert(
3047        &self,
3048        bootstrap: &std::sync::Arc<NostrDiscovery>,
3049    ) -> Option<OverlayAdvert> {
3050        if !self.config.node.discovery.nostr.enabled {
3051            return None;
3052        }
3053
3054        let mut endpoints = Vec::new();
3055        let mut has_udp_nat = false;
3056        let mut has_webrtc = false;
3057
3058        for handle in self.transports.values() {
3059            if !handle.is_operational() {
3060                continue;
3061            }
3062
3063            match handle.transport_type().name {
3064                "udp" => {
3065                    let Some(cfg) = self.lookup_udp_config(handle.name()) else {
3066                        continue;
3067                    };
3068                    if !cfg.advertise_on_nostr() {
3069                        continue;
3070                    }
3071                    if cfg.is_public() {
3072                        // Precedence:
3073                        // 1. operator-supplied `external_addr` (skips STUN)
3074                        // 2. non-wildcard *public* `local_addr` (operator
3075                        //    bound to a specific public IP directly)
3076                        // 3. STUN auto-discovery against ephemeral socket
3077                        //    (also taken when bind is wildcard *or* private —
3078                        //    a private bind is not peer-reachable, so we
3079                        //    must publish the public reflexive instead)
3080                        // 4. loud warn + omit endpoint
3081                        if let Some(explicit) = cfg.external_advert_addr() {
3082                            endpoints.push(OverlayEndpointAdvert {
3083                                transport: OverlayTransportKind::Udp,
3084                                addr: explicit.to_string(),
3085                            });
3086                        } else {
3087                            match handle.local_addr() {
3088                                Some(addr)
3089                                    if !addr.ip().is_unspecified()
3090                                        && !is_unroutable_advert_ip(addr.ip()) =>
3091                                {
3092                                    endpoints.push(OverlayEndpointAdvert {
3093                                        transport: OverlayTransportKind::Udp,
3094                                        addr: addr.to_string(),
3095                                    });
3096                                }
3097                                Some(addr) => {
3098                                    let key = handle.transport_id().as_u32();
3099                                    let port = addr.port();
3100                                    if let Some(public) =
3101                                        bootstrap.learn_public_udp_addr(key, port).await
3102                                    {
3103                                        endpoints.push(OverlayEndpointAdvert {
3104                                            transport: OverlayTransportKind::Udp,
3105                                            addr: public.to_string(),
3106                                        });
3107                                    } else {
3108                                        warn!(
3109                                            transport_id = key,
3110                                            bind_addr = %addr,
3111                                            "advert: udp public=true but bind is wildcard \
3112                                            or private and STUN observation failed; \
3113                                            advertising no UDP endpoint. Either set \
3114                                            transports.udp.external_addr, bind to a \
3115                                            specific *public* IP, or ensure \
3116                                            node.discovery.nostr.stun_servers is reachable"
3117                                        );
3118                                    }
3119                                }
3120                                None => {}
3121                            }
3122                        }
3123                    } else {
3124                        endpoints.push(OverlayEndpointAdvert {
3125                            transport: OverlayTransportKind::Udp,
3126                            addr: "nat".to_string(),
3127                        });
3128                        has_udp_nat = true;
3129                    }
3130                }
3131                "webrtc" => {
3132                    let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
3133                        continue;
3134                    };
3135                    if !cfg.advertise_on_nostr() {
3136                        continue;
3137                    }
3138                    endpoints.push(OverlayEndpointAdvert {
3139                        transport: OverlayTransportKind::WebRtc,
3140                        addr: hex::encode(self.identity.pubkey_full().serialize()),
3141                    });
3142                    has_webrtc = true;
3143                }
3144                "tcp" => {
3145                    let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
3146                        continue;
3147                    };
3148                    if !cfg.advertise_on_nostr() {
3149                        continue;
3150                    }
3151                    // Precedence:
3152                    // 1. operator-supplied `external_addr` (only path that
3153                    //    works on cloud-NAT setups where the public IP is
3154                    //    not on a host interface).
3155                    // 2. non-wildcard *public* `local_addr` (operator bound
3156                    //    to a specific public IP directly).
3157                    // 3. loud warn + omit endpoint (no TCP STUN equivalent).
3158                    //
3159                    // A wildcard *or* private bind is never advertised as-is
3160                    // — peers off-LAN can't reach a private bind, and there
3161                    // is no TCP STUN to discover a public reflexive.
3162                    if let Some(explicit) = cfg.external_advert_addr() {
3163                        endpoints.push(OverlayEndpointAdvert {
3164                            transport: OverlayTransportKind::Tcp,
3165                            addr: explicit.to_string(),
3166                        });
3167                    } else {
3168                        match handle.local_addr() {
3169                            Some(addr)
3170                                if !addr.ip().is_unspecified()
3171                                    && !is_unroutable_advert_ip(addr.ip()) =>
3172                            {
3173                                endpoints.push(OverlayEndpointAdvert {
3174                                    transport: OverlayTransportKind::Tcp,
3175                                    addr: addr.to_string(),
3176                                });
3177                            }
3178                            Some(addr) => {
3179                                warn!(
3180                                    bind_addr = %addr,
3181                                    "advert: tcp advertise_on_nostr=true bound to wildcard \
3182                                    or private IP and no transports.tcp.external_addr set; \
3183                                    advertising no TCP endpoint. Either set external_addr \
3184                                    to the public IP (recommended for cloud 1:1-NAT setups) \
3185                                    or bind explicitly to the public IP"
3186                                );
3187                            }
3188                            None => {}
3189                        }
3190                    }
3191                }
3192                "tor" => {
3193                    let Some(cfg) = self.lookup_tor_config(handle.name()) else {
3194                        continue;
3195                    };
3196                    if !cfg.advertise_on_nostr() {
3197                        continue;
3198                    }
3199                    if let Some(addr) = handle.onion_address() {
3200                        endpoints.push(OverlayEndpointAdvert {
3201                            transport: OverlayTransportKind::Tor,
3202                            addr: format!("{}:{}", addr, cfg.advertised_port()),
3203                        });
3204                    }
3205                }
3206                _ => {}
3207            }
3208        }
3209
3210        if endpoints.is_empty() {
3211            return None;
3212        }
3213
3214        Some(OverlayAdvert {
3215            identifier: ADVERT_IDENTIFIER.to_string(),
3216            version: ADVERT_VERSION,
3217            endpoints,
3218            signal_relays: (has_udp_nat || has_webrtc)
3219                .then(|| self.config.node.discovery.nostr.dm_relays.clone()),
3220            stun_servers: (has_udp_nat || has_webrtc)
3221                .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
3222        })
3223    }
3224
3225    async fn refresh_overlay_advert(
3226        &self,
3227        bootstrap: &std::sync::Arc<NostrDiscovery>,
3228    ) -> Result<(), crate::discovery::nostr::BootstrapError> {
3229        let advert = self.build_overlay_advert(bootstrap).await;
3230        bootstrap.update_local_advert(advert).await
3231    }
3232
3233    fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
3234        match (&self.config.transports.udp, transport_name) {
3235            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3236            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3237            _ => None,
3238        }
3239    }
3240
3241    fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
3242        match (&self.config.transports.tcp, transport_name) {
3243            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3244            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3245            _ => None,
3246        }
3247    }
3248
3249    fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
3250        match (&self.config.transports.tor, transport_name) {
3251            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3252            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3253            _ => None,
3254        }
3255    }
3256
3257    fn lookup_webrtc_config(
3258        &self,
3259        transport_name: Option<&str>,
3260    ) -> Option<&crate::config::WebRtcConfig> {
3261        match (&self.config.transports.webrtc, transport_name) {
3262            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
3263            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
3264            _ => None,
3265        }
3266    }
3267
3268    pub(in crate::node) async fn try_peer_addresses(
3269        &mut self,
3270        peer_config: &PeerConfig,
3271        peer_identity: PeerIdentity,
3272        allow_bootstrap_nat: bool,
3273    ) -> Result<(), NodeError> {
3274        let peer_node_addr = *peer_identity.node_addr();
3275        if self.peers.contains_key(&peer_node_addr) {
3276            debug!(
3277                npub = %peer_config.npub,
3278                "Peer already exists, skipping address attempts"
3279            );
3280            return Ok(());
3281        }
3282
3283        let candidates = self.peer_address_candidates(peer_config).await;
3284
3285        if candidates.is_empty() {
3286            if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
3287                return Ok(());
3288            }
3289            return Err(NodeError::NoTransportForType(format!(
3290                "no addresses known for {}",
3291                peer_config.npub
3292            )));
3293        }
3294
3295        if self
3296            .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
3297            .await
3298            .is_ok()
3299        {
3300            return Ok(());
3301        }
3302
3303        Err(NodeError::NoTransportForType(format!(
3304            "no operational transport for any of {}'s addresses",
3305            peer_config.npub
3306        )))
3307    }
3308
3309    async fn try_active_peer_alternative_addresses(
3310        &mut self,
3311        peer_config: &PeerConfig,
3312        peer_identity: PeerIdentity,
3313    ) -> Result<bool, NodeError> {
3314        let peer_node_addr = *peer_identity.node_addr();
3315        let candidates = self.peer_address_candidates(peer_config).await;
3316
3317        if candidates.is_empty() {
3318            return Err(NodeError::NoTransportForType(format!(
3319                "no addresses known for {}",
3320                peer_config.npub
3321            )));
3322        }
3323
3324        let alternatives: Vec<_> = candidates
3325            .into_iter()
3326            .filter(|addr| !(addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat")))
3327            .filter(|addr| !self.active_peer_matches_candidate(&peer_node_addr, addr))
3328            .collect();
3329
3330        if alternatives.is_empty() {
3331            return Err(NodeError::NoTransportForType(format!(
3332                "no concrete alternate addresses known for {}",
3333                peer_config.npub
3334            )));
3335        }
3336
3337        self.attempt_peer_address_list(peer_config, peer_identity, false, &alternatives)
3338            .await?;
3339        Ok(true)
3340    }
3341
3342    async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
3343        // Merge every candidate from every source we have for this peer.
3344        // Explicitly configured addresses keep first shot, then freshly
3345        // fetched overlay adverts are appended as fallback candidates. This
3346        // lets native peers try known LAN/nvpn/static UDP routes before
3347        // slower WebRTC/Nostr-discovered paths, while still racing every
3348        // concrete candidate that fits in the per-peer budget.
3349        let static_addresses = self.static_peer_addresses(peer_config);
3350        let overlay_addresses = self
3351            .nostr_peer_fallback_addresses(peer_config, &static_addresses)
3352            .await;
3353
3354        let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
3355        for addr in overlay_addresses.into_iter().chain(static_addresses) {
3356            if !candidates.iter().any(|existing: &PeerAddress| {
3357                existing.transport == addr.transport && existing.addr == addr.addr
3358            }) {
3359                candidates.push(addr);
3360            }
3361        }
3362
3363        // Stable sort: explicit priority first, with recency only breaking
3364        // ties inside a source tier. `nostr_peer_fallback_addresses` assigns
3365        // overlay addresses priorities after the static max, so a fresh
3366        // overlay advert cannot leapfrog an operator-provided direct hint.
3367        candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
3368            _ if a.priority != b.priority => a.priority.cmp(&b.priority),
3369            (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
3370            (Some(_), None) => std::cmp::Ordering::Less,
3371            (None, Some(_)) => std::cmp::Ordering::Greater,
3372            (None, None) => std::cmp::Ordering::Equal,
3373        });
3374
3375        candidates
3376    }
3377
3378    fn active_peer_matches_any_candidate(
3379        &self,
3380        peer_node_addr: &NodeAddr,
3381        candidates: &[PeerAddress],
3382    ) -> bool {
3383        candidates
3384            .iter()
3385            .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
3386    }
3387
3388    pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
3389        &self,
3390        peer_node_addr: &NodeAddr,
3391        candidates: &[PeerAddress],
3392    ) -> bool {
3393        if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
3394            return false;
3395        }
3396        !self.active_peer_needs_same_path_refresh(peer_node_addr)
3397    }
3398
3399    fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
3400        let Some(peer) = self.peers.get(peer_node_addr) else {
3401            return false;
3402        };
3403        let stale_after_ms = self
3404            .config
3405            .node
3406            .heartbeat_interval_secs
3407            .saturating_mul(1000)
3408            .max(1000);
3409        peer.idle_time(Self::now_ms()) > stale_after_ms
3410    }
3411
3412    fn active_peer_matches_candidate(
3413        &self,
3414        peer_node_addr: &NodeAddr,
3415        candidate: &PeerAddress,
3416    ) -> bool {
3417        let Some(peer) = self.peers.get(peer_node_addr) else {
3418            return false;
3419        };
3420        let Some(current_addr) = peer.current_addr() else {
3421            return false;
3422        };
3423        if let Some(peer_transport_id) = peer.transport_id()
3424            && let Some((candidate_transport_id, candidate_addr)) =
3425                self.resolve_peer_address_for_match(candidate)
3426        {
3427            return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
3428        }
3429        if peer
3430            .transport_id()
3431            .map(|id| self.bootstrap_transports.contains(&id))
3432            .unwrap_or(false)
3433        {
3434            return false;
3435        }
3436        let current_addr = current_addr.to_string();
3437        let current_transport = peer
3438            .transport_id()
3439            .and_then(|id| self.transports.get(&id))
3440            .map(|transport| transport.transport_type().name);
3441
3442        candidate.addr == current_addr
3443            && current_transport
3444                .map(|transport| transport == candidate.transport)
3445                .unwrap_or(true)
3446    }
3447
3448    // === Control API methods ===
3449
3450    /// Connect to a peer via the control API.
3451    ///
3452    /// Creates an ephemeral peer connection (not persisted to config, no
3453    /// auto-reconnect). Reuses the same connection path as auto-connect
3454    /// peers. Returns JSON data on success or an error message.
3455    pub(crate) async fn api_connect(
3456        &mut self,
3457        npub: &str,
3458        address: &str,
3459        transport: &str,
3460    ) -> Result<serde_json::Value, String> {
3461        let peer_config = PeerConfig {
3462            npub: npub.to_string(),
3463            alias: None,
3464            addresses: vec![PeerAddress::new(transport, address)],
3465            connect_policy: ConnectPolicy::Manual,
3466            auto_reconnect: false,
3467            discovery_fallback_transit: true,
3468        };
3469
3470        // Pre-seed identity cache (same as initiate_peer_connections does)
3471        if let Ok(identity) = PeerIdentity::from_npub(npub) {
3472            self.peer_aliases
3473                .insert(*identity.node_addr(), identity.short_npub());
3474            self.register_identity(*identity.node_addr(), identity.pubkey_full());
3475        }
3476
3477        self.initiate_peer_connection(&peer_config)
3478            .await
3479            .map(|()| {
3480                info!(
3481                    npub = %npub,
3482                    address = %address,
3483                    transport = %transport,
3484                    "API connect initiated"
3485                );
3486                serde_json::json!({
3487                    "npub": npub,
3488                    "address": address,
3489                    "transport": transport,
3490                })
3491            })
3492            .map_err(|e| e.to_string())
3493    }
3494
3495    /// Disconnect a peer via the control API.
3496    ///
3497    /// Removes the peer and suppresses auto-reconnect.
3498    pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
3499        let peer_identity =
3500            PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
3501        let node_addr = *peer_identity.node_addr();
3502
3503        if !self.peers.contains_key(&node_addr) {
3504            return Err(format!("peer not found: {npub}"));
3505        }
3506
3507        // Remove the peer (full cleanup: sessions, indices, links, tree, bloom)
3508        self.remove_active_peer(&node_addr);
3509
3510        // Suppress any pending auto-reconnect
3511        self.retry_pending.remove(&node_addr);
3512
3513        info!(npub = %npub, "API disconnect completed");
3514
3515        Ok(serde_json::json!({
3516            "npub": npub,
3517            "disconnected": true,
3518        }))
3519    }
3520
3521    /// Adopt an already-established UDP traversal and start the normal FIPS
3522    /// Noise handshake over it.
3523    ///
3524    /// This is intended for integration with an external rendezvous runtime
3525    /// that has already completed relay signaling, STUN observation, and UDP
3526    /// hole punching. After handoff, the adopted socket is owned by FIPS.
3527    pub async fn adopt_established_traversal(
3528        &mut self,
3529        traversal: EstablishedTraversal,
3530    ) -> Result<BootstrapHandoffResult, NodeError> {
3531        debug!(
3532            peer_npub = %traversal.peer_npub,
3533            session_id = %traversal.session_id,
3534            remote_addr = %traversal.remote_addr,
3535            "adopting established traversal socket"
3536        );
3537
3538        if !self.state.is_operational() {
3539            return Err(NodeError::NotStarted);
3540        }
3541
3542        let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
3543        let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
3544            NodeError::InvalidPeerNpub {
3545                npub: traversal.peer_npub.clone(),
3546                reason: e.to_string(),
3547            }
3548        })?;
3549        let peer_node_addr = *peer_identity.node_addr();
3550        if self.peers.contains_key(&peer_node_addr) {
3551            debug!(
3552                peer_npub = %traversal.peer_npub,
3553                "Adopting NAT traversal handoff as alternate path for already-connected peer"
3554            );
3555        }
3556
3557        self.peer_aliases
3558            .insert(peer_node_addr, peer_identity.short_npub());
3559        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
3560
3561        let transport_id = self.allocate_transport_id();
3562        // Adopted ephemeral UDP transports inherit MTU + socket-buffer sizing
3563        // (and accept_connections / advertise flags) from the operator's
3564        // configured [transports.udp] when the bootstrap runtime doesn't
3565        // pass an explicit override. Lookup tries `transport_name` first
3566        // (covers the `Named` multi-listener variant) and falls back to the
3567        // unnamed `Single` listener, so single- and named-listener configs
3568        // both inherit cleanly.
3569        //
3570        // Tradeoff: `UdpConfig::default()` sets MTU 1280 (IPv6 minimum), the
3571        // only value guaranteed to survive arbitrary middlebox paths.
3572        // Inheriting a higher operator-chosen MTU means NAT-traversed flows
3573        // initially attempt that MTU and may black-hole on tighter paths
3574        // until reactive `MtuExceeded` recovery kicks in. Operators who
3575        // raise the primary MTU based on known-clean topology accept that
3576        // tradeoff; the silent drop on a too-low default was strictly
3577        // worse for the common case where the primary MTU is reachable.
3578        //
3579        // Bind / external address fields are cleared since the socket is
3580        // already bound.
3581        let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
3582            let mut cfg = self
3583                .lookup_udp_config(traversal.transport_name.as_deref())
3584                .or_else(|| self.lookup_udp_config(None))
3585                .cloned()
3586                .unwrap_or_default();
3587            cfg.bind_addr = None;
3588            cfg.external_addr = None;
3589            cfg
3590        });
3591        let mut transport = crate::transport::udp::UdpTransport::new(
3592            transport_id,
3593            traversal.transport_name.clone(),
3594            inherited_config,
3595            packet_tx,
3596        );
3597
3598        transport
3599            .adopt_socket_async(traversal.socket)
3600            .await
3601            .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
3602
3603        let local_addr = transport.local_addr().ok_or_else(|| {
3604            NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
3605        })?;
3606
3607        self.transports.insert(
3608            transport_id,
3609            crate::transport::TransportHandle::Udp(transport),
3610        );
3611        self.bootstrap_transports.insert(transport_id);
3612        self.bootstrap_transport_npubs
3613            .insert(transport_id, traversal.peer_npub.clone());
3614
3615        let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
3616        if let Err(err) = self
3617            .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
3618            .await
3619        {
3620            self.bootstrap_transports.remove(&transport_id);
3621            self.bootstrap_transport_npubs.remove(&transport_id);
3622            if let Some(mut handle) = self.transports.remove(&transport_id) {
3623                let _ = handle.stop().await;
3624            }
3625            return Err(err);
3626        }
3627
3628        info!(
3629            peer = %self.peer_display_name(&peer_node_addr),
3630            transport_id = %transport_id,
3631            local_addr = %local_addr,
3632            remote_addr = %traversal.remote_addr,
3633            session_id = %traversal.session_id,
3634            "adopted NAT traversal socket; handshake initiated"
3635        );
3636
3637        Ok(BootstrapHandoffResult {
3638            transport_id,
3639            local_addr,
3640            remote_addr: traversal.remote_addr,
3641            peer_node_addr,
3642            session_id: traversal.session_id,
3643        })
3644    }
3645}