Skip to main content

fips_core/node/
lifecycle.rs

1//! Node lifecycle management: start, stop, and peer connection initiation.
2
3use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6    ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, NostrDiscovery, OverlayAdvert,
7    OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use std::collections::{HashMap, HashSet};
18use std::net::{IpAddr, SocketAddr};
19use std::thread;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23#[cfg(debug_assertions)]
24fn node_start_debug_log(message: impl AsRef<str>) {
25    use std::io::Write as _;
26
27    if let Ok(mut file) = std::fs::OpenOptions::new()
28        .create(true)
29        .append(true)
30        .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
31    {
32        let _ = writeln!(
33            file,
34            "{:?} {}",
35            std::time::SystemTime::now(),
36            message.as_ref()
37        );
38    }
39}
40
41#[cfg(not(debug_assertions))]
42fn node_start_debug_log(_message: impl AsRef<str>) {}
43
44/// True if `ip` is not a viable canonical advert endpoint for peers off
45/// the publisher's own LAN. Covers RFC1918, loopback, link-local, IPv4
46/// CGNAT (100.64/10), unspecified, multicast/benchmark, and IPv6
47/// unique-local/loopback/unspecified. We never publish these as the
48/// peer's primary `runtime_endpoint`; an off-LAN consumer can't route
49/// to them, and latching one in onto a slow overlay-relay fallback is
50/// the original bug this guard exists to prevent.
51fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
52    match ip {
53        IpAddr::V4(v4) => {
54            v4.is_private()
55                || v4.is_loopback()
56                || v4.is_link_local()
57                || v4.is_unspecified()
58                || v4.is_multicast()
59                || v4.is_broadcast()
60                || v4.is_documentation()
61                // 100.64.0.0/10 — CGNAT, RFC 6598. Not routable on the
62                // public internet; behaves like an extra NAT layer.
63                || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
64        }
65        IpAddr::V6(v6) => {
66            v6.is_loopback()
67                || v6.is_unspecified()
68                || v6.is_unique_local()
69                || v6.is_multicast()
70                // IPv6 link-local: fe80::/10
71                || (v6.segments()[0] & 0xffc0) == 0xfe80
72        }
73    }
74}
75
76fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
77    matches!(
78        (local, remote),
79        (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
80    )
81}
82
83const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
84const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
85const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
86const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
87
88impl Node {
89    /// Initiate connections to configured static peers.
90    ///
91    /// For each peer configured with AutoConnect policy, creates a link and
92    /// peer entry, then starts the Noise handshake by sending the first message.
93    /// Replace the runtime peer list. Newly added auto-connect peers get
94    /// `initiate_peer_connection` immediately; removed peers are dropped from
95    /// the retry queue (the regular liveness timeout reaps any active session
96    /// — we don't proactively disconnect, since the same npub might be on its
97    /// way back via a fresh advert). Existing auto-connect entries with new
98    /// direct addresses are dialed immediately, and retry state is refreshed
99    /// so later attempts also use the latest hints.
100    pub(super) async fn update_peers(
101        &mut self,
102        new_peers: Vec<crate::config::PeerConfig>,
103    ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
104        use std::collections::{HashMap, HashSet};
105
106        let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
107            HashMap::with_capacity(new_peers.len());
108        for peer in new_peers {
109            let identity = match PeerIdentity::from_npub(&peer.npub) {
110                Ok(id) => id,
111                Err(e) => {
112                    return Err(crate::node::NodeError::InvalidPeerNpub {
113                        npub: peer.npub.clone(),
114                        reason: e.to_string(),
115                    });
116                }
117            };
118            // Last-write-wins on duplicates so callers passing a multi-source
119            // candidate list (e.g. operator hints + recent-peers cache for
120            // the same npub) get the merge they meant.
121            new_by_addr.insert(*identity.node_addr(), peer);
122        }
123
124        let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
125            .config
126            .peers()
127            .iter()
128            .filter_map(|pc| {
129                PeerIdentity::from_npub(&pc.npub)
130                    .ok()
131                    .map(|id| (*id.node_addr(), pc.clone()))
132            })
133            .collect();
134
135        let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
136        let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
137
138        let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
139        let added: Vec<_> = new_addrs.difference(&current_addrs).copied().collect();
140        let kept: Vec<_> = new_addrs.intersection(&current_addrs).copied().collect();
141
142        let mut outcome = crate::node::UpdatePeersOutcome::default();
143
144        for node_addr in &removed {
145            if self.retry_pending.remove(node_addr).is_some() {
146                debug!(
147                    peer = %self.peer_display_name(node_addr),
148                    "Dropping retry entry for peer removed from runtime peer list"
149                );
150            }
151            self.peer_aliases.remove(node_addr);
152            self.set_discovery_fallback_transit_allowed(*node_addr, false);
153            outcome.removed += 1;
154        }
155
156        let mut auto_connect_refresh_configs = Vec::new();
157        for node_addr in &kept {
158            let new_pc = &new_by_addr[node_addr];
159            let current_pc = &current_by_addr[node_addr];
160            if new_pc.addresses != current_pc.addresses
161                || new_pc.alias != current_pc.alias
162                || new_pc.connect_policy != current_pc.connect_policy
163                || new_pc.auto_reconnect != current_pc.auto_reconnect
164                || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
165            {
166                outcome.updated += 1;
167                self.set_discovery_fallback_transit_allowed(
168                    *node_addr,
169                    new_pc.discovery_fallback_transit,
170                );
171                if let Some(state) = self.retry_pending.get_mut(node_addr) {
172                    state.peer_config = new_pc.clone();
173                    state.retry_after_ms = Self::now_ms();
174                }
175                if let Some(alias) = new_pc.alias.clone() {
176                    self.peer_aliases.insert(*node_addr, alias);
177                }
178                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
179                    auto_connect_refresh_configs.push(new_pc.clone());
180                }
181            } else {
182                outcome.unchanged += 1;
183                self.set_discovery_fallback_transit_allowed(
184                    *node_addr,
185                    new_pc.discovery_fallback_transit,
186                );
187                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
188                    auto_connect_refresh_configs.push(new_pc.clone());
189                }
190            }
191        }
192
193        let added_configs: Vec<crate::config::PeerConfig> =
194            added.iter().map(|addr| new_by_addr[addr].clone()).collect();
195
196        // Replace the live config peer list before initiating connections so
197        // any helper that consults `self.config.peers()` during the dial
198        // (alias lookup, retry-state seeding) sees the new entries.
199        self.config.peers = new_by_addr.into_values().collect();
200
201        for peer_config in added_configs {
202            outcome.added += 1;
203            let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
204                continue;
205            };
206            let name = peer_config
207                .alias
208                .clone()
209                .unwrap_or_else(|| identity.short_npub());
210            self.peer_aliases.insert(*identity.node_addr(), name);
211            self.set_discovery_fallback_transit_allowed(
212                *identity.node_addr(),
213                peer_config.discovery_fallback_transit,
214            );
215            self.register_identity(*identity.node_addr(), identity.pubkey_full());
216
217            if !peer_config.is_auto_connect() {
218                continue;
219            }
220
221            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
222                warn!(
223                    npub = %peer_config.npub,
224                    error = %e,
225                    "Failed to initiate connection for newly added peer"
226                );
227                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
228                    self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
229                }
230                if matches!(e, crate::node::NodeError::NoTransportForType(_))
231                    && let Some(bootstrap) = self.nostr_discovery.clone()
232                {
233                    bootstrap
234                        .request_advert_stale_check(peer_config.npub.clone())
235                        .await;
236                }
237            }
238        }
239
240        for peer_config in auto_connect_refresh_configs {
241            let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
242                continue;
243            };
244            let node_addr = *peer_identity.node_addr();
245
246            if self.peers.contains_key(&node_addr) {
247                match self
248                    .initiate_active_peer_alternative_connection(&peer_config)
249                    .await
250                {
251                    Ok(attempted) => {
252                        if attempted {
253                            debug!(
254                                peer = %self.peer_display_name(&node_addr),
255                                "Started non-disruptive alternate-path handshake for active peer"
256                            );
257                        }
258                    }
259                    Err(e) => {
260                        debug!(
261                            npub = %peer_config.npub,
262                            error = %e,
263                            "Active peer alternate-path refresh did not start"
264                        );
265                    }
266                }
267                continue;
268            }
269
270            match self.initiate_peer_connection(&peer_config).await {
271                Ok(()) => {
272                    let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
273                    if let Some(state) = self.retry_pending.get_mut(&node_addr) {
274                        state.peer_config = peer_config;
275                        state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
276                    }
277                }
278                Err(e) => {
279                    debug!(
280                        npub = %peer_config.npub,
281                        error = %e,
282                        "Refreshed peer addresses did not initiate a direct connection"
283                    );
284                    self.schedule_retry(node_addr, Self::now_ms());
285                }
286            }
287        }
288
289        self.warm_auto_connect_graph_sessions().await;
290
291        Ok(outcome)
292    }
293
294    pub(super) async fn initiate_peer_connections(&mut self) {
295        // Build display name map from all configured peers (alias or short npub),
296        // and pre-seed the identity cache from each peer's npub so that TUN packets
297        // addressed to a configured peer can be dispatched (and trigger session
298        // initiation) immediately on startup — without waiting for the link-layer
299        // handshake to complete first.
300        let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
301            .config
302            .peers()
303            .iter()
304            .filter_map(|pc| {
305                PeerIdentity::from_npub(&pc.npub)
306                    .ok()
307                    .map(|id| (id, pc.alias.clone()))
308            })
309            .collect();
310
311        for (identity, alias) in peer_identities {
312            let name = alias.unwrap_or_else(|| identity.short_npub());
313            self.peer_aliases.insert(*identity.node_addr(), name);
314            // Pre-seed identity cache. The parity may be wrong (npub is x-only)
315            // but will be corrected to the real value when the peer is promoted
316            // after a successful Noise handshake.
317            self.register_identity(*identity.node_addr(), identity.pubkey_full());
318        }
319
320        // Collect peer configs to avoid borrow conflicts
321        let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
322
323        if peer_configs.is_empty() {
324            debug!("No static peers configured");
325            return;
326        }
327
328        debug!(
329            count = peer_configs.len(),
330            "Initiating static peer connections"
331        );
332
333        for peer_config in peer_configs {
334            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
335                warn!(
336                    npub = %peer_config.npub,
337                    alias = ?peer_config.alias,
338                    error = %e,
339                    "Failed to initiate peer connection"
340                );
341                // Schedule a retry so transient address-resolution failures
342                // (e.g. cached endpoints stale, NAT rebinds, all addresses
343                // currently unreachable) recover without a daemon restart.
344                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
345                    self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
346                }
347                // No-transport failures most often mean the cached overlay
348                // advert is pointing at a dead post-NAT-rebind address. The
349                // advert cache is read-only inside fetch_advert, so retries
350                // would loop on the same dead address until expiry. Force a
351                // re-fetch so the next retry tick picks up fresh endpoints.
352                if matches!(e, crate::node::NodeError::NoTransportForType(_))
353                    && let Some(bootstrap) = self.nostr_discovery.clone()
354                {
355                    bootstrap
356                        .request_advert_stale_check(peer_config.npub.clone())
357                        .await;
358                }
359            }
360        }
361
362        self.warm_auto_connect_graph_sessions().await;
363    }
364
365    /// Initiate a connection to a single peer.
366    ///
367    /// Creates a link, starts the Noise handshake, and sends the first message.
368    pub(super) async fn initiate_peer_connection(
369        &mut self,
370        peer_config: &crate::config::PeerConfig,
371    ) -> Result<(), NodeError> {
372        self.initiate_peer_connection_inner(peer_config).await
373    }
374
375    /// Initiate a connection from the retry path. Identical to
376    /// [`initiate_peer_connection`] today — both paths fan out across every
377    /// known address (overlay-fresh first, then operator/cache hints) in a
378    /// single pass. The two entry points stay separate so callers can be
379    /// distinguished in tracing.
380    pub(super) async fn initiate_peer_retry_connection(
381        &mut self,
382        peer_config: &crate::config::PeerConfig,
383    ) -> Result<(), NodeError> {
384        self.initiate_peer_connection_inner(peer_config).await
385    }
386
387    async fn initiate_active_peer_alternative_connection(
388        &mut self,
389        peer_config: &crate::config::PeerConfig,
390    ) -> Result<bool, NodeError> {
391        let peer_identity =
392            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
393                npub: peer_config.npub.clone(),
394                reason: e.to_string(),
395            })?;
396        let peer_node_addr = *peer_identity.node_addr();
397
398        if !self.peers.contains_key(&peer_node_addr) {
399            self.initiate_peer_connection(peer_config).await?;
400            return Ok(true);
401        }
402
403        // Keep the current link live and race fresh concrete candidates.
404        // Cross-connection resolution still decides which replacement link
405        // wins if both peers try the same upgrade; the important part here is
406        // that a stale path does not depend on the remote peer receiving our
407        // hint first before either side attempts the better address.
408        self.try_active_peer_alternative_addresses(peer_config, peer_identity)
409            .await
410    }
411
412    async fn initiate_peer_connection_inner(
413        &mut self,
414        peer_config: &crate::config::PeerConfig,
415    ) -> Result<(), NodeError> {
416        // Parse the peer's npub to get their identity
417        let peer_identity =
418            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
419                npub: peer_config.npub.clone(),
420                reason: e.to_string(),
421            })?;
422
423        let peer_node_addr = *peer_identity.node_addr();
424
425        // Check if peer already exists (fully authenticated)
426        if self.peers.contains_key(&peer_node_addr) {
427            debug!(
428                npub = %peer_config.npub,
429                "Peer already exists, skipping"
430            );
431            return Ok(());
432        }
433
434        self.try_peer_addresses(peer_config, peer_identity, true)
435            .await
436    }
437
438    fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
439        self.connections.values().any(|conn| {
440            conn.expected_identity()
441                .map(|id| id.node_addr() == peer_node_addr)
442                .unwrap_or(false)
443        })
444    }
445
446    fn is_connecting_to_peer_on_path(
447        &self,
448        peer_node_addr: &NodeAddr,
449        transport_id: TransportId,
450        remote_addr: &TransportAddr,
451    ) -> bool {
452        self.connections.values().any(|conn| {
453            conn.expected_identity()
454                .map(|id| id.node_addr() == peer_node_addr)
455                .unwrap_or(false)
456                && conn.transport_id() == Some(transport_id)
457                && conn.source_addr() == Some(remote_addr)
458        }) || self.pending_connects.iter().any(|pending| {
459            pending.peer_identity.node_addr() == peer_node_addr
460                && pending.transport_id == transport_id
461                && &pending.remote_addr == remote_addr
462        })
463    }
464
465    pub(in crate::node) fn should_warm_auto_connect_session(
466        &self,
467        peer_node_addr: &NodeAddr,
468    ) -> bool {
469        if self.peers.contains_key(peer_node_addr)
470            || self
471                .sessions
472                .get(peer_node_addr)
473                .is_some_and(|entry| entry.is_established())
474        {
475            return false;
476        }
477
478        self.config.peers().iter().any(|peer| {
479            peer.is_auto_connect()
480                && PeerIdentity::from_npub(&peer.npub)
481                    .map(|identity| identity.node_addr() == peer_node_addr)
482                    .unwrap_or(false)
483        })
484    }
485
486    pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
487        if !self.peers.values().any(|peer| peer.can_send()) {
488            return 0;
489        }
490
491        let mut budget = self.graph_session_warmup_budget();
492        if budget == 0 {
493            return 0;
494        }
495
496        let peer_identities: Vec<_> = self
497            .config
498            .auto_connect_peers()
499            .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
500            .collect();
501
502        let mut warmed = 0;
503        for identity in peer_identities {
504            if budget == 0 {
505                break;
506            }
507
508            let peer_node_addr = *identity.node_addr();
509            if peer_node_addr == *self.identity.node_addr()
510                || !self.should_warm_auto_connect_session(&peer_node_addr)
511                || self
512                    .sessions
513                    .get(&peer_node_addr)
514                    .is_some_and(|entry| entry.is_initiating())
515            {
516                continue;
517            }
518
519            self.register_identity(peer_node_addr, identity.pubkey_full());
520
521            if self.find_next_hop(&peer_node_addr).is_some() {
522                match self
523                    .initiate_session(peer_node_addr, identity.pubkey_full())
524                    .await
525                {
526                    Ok(()) => {
527                        warmed += 1;
528                        budget = budget.saturating_sub(1);
529                        debug!(
530                            peer = %self.peer_display_name(&peer_node_addr),
531                            "Warmed auto-connect peer session over existing FIPS graph"
532                        );
533                    }
534                    Err(NodeError::SendFailed { node_addr, reason })
535                        if node_addr == peer_node_addr && reason == "no route to destination" =>
536                    {
537                        self.maybe_initiate_lookup(&peer_node_addr).await;
538                        warmed += 1;
539                        budget = budget.saturating_sub(1);
540                    }
541                    Err(err) => {
542                        debug!(
543                            peer = %self.peer_display_name(&peer_node_addr),
544                            error = %err,
545                            "Failed to warm auto-connect peer session"
546                        );
547                    }
548                }
549            } else {
550                self.maybe_initiate_lookup(&peer_node_addr).await;
551                warmed += 1;
552                budget = budget.saturating_sub(1);
553            }
554        }
555
556        warmed
557    }
558
559    pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
560        let max_destinations = self.config.node.session.pending_max_destinations;
561        if max_destinations == 0 {
562            return 0;
563        }
564
565        let pending_sessions = self
566            .sessions
567            .values()
568            .filter(|entry| !entry.is_established())
569            .count();
570        let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
571        max_destinations
572            .saturating_sub(pending_total)
573            .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
574    }
575
576    fn outbound_handshake_slots(&self) -> usize {
577        let used = self
578            .connections
579            .len()
580            .saturating_add(self.pending_connects.len());
581        if self.max_connections == 0 {
582            usize::MAX
583        } else {
584            self.max_connections.saturating_sub(used)
585        }
586    }
587
588    fn outbound_link_slots(&self) -> usize {
589        if self.max_links == 0 {
590            usize::MAX
591        } else {
592            self.max_links.saturating_sub(self.links.len())
593        }
594    }
595
596    fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
597        if !self.peers.contains_key(peer_node_addr)
598            && self.max_peers > 0
599            && self.peers.len() >= self.max_peers
600        {
601            return 0;
602        }
603
604        let in_flight_for_peer = self
605            .connections
606            .values()
607            .filter(|conn| {
608                conn.expected_identity()
609                    .map(|id| id.node_addr() == peer_node_addr)
610                    .unwrap_or(false)
611            })
612            .count()
613            .saturating_add(
614                self.pending_connects
615                    .iter()
616                    .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
617                    .count(),
618            );
619
620        self.outbound_handshake_slots()
621            .min(self.outbound_link_slots())
622            .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
623    }
624
625    fn discovery_connect_budget(&self) -> usize {
626        self.outbound_handshake_slots()
627            .min(self.outbound_link_slots())
628            .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
629    }
630
631    /// Find a UDP transport whose bound socket can send to `remote_addr`.
632    ///
633    /// LAN discovery can surface both IPv4 and IPv6 addresses for the same
634    /// service. A wildcard IPv4 socket cannot send to an IPv6 link-local
635    /// target, and vice versa, so callers must choose by socket family rather
636    /// than by transport type alone.
637    fn find_udp_transport_for_remote_addr(
638        &self,
639        remote_addr: SocketAddr,
640    ) -> Option<(TransportId, SocketAddr)> {
641        self.transports
642            .iter()
643            .filter(|(id, handle)| {
644                handle.transport_type().name == "udp"
645                    && handle.is_operational()
646                    && !self.bootstrap_transports.contains(id)
647            })
648            .filter_map(|(id, handle)| {
649                let local_addr = handle.local_addr()?;
650                socket_addr_families_compatible(local_addr, remote_addr)
651                    .then_some((*id, local_addr))
652            })
653            .min_by_key(|(id, _)| id.as_u32())
654    }
655
656    pub(super) fn transport_discovery_candidate(
657        &self,
658        discovered_transport_id: TransportId,
659        discovered_addr: TransportAddr,
660    ) -> Option<(TransportId, TransportAddr, &'static str)> {
661        let transport = self.transports.get(&discovered_transport_id)?;
662        let transport_name = transport.transport_type().name;
663
664        if transport_name != "udp" {
665            return Some((discovered_transport_id, discovered_addr, transport_name));
666        }
667
668        let Some(remote_socket_addr) = discovered_addr
669            .as_str()
670            .and_then(|addr| addr.parse::<SocketAddr>().ok())
671        else {
672            if self.bootstrap_transports.contains(&discovered_transport_id) {
673                debug!(
674                    transport_id = %discovered_transport_id,
675                    remote_addr = %discovered_addr,
676                    "transport discovery: skip non-numeric UDP address from bootstrap transport"
677                );
678                return None;
679            }
680            return Some((discovered_transport_id, discovered_addr, transport_name));
681        };
682
683        let Some((transport_id, local_addr)) =
684            self.find_udp_transport_for_remote_addr(remote_socket_addr)
685        else {
686            debug!(
687                transport_id = %discovered_transport_id,
688                remote_addr = %discovered_addr,
689                "transport discovery: skip UDP peer with no compatible local socket"
690            );
691            return None;
692        };
693
694        if transport_id != discovered_transport_id {
695            debug!(
696                discovered_transport_id = %discovered_transport_id,
697                selected_transport_id = %transport_id,
698                local_addr = %local_addr,
699                remote_addr = %remote_socket_addr,
700                "transport discovery: selected compatible UDP transport"
701            );
702        }
703
704        Some((
705            transport_id,
706            TransportAddr::from_socket_addr(remote_socket_addr),
707            transport_name,
708        ))
709    }
710
711    fn peer_address_string_for_transport_candidate(
712        &self,
713        transport_id: TransportId,
714        transport_name: &str,
715        remote_addr: &TransportAddr,
716    ) -> String {
717        #[cfg(any(target_os = "linux", target_os = "macos"))]
718        if transport_name == "ethernet"
719            && remote_addr.as_bytes().len() == 6
720            && let Some(interface) = self
721                .transports
722                .get(&transport_id)
723                .and_then(|transport| transport.interface_name())
724        {
725            let mut mac = [0u8; 6];
726            mac.copy_from_slice(remote_addr.as_bytes());
727            return format!(
728                "{interface}/{}",
729                crate::transport::ethernet::format_mac(&mac)
730            );
731        }
732
733        remote_addr.to_string()
734    }
735
736    fn resolve_peer_address_for_match(
737        &self,
738        candidate: &PeerAddress,
739    ) -> Option<(TransportId, TransportAddr)> {
740        if candidate.transport == "udp" && candidate.addr.eq_ignore_ascii_case("nat") {
741            return None;
742        }
743
744        if candidate.transport == "ethernet" {
745            return self.resolve_ethernet_addr(&candidate.addr).ok();
746        }
747
748        if candidate.transport == "ble" {
749            #[cfg(bluer_available)]
750            {
751                return self.resolve_ble_addr(&candidate.addr).ok();
752            }
753            #[cfg(not(bluer_available))]
754            {
755                return None;
756            }
757        }
758
759        let transport_id = if candidate.transport == "udp"
760            && let Ok(remote_socket_addr) = candidate.addr.parse::<SocketAddr>()
761        {
762            self.find_udp_transport_for_remote_addr(remote_socket_addr)
763                .map(|(id, _)| id)?
764        } else {
765            self.find_transport_for_type(&candidate.transport)?
766        };
767
768        Some((transport_id, TransportAddr::from_string(&candidate.addr)))
769    }
770
771    /// Initiate a connection to a peer on a specific transport and address.
772    ///
773    /// For connectionless transports (UDP, Ethernet): allocates a link, starts
774    /// the Noise IK handshake, sends msg1, and registers the connection for
775    /// msg2 dispatch.
776    ///
777    /// For connection-oriented transports (TCP, Tor): allocates a link and
778    /// starts a non-blocking transport connect. The handshake is deferred
779    /// until the transport connection is established — the tick handler
780    /// polls `connection_state()` and initiates the handshake when ready.
781    pub(super) async fn initiate_connection(
782        &mut self,
783        transport_id: TransportId,
784        remote_addr: TransportAddr,
785        peer_identity: PeerIdentity,
786    ) -> Result<(), NodeError> {
787        let peer_node_addr = *peer_identity.node_addr();
788
789        if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
790            debug!(
791                peer = %self.peer_display_name(&peer_node_addr),
792                transport_id = %transport_id,
793                remote_addr = %remote_addr,
794                "Connection already in progress for candidate path"
795            );
796            return Ok(());
797        }
798
799        if self.outbound_handshake_slots() == 0 {
800            return Err(NodeError::MaxConnectionsExceeded {
801                max: self.max_connections,
802            });
803        }
804
805        if self.outbound_link_slots() == 0 {
806            return Err(NodeError::MaxLinksExceeded {
807                max: self.max_links,
808            });
809        }
810
811        if !self.peers.contains_key(&peer_node_addr)
812            && self.max_peers > 0
813            && self.peers.len() >= self.max_peers
814        {
815            return Err(NodeError::MaxPeersExceeded {
816                max: self.max_peers,
817            });
818        }
819
820        self.authorize_peer(
821            &peer_identity,
822            PeerAclContext::OutboundConnect,
823            transport_id,
824            &remote_addr,
825        )?;
826
827        let is_connection_oriented = self
828            .transports
829            .get(&transport_id)
830            .map(|t| t.transport_type().connection_oriented)
831            .unwrap_or(false);
832
833        // Allocate link ID and create link
834        let link_id = self.allocate_link_id();
835
836        let link = if is_connection_oriented {
837            Link::new(
838                link_id,
839                transport_id,
840                remote_addr.clone(),
841                LinkDirection::Outbound,
842                Duration::from_millis(self.config.node.base_rtt_ms),
843            )
844        } else {
845            Link::connectionless(
846                link_id,
847                transport_id,
848                remote_addr.clone(),
849                LinkDirection::Outbound,
850                Duration::from_millis(self.config.node.base_rtt_ms),
851            )
852        };
853
854        self.links.insert(link_id, link);
855
856        // Add reverse lookup for packet dispatch
857        self.addr_to_link
858            .insert((transport_id, remote_addr.clone()), link_id);
859
860        if is_connection_oriented {
861            // Connection-oriented: start non-blocking connect, defer handshake
862            if let Some(transport) = self.transports.get(&transport_id) {
863                match transport.connect(&remote_addr).await {
864                    Ok(()) => {
865                        debug!(
866                            peer = %self.peer_display_name(&peer_node_addr),
867                            transport_id = %transport_id,
868                            remote_addr = %remote_addr,
869                            link_id = %link_id,
870                            "Transport connect initiated (non-blocking)"
871                        );
872                        self.pending_connects.push(super::PendingConnect {
873                            link_id,
874                            transport_id,
875                            remote_addr,
876                            peer_identity,
877                        });
878                    }
879                    Err(e) => {
880                        // Clean up link
881                        self.links.remove(&link_id);
882                        self.addr_to_link.remove(&(transport_id, remote_addr));
883                        return Err(NodeError::TransportError(e.to_string()));
884                    }
885                }
886            }
887            Ok(())
888        } else {
889            // Connectionless: proceed with immediate handshake
890            self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
891                .await
892        }
893    }
894
895    /// Start the Noise handshake on a link and send msg1.
896    ///
897    /// Called immediately for connectionless transports, or after the
898    /// transport connection is established for connection-oriented transports.
899    pub(super) async fn start_handshake(
900        &mut self,
901        link_id: LinkId,
902        transport_id: TransportId,
903        remote_addr: TransportAddr,
904        peer_identity: PeerIdentity,
905    ) -> Result<(), NodeError> {
906        let peer_node_addr = *peer_identity.node_addr();
907
908        // Create connection in handshake phase (outbound knows expected identity)
909        let current_time_ms = Self::now_ms();
910        let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
911
912        // Allocate a session index for this handshake
913        let our_index = match self.index_allocator.allocate() {
914            Ok(idx) => idx,
915            Err(e) => {
916                // Clean up the link we just created
917                self.links.remove(&link_id);
918                self.addr_to_link.remove(&(transport_id, remote_addr));
919                return Err(NodeError::IndexAllocationFailed(e.to_string()));
920            }
921        };
922
923        // Start the Noise handshake and get message 1
924        let our_keypair = self.identity.keypair();
925        let noise_msg1 =
926            match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
927                Ok(msg) => msg,
928                Err(e) => {
929                    // Clean up the index and link
930                    let _ = self.index_allocator.free(our_index);
931                    self.links.remove(&link_id);
932                    self.addr_to_link.remove(&(transport_id, remote_addr));
933                    return Err(NodeError::HandshakeFailed(e.to_string()));
934                }
935            };
936
937        // Set index and transport info on the connection
938        connection.set_our_index(our_index);
939        connection.set_transport_id(transport_id);
940        connection.set_source_addr(remote_addr.clone());
941
942        // Build wire format msg1: [0x01][sender_idx:4 LE][noise_msg1:82]
943        let wire_msg1 = build_msg1(our_index, &noise_msg1);
944
945        debug!(
946            peer = %self.peer_display_name(&peer_node_addr),
947            transport_id = %transport_id,
948            remote_addr = %remote_addr,
949            link_id = %link_id,
950            our_index = %our_index,
951            "Connection initiated"
952        );
953
954        // Store msg1 for resend and schedule first resend
955        let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
956        connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
957
958        // Track in pending_outbound for msg2 dispatch
959        self.pending_outbound
960            .insert((transport_id, our_index.as_u32()), link_id);
961        self.connections.insert(link_id, connection);
962
963        // Send the wire format handshake message. If the very first send fails
964        // synchronously (for example an IPv6 candidate on an IPv4-only UDP
965        // socket), undo this candidate so the caller can try the next address
966        // in the same dial pass.
967        let send_result = match self.transports.get(&transport_id) {
968            Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
969            None => None,
970        };
971        match send_result {
972            Some(send_result) => {
973                self.note_local_send_outcome(&send_result);
974                match send_result {
975                    Ok(bytes) => {
976                        debug!(
977                            link_id = %link_id,
978                            our_index = %our_index,
979                            bytes,
980                            "Sent Noise handshake message 1 (wire format)"
981                        );
982                    }
983                    Err(e) => {
984                        warn!(
985                            link_id = %link_id,
986                            transport_id = %transport_id,
987                            remote_addr = %remote_addr,
988                            our_index = %our_index,
989                            error = %e,
990                            "Failed to send handshake message"
991                        );
992                        self.pending_outbound
993                            .remove(&(transport_id, our_index.as_u32()));
994                        self.connections.remove(&link_id);
995                        self.links.remove(&link_id);
996                        self.addr_to_link
997                            .remove(&(transport_id, remote_addr.clone()));
998                        let _ = self.index_allocator.free(our_index);
999                        return Err(NodeError::TransportError(e.to_string()));
1000                    }
1001                }
1002            }
1003            None => {
1004                self.pending_outbound
1005                    .remove(&(transport_id, our_index.as_u32()));
1006                self.connections.remove(&link_id);
1007                self.links.remove(&link_id);
1008                self.addr_to_link
1009                    .remove(&(transport_id, remote_addr.clone()));
1010                let _ = self.index_allocator.free(our_index);
1011                return Err(NodeError::TransportError(format!(
1012                    "transport {transport_id} disappeared before first handshake send"
1013                )));
1014            }
1015        }
1016
1017        Ok(())
1018    }
1019
1020    /// Poll all transports for discovered peers and auto-connect.
1021    ///
1022    /// Called from the tick handler. Iterates operational transports,
1023    /// drains their discovery buffers, and initiates connections to
1024    /// newly discovered peers (if auto_connect is enabled).
1025    pub(super) async fn poll_transport_discovery(&mut self) {
1026        // Collect discoveries first to avoid borrow conflict with self
1027        let mut to_connect = Vec::new();
1028        let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
1029        let mut connect_budget = self.discovery_connect_budget();
1030        let mut skipped_budget = 0usize;
1031
1032        for transport in self.transports.values() {
1033            if !transport.is_operational() {
1034                continue;
1035            }
1036            if !transport.auto_connect() {
1037                // Still drain the buffer so it doesn't grow unbounded
1038                let _ = transport.discover();
1039                continue;
1040            }
1041            let discovered = match transport.discover() {
1042                Ok(peers) => peers,
1043                Err(_) => continue,
1044            };
1045            for peer in discovered {
1046                let discovered_transport_id = peer.transport_id;
1047                let pubkey = match peer.pubkey_hint {
1048                    Some(pk) => pk,
1049                    None => continue,
1050                };
1051                let identity = PeerIdentity::from_pubkey(pubkey);
1052                let node_addr = *identity.node_addr();
1053
1054                // Skip self
1055                if node_addr == *self.identity.node_addr() {
1056                    continue;
1057                }
1058
1059                let Some((candidate_transport_id, remote_addr, transport_name)) =
1060                    self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1061                else {
1062                    continue;
1063                };
1064
1065                if self.peers.contains_key(&node_addr) {
1066                    let candidate = PeerAddress::new(
1067                        transport_name,
1068                        self.peer_address_string_for_transport_candidate(
1069                            candidate_transport_id,
1070                            transport_name,
1071                            &remote_addr,
1072                        ),
1073                    );
1074                    if self.active_peer_candidate_is_fresh_enough_to_skip(
1075                        &node_addr,
1076                        std::slice::from_ref(&candidate),
1077                    ) {
1078                        continue;
1079                    }
1080                    if self.is_connecting_to_peer_on_path(
1081                        &node_addr,
1082                        candidate_transport_id,
1083                        &remote_addr,
1084                    ) {
1085                        continue;
1086                    }
1087                    let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1088                    if connect_budget == 0
1089                        || self
1090                            .path_candidate_attempt_budget(&node_addr)
1091                            .saturating_sub(queued_for_peer)
1092                            == 0
1093                    {
1094                        skipped_budget = skipped_budget.saturating_add(1);
1095                        continue;
1096                    }
1097                    to_connect.push((candidate_transport_id, remote_addr, identity, true));
1098                    *queued_per_peer.entry(node_addr).or_default() += 1;
1099                    connect_budget = connect_budget.saturating_sub(1);
1100                    continue;
1101                }
1102
1103                if self.is_connecting_to_peer_on_path(
1104                    &node_addr,
1105                    candidate_transport_id,
1106                    &remote_addr,
1107                ) {
1108                    continue;
1109                }
1110
1111                let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1112                if connect_budget == 0
1113                    || self
1114                        .path_candidate_attempt_budget(&node_addr)
1115                        .saturating_sub(queued_for_peer)
1116                        == 0
1117                {
1118                    skipped_budget = skipped_budget.saturating_add(1);
1119                    continue;
1120                }
1121                to_connect.push((candidate_transport_id, remote_addr, identity, false));
1122                *queued_per_peer.entry(node_addr).or_default() += 1;
1123                connect_budget = connect_budget.saturating_sub(1);
1124            }
1125        }
1126
1127        if skipped_budget > 0 {
1128            debug!(
1129                skipped = skipped_budget,
1130                queued = to_connect.len(),
1131                "Transport discovery connect budget exhausted"
1132            );
1133        }
1134
1135        for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1136            info!(
1137                peer = %self.peer_display_name(identity.node_addr()),
1138                transport_id = %transport_id,
1139                remote_addr = %remote_addr,
1140                active_refresh,
1141                "Auto-connecting to discovered peer"
1142            );
1143            if let Err(e) = self
1144                .initiate_connection(transport_id, remote_addr, identity)
1145                .await
1146            {
1147                warn!(error = %e, "Failed to auto-connect to discovered peer");
1148            }
1149        }
1150    }
1151
1152    pub(super) async fn poll_nostr_discovery(&mut self) {
1153        let Some(bootstrap) = self.nostr_discovery.clone() else {
1154            return;
1155        };
1156
1157        if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1158            debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1159        }
1160
1161        for event in bootstrap.drain_events().await {
1162            match event {
1163                BootstrapEvent::Established { traversal } => {
1164                    let peer_npub = traversal.peer_npub.clone();
1165                    match self.adopt_established_traversal(traversal).await {
1166                        Ok(_) => {
1167                            info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1168                        }
1169                        Err(err) => {
1170                            warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1171                            if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1172                                self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1173                            }
1174                        }
1175                    }
1176                }
1177                BootstrapEvent::Failed {
1178                    peer_config,
1179                    reason,
1180                } => {
1181                    let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1182                        Ok(identity) => identity,
1183                        Err(_) => continue,
1184                    };
1185                    let node_addr = *peer_identity.node_addr();
1186                    if self.peers.contains_key(&node_addr) {
1187                        debug!(
1188                            npub = %peer_config.npub,
1189                            error = %reason,
1190                            "Ignoring failed NAT traversal for already-connected peer"
1191                        );
1192                        continue;
1193                    }
1194                    if self.is_connecting_to_peer(&node_addr) {
1195                        debug!(
1196                            npub = %peer_config.npub,
1197                            error = %reason,
1198                            "Ignoring failed NAT traversal while peer handshake is already in progress"
1199                        );
1200                        continue;
1201                    }
1202
1203                    let now_ms = Self::now_ms();
1204                    let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1205                    if decision.should_warn {
1206                        warn!(
1207                            npub = %peer_config.npub,
1208                            error = %reason,
1209                            consecutive_failures = decision.consecutive_failures,
1210                            cooldown_secs = decision
1211                                .cooldown_until_ms
1212                                .map(|t| t.saturating_sub(now_ms) / 1000),
1213                            "NAT traversal failed"
1214                        );
1215                    } else {
1216                        debug!(
1217                            npub = %peer_config.npub,
1218                            error = %reason,
1219                            consecutive_failures = decision.consecutive_failures,
1220                            "NAT traversal failed (suppressed by warn-rate-limit)"
1221                        );
1222                    }
1223
1224                    // B6: stale-advert eviction on the streak-threshold
1225                    // crossing. Fire-and-forget; the outcome is logged so
1226                    // operators can see when peers get cleaned up.
1227                    if decision.crossed_threshold {
1228                        bootstrap
1229                            .request_advert_stale_check(peer_config.npub.clone())
1230                            .await;
1231                    }
1232
1233                    if self
1234                        .try_peer_addresses(&peer_config, peer_identity, false)
1235                        .await
1236                        .is_ok()
1237                    {
1238                        continue;
1239                    }
1240
1241                    self.schedule_retry(node_addr, now_ms);
1242                    if let Some(cooldown_until_ms) = decision.cooldown_until_ms
1243                        && let Some(state) = self.retry_pending.get_mut(&node_addr)
1244                    {
1245                        // Push the next retry past the cooldown so the
1246                        // open-discovery sweep doesn't re-enqueue and the
1247                        // per-attempt backoff doesn't fire sooner.
1248                        state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1249                    }
1250                }
1251            }
1252        }
1253
1254        self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1255            .await;
1256        self.queue_open_discovery_retries(&bootstrap).await;
1257    }
1258
1259    /// Resolve the LAN-only discovery scope. Applications with explicit
1260    /// connectivity config can set `node.discovery.lan.scope` without
1261    /// changing the public Nostr discovery `app` tag. The older fallback
1262    /// extracts a scope from the Nostr app tag used by default scoped
1263    /// discovery.
1264    pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1265        if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1266            let scope = scope.trim();
1267            if !scope.is_empty() {
1268                return Some(scope.to_string());
1269            }
1270        }
1271
1272        let app = self.config.node.discovery.nostr.app.trim();
1273        if app.is_empty() {
1274            return None;
1275        }
1276        if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1277            let scope = rest.trim();
1278            if scope.is_empty() {
1279                None
1280            } else {
1281                Some(scope.to_string())
1282            }
1283        } else {
1284            Some(app.to_string())
1285        }
1286    }
1287
1288    /// Drain mDNS-discovered peers and initiate Noise XX handshakes. For
1289    /// active peers this is a non-disruptive alternate-path refresh: the
1290    /// current link stays live until a new handshake authenticates and
1291    /// promotes. The handshake itself is the authentication — a spoofed
1292    /// mDNS advert with someone else's npub fails the XX exchange and
1293    /// is dropped.
1294    pub(super) async fn poll_lan_discovery(&mut self) {
1295        let Some(runtime) = self.lan_discovery.clone() else {
1296            return;
1297        };
1298        let events = runtime.drain_events().await;
1299        if events.is_empty() {
1300            return;
1301        }
1302        let mut connect_budget = self.discovery_connect_budget();
1303        let mut skipped_budget = 0usize;
1304        for event in events {
1305            let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1306            let Some((transport_id, local_addr)) =
1307                self.find_udp_transport_for_remote_addr(peer.addr)
1308            else {
1309                debug!(
1310                    addr = %peer.addr,
1311                    "lan: skip discovered peer with no compatible UDP transport"
1312                );
1313                continue;
1314            };
1315            let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1316                Ok(id) => id,
1317                Err(err) => {
1318                    debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1319                    continue;
1320                }
1321            };
1322            let peer_node_addr = *identity.node_addr();
1323            let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1324            if self.peers.contains_key(&peer_node_addr) {
1325                let candidate = PeerAddress::new("udp", peer.addr.to_string());
1326                if self.active_peer_candidate_is_fresh_enough_to_skip(
1327                    &peer_node_addr,
1328                    std::slice::from_ref(&candidate),
1329                ) {
1330                    continue;
1331                }
1332                if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1333                    continue;
1334                }
1335                if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1336                    skipped_budget = skipped_budget.saturating_add(1);
1337                    continue;
1338                }
1339                info!(
1340                    npub = %identity.short_npub(),
1341                    addr = %peer.addr,
1342                    local_addr = %local_addr,
1343                    "lan: initiating alternate-path handshake to active peer"
1344                );
1345                if let Err(err) = self
1346                    .initiate_connection(transport_id, remote_addr, identity)
1347                    .await
1348                {
1349                    debug!(
1350                        npub = %peer.npub,
1351                        error = %err,
1352                        "lan: failed to initiate active peer alternate-path handshake"
1353                    );
1354                }
1355                connect_budget = connect_budget.saturating_sub(1);
1356                continue;
1357            }
1358            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1359                continue;
1360            }
1361            if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1362                skipped_budget = skipped_budget.saturating_add(1);
1363                continue;
1364            }
1365            info!(
1366                npub = %identity.short_npub(),
1367                addr = %peer.addr,
1368                local_addr = %local_addr,
1369                "lan: initiating handshake to discovered peer"
1370            );
1371            if let Err(err) = self
1372                .initiate_connection(transport_id, remote_addr, identity)
1373                .await
1374            {
1375                debug!(
1376                    npub = %peer.npub,
1377                    error = %err,
1378                    "lan: failed to initiate connection to discovered peer"
1379                );
1380            }
1381            connect_budget = connect_budget.saturating_sub(1);
1382        }
1383        if skipped_budget > 0 {
1384            debug!(
1385                skipped = skipped_budget,
1386                "lan: discovery connect budget exhausted"
1387            );
1388        }
1389    }
1390
1391    /// Poll pending transport connects and initiate handshakes for ready ones.
1392    ///
1393    /// Called from the tick handler. For each pending connect, queries the
1394    /// transport's connection state. When a connection is established,
1395    /// marks the link as Connected and starts the Noise handshake.
1396    /// Failed connections are cleaned up and scheduled for retry.
1397    pub(super) async fn poll_pending_connects(&mut self) {
1398        if self.pending_connects.is_empty() {
1399            return;
1400        }
1401
1402        let mut completed = Vec::new();
1403
1404        for (i, pending) in self.pending_connects.iter().enumerate() {
1405            let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
1406                transport.connection_state(&pending.remote_addr)
1407            } else {
1408                crate::transport::ConnectionState::Failed("transport removed".into())
1409            };
1410
1411            match state {
1412                crate::transport::ConnectionState::Connected => {
1413                    completed.push((i, true, None));
1414                }
1415                crate::transport::ConnectionState::Failed(reason) => {
1416                    completed.push((i, false, Some(reason)));
1417                }
1418                crate::transport::ConnectionState::Connecting => {
1419                    // Still in progress, check on next tick
1420                }
1421                crate::transport::ConnectionState::None => {
1422                    // Shouldn't happen — treat as failure
1423                    completed.push((i, false, Some("no connection attempt found".into())));
1424                }
1425            }
1426        }
1427
1428        // Process completions in reverse order to preserve indices
1429        for (i, success, reason) in completed.into_iter().rev() {
1430            let pending = self.pending_connects.remove(i);
1431
1432            if success {
1433                // Mark link as Connected
1434                if let Some(link) = self.links.get_mut(&pending.link_id) {
1435                    link.set_connected();
1436                }
1437
1438                debug!(
1439                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1440                    transport_id = %pending.transport_id,
1441                    remote_addr = %pending.remote_addr,
1442                    link_id = %pending.link_id,
1443                    "Transport connected, starting handshake"
1444                );
1445
1446                // Start the handshake now that the transport is connected
1447                if let Err(e) = self
1448                    .start_handshake(
1449                        pending.link_id,
1450                        pending.transport_id,
1451                        pending.remote_addr.clone(),
1452                        pending.peer_identity,
1453                    )
1454                    .await
1455                {
1456                    warn!(
1457                        link_id = %pending.link_id,
1458                        error = %e,
1459                        "Failed to start handshake after transport connect"
1460                    );
1461                    // Clean up link on handshake failure
1462                    self.remove_link(&pending.link_id);
1463                }
1464            } else {
1465                let reason = reason.unwrap_or_default();
1466                warn!(
1467                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1468                    transport_id = %pending.transport_id,
1469                    remote_addr = %pending.remote_addr,
1470                    link_id = %pending.link_id,
1471                    reason = %reason,
1472                    "Transport connect failed"
1473                );
1474
1475                // Clean up link and schedule retry
1476                self.remove_link(&pending.link_id);
1477                self.links.remove(&pending.link_id);
1478                self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
1479            }
1480        }
1481    }
1482
1483    // === State Transitions ===
1484
1485    /// Start the node.
1486    ///
1487    /// Initializes the TUN interface (if configured), spawns I/O threads,
1488    /// and transitions to the Running state.
1489    pub async fn start(&mut self) -> Result<(), NodeError> {
1490        node_start_debug_log("Node::start begin");
1491        if !self.state.can_start() {
1492            return Err(NodeError::AlreadyStarted);
1493        }
1494        self.state = NodeState::Starting;
1495        node_start_debug_log("Node::start state set to starting");
1496
1497        // Create packet channel for transport -> Node communication
1498        let packet_buffer_size = self.config.node.buffers.packet_channel;
1499        let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
1500        self.packet_tx = Some(packet_tx.clone());
1501        self.packet_rx = Some(packet_rx);
1502        node_start_debug_log("Node::start packet channel created");
1503
1504        // Initialize transports first (before TUN, before Nostr discovery).
1505        node_start_debug_log("Node::start create transports begin");
1506        let transport_handles = self.create_transports(&packet_tx).await;
1507        node_start_debug_log(format!(
1508            "Node::start create transports complete count={}",
1509            transport_handles.len()
1510        ));
1511
1512        for mut handle in transport_handles {
1513            let transport_id = handle.transport_id();
1514            let transport_type = handle.transport_type().name;
1515            let name = handle.name().map(|s| s.to_string());
1516
1517            node_start_debug_log(format!(
1518                "Node::start transport start begin id={} type={} name={:?}",
1519                transport_id, transport_type, name
1520            ));
1521            match handle.start().await {
1522                Ok(()) => {
1523                    node_start_debug_log(format!(
1524                        "Node::start transport start ok id={} type={}",
1525                        transport_id, transport_type
1526                    ));
1527                    self.transports.insert(transport_id, handle);
1528                }
1529                Err(e) => {
1530                    node_start_debug_log(format!(
1531                        "Node::start transport start error id={} type={} error={}",
1532                        transport_id, transport_type, e
1533                    ));
1534                    if let Some(ref n) = name {
1535                        warn!(transport_type, name = %n, error = %e, "Transport failed to start");
1536                    } else {
1537                        warn!(transport_type, error = %e, "Transport failed to start");
1538                    }
1539                }
1540            }
1541        }
1542
1543        if !self.transports.is_empty() {
1544            info!(count = self.transports.len(), "Transports initialized");
1545        }
1546
1547        // Spawn the off-task FMP-encrypt + UDP-send + FMP-decrypt
1548        // worker pools. **Unix only** — both pools issue direct
1549        // sendmmsg(2) / sendmsg(2)+UDP_GSO / recvmmsg(2) calls on
1550        // raw file descriptors via `AsRawFd`, which is a unix-only
1551        // trait. On Windows the rx_loop's tokio-based send/recv
1552        // remain the canonical path; the perf overhaul lands its
1553        // gains on unix.
1554        //
1555        // Worker count defaults to the number of CPUs, overridable
1556        // via `FIPS_ENCRYPT_WORKERS=N` / `FIPS_DECRYPT_WORKERS=N`
1557        // for debug / benchmarking. Hash-by-destination means a
1558        // single TCP flow pins to one worker (preserves wire
1559        // ordering); additional workers light up under multi-flow
1560        // / multi-peer load. See `node::encrypt_worker` /
1561        // `node::decrypt_worker` for full rationale.
1562        #[cfg(unix)]
1563        {
1564            if self.config.node.worker_pools_enabled {
1565                node_start_debug_log("Node::start worker pools begin");
1566                let cpu_default = std::thread::available_parallelism()
1567                    .map(|n| n.get())
1568                    .unwrap_or(1)
1569                    .max(1);
1570                let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
1571                    .ok()
1572                    .and_then(|s| s.parse().ok())
1573                    .unwrap_or(cpu_default)
1574                    .max(1);
1575                self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
1576                    encrypt_worker_count,
1577                ));
1578                info!(
1579                    workers = encrypt_worker_count,
1580                    "Spawned FMP-encrypt worker pool"
1581                );
1582
1583                // `FIPS_DECRYPT_WORKERS=0` disables the pool entirely and
1584                // falls through to the in-line rx_loop decrypt path (the
1585                // "test-mode" branch in `handle_encrypted_frame`, which is
1586                // in fact a fully functional synchronous decrypt). Useful
1587                // as an A/B against the worker pipeline when chasing
1588                // scheduling/queueing regressions on the native macOS
1589                // path. Any non-zero value (env or default) spawns the
1590                // pool as before.
1591                let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
1592                    .ok()
1593                    .and_then(|s| s.parse().ok())
1594                    .unwrap_or(cpu_default);
1595                if decrypt_worker_count == 0 {
1596                    info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
1597                } else {
1598                    self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
1599                        decrypt_worker_count,
1600                    ));
1601                    info!(
1602                        workers = decrypt_worker_count,
1603                        "Spawned FMP+FSP-decrypt worker pool"
1604                    );
1605                }
1606                node_start_debug_log("Node::start worker pools complete");
1607            } else {
1608                node_start_debug_log("Node::start worker pools disabled");
1609                info!("FIPS worker pools disabled; using in-line crypto/send path");
1610            }
1611        }
1612
1613        if self.config.node.discovery.nostr.enabled {
1614            node_start_debug_log("Node::start nostr discovery start begin");
1615            match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
1616                .await
1617            {
1618                Ok(runtime) => {
1619                    node_start_debug_log("Node::start nostr discovery runtime created");
1620                    if let Err(err) = self.refresh_overlay_advert(&runtime).await {
1621                        warn!(error = %err, "Failed to publish initial Nostr overlay advert");
1622                    }
1623                    node_start_debug_log("Node::start nostr overlay advert refreshed");
1624                    self.nostr_discovery = Some(runtime);
1625                    self.nostr_discovery_started_at_ms = Some(Self::now_ms());
1626                    info!("Nostr overlay discovery enabled");
1627                }
1628                Err(err) => {
1629                    node_start_debug_log(format!(
1630                        "Node::start nostr discovery start error error={}",
1631                        err
1632                    ));
1633                    warn!(error = %err, "Failed to start Nostr overlay discovery");
1634                }
1635            }
1636        }
1637
1638        // mDNS / DNS-SD LAN discovery. Independent of Nostr — runs even
1639        // when Nostr is disabled, since it gives us sub-second pairing
1640        // on the same link without any relay or NAT-traversal roundtrip.
1641        if self.config.node.discovery.lan.enabled {
1642            node_start_debug_log("Node::start lan discovery start begin");
1643            let advertised_udp_port = self
1644                .transports
1645                .values()
1646                .filter(|h| h.is_operational())
1647                .filter(|h| h.transport_type().name == "udp")
1648                .find_map(|h| h.local_addr().map(|addr| addr.port()))
1649                .unwrap_or(0);
1650            let scope = self.lan_discovery_scope();
1651            match crate::discovery::lan::LanDiscovery::start(
1652                &self.identity,
1653                scope,
1654                advertised_udp_port,
1655                self.config.node.discovery.lan.clone(),
1656            )
1657            .await
1658            {
1659                Ok(runtime) => {
1660                    node_start_debug_log("Node::start lan discovery start ok");
1661                    self.lan_discovery = Some(runtime);
1662                    info!("LAN mDNS discovery enabled");
1663                }
1664                Err(err) => {
1665                    node_start_debug_log(format!(
1666                        "Node::start lan discovery start error error={}",
1667                        err
1668                    ));
1669                    debug!(error = %err, "LAN mDNS discovery not started");
1670                }
1671            }
1672        }
1673
1674        // Connect to static peers before TUN is active
1675        // This allows handshake messages to be sent before we start accepting packets
1676        node_start_debug_log("Node::start initiate peer connections begin");
1677        self.initiate_peer_connections().await;
1678        node_start_debug_log("Node::start initiate peer connections complete");
1679
1680        // Initialize TUN interface last, after transports and peers are ready
1681        if self.config.tun.enabled {
1682            node_start_debug_log("Node::start tun init begin");
1683            let address = *self.identity.address();
1684            match TunDevice::create(&self.config.tun, address).await {
1685                Ok(device) => {
1686                    let mtu = device.mtu();
1687                    let name = device.name().to_string();
1688                    let our_addr = *device.address();
1689
1690                    info!("TUN device active:");
1691                    info!("     name: {}", name);
1692                    info!("  address: {}", device.address());
1693                    info!("      mtu: {}", mtu);
1694
1695                    // Calculate max MSS for TCP clamping
1696                    let effective_mtu = self.effective_ipv6_mtu();
1697                    let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); // IPv6 + TCP headers
1698
1699                    info!("effective MTU: {} bytes", effective_mtu);
1700                    debug!("   max TCP MSS: {} bytes", max_mss);
1701
1702                    // On macOS, create a shutdown pipe. Writing to it unblocks the
1703                    // reader thread's select() loop without closing the TUN fd
1704                    // (which would cause a double-close when TunDevice drops).
1705                    #[cfg(target_os = "macos")]
1706                    let (shutdown_read_fd, shutdown_write_fd) = {
1707                        let mut fds = [0i32; 2];
1708                        if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
1709                            return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
1710                                "failed to create shutdown pipe".into(),
1711                            )));
1712                        }
1713                        (fds[0], fds[1])
1714                    };
1715
1716                    // Create writer (dups the fd for independent write access).
1717                    // Pass path_mtu_lookup so inbound SYN-ACK clamp can read
1718                    // per-destination path MTU learned via discovery.
1719                    let (writer, tun_tx) =
1720                        device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
1721
1722                    // Spawn writer thread
1723                    let writer_handle = thread::spawn(move || {
1724                        writer.run();
1725                    });
1726
1727                    // Clone tun_tx for the reader
1728                    let reader_tun_tx = tun_tx.clone();
1729
1730                    // Create outbound channel for TUN reader → Node
1731                    let tun_channel_size = self.config.node.buffers.tun_channel;
1732                    let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
1733
1734                    // Spawn reader thread
1735                    let transport_mtu = self.transport_mtu();
1736                    let path_mtu_lookup = self.path_mtu_lookup.clone();
1737                    #[cfg(target_os = "macos")]
1738                    let reader_handle = thread::spawn(move || {
1739                        run_tun_reader(
1740                            device,
1741                            mtu,
1742                            our_addr,
1743                            reader_tun_tx,
1744                            outbound_tx,
1745                            transport_mtu,
1746                            path_mtu_lookup,
1747                            shutdown_read_fd,
1748                        );
1749                    });
1750                    #[cfg(not(target_os = "macos"))]
1751                    let reader_handle = thread::spawn(move || {
1752                        run_tun_reader(
1753                            device,
1754                            mtu,
1755                            our_addr,
1756                            reader_tun_tx,
1757                            outbound_tx,
1758                            transport_mtu,
1759                            path_mtu_lookup,
1760                        );
1761                    });
1762
1763                    self.tun_state = TunState::Active;
1764                    self.tun_name = Some(name);
1765                    self.tun_tx = Some(tun_tx);
1766                    self.tun_outbound_rx = Some(outbound_rx);
1767                    self.tun_reader_handle = Some(reader_handle);
1768                    self.tun_writer_handle = Some(writer_handle);
1769                    #[cfg(target_os = "macos")]
1770                    {
1771                        self.tun_shutdown_fd = Some(shutdown_write_fd);
1772                    }
1773                }
1774                Err(e) => {
1775                    self.tun_state = TunState::Failed;
1776                    warn!(error = %e, "Failed to initialize TUN, continuing without it");
1777                }
1778            }
1779            node_start_debug_log("Node::start tun init complete");
1780        }
1781
1782        // Initialize DNS responder (independent of TUN).
1783        //
1784        // Default bind_addr is "::1" (IPv6 loopback). The shipped
1785        // fips-dns-setup configures systemd-resolved via a global
1786        // /etc/systemd/resolved.conf.d/fips.conf drop-in pointing at
1787        // [::1]:5354, which sidesteps a Linux IPV6_PKTINFO behaviour
1788        // where self-destined traffic to fips0's address is attributed
1789        // to fips0 in PKTINFO and gets silently dropped by the
1790        // mesh-interface filter in src/upper/dns.rs.
1791        //
1792        // For mesh-reachable resolution (rare), set bind_addr: "::"
1793        // in fips.yaml. The mesh-interface filter remains active to
1794        // prevent hosts-file alias enumeration in that mode.
1795        // `IPV6_V6ONLY=0` is set explicitly so IPv4 clients on
1796        // 127.0.0.1 still reach us regardless of kernel sysctl
1797        // defaults — but only when bind is on a wildcard / IPv6 path.
1798        if self.config.dns.enabled {
1799            node_start_debug_log("Node::start dns init begin");
1800            let addr_str = self.config.dns.bind_addr();
1801            match addr_str.parse::<std::net::IpAddr>() {
1802                Ok(ip) => {
1803                    let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
1804                    match Self::bind_dns_socket(bind) {
1805                        Ok(socket) => {
1806                            let dns_channel_size = self.config.node.buffers.dns_channel;
1807                            let (identity_tx, identity_rx) =
1808                                tokio::sync::mpsc::channel(dns_channel_size);
1809                            let dns_ttl = self.config.dns.ttl();
1810                            let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
1811                                self.config.peers(),
1812                            );
1813                            let reloader = if self.config.node.system_files_enabled {
1814                                let hosts_path = std::path::PathBuf::from(
1815                                    crate::upper::hosts::DEFAULT_HOSTS_PATH,
1816                                );
1817                                crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
1818                            } else {
1819                                crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
1820                            };
1821                            // Resolve the TUN ifindex so the responder can
1822                            // drop queries arriving on the mesh interface
1823                            // (fips0). Without this, the `::` bind exposes
1824                            // /etc/fips/hosts alias probing to any mesh peer.
1825                            // When TUN isn't enabled or the name can't be
1826                            // resolved, `None` disables the filter (there
1827                            // is no mesh surface to defend anyway).
1828                            let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
1829                            info!(
1830                                bind = %bind,
1831                                hosts = reloader.hosts().len(),
1832                                mesh_ifindex = ?mesh_ifindex,
1833                                "DNS responder started for .fips domain (auto-reload enabled)"
1834                            );
1835                            let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
1836                                socket,
1837                                identity_tx,
1838                                dns_ttl,
1839                                reloader,
1840                                mesh_ifindex,
1841                            ));
1842                            self.dns_identity_rx = Some(identity_rx);
1843                            self.dns_task = Some(handle);
1844                        }
1845                        Err(e) => {
1846                            warn!(bind = %bind, error = %e, "Failed to start DNS responder");
1847                        }
1848                    }
1849                }
1850                Err(e) => {
1851                    warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
1852                }
1853            }
1854            node_start_debug_log("Node::start dns init complete");
1855        }
1856
1857        self.state = NodeState::Running;
1858        node_start_debug_log("Node::start running");
1859        info!("Node started:");
1860        info!("       state: {}", self.state);
1861        info!("  transports: {}", self.transports.len());
1862        info!(" connections: {}", self.connections.len());
1863        Ok(())
1864    }
1865
1866    /// Bind a UDP socket for the DNS responder.
1867    ///
1868    /// For IPv6 binds (including `::`), sets `IPV6_V6ONLY=0` so the socket
1869    /// also accepts IPv4-mapped addresses. This guarantees dual-stack
1870    /// delivery regardless of `net.ipv6.bindv6only` sysctl on the host —
1871    /// v4 clients on 127.0.0.1 and v6 clients on the fips0 address both
1872    /// land on the same socket.
1873    ///
1874    /// Also enables `IPV6_RECVPKTINFO` on IPv6 sockets so the responder
1875    /// can learn the arrival interface per packet. The responder uses that
1876    /// to drop queries arriving on the mesh TUN, closing the hosts-file
1877    /// probing side-channel created by the `::` bind.
1878    fn bind_dns_socket(
1879        addr: std::net::SocketAddr,
1880    ) -> Result<tokio::net::UdpSocket, std::io::Error> {
1881        use socket2::{Domain, Protocol, Socket, Type};
1882        let domain = if addr.is_ipv4() {
1883            Domain::IPV4
1884        } else {
1885            Domain::IPV6
1886        };
1887        let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
1888        if addr.is_ipv6() {
1889            sock.set_only_v6(false)?;
1890            #[cfg(unix)]
1891            Self::set_recv_pktinfo_v6(&sock)?;
1892        }
1893        sock.set_nonblocking(true)?;
1894        sock.bind(&addr.into())?;
1895        tokio::net::UdpSocket::from_std(sock.into())
1896    }
1897
1898    /// Enable `IPV6_RECVPKTINFO` on an IPv6 UDP socket.
1899    ///
1900    /// After this setsockopt, each `recvmsg()` call on the socket receives
1901    /// an `IPV6_PKTINFO` control message containing the arrival interface
1902    /// index, which the DNS responder uses for its mesh-interface filter.
1903    #[cfg(unix)]
1904    fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
1905        use std::os::fd::AsRawFd;
1906        let enable: libc::c_int = 1;
1907        let ret = unsafe {
1908            libc::setsockopt(
1909                sock.as_raw_fd(),
1910                libc::IPPROTO_IPV6,
1911                libc::IPV6_RECVPKTINFO,
1912                &enable as *const _ as *const libc::c_void,
1913                std::mem::size_of::<libc::c_int>() as libc::socklen_t,
1914            )
1915        };
1916        if ret < 0 {
1917            return Err(std::io::Error::last_os_error());
1918        }
1919        Ok(())
1920    }
1921
1922    /// Resolve the mesh TUN interface index by name.
1923    ///
1924    /// Returns `None` if the interface does not exist (e.g. TUN disabled
1925    /// or not yet created). A `None` result disables the DNS responder's
1926    /// mesh-interface filter — safe, because if there is no fips0 there
1927    /// is no mesh exposure to defend against.
1928    fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
1929        #[cfg(unix)]
1930        {
1931            let c_name = std::ffi::CString::new(name).ok()?;
1932            let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
1933            if idx == 0 { None } else { Some(idx) }
1934        }
1935        #[cfg(not(unix))]
1936        {
1937            let _ = name;
1938            None
1939        }
1940    }
1941
1942    /// Stop the node.
1943    ///
1944    /// Shuts down TUN interface, stops I/O threads, and transitions to
1945    /// the Stopped state.
1946    pub async fn stop(&mut self) -> Result<(), NodeError> {
1947        if !self.state.can_stop() {
1948            return Err(NodeError::NotStarted);
1949        }
1950        self.state = NodeState::Stopping;
1951        info!(state = %self.state, "Node stopping");
1952
1953        // Stop DNS responder
1954        if let Some(handle) = self.dns_task.take() {
1955            handle.abort();
1956            debug!("DNS responder stopped");
1957        }
1958
1959        // Send disconnect notifications to all active peers before closing transports
1960        self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
1961            .await;
1962
1963        // Stop Nostr overlay discovery background work and withdraw any advert.
1964        if let Some(bootstrap) = self.nostr_discovery.take()
1965            && let Err(e) = bootstrap.shutdown().await
1966        {
1967            warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
1968        }
1969
1970        // Tear down LAN mDNS responder + browser. Best-effort: the
1971        // OS will eventually time the advert out via its TTL even if
1972        // we don't get a clean unregister out before the daemon exits.
1973        if let Some(lan) = self.lan_discovery.take() {
1974            lan.shutdown().await;
1975        }
1976
1977        // Shutdown transports (they're packet producers)
1978        let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
1979        for transport_id in transport_ids {
1980            if let Some(mut handle) = self.transports.remove(&transport_id) {
1981                let transport_type = handle.transport_type().name;
1982                match handle.stop().await {
1983                    Ok(()) => {
1984                        info!(transport_id = %transport_id, transport_type, "Transport stopped");
1985                    }
1986                    Err(e) => {
1987                        warn!(
1988                            transport_id = %transport_id,
1989                            transport_type,
1990                            error = %e,
1991                            "Transport stop failed"
1992                        );
1993                    }
1994                }
1995            }
1996        }
1997
1998        // Drop packet channels
1999        self.packet_tx.take();
2000        self.packet_rx.take();
2001
2002        // Shutdown TUN interface
2003        if let Some(name) = self.tun_name.take() {
2004            info!(name = %name, "Shutting down TUN interface");
2005
2006            // Drop the tun_tx to signal the writer to stop
2007            self.tun_tx.take();
2008
2009            // Delete the interface (on Linux, causes reader to get EFAULT)
2010            if let Err(e) = shutdown_tun_interface(&name).await {
2011                warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
2012            }
2013
2014            // On macOS, signal the reader thread to exit by writing to the
2015            // shutdown pipe. The reader's select() will wake up and break.
2016            #[cfg(target_os = "macos")]
2017            if let Some(fd) = self.tun_shutdown_fd.take() {
2018                unsafe {
2019                    libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
2020                    libc::close(fd);
2021                }
2022            }
2023
2024            // Wait for threads to finish
2025            if let Some(handle) = self.tun_reader_handle.take() {
2026                let _ = handle.join();
2027            }
2028            if let Some(handle) = self.tun_writer_handle.take() {
2029                let _ = handle.join();
2030            }
2031
2032            self.tun_state = TunState::Disabled;
2033        }
2034
2035        self.state = NodeState::Stopped;
2036        info!(state = %self.state, "Node stopped");
2037        Ok(())
2038    }
2039
2040    /// Send disconnect notifications to all active peers.
2041    ///
2042    /// Best-effort: send failures are logged and ignored since the transport
2043    /// may already be degraded. This runs before transports are shut down.
2044    async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
2045        let disconnect = Disconnect::new(reason);
2046        let plaintext = disconnect.encode();
2047
2048        // Collect node_addrs to avoid borrow conflict with send helper
2049        let peer_addrs: Vec<NodeAddr> = self
2050            .peers
2051            .iter()
2052            .filter(|(_, peer)| peer.can_send() && peer.has_session())
2053            .map(|(addr, _)| *addr)
2054            .collect();
2055
2056        if peer_addrs.is_empty() {
2057            debug!(
2058                total_peers = self.peers.len(),
2059                "No sendable peers for disconnect notification"
2060            );
2061            return;
2062        }
2063
2064        let mut sent = 0usize;
2065        for node_addr in &peer_addrs {
2066            match self
2067                .send_encrypted_link_message(node_addr, &plaintext)
2068                .await
2069            {
2070                Ok(()) => sent += 1,
2071                Err(e) => {
2072                    debug!(
2073                        peer = %self.peer_display_name(node_addr),
2074                        error = %e,
2075                        "Failed to send disconnect (transport may be down)"
2076                    );
2077                }
2078            }
2079        }
2080
2081        info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2082    }
2083
2084    fn static_peer_addresses(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
2085        peer_config
2086            .addresses_by_priority()
2087            .into_iter()
2088            .cloned()
2089            .collect()
2090    }
2091
2092    async fn nostr_peer_fallback_addresses(
2093        &self,
2094        peer_config: &PeerConfig,
2095        existing: &[PeerAddress],
2096    ) -> Vec<PeerAddress> {
2097        if !self.config.node.discovery.nostr.enabled
2098            || self.config.node.discovery.nostr.policy
2099                == crate::config::NostrDiscoveryPolicy::Disabled
2100        {
2101            return Vec::new();
2102        }
2103
2104        let Some(bootstrap) = self.nostr_discovery.clone() else {
2105            return Vec::new();
2106        };
2107        let endpoints = match bootstrap
2108            .cached_advert_endpoints_for_peer(&peer_config.npub)
2109            .await
2110        {
2111            Some(endpoints) => endpoints,
2112            None => {
2113                debug!(
2114                    npub = %peer_config.npub,
2115                    "No cached Nostr advert endpoints for configured peer"
2116                );
2117                return Vec::new();
2118            }
2119        };
2120
2121        let mut fallback = Vec::new();
2122        let mut next_priority = existing
2123            .iter()
2124            .map(|addr| addr.priority)
2125            .max()
2126            .unwrap_or(100)
2127            .saturating_add(1);
2128        // Stamp every overlay-derived candidate with the current wall clock
2129        // so the dialer ranks it ahead of unstamped operator hints. We use a
2130        // single timestamp per fetch (rather than per-endpoint) because all
2131        // candidates in a single advert are equally fresh.
2132        let seen_at_ms = Self::now_ms();
2133        for endpoint in endpoints {
2134            let Some(candidate) =
2135                Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2136            else {
2137                continue;
2138            };
2139            if existing
2140                .iter()
2141                .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2142                || fallback.iter().any(|addr: &PeerAddress| {
2143                    addr.transport == candidate.transport && addr.addr == candidate.addr
2144                })
2145            {
2146                continue;
2147            }
2148            fallback.push(candidate);
2149            next_priority = next_priority.saturating_add(1);
2150        }
2151        fallback
2152    }
2153
2154    async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2155        if !self.config.node.discovery.nostr.enabled
2156            || self.config.node.discovery.nostr.policy
2157                == crate::config::NostrDiscoveryPolicy::Disabled
2158        {
2159            return false;
2160        }
2161        let Some(bootstrap) = self.nostr_discovery.clone() else {
2162            return false;
2163        };
2164        bootstrap.request_connect(peer_config.clone()).await;
2165        info!(npub = %peer_config.npub, "Started background Nostr UDP NAT traversal attempt");
2166        true
2167    }
2168
2169    fn overlay_endpoint_to_peer_address(
2170        endpoint: &OverlayEndpointAdvert,
2171        priority: u8,
2172        seen_at_ms: u64,
2173    ) -> Option<PeerAddress> {
2174        let transport = match endpoint.transport {
2175            OverlayTransportKind::Udp => "udp",
2176            OverlayTransportKind::Tcp => "tcp",
2177            OverlayTransportKind::Tor => "tor",
2178            OverlayTransportKind::WebRtc => "webrtc",
2179        };
2180        Some(
2181            PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2182                .with_seen_at_ms(seen_at_ms),
2183        )
2184    }
2185
2186    async fn attempt_peer_address_list(
2187        &mut self,
2188        peer_config: &PeerConfig,
2189        peer_identity: PeerIdentity,
2190        allow_bootstrap_nat: bool,
2191        addresses: &[PeerAddress],
2192    ) -> Result<(), NodeError> {
2193        let mut attempted = false;
2194        let peer_node_addr = *peer_identity.node_addr();
2195        let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2196
2197        for addr in addresses {
2198            if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2199                if !allow_bootstrap_nat {
2200                    continue;
2201                }
2202                if self.request_nostr_bootstrap(peer_config).await {
2203                    attempted = true;
2204                    continue;
2205                }
2206                debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2207                continue;
2208            }
2209
2210            let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2211                match self.resolve_ethernet_addr(&addr.addr) {
2212                    Ok(result) => result,
2213                    Err(e) => {
2214                        debug!(
2215                            transport = %addr.transport,
2216                            addr = %addr.addr,
2217                            error = %e,
2218                            "Failed to resolve Ethernet address"
2219                        );
2220                        continue;
2221                    }
2222                }
2223            } else if addr.transport == "ble" {
2224                #[cfg(bluer_available)]
2225                {
2226                    match self.resolve_ble_addr(&addr.addr) {
2227                        Ok(result) => result,
2228                        Err(e) => {
2229                            debug!(
2230                                transport = %addr.transport,
2231                                addr = %addr.addr,
2232                                error = %e,
2233                                "Failed to resolve BLE address"
2234                            );
2235                            continue;
2236                        }
2237                    }
2238                }
2239                #[cfg(not(bluer_available))]
2240                {
2241                    debug!(transport = %addr.transport, "BLE transport not available on this build");
2242                    continue;
2243                }
2244            } else {
2245                let tid = if addr.transport == "udp"
2246                    && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2247                {
2248                    match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2249                        Some((id, _)) => id,
2250                        None => {
2251                            debug!(
2252                                transport = %addr.transport,
2253                                addr = %addr.addr,
2254                                "No compatible operational UDP transport for address"
2255                            );
2256                            continue;
2257                        }
2258                    }
2259                } else {
2260                    match self.find_transport_for_type(&addr.transport) {
2261                        Some(id) => id,
2262                        None => {
2263                            debug!(
2264                                transport = %addr.transport,
2265                                addr = %addr.addr,
2266                                "No operational transport for address type"
2267                            );
2268                            continue;
2269                        }
2270                    }
2271                };
2272                (tid, TransportAddr::from_string(&addr.addr))
2273            };
2274
2275            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2276                attempted = true;
2277                debug!(
2278                    npub = %peer_config.npub,
2279                    transport_id = %transport_id,
2280                    remote_addr = %remote_addr,
2281                    "Skipping duplicate in-flight candidate path"
2282                );
2283                continue;
2284            }
2285
2286            if concrete_budget == 0 {
2287                debug!(
2288                    npub = %peer_config.npub,
2289                    max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2290                    "Path candidate race budget exhausted"
2291                );
2292                break;
2293            }
2294
2295            match self
2296                .initiate_connection(transport_id, remote_addr, peer_identity)
2297                .await
2298            {
2299                Ok(()) => {
2300                    attempted = true;
2301                    concrete_budget = concrete_budget.saturating_sub(1);
2302                }
2303                Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2304                Err(e) => {
2305                    debug!(
2306                        npub = %peer_config.npub,
2307                        transport_id = %transport_id,
2308                        error = %e,
2309                        "Connection attempt failed, trying next address"
2310                    );
2311                }
2312            }
2313        }
2314
2315        if attempted {
2316            return Ok(());
2317        }
2318
2319        Err(NodeError::NoTransportForType(format!(
2320            "no operational transport for any of {}'s addresses",
2321            peer_config.npub
2322        )))
2323    }
2324
2325    async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
2326        self.run_open_discovery_sweep(bootstrap, None, "per-tick")
2327            .await;
2328    }
2329
2330    /// Open-discovery cache sweep. Iterates the cached overlay adverts and
2331    /// queues retries for non-configured, not-yet-connected peers.
2332    ///
2333    /// `max_age_secs`, if set, filters out adverts whose `created_at` is
2334    /// older than `now - max_age_secs`. The per-tick sweep passes `None`
2335    /// (relies on the cache's own `valid_until_ms` filter); the one-shot
2336    /// startup sweep passes `Some(startup_sweep_max_age_secs)`.
2337    ///
2338    /// `caller` is a short label included in log lines so per-tick and
2339    /// startup sweeps are distinguishable in operator-facing logs.
2340    pub(in crate::node) async fn run_open_discovery_sweep(
2341        &mut self,
2342        bootstrap: &std::sync::Arc<NostrDiscovery>,
2343        max_age_secs: Option<u64>,
2344        caller: &'static str,
2345    ) {
2346        if !self.config.node.discovery.nostr.enabled
2347            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2348        {
2349            return;
2350        }
2351
2352        let configured_npubs = self
2353            .config
2354            .peers()
2355            .iter()
2356            .map(|peer| peer.npub.clone())
2357            .collect::<HashSet<_>>();
2358        let now_ms = Self::now_ms();
2359        let now_secs = now_ms / 1000;
2360        let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
2361        if enqueue_budget == 0 {
2362            debug!(
2363                caller = %caller,
2364                "open-discovery sweep: enqueue budget is 0, skipping"
2365            );
2366            return;
2367        }
2368
2369        let candidates = bootstrap.cached_open_discovery_candidates(64).await;
2370        let cached_count = candidates.len();
2371        let mut enqueued = 0usize;
2372        let mut skipped_age = 0usize;
2373        let mut skipped_configured = 0usize;
2374        let mut skipped_self = 0usize;
2375        let mut skipped_connected = 0usize;
2376        let mut skipped_retry_pending = 0usize;
2377        let mut skipped_connecting = 0usize;
2378        let mut skipped_no_endpoints = 0usize;
2379        let mut skipped_invalid_npub = 0usize;
2380        let mut skipped_cooldown = 0usize;
2381
2382        for (npub, endpoints, created_at_secs) in candidates {
2383            if enqueue_budget == 0 {
2384                break;
2385            }
2386
2387            if let Some(max_age) = max_age_secs
2388                && now_secs.saturating_sub(created_at_secs) > max_age
2389            {
2390                skipped_age = skipped_age.saturating_add(1);
2391                continue;
2392            }
2393
2394            if configured_npubs.contains(&npub) {
2395                // Configured peers don't go through the open-discovery
2396                // enqueue path — their `PeerConfig` is already in
2397                // `self.config.peers()`, so the regular retry queue is
2398                // what drives their reconnect. But on cold start with
2399                // NAT'd peers, every initial `initiate_peer_connection`
2400                // fails (no overlay data yet, static cache hints empty
2401                // or stale), each pushes the peer into `retry_pending`
2402                // with exponential backoff (5/10/20/40/80s), and by the
2403                // time the next backoff slot fires the Nostr advert is
2404                // already cached — we just don't act on it for ~80s.
2405                //
2406                // The arrival of an advert (which this sweep sees) means
2407                // we now have a path to dial. If the peer's retry is
2408                // scheduled in the future, pull it forward to "now" so
2409                // the next `process_pending_retries` tick fires it
2410                // immediately. The retry path (`initiate_peer_retry_
2411                // connection` → `try_peer_addresses`) then refetches
2412                // the advert and dials it — no behavioral change
2413                // beyond schedule timing.
2414                if let Ok(identity) = PeerIdentity::from_npub(&npub) {
2415                    let configured_addr = *identity.node_addr();
2416                    if let Some(state) = self.retry_pending.get_mut(&configured_addr)
2417                        && state.retry_after_ms > now_ms
2418                    {
2419                        state.retry_after_ms = now_ms;
2420                        debug!(
2421                            caller = %caller,
2422                            peer = %self.peer_display_name(&configured_addr),
2423                            advert_age_secs = now_secs.saturating_sub(created_at_secs),
2424                            "Expediting configured-peer retry after fresh overlay advert"
2425                        );
2426                    }
2427                }
2428                skipped_configured = skipped_configured.saturating_add(1);
2429                continue;
2430            }
2431
2432            let peer_identity = match PeerIdentity::from_npub(&npub) {
2433                Ok(identity) => identity,
2434                Err(_) => {
2435                    skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
2436                    continue;
2437                }
2438            };
2439            let node_addr = *peer_identity.node_addr();
2440            if node_addr == *self.identity.node_addr() {
2441                skipped_self = skipped_self.saturating_add(1);
2442                continue;
2443            }
2444            if self.peers.contains_key(&node_addr) {
2445                skipped_connected = skipped_connected.saturating_add(1);
2446                continue;
2447            }
2448            if self.retry_pending.contains_key(&node_addr) {
2449                skipped_retry_pending = skipped_retry_pending.saturating_add(1);
2450                continue;
2451            }
2452            if bootstrap.cooldown_until(&npub, now_ms).is_some() {
2453                skipped_cooldown = skipped_cooldown.saturating_add(1);
2454                continue;
2455            }
2456            let connecting = self.connections.values().any(|conn| {
2457                conn.expected_identity()
2458                    .map(|id| id.node_addr() == &node_addr)
2459                    .unwrap_or(false)
2460            });
2461            if connecting {
2462                skipped_connecting = skipped_connecting.saturating_add(1);
2463                continue;
2464            }
2465
2466            let mut addresses = Vec::new();
2467            let mut priority = 120u8;
2468            let seen_at_ms = Self::now_ms();
2469            for endpoint in endpoints {
2470                let Some(candidate) =
2471                    Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
2472                else {
2473                    continue;
2474                };
2475                if addresses.iter().any(|existing: &PeerAddress| {
2476                    existing.transport == candidate.transport && existing.addr == candidate.addr
2477                }) {
2478                    continue;
2479                }
2480                addresses.push(candidate);
2481                priority = priority.saturating_add(1);
2482            }
2483            if addresses.is_empty() {
2484                skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
2485                continue;
2486            }
2487
2488            self.peer_aliases
2489                .entry(node_addr)
2490                .or_insert_with(|| peer_identity.short_npub());
2491            self.register_identity(node_addr, peer_identity.pubkey_full());
2492
2493            let mut state = super::retry::RetryState::new(PeerConfig {
2494                npub: npub.clone(),
2495                alias: None,
2496                addresses,
2497                connect_policy: ConnectPolicy::AutoConnect,
2498                auto_reconnect: true,
2499                discovery_fallback_transit: false,
2500            });
2501            state.reconnect = false;
2502            state.retry_after_ms = now_ms;
2503            state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
2504            self.retry_pending.insert(node_addr, state);
2505            info!(
2506                caller = %caller,
2507                peer = %peer_identity.short_npub(),
2508                advert_age_secs = now_secs.saturating_sub(created_at_secs),
2509                "open-discovery sweep: queued retry for cached advert"
2510            );
2511            enqueue_budget = enqueue_budget.saturating_sub(1);
2512            enqueued = enqueued.saturating_add(1);
2513        }
2514
2515        // Always log a one-line summary on the startup sweep so operators
2516        // can verify it ran. Per-tick sweeps are noisier; only summarize
2517        // when something happened.
2518        let total_skipped = skipped_age
2519            + skipped_configured
2520            + skipped_self
2521            + skipped_connected
2522            + skipped_retry_pending
2523            + skipped_connecting
2524            + skipped_no_endpoints
2525            + skipped_invalid_npub
2526            + skipped_cooldown;
2527        let should_summarize = caller == "startup" || enqueued > 0;
2528        if should_summarize {
2529            info!(
2530                caller = %caller,
2531                cached = cached_count,
2532                queued = enqueued,
2533                skipped_age = skipped_age,
2534                skipped_configured = skipped_configured,
2535                skipped_self = skipped_self,
2536                skipped_connected = skipped_connected,
2537                skipped_retry_pending = skipped_retry_pending,
2538                skipped_connecting = skipped_connecting,
2539                skipped_no_endpoints = skipped_no_endpoints,
2540                skipped_invalid_npub = skipped_invalid_npub,
2541                skipped_cooldown = skipped_cooldown,
2542                skipped_total = total_skipped,
2543                "open-discovery sweep complete"
2544            );
2545        }
2546    }
2547
2548    /// One-shot startup sweep: runs once after the configured settle
2549    /// delay, iterating the cached overlay adverts and queueing retries
2550    /// for any peer with a recent enough advert that we haven't already
2551    /// configured statically or established a link to.
2552    ///
2553    /// Gated identically to [`run_open_discovery_sweep`]: requires
2554    /// `node.discovery.nostr.enabled` and `policy == open`.
2555    async fn maybe_run_startup_open_discovery_sweep(
2556        &mut self,
2557        bootstrap: &std::sync::Arc<NostrDiscovery>,
2558    ) {
2559        if self.startup_open_discovery_sweep_done {
2560            return;
2561        }
2562        if !self.config.node.discovery.nostr.enabled
2563            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2564        {
2565            // Mark done so we don't keep re-checking on every tick.
2566            self.startup_open_discovery_sweep_done = true;
2567            return;
2568        }
2569        let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
2570            return;
2571        };
2572        let now_ms = Self::now_ms();
2573        let delay_ms = self
2574            .config
2575            .node
2576            .discovery
2577            .nostr
2578            .startup_sweep_delay_secs
2579            .saturating_mul(1000);
2580        if now_ms < started_at_ms.saturating_add(delay_ms) {
2581            return;
2582        }
2583
2584        let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
2585        self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
2586            .await;
2587        self.startup_open_discovery_sweep_done = true;
2588    }
2589
2590    fn available_outbound_slots(&self) -> usize {
2591        let connection_used = self
2592            .connections
2593            .len()
2594            .saturating_add(self.pending_connects.len());
2595        let connection_slots = if self.max_connections == 0 {
2596            usize::MAX
2597        } else {
2598            self.max_connections.saturating_sub(connection_used)
2599        };
2600
2601        let peer_slots = if self.max_peers == 0 {
2602            usize::MAX
2603        } else {
2604            self.max_peers.saturating_sub(self.peers.len())
2605        };
2606
2607        connection_slots.min(peer_slots)
2608    }
2609
2610    fn open_discovery_enqueue_budget(&self, configured_npubs: &HashSet<String>) -> usize {
2611        let current_open_discovery_pending = self
2612            .retry_pending
2613            .values()
2614            .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
2615            .count();
2616
2617        let cap_remaining = self
2618            .config
2619            .node
2620            .discovery
2621            .nostr
2622            .open_discovery_max_pending
2623            .saturating_sub(current_open_discovery_pending);
2624
2625        cap_remaining.min(self.available_outbound_slots())
2626    }
2627
2628    fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
2629        now_ms.saturating_add(
2630            self.config
2631                .node
2632                .discovery
2633                .nostr
2634                .advert_ttl_secs
2635                .saturating_mul(1000)
2636                .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
2637        )
2638    }
2639
2640    async fn build_overlay_advert(
2641        &self,
2642        bootstrap: &std::sync::Arc<NostrDiscovery>,
2643    ) -> Option<OverlayAdvert> {
2644        if !self.config.node.discovery.nostr.enabled {
2645            return None;
2646        }
2647
2648        let mut endpoints = Vec::new();
2649        let mut has_udp_nat = false;
2650        let mut has_webrtc = false;
2651
2652        for handle in self.transports.values() {
2653            if !handle.is_operational() {
2654                continue;
2655            }
2656
2657            match handle.transport_type().name {
2658                "udp" => {
2659                    let Some(cfg) = self.lookup_udp_config(handle.name()) else {
2660                        continue;
2661                    };
2662                    if !cfg.advertise_on_nostr() {
2663                        continue;
2664                    }
2665                    if cfg.is_public() {
2666                        // Precedence:
2667                        // 1. operator-supplied `external_addr` (skips STUN)
2668                        // 2. non-wildcard *public* `local_addr` (operator
2669                        //    bound to a specific public IP directly)
2670                        // 3. STUN auto-discovery against ephemeral socket
2671                        //    (also taken when bind is wildcard *or* private —
2672                        //    a private bind is not peer-reachable, so we
2673                        //    must publish the public reflexive instead)
2674                        // 4. loud warn + omit endpoint
2675                        if let Some(explicit) = cfg.external_advert_addr() {
2676                            endpoints.push(OverlayEndpointAdvert {
2677                                transport: OverlayTransportKind::Udp,
2678                                addr: explicit.to_string(),
2679                            });
2680                        } else {
2681                            match handle.local_addr() {
2682                                Some(addr)
2683                                    if !addr.ip().is_unspecified()
2684                                        && !is_unroutable_advert_ip(addr.ip()) =>
2685                                {
2686                                    endpoints.push(OverlayEndpointAdvert {
2687                                        transport: OverlayTransportKind::Udp,
2688                                        addr: addr.to_string(),
2689                                    });
2690                                }
2691                                Some(addr) => {
2692                                    let key = handle.transport_id().as_u32();
2693                                    let port = addr.port();
2694                                    if let Some(public) =
2695                                        bootstrap.learn_public_udp_addr(key, port).await
2696                                    {
2697                                        endpoints.push(OverlayEndpointAdvert {
2698                                            transport: OverlayTransportKind::Udp,
2699                                            addr: public.to_string(),
2700                                        });
2701                                    } else {
2702                                        warn!(
2703                                            transport_id = key,
2704                                            bind_addr = %addr,
2705                                            "advert: udp public=true but bind is wildcard \
2706                                            or private and STUN observation failed; \
2707                                            advertising no UDP endpoint. Either set \
2708                                            transports.udp.external_addr, bind to a \
2709                                            specific *public* IP, or ensure \
2710                                            node.discovery.nostr.stun_servers is reachable"
2711                                        );
2712                                    }
2713                                }
2714                                None => {}
2715                            }
2716                        }
2717                    } else {
2718                        endpoints.push(OverlayEndpointAdvert {
2719                            transport: OverlayTransportKind::Udp,
2720                            addr: "nat".to_string(),
2721                        });
2722                        has_udp_nat = true;
2723                    }
2724                }
2725                "webrtc" => {
2726                    let Some(cfg) = self.lookup_webrtc_config(handle.name()) else {
2727                        continue;
2728                    };
2729                    if !cfg.advertise_on_nostr() {
2730                        continue;
2731                    }
2732                    endpoints.push(OverlayEndpointAdvert {
2733                        transport: OverlayTransportKind::WebRtc,
2734                        addr: hex::encode(self.identity.pubkey_full().serialize()),
2735                    });
2736                    has_webrtc = true;
2737                }
2738                "tcp" => {
2739                    let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
2740                        continue;
2741                    };
2742                    if !cfg.advertise_on_nostr() {
2743                        continue;
2744                    }
2745                    // Precedence:
2746                    // 1. operator-supplied `external_addr` (only path that
2747                    //    works on cloud-NAT setups where the public IP is
2748                    //    not on a host interface).
2749                    // 2. non-wildcard *public* `local_addr` (operator bound
2750                    //    to a specific public IP directly).
2751                    // 3. loud warn + omit endpoint (no TCP STUN equivalent).
2752                    //
2753                    // A wildcard *or* private bind is never advertised as-is
2754                    // — peers off-LAN can't reach a private bind, and there
2755                    // is no TCP STUN to discover a public reflexive.
2756                    if let Some(explicit) = cfg.external_advert_addr() {
2757                        endpoints.push(OverlayEndpointAdvert {
2758                            transport: OverlayTransportKind::Tcp,
2759                            addr: explicit.to_string(),
2760                        });
2761                    } else {
2762                        match handle.local_addr() {
2763                            Some(addr)
2764                                if !addr.ip().is_unspecified()
2765                                    && !is_unroutable_advert_ip(addr.ip()) =>
2766                            {
2767                                endpoints.push(OverlayEndpointAdvert {
2768                                    transport: OverlayTransportKind::Tcp,
2769                                    addr: addr.to_string(),
2770                                });
2771                            }
2772                            Some(addr) => {
2773                                warn!(
2774                                    bind_addr = %addr,
2775                                    "advert: tcp advertise_on_nostr=true bound to wildcard \
2776                                    or private IP and no transports.tcp.external_addr set; \
2777                                    advertising no TCP endpoint. Either set external_addr \
2778                                    to the public IP (recommended for cloud 1:1-NAT setups) \
2779                                    or bind explicitly to the public IP"
2780                                );
2781                            }
2782                            None => {}
2783                        }
2784                    }
2785                }
2786                "tor" => {
2787                    let Some(cfg) = self.lookup_tor_config(handle.name()) else {
2788                        continue;
2789                    };
2790                    if !cfg.advertise_on_nostr() {
2791                        continue;
2792                    }
2793                    if let Some(addr) = handle.onion_address() {
2794                        endpoints.push(OverlayEndpointAdvert {
2795                            transport: OverlayTransportKind::Tor,
2796                            addr: format!("{}:{}", addr, cfg.advertised_port()),
2797                        });
2798                    }
2799                }
2800                _ => {}
2801            }
2802        }
2803
2804        if endpoints.is_empty() {
2805            return None;
2806        }
2807
2808        Some(OverlayAdvert {
2809            identifier: ADVERT_IDENTIFIER.to_string(),
2810            version: ADVERT_VERSION,
2811            endpoints,
2812            signal_relays: (has_udp_nat || has_webrtc)
2813                .then(|| self.config.node.discovery.nostr.dm_relays.clone()),
2814            stun_servers: (has_udp_nat || has_webrtc)
2815                .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
2816        })
2817    }
2818
2819    async fn refresh_overlay_advert(
2820        &self,
2821        bootstrap: &std::sync::Arc<NostrDiscovery>,
2822    ) -> Result<(), crate::discovery::nostr::BootstrapError> {
2823        let advert = self.build_overlay_advert(bootstrap).await;
2824        bootstrap.update_local_advert(advert).await
2825    }
2826
2827    fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
2828        match (&self.config.transports.udp, transport_name) {
2829            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2830            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2831            _ => None,
2832        }
2833    }
2834
2835    fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
2836        match (&self.config.transports.tcp, transport_name) {
2837            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2838            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2839            _ => None,
2840        }
2841    }
2842
2843    fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
2844        match (&self.config.transports.tor, transport_name) {
2845            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2846            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2847            _ => None,
2848        }
2849    }
2850
2851    fn lookup_webrtc_config(
2852        &self,
2853        transport_name: Option<&str>,
2854    ) -> Option<&crate::config::WebRtcConfig> {
2855        match (&self.config.transports.webrtc, transport_name) {
2856            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2857            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2858            _ => None,
2859        }
2860    }
2861
2862    pub(in crate::node) async fn try_peer_addresses(
2863        &mut self,
2864        peer_config: &PeerConfig,
2865        peer_identity: PeerIdentity,
2866        allow_bootstrap_nat: bool,
2867    ) -> Result<(), NodeError> {
2868        let peer_node_addr = *peer_identity.node_addr();
2869        if self.peers.contains_key(&peer_node_addr) {
2870            debug!(
2871                npub = %peer_config.npub,
2872                "Peer already exists, skipping address attempts"
2873            );
2874            return Ok(());
2875        }
2876
2877        let candidates = self.peer_address_candidates(peer_config).await;
2878
2879        if candidates.is_empty() {
2880            if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
2881                return Ok(());
2882            }
2883            return Err(NodeError::NoTransportForType(format!(
2884                "no addresses known for {}",
2885                peer_config.npub
2886            )));
2887        }
2888
2889        if self
2890            .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
2891            .await
2892            .is_ok()
2893        {
2894            return Ok(());
2895        }
2896
2897        Err(NodeError::NoTransportForType(format!(
2898            "no operational transport for any of {}'s addresses",
2899            peer_config.npub
2900        )))
2901    }
2902
2903    async fn try_active_peer_alternative_addresses(
2904        &mut self,
2905        peer_config: &PeerConfig,
2906        peer_identity: PeerIdentity,
2907    ) -> Result<bool, NodeError> {
2908        let peer_node_addr = *peer_identity.node_addr();
2909        let candidates = self.peer_address_candidates(peer_config).await;
2910
2911        if candidates.is_empty() {
2912            return Err(NodeError::NoTransportForType(format!(
2913                "no addresses known for {}",
2914                peer_config.npub
2915            )));
2916        }
2917
2918        let alternatives: Vec<_> = candidates
2919            .into_iter()
2920            .filter(|addr| !(addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat")))
2921            .filter(|addr| !self.active_peer_matches_candidate(&peer_node_addr, addr))
2922            .collect();
2923
2924        if alternatives.is_empty() {
2925            return Err(NodeError::NoTransportForType(format!(
2926                "no concrete alternate addresses known for {}",
2927                peer_config.npub
2928            )));
2929        }
2930
2931        self.attempt_peer_address_list(peer_config, peer_identity, false, &alternatives)
2932            .await?;
2933        Ok(true)
2934    }
2935
2936    async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
2937        // Merge every candidate from every source we have for this peer
2938        // (operator-configured static addresses, freshly fetched overlay
2939        // adverts, callers' recent-peers caches via `update_peers`) and
2940        // try them in order of `seen_at_ms` descending — most-recent
2941        // first, source-agnostic. Addresses without a freshness signal
2942        // sort last. Addresses are hints, not the final word; concrete
2943        // candidates in a single pass race each other and `udp:nat` only
2944        // starts background Nostr traversal without suppressing direct
2945        // static/LAN attempts that appear later in the list.
2946        let static_addresses = self.static_peer_addresses(peer_config);
2947        let overlay_addresses = self
2948            .nostr_peer_fallback_addresses(peer_config, &static_addresses)
2949            .await;
2950
2951        let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
2952        for addr in overlay_addresses.into_iter().chain(static_addresses) {
2953            if !candidates.iter().any(|existing: &PeerAddress| {
2954                existing.transport == addr.transport && existing.addr == addr.addr
2955            }) {
2956                candidates.push(addr);
2957            }
2958        }
2959
2960        // Stable sort: most-recently-observed first (Some > None), tiebreak
2961        // by input order so equal-timestamp addresses keep operator-supplied
2962        // priority and the overlay-then-static merge order survives when
2963        // nothing has a freshness signal.
2964        candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
2965            (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
2966            (Some(_), None) => std::cmp::Ordering::Less,
2967            (None, Some(_)) => std::cmp::Ordering::Greater,
2968            (None, None) => std::cmp::Ordering::Equal,
2969        });
2970
2971        candidates
2972    }
2973
2974    fn active_peer_matches_any_candidate(
2975        &self,
2976        peer_node_addr: &NodeAddr,
2977        candidates: &[PeerAddress],
2978    ) -> bool {
2979        candidates
2980            .iter()
2981            .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
2982    }
2983
2984    pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
2985        &self,
2986        peer_node_addr: &NodeAddr,
2987        candidates: &[PeerAddress],
2988    ) -> bool {
2989        if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
2990            return false;
2991        }
2992        !self.active_peer_needs_same_path_refresh(peer_node_addr)
2993    }
2994
2995    fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
2996        let Some(peer) = self.peers.get(peer_node_addr) else {
2997            return false;
2998        };
2999        let stale_after_ms = self
3000            .config
3001            .node
3002            .heartbeat_interval_secs
3003            .saturating_mul(1000)
3004            .max(1000);
3005        peer.idle_time(Self::now_ms()) > stale_after_ms
3006    }
3007
3008    fn active_peer_matches_candidate(
3009        &self,
3010        peer_node_addr: &NodeAddr,
3011        candidate: &PeerAddress,
3012    ) -> bool {
3013        let Some(peer) = self.peers.get(peer_node_addr) else {
3014            return false;
3015        };
3016        let Some(current_addr) = peer.current_addr() else {
3017            return false;
3018        };
3019        if let Some(peer_transport_id) = peer.transport_id()
3020            && let Some((candidate_transport_id, candidate_addr)) =
3021                self.resolve_peer_address_for_match(candidate)
3022        {
3023            return peer_transport_id == candidate_transport_id && current_addr == &candidate_addr;
3024        }
3025        if peer
3026            .transport_id()
3027            .map(|id| self.bootstrap_transports.contains(&id))
3028            .unwrap_or(false)
3029        {
3030            return false;
3031        }
3032        let current_addr = current_addr.to_string();
3033        let current_transport = peer
3034            .transport_id()
3035            .and_then(|id| self.transports.get(&id))
3036            .map(|transport| transport.transport_type().name);
3037
3038        candidate.addr == current_addr
3039            && current_transport
3040                .map(|transport| transport == candidate.transport)
3041                .unwrap_or(true)
3042    }
3043
3044    // === Control API methods ===
3045
3046    /// Connect to a peer via the control API.
3047    ///
3048    /// Creates an ephemeral peer connection (not persisted to config, no
3049    /// auto-reconnect). Reuses the same connection path as auto-connect
3050    /// peers. Returns JSON data on success or an error message.
3051    pub(crate) async fn api_connect(
3052        &mut self,
3053        npub: &str,
3054        address: &str,
3055        transport: &str,
3056    ) -> Result<serde_json::Value, String> {
3057        let peer_config = PeerConfig {
3058            npub: npub.to_string(),
3059            alias: None,
3060            addresses: vec![PeerAddress::new(transport, address)],
3061            connect_policy: ConnectPolicy::Manual,
3062            auto_reconnect: false,
3063            discovery_fallback_transit: true,
3064        };
3065
3066        // Pre-seed identity cache (same as initiate_peer_connections does)
3067        if let Ok(identity) = PeerIdentity::from_npub(npub) {
3068            self.peer_aliases
3069                .insert(*identity.node_addr(), identity.short_npub());
3070            self.register_identity(*identity.node_addr(), identity.pubkey_full());
3071        }
3072
3073        self.initiate_peer_connection(&peer_config)
3074            .await
3075            .map(|()| {
3076                info!(
3077                    npub = %npub,
3078                    address = %address,
3079                    transport = %transport,
3080                    "API connect initiated"
3081                );
3082                serde_json::json!({
3083                    "npub": npub,
3084                    "address": address,
3085                    "transport": transport,
3086                })
3087            })
3088            .map_err(|e| e.to_string())
3089    }
3090
3091    /// Disconnect a peer via the control API.
3092    ///
3093    /// Removes the peer and suppresses auto-reconnect.
3094    pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
3095        let peer_identity =
3096            PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
3097        let node_addr = *peer_identity.node_addr();
3098
3099        if !self.peers.contains_key(&node_addr) {
3100            return Err(format!("peer not found: {npub}"));
3101        }
3102
3103        // Remove the peer (full cleanup: sessions, indices, links, tree, bloom)
3104        self.remove_active_peer(&node_addr);
3105
3106        // Suppress any pending auto-reconnect
3107        self.retry_pending.remove(&node_addr);
3108
3109        info!(npub = %npub, "API disconnect completed");
3110
3111        Ok(serde_json::json!({
3112            "npub": npub,
3113            "disconnected": true,
3114        }))
3115    }
3116
3117    /// Adopt an already-established UDP traversal and start the normal FIPS
3118    /// Noise handshake over it.
3119    ///
3120    /// This is intended for integration with an external rendezvous runtime
3121    /// that has already completed relay signaling, STUN observation, and UDP
3122    /// hole punching. After handoff, the adopted socket is owned by FIPS.
3123    pub async fn adopt_established_traversal(
3124        &mut self,
3125        traversal: EstablishedTraversal,
3126    ) -> Result<BootstrapHandoffResult, NodeError> {
3127        debug!(
3128            peer_npub = %traversal.peer_npub,
3129            session_id = %traversal.session_id,
3130            remote_addr = %traversal.remote_addr,
3131            "adopting established traversal socket"
3132        );
3133
3134        if !self.state.is_operational() {
3135            return Err(NodeError::NotStarted);
3136        }
3137
3138        let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
3139        let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
3140            NodeError::InvalidPeerNpub {
3141                npub: traversal.peer_npub.clone(),
3142                reason: e.to_string(),
3143            }
3144        })?;
3145        let peer_node_addr = *peer_identity.node_addr();
3146        if self.peers.contains_key(&peer_node_addr) {
3147            debug!(
3148                peer_npub = %traversal.peer_npub,
3149                "Adopting NAT traversal handoff as alternate path for already-connected peer"
3150            );
3151        }
3152
3153        self.peer_aliases
3154            .insert(peer_node_addr, peer_identity.short_npub());
3155        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
3156
3157        let transport_id = self.allocate_transport_id();
3158        // Adopted ephemeral UDP transports inherit MTU + socket-buffer sizing
3159        // (and accept_connections / advertise flags) from the operator's
3160        // configured [transports.udp] when the bootstrap runtime doesn't
3161        // pass an explicit override. Lookup tries `transport_name` first
3162        // (covers the `Named` multi-listener variant) and falls back to the
3163        // unnamed `Single` listener, so single- and named-listener configs
3164        // both inherit cleanly.
3165        //
3166        // Tradeoff: `UdpConfig::default()` sets MTU 1280 (IPv6 minimum), the
3167        // only value guaranteed to survive arbitrary middlebox paths.
3168        // Inheriting a higher operator-chosen MTU means NAT-traversed flows
3169        // initially attempt that MTU and may black-hole on tighter paths
3170        // until reactive `MtuExceeded` recovery kicks in. Operators who
3171        // raise the primary MTU based on known-clean topology accept that
3172        // tradeoff; the silent drop on a too-low default was strictly
3173        // worse for the common case where the primary MTU is reachable.
3174        //
3175        // Bind / external address fields are cleared since the socket is
3176        // already bound.
3177        let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
3178            let mut cfg = self
3179                .lookup_udp_config(traversal.transport_name.as_deref())
3180                .or_else(|| self.lookup_udp_config(None))
3181                .cloned()
3182                .unwrap_or_default();
3183            cfg.bind_addr = None;
3184            cfg.external_addr = None;
3185            cfg
3186        });
3187        let mut transport = crate::transport::udp::UdpTransport::new(
3188            transport_id,
3189            traversal.transport_name.clone(),
3190            inherited_config,
3191            packet_tx,
3192        );
3193
3194        transport
3195            .adopt_socket_async(traversal.socket)
3196            .await
3197            .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
3198
3199        let local_addr = transport.local_addr().ok_or_else(|| {
3200            NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
3201        })?;
3202
3203        self.transports.insert(
3204            transport_id,
3205            crate::transport::TransportHandle::Udp(transport),
3206        );
3207        self.bootstrap_transports.insert(transport_id);
3208        self.bootstrap_transport_npubs
3209            .insert(transport_id, traversal.peer_npub.clone());
3210
3211        let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
3212        if let Err(err) = self
3213            .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
3214            .await
3215        {
3216            self.bootstrap_transports.remove(&transport_id);
3217            self.bootstrap_transport_npubs.remove(&transport_id);
3218            if let Some(mut handle) = self.transports.remove(&transport_id) {
3219                let _ = handle.stop().await;
3220            }
3221            return Err(err);
3222        }
3223
3224        info!(
3225            peer = %self.peer_display_name(&peer_node_addr),
3226            transport_id = %transport_id,
3227            local_addr = %local_addr,
3228            remote_addr = %traversal.remote_addr,
3229            session_id = %traversal.session_id,
3230            "adopted NAT traversal socket; handshake initiated"
3231        );
3232
3233        Ok(BootstrapHandoffResult {
3234            transport_id,
3235            local_addr,
3236            remote_addr: traversal.remote_addr,
3237            peer_node_addr,
3238            session_id: traversal.session_id,
3239        })
3240    }
3241}