Skip to main content

fips_core/node/
lifecycle.rs

1//! Node lifecycle management: start, stop, and peer connection initiation.
2
3use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6    ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, NostrDiscovery, OverlayAdvert,
7    OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use std::collections::{HashMap, HashSet};
18use std::net::{IpAddr, SocketAddr};
19use std::thread;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23#[cfg(debug_assertions)]
24fn node_start_debug_log(message: impl AsRef<str>) {
25    use std::io::Write as _;
26
27    if let Ok(mut file) = std::fs::OpenOptions::new()
28        .create(true)
29        .append(true)
30        .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
31    {
32        let _ = writeln!(
33            file,
34            "{:?} {}",
35            std::time::SystemTime::now(),
36            message.as_ref()
37        );
38    }
39}
40
41#[cfg(not(debug_assertions))]
42fn node_start_debug_log(_message: impl AsRef<str>) {}
43
44/// True if `ip` is not a viable canonical advert endpoint for peers off
45/// the publisher's own LAN. Covers RFC1918, loopback, link-local, IPv4
46/// CGNAT (100.64/10), unspecified, multicast/benchmark, and IPv6
47/// unique-local/loopback/unspecified. We never publish these as the
48/// peer's primary `runtime_endpoint`; an off-LAN consumer can't route
49/// to them, and latching one in onto a slow overlay-relay fallback is
50/// the original bug this guard exists to prevent.
51fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
52    match ip {
53        IpAddr::V4(v4) => {
54            v4.is_private()
55                || v4.is_loopback()
56                || v4.is_link_local()
57                || v4.is_unspecified()
58                || v4.is_multicast()
59                || v4.is_broadcast()
60                || v4.is_documentation()
61                // 100.64.0.0/10 — CGNAT, RFC 6598. Not routable on the
62                // public internet; behaves like an extra NAT layer.
63                || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
64        }
65        IpAddr::V6(v6) => {
66            v6.is_loopback()
67                || v6.is_unspecified()
68                || v6.is_unique_local()
69                || v6.is_multicast()
70                // IPv6 link-local: fe80::/10
71                || (v6.segments()[0] & 0xffc0) == 0xfe80
72        }
73    }
74}
75
76fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
77    matches!(
78        (local, remote),
79        (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
80    )
81}
82
83const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
84const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
85const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
86const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
87
88impl Node {
89    /// Initiate connections to configured static peers.
90    ///
91    /// For each peer configured with AutoConnect policy, creates a link and
92    /// peer entry, then starts the Noise handshake by sending the first message.
93    /// Replace the runtime peer list. Newly added auto-connect peers get
94    /// `initiate_peer_connection` immediately; removed peers are dropped from
95    /// the retry queue (the regular liveness timeout reaps any active session
96    /// — we don't proactively disconnect, since the same npub might be on its
97    /// way back via a fresh advert). Existing auto-connect entries with new
98    /// direct addresses are dialed immediately, and retry state is refreshed
99    /// so later attempts also use the latest hints.
100    pub(super) async fn update_peers(
101        &mut self,
102        new_peers: Vec<crate::config::PeerConfig>,
103    ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
104        use std::collections::{HashMap, HashSet};
105
106        let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
107            HashMap::with_capacity(new_peers.len());
108        for peer in new_peers {
109            let identity = match PeerIdentity::from_npub(&peer.npub) {
110                Ok(id) => id,
111                Err(e) => {
112                    return Err(crate::node::NodeError::InvalidPeerNpub {
113                        npub: peer.npub.clone(),
114                        reason: e.to_string(),
115                    });
116                }
117            };
118            // Last-write-wins on duplicates so callers passing a multi-source
119            // candidate list (e.g. operator hints + recent-peers cache for
120            // the same npub) get the merge they meant.
121            new_by_addr.insert(*identity.node_addr(), peer);
122        }
123
124        let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
125            .config
126            .peers()
127            .iter()
128            .filter_map(|pc| {
129                PeerIdentity::from_npub(&pc.npub)
130                    .ok()
131                    .map(|id| (*id.node_addr(), pc.clone()))
132            })
133            .collect();
134
135        let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
136        let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
137
138        let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
139        let added: Vec<_> = new_addrs.difference(&current_addrs).copied().collect();
140        let kept: Vec<_> = new_addrs.intersection(&current_addrs).copied().collect();
141
142        let mut outcome = crate::node::UpdatePeersOutcome::default();
143
144        for node_addr in &removed {
145            if self.retry_pending.remove(node_addr).is_some() {
146                debug!(
147                    peer = %self.peer_display_name(node_addr),
148                    "Dropping retry entry for peer removed from runtime peer list"
149                );
150            }
151            self.peer_aliases.remove(node_addr);
152            self.set_discovery_fallback_transit_allowed(*node_addr, false);
153            outcome.removed += 1;
154        }
155
156        let mut auto_connect_refresh_configs = Vec::new();
157        for node_addr in &kept {
158            let new_pc = &new_by_addr[node_addr];
159            let current_pc = &current_by_addr[node_addr];
160            if new_pc.addresses != current_pc.addresses
161                || new_pc.alias != current_pc.alias
162                || new_pc.connect_policy != current_pc.connect_policy
163                || new_pc.auto_reconnect != current_pc.auto_reconnect
164                || new_pc.discovery_fallback_transit != current_pc.discovery_fallback_transit
165            {
166                outcome.updated += 1;
167                self.set_discovery_fallback_transit_allowed(
168                    *node_addr,
169                    new_pc.discovery_fallback_transit,
170                );
171                if let Some(state) = self.retry_pending.get_mut(node_addr) {
172                    state.peer_config = new_pc.clone();
173                    state.retry_after_ms = Self::now_ms();
174                }
175                if let Some(alias) = new_pc.alias.clone() {
176                    self.peer_aliases.insert(*node_addr, alias);
177                }
178                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
179                    auto_connect_refresh_configs.push(new_pc.clone());
180                }
181            } else {
182                outcome.unchanged += 1;
183                self.set_discovery_fallback_transit_allowed(
184                    *node_addr,
185                    new_pc.discovery_fallback_transit,
186                );
187                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
188                    auto_connect_refresh_configs.push(new_pc.clone());
189                }
190            }
191        }
192
193        let added_configs: Vec<crate::config::PeerConfig> =
194            added.iter().map(|addr| new_by_addr[addr].clone()).collect();
195
196        // Replace the live config peer list before initiating connections so
197        // any helper that consults `self.config.peers()` during the dial
198        // (alias lookup, retry-state seeding) sees the new entries.
199        self.config.peers = new_by_addr.into_values().collect();
200
201        for peer_config in added_configs {
202            outcome.added += 1;
203            let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
204                continue;
205            };
206            let name = peer_config
207                .alias
208                .clone()
209                .unwrap_or_else(|| identity.short_npub());
210            self.peer_aliases.insert(*identity.node_addr(), name);
211            self.set_discovery_fallback_transit_allowed(
212                *identity.node_addr(),
213                peer_config.discovery_fallback_transit,
214            );
215            self.register_identity(*identity.node_addr(), identity.pubkey_full());
216
217            if !peer_config.is_auto_connect() {
218                continue;
219            }
220
221            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
222                warn!(
223                    npub = %peer_config.npub,
224                    error = %e,
225                    "Failed to initiate connection for newly added peer"
226                );
227                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
228                    self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
229                }
230                if matches!(e, crate::node::NodeError::NoTransportForType(_))
231                    && let Some(bootstrap) = self.nostr_discovery.clone()
232                {
233                    bootstrap
234                        .request_advert_stale_check(peer_config.npub.clone())
235                        .await;
236                }
237            }
238        }
239
240        for peer_config in auto_connect_refresh_configs {
241            let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
242                continue;
243            };
244            let node_addr = *peer_identity.node_addr();
245
246            if self.peers.contains_key(&node_addr) {
247                match self
248                    .initiate_active_peer_alternative_connection(&peer_config)
249                    .await
250                {
251                    Ok(attempted) => {
252                        if attempted {
253                            debug!(
254                                peer = %self.peer_display_name(&node_addr),
255                                "Started non-disruptive alternate-path handshake for active peer"
256                            );
257                        }
258                    }
259                    Err(e) => {
260                        debug!(
261                            npub = %peer_config.npub,
262                            error = %e,
263                            "Active peer alternate-path refresh did not start"
264                        );
265                    }
266                }
267                continue;
268            }
269
270            match self.initiate_peer_connection(&peer_config).await {
271                Ok(()) => {
272                    let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
273                    if let Some(state) = self.retry_pending.get_mut(&node_addr) {
274                        state.peer_config = peer_config;
275                        state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
276                    }
277                }
278                Err(e) => {
279                    debug!(
280                        npub = %peer_config.npub,
281                        error = %e,
282                        "Refreshed peer addresses did not initiate a direct connection"
283                    );
284                    self.schedule_retry(node_addr, Self::now_ms());
285                }
286            }
287        }
288
289        self.warm_auto_connect_graph_sessions().await;
290
291        Ok(outcome)
292    }
293
294    pub(super) async fn initiate_peer_connections(&mut self) {
295        // Build display name map from all configured peers (alias or short npub),
296        // and pre-seed the identity cache from each peer's npub so that TUN packets
297        // addressed to a configured peer can be dispatched (and trigger session
298        // initiation) immediately on startup — without waiting for the link-layer
299        // handshake to complete first.
300        let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
301            .config
302            .peers()
303            .iter()
304            .filter_map(|pc| {
305                PeerIdentity::from_npub(&pc.npub)
306                    .ok()
307                    .map(|id| (id, pc.alias.clone()))
308            })
309            .collect();
310
311        for (identity, alias) in peer_identities {
312            let name = alias.unwrap_or_else(|| identity.short_npub());
313            self.peer_aliases.insert(*identity.node_addr(), name);
314            // Pre-seed identity cache. The parity may be wrong (npub is x-only)
315            // but will be corrected to the real value when the peer is promoted
316            // after a successful Noise handshake.
317            self.register_identity(*identity.node_addr(), identity.pubkey_full());
318        }
319
320        // Collect peer configs to avoid borrow conflicts
321        let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
322
323        if peer_configs.is_empty() {
324            debug!("No static peers configured");
325            return;
326        }
327
328        debug!(
329            count = peer_configs.len(),
330            "Initiating static peer connections"
331        );
332
333        for peer_config in peer_configs {
334            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
335                warn!(
336                    npub = %peer_config.npub,
337                    alias = ?peer_config.alias,
338                    error = %e,
339                    "Failed to initiate peer connection"
340                );
341                // Schedule a retry so transient address-resolution failures
342                // (e.g. cached endpoints stale, NAT rebinds, all addresses
343                // currently unreachable) recover without a daemon restart.
344                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
345                    self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
346                }
347                // No-transport failures most often mean the cached overlay
348                // advert is pointing at a dead post-NAT-rebind address. The
349                // advert cache is read-only inside fetch_advert, so retries
350                // would loop on the same dead address until expiry. Force a
351                // re-fetch so the next retry tick picks up fresh endpoints.
352                if matches!(e, crate::node::NodeError::NoTransportForType(_))
353                    && let Some(bootstrap) = self.nostr_discovery.clone()
354                {
355                    bootstrap
356                        .request_advert_stale_check(peer_config.npub.clone())
357                        .await;
358                }
359            }
360        }
361
362        self.warm_auto_connect_graph_sessions().await;
363    }
364
365    /// Initiate a connection to a single peer.
366    ///
367    /// Creates a link, starts the Noise handshake, and sends the first message.
368    pub(super) async fn initiate_peer_connection(
369        &mut self,
370        peer_config: &crate::config::PeerConfig,
371    ) -> Result<(), NodeError> {
372        self.initiate_peer_connection_inner(peer_config).await
373    }
374
375    /// Initiate a connection from the retry path. Identical to
376    /// [`initiate_peer_connection`] today — both paths fan out across every
377    /// known address (overlay-fresh first, then operator/cache hints) in a
378    /// single pass. The two entry points stay separate so callers can be
379    /// distinguished in tracing.
380    pub(super) async fn initiate_peer_retry_connection(
381        &mut self,
382        peer_config: &crate::config::PeerConfig,
383    ) -> Result<(), NodeError> {
384        self.initiate_peer_connection_inner(peer_config).await
385    }
386
387    async fn initiate_active_peer_alternative_connection(
388        &mut self,
389        peer_config: &crate::config::PeerConfig,
390    ) -> Result<bool, NodeError> {
391        let peer_identity =
392            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
393                npub: peer_config.npub.clone(),
394                reason: e.to_string(),
395            })?;
396        let peer_node_addr = *peer_identity.node_addr();
397
398        if !self.peers.contains_key(&peer_node_addr) {
399            self.initiate_peer_connection(peer_config).await?;
400            return Ok(true);
401        }
402
403        // Keep the current link live and race fresh concrete candidates.
404        // Cross-connection resolution still decides which replacement link
405        // wins if both peers try the same upgrade; the important part here is
406        // that a stale path does not depend on the remote peer receiving our
407        // hint first before either side attempts the better address.
408        self.try_active_peer_alternative_addresses(peer_config, peer_identity)
409            .await
410    }
411
412    async fn initiate_peer_connection_inner(
413        &mut self,
414        peer_config: &crate::config::PeerConfig,
415    ) -> Result<(), NodeError> {
416        // Parse the peer's npub to get their identity
417        let peer_identity =
418            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
419                npub: peer_config.npub.clone(),
420                reason: e.to_string(),
421            })?;
422
423        let peer_node_addr = *peer_identity.node_addr();
424
425        // Check if peer already exists (fully authenticated)
426        if self.peers.contains_key(&peer_node_addr) {
427            debug!(
428                npub = %peer_config.npub,
429                "Peer already exists, skipping"
430            );
431            return Ok(());
432        }
433
434        self.try_peer_addresses(peer_config, peer_identity, true)
435            .await
436    }
437
438    fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
439        self.connections.values().any(|conn| {
440            conn.expected_identity()
441                .map(|id| id.node_addr() == peer_node_addr)
442                .unwrap_or(false)
443        })
444    }
445
446    fn is_connecting_to_peer_on_path(
447        &self,
448        peer_node_addr: &NodeAddr,
449        transport_id: TransportId,
450        remote_addr: &TransportAddr,
451    ) -> bool {
452        self.connections.values().any(|conn| {
453            conn.expected_identity()
454                .map(|id| id.node_addr() == peer_node_addr)
455                .unwrap_or(false)
456                && conn.transport_id() == Some(transport_id)
457                && conn.source_addr() == Some(remote_addr)
458        }) || self.pending_connects.iter().any(|pending| {
459            pending.peer_identity.node_addr() == peer_node_addr
460                && pending.transport_id == transport_id
461                && &pending.remote_addr == remote_addr
462        })
463    }
464
465    pub(in crate::node) fn should_warm_auto_connect_session(
466        &self,
467        peer_node_addr: &NodeAddr,
468    ) -> bool {
469        if self.peers.contains_key(peer_node_addr)
470            || self
471                .sessions
472                .get(peer_node_addr)
473                .is_some_and(|entry| entry.is_established())
474        {
475            return false;
476        }
477
478        self.config.peers().iter().any(|peer| {
479            peer.is_auto_connect()
480                && PeerIdentity::from_npub(&peer.npub)
481                    .map(|identity| identity.node_addr() == peer_node_addr)
482                    .unwrap_or(false)
483        })
484    }
485
486    pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
487        if !self.peers.values().any(|peer| peer.can_send()) {
488            return 0;
489        }
490
491        let mut budget = self.graph_session_warmup_budget();
492        if budget == 0 {
493            return 0;
494        }
495
496        let peer_identities: Vec<_> = self
497            .config
498            .auto_connect_peers()
499            .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
500            .collect();
501
502        let mut warmed = 0;
503        for identity in peer_identities {
504            if budget == 0 {
505                break;
506            }
507
508            let peer_node_addr = *identity.node_addr();
509            if peer_node_addr == *self.identity.node_addr()
510                || !self.should_warm_auto_connect_session(&peer_node_addr)
511                || self
512                    .sessions
513                    .get(&peer_node_addr)
514                    .is_some_and(|entry| entry.is_initiating())
515            {
516                continue;
517            }
518
519            self.register_identity(peer_node_addr, identity.pubkey_full());
520
521            if self.find_next_hop(&peer_node_addr).is_some() {
522                match self
523                    .initiate_session(peer_node_addr, identity.pubkey_full())
524                    .await
525                {
526                    Ok(()) => {
527                        warmed += 1;
528                        budget = budget.saturating_sub(1);
529                        debug!(
530                            peer = %self.peer_display_name(&peer_node_addr),
531                            "Warmed auto-connect peer session over existing FIPS graph"
532                        );
533                    }
534                    Err(NodeError::SendFailed { node_addr, reason })
535                        if node_addr == peer_node_addr && reason == "no route to destination" =>
536                    {
537                        self.maybe_initiate_lookup(&peer_node_addr).await;
538                        warmed += 1;
539                        budget = budget.saturating_sub(1);
540                    }
541                    Err(err) => {
542                        debug!(
543                            peer = %self.peer_display_name(&peer_node_addr),
544                            error = %err,
545                            "Failed to warm auto-connect peer session"
546                        );
547                    }
548                }
549            } else {
550                self.maybe_initiate_lookup(&peer_node_addr).await;
551                warmed += 1;
552                budget = budget.saturating_sub(1);
553            }
554        }
555
556        warmed
557    }
558
559    pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
560        let max_destinations = self.config.node.session.pending_max_destinations;
561        if max_destinations == 0 {
562            return 0;
563        }
564
565        let pending_sessions = self
566            .sessions
567            .values()
568            .filter(|entry| !entry.is_established())
569            .count();
570        let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
571        max_destinations
572            .saturating_sub(pending_total)
573            .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
574    }
575
576    fn outbound_handshake_slots(&self) -> usize {
577        let used = self
578            .connections
579            .len()
580            .saturating_add(self.pending_connects.len());
581        if self.max_connections == 0 {
582            usize::MAX
583        } else {
584            self.max_connections.saturating_sub(used)
585        }
586    }
587
588    fn outbound_link_slots(&self) -> usize {
589        if self.max_links == 0 {
590            usize::MAX
591        } else {
592            self.max_links.saturating_sub(self.links.len())
593        }
594    }
595
596    fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
597        if !self.peers.contains_key(peer_node_addr)
598            && self.max_peers > 0
599            && self.peers.len() >= self.max_peers
600        {
601            return 0;
602        }
603
604        let in_flight_for_peer = self
605            .connections
606            .values()
607            .filter(|conn| {
608                conn.expected_identity()
609                    .map(|id| id.node_addr() == peer_node_addr)
610                    .unwrap_or(false)
611            })
612            .count()
613            .saturating_add(
614                self.pending_connects
615                    .iter()
616                    .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
617                    .count(),
618            );
619
620        self.outbound_handshake_slots()
621            .min(self.outbound_link_slots())
622            .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
623    }
624
625    fn discovery_connect_budget(&self) -> usize {
626        self.outbound_handshake_slots()
627            .min(self.outbound_link_slots())
628            .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
629    }
630
631    /// Find a UDP transport whose bound socket can send to `remote_addr`.
632    ///
633    /// LAN discovery can surface both IPv4 and IPv6 addresses for the same
634    /// service. A wildcard IPv4 socket cannot send to an IPv6 link-local
635    /// target, and vice versa, so callers must choose by socket family rather
636    /// than by transport type alone.
637    fn find_udp_transport_for_remote_addr(
638        &self,
639        remote_addr: SocketAddr,
640    ) -> Option<(TransportId, SocketAddr)> {
641        self.transports
642            .iter()
643            .filter(|(id, handle)| {
644                handle.transport_type().name == "udp"
645                    && handle.is_operational()
646                    && !self.bootstrap_transports.contains(id)
647            })
648            .filter_map(|(id, handle)| {
649                let local_addr = handle.local_addr()?;
650                socket_addr_families_compatible(local_addr, remote_addr)
651                    .then_some((*id, local_addr))
652            })
653            .min_by_key(|(id, _)| id.as_u32())
654    }
655
656    pub(super) fn transport_discovery_candidate(
657        &self,
658        discovered_transport_id: TransportId,
659        discovered_addr: TransportAddr,
660    ) -> Option<(TransportId, TransportAddr, &'static str)> {
661        let transport = self.transports.get(&discovered_transport_id)?;
662        let transport_name = transport.transport_type().name;
663
664        if transport_name != "udp" {
665            return Some((discovered_transport_id, discovered_addr, transport_name));
666        }
667
668        let Some(remote_socket_addr) = discovered_addr
669            .as_str()
670            .and_then(|addr| addr.parse::<SocketAddr>().ok())
671        else {
672            if self.bootstrap_transports.contains(&discovered_transport_id) {
673                debug!(
674                    transport_id = %discovered_transport_id,
675                    remote_addr = %discovered_addr,
676                    "transport discovery: skip non-numeric UDP address from bootstrap transport"
677                );
678                return None;
679            }
680            return Some((discovered_transport_id, discovered_addr, transport_name));
681        };
682
683        let Some((transport_id, local_addr)) =
684            self.find_udp_transport_for_remote_addr(remote_socket_addr)
685        else {
686            debug!(
687                transport_id = %discovered_transport_id,
688                remote_addr = %discovered_addr,
689                "transport discovery: skip UDP peer with no compatible local socket"
690            );
691            return None;
692        };
693
694        if transport_id != discovered_transport_id {
695            debug!(
696                discovered_transport_id = %discovered_transport_id,
697                selected_transport_id = %transport_id,
698                local_addr = %local_addr,
699                remote_addr = %remote_socket_addr,
700                "transport discovery: selected compatible UDP transport"
701            );
702        }
703
704        Some((
705            transport_id,
706            TransportAddr::from_socket_addr(remote_socket_addr),
707            transport_name,
708        ))
709    }
710
711    /// Initiate a connection to a peer on a specific transport and address.
712    ///
713    /// For connectionless transports (UDP, Ethernet): allocates a link, starts
714    /// the Noise IK handshake, sends msg1, and registers the connection for
715    /// msg2 dispatch.
716    ///
717    /// For connection-oriented transports (TCP, Tor): allocates a link and
718    /// starts a non-blocking transport connect. The handshake is deferred
719    /// until the transport connection is established — the tick handler
720    /// polls `connection_state()` and initiates the handshake when ready.
721    pub(super) async fn initiate_connection(
722        &mut self,
723        transport_id: TransportId,
724        remote_addr: TransportAddr,
725        peer_identity: PeerIdentity,
726    ) -> Result<(), NodeError> {
727        let peer_node_addr = *peer_identity.node_addr();
728
729        if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
730            debug!(
731                peer = %self.peer_display_name(&peer_node_addr),
732                transport_id = %transport_id,
733                remote_addr = %remote_addr,
734                "Connection already in progress for candidate path"
735            );
736            return Ok(());
737        }
738
739        if self.outbound_handshake_slots() == 0 {
740            return Err(NodeError::MaxConnectionsExceeded {
741                max: self.max_connections,
742            });
743        }
744
745        if self.outbound_link_slots() == 0 {
746            return Err(NodeError::MaxLinksExceeded {
747                max: self.max_links,
748            });
749        }
750
751        if !self.peers.contains_key(&peer_node_addr)
752            && self.max_peers > 0
753            && self.peers.len() >= self.max_peers
754        {
755            return Err(NodeError::MaxPeersExceeded {
756                max: self.max_peers,
757            });
758        }
759
760        self.authorize_peer(
761            &peer_identity,
762            PeerAclContext::OutboundConnect,
763            transport_id,
764            &remote_addr,
765        )?;
766
767        let is_connection_oriented = self
768            .transports
769            .get(&transport_id)
770            .map(|t| t.transport_type().connection_oriented)
771            .unwrap_or(false);
772
773        // Allocate link ID and create link
774        let link_id = self.allocate_link_id();
775
776        let link = if is_connection_oriented {
777            Link::new(
778                link_id,
779                transport_id,
780                remote_addr.clone(),
781                LinkDirection::Outbound,
782                Duration::from_millis(self.config.node.base_rtt_ms),
783            )
784        } else {
785            Link::connectionless(
786                link_id,
787                transport_id,
788                remote_addr.clone(),
789                LinkDirection::Outbound,
790                Duration::from_millis(self.config.node.base_rtt_ms),
791            )
792        };
793
794        self.links.insert(link_id, link);
795
796        // Add reverse lookup for packet dispatch
797        self.addr_to_link
798            .insert((transport_id, remote_addr.clone()), link_id);
799
800        if is_connection_oriented {
801            // Connection-oriented: start non-blocking connect, defer handshake
802            if let Some(transport) = self.transports.get(&transport_id) {
803                match transport.connect(&remote_addr).await {
804                    Ok(()) => {
805                        debug!(
806                            peer = %self.peer_display_name(&peer_node_addr),
807                            transport_id = %transport_id,
808                            remote_addr = %remote_addr,
809                            link_id = %link_id,
810                            "Transport connect initiated (non-blocking)"
811                        );
812                        self.pending_connects.push(super::PendingConnect {
813                            link_id,
814                            transport_id,
815                            remote_addr,
816                            peer_identity,
817                        });
818                    }
819                    Err(e) => {
820                        // Clean up link
821                        self.links.remove(&link_id);
822                        self.addr_to_link.remove(&(transport_id, remote_addr));
823                        return Err(NodeError::TransportError(e.to_string()));
824                    }
825                }
826            }
827            Ok(())
828        } else {
829            // Connectionless: proceed with immediate handshake
830            self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
831                .await
832        }
833    }
834
835    /// Start the Noise handshake on a link and send msg1.
836    ///
837    /// Called immediately for connectionless transports, or after the
838    /// transport connection is established for connection-oriented transports.
839    pub(super) async fn start_handshake(
840        &mut self,
841        link_id: LinkId,
842        transport_id: TransportId,
843        remote_addr: TransportAddr,
844        peer_identity: PeerIdentity,
845    ) -> Result<(), NodeError> {
846        let peer_node_addr = *peer_identity.node_addr();
847
848        // Create connection in handshake phase (outbound knows expected identity)
849        let current_time_ms = Self::now_ms();
850        let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
851
852        // Allocate a session index for this handshake
853        let our_index = match self.index_allocator.allocate() {
854            Ok(idx) => idx,
855            Err(e) => {
856                // Clean up the link we just created
857                self.links.remove(&link_id);
858                self.addr_to_link.remove(&(transport_id, remote_addr));
859                return Err(NodeError::IndexAllocationFailed(e.to_string()));
860            }
861        };
862
863        // Start the Noise handshake and get message 1
864        let our_keypair = self.identity.keypair();
865        let noise_msg1 =
866            match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
867                Ok(msg) => msg,
868                Err(e) => {
869                    // Clean up the index and link
870                    let _ = self.index_allocator.free(our_index);
871                    self.links.remove(&link_id);
872                    self.addr_to_link.remove(&(transport_id, remote_addr));
873                    return Err(NodeError::HandshakeFailed(e.to_string()));
874                }
875            };
876
877        // Set index and transport info on the connection
878        connection.set_our_index(our_index);
879        connection.set_transport_id(transport_id);
880        connection.set_source_addr(remote_addr.clone());
881
882        // Build wire format msg1: [0x01][sender_idx:4 LE][noise_msg1:82]
883        let wire_msg1 = build_msg1(our_index, &noise_msg1);
884
885        debug!(
886            peer = %self.peer_display_name(&peer_node_addr),
887            transport_id = %transport_id,
888            remote_addr = %remote_addr,
889            link_id = %link_id,
890            our_index = %our_index,
891            "Connection initiated"
892        );
893
894        // Store msg1 for resend and schedule first resend
895        let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
896        connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
897
898        // Track in pending_outbound for msg2 dispatch
899        self.pending_outbound
900            .insert((transport_id, our_index.as_u32()), link_id);
901        self.connections.insert(link_id, connection);
902
903        // Send the wire format handshake message. If the very first send fails
904        // synchronously (for example an IPv6 candidate on an IPv4-only UDP
905        // socket), undo this candidate so the caller can try the next address
906        // in the same dial pass.
907        let send_result = match self.transports.get(&transport_id) {
908            Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
909            None => None,
910        };
911        match send_result {
912            Some(send_result) => {
913                self.note_local_send_outcome(&send_result);
914                match send_result {
915                    Ok(bytes) => {
916                        debug!(
917                            link_id = %link_id,
918                            our_index = %our_index,
919                            bytes,
920                            "Sent Noise handshake message 1 (wire format)"
921                        );
922                    }
923                    Err(e) => {
924                        warn!(
925                            link_id = %link_id,
926                            transport_id = %transport_id,
927                            remote_addr = %remote_addr,
928                            our_index = %our_index,
929                            error = %e,
930                            "Failed to send handshake message"
931                        );
932                        self.pending_outbound
933                            .remove(&(transport_id, our_index.as_u32()));
934                        self.connections.remove(&link_id);
935                        self.links.remove(&link_id);
936                        self.addr_to_link
937                            .remove(&(transport_id, remote_addr.clone()));
938                        let _ = self.index_allocator.free(our_index);
939                        return Err(NodeError::TransportError(e.to_string()));
940                    }
941                }
942            }
943            None => {
944                self.pending_outbound
945                    .remove(&(transport_id, our_index.as_u32()));
946                self.connections.remove(&link_id);
947                self.links.remove(&link_id);
948                self.addr_to_link
949                    .remove(&(transport_id, remote_addr.clone()));
950                let _ = self.index_allocator.free(our_index);
951                return Err(NodeError::TransportError(format!(
952                    "transport {transport_id} disappeared before first handshake send"
953                )));
954            }
955        }
956
957        Ok(())
958    }
959
960    /// Poll all transports for discovered peers and auto-connect.
961    ///
962    /// Called from the tick handler. Iterates operational transports,
963    /// drains their discovery buffers, and initiates connections to
964    /// newly discovered peers (if auto_connect is enabled).
965    pub(super) async fn poll_transport_discovery(&mut self) {
966        // Collect discoveries first to avoid borrow conflict with self
967        let mut to_connect = Vec::new();
968        let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
969        let mut connect_budget = self.discovery_connect_budget();
970        let mut skipped_budget = 0usize;
971
972        for transport in self.transports.values() {
973            if !transport.is_operational() {
974                continue;
975            }
976            if !transport.auto_connect() {
977                // Still drain the buffer so it doesn't grow unbounded
978                let _ = transport.discover();
979                continue;
980            }
981            let discovered = match transport.discover() {
982                Ok(peers) => peers,
983                Err(_) => continue,
984            };
985            for peer in discovered {
986                let discovered_transport_id = peer.transport_id;
987                let pubkey = match peer.pubkey_hint {
988                    Some(pk) => pk,
989                    None => continue,
990                };
991                let identity = PeerIdentity::from_pubkey(pubkey);
992                let node_addr = *identity.node_addr();
993
994                // Skip self
995                if node_addr == *self.identity.node_addr() {
996                    continue;
997                }
998
999                let Some((candidate_transport_id, remote_addr, transport_name)) =
1000                    self.transport_discovery_candidate(discovered_transport_id, peer.addr)
1001                else {
1002                    continue;
1003                };
1004
1005                if self.peers.contains_key(&node_addr) {
1006                    let candidate = PeerAddress::new(transport_name, remote_addr.to_string());
1007                    if self.active_peer_candidate_is_fresh_enough_to_skip(
1008                        &node_addr,
1009                        std::slice::from_ref(&candidate),
1010                    ) {
1011                        continue;
1012                    }
1013                    if self.is_connecting_to_peer_on_path(
1014                        &node_addr,
1015                        candidate_transport_id,
1016                        &remote_addr,
1017                    ) {
1018                        continue;
1019                    }
1020                    let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1021                    if connect_budget == 0
1022                        || self
1023                            .path_candidate_attempt_budget(&node_addr)
1024                            .saturating_sub(queued_for_peer)
1025                            == 0
1026                    {
1027                        skipped_budget = skipped_budget.saturating_add(1);
1028                        continue;
1029                    }
1030                    to_connect.push((candidate_transport_id, remote_addr, identity, true));
1031                    *queued_per_peer.entry(node_addr).or_default() += 1;
1032                    connect_budget = connect_budget.saturating_sub(1);
1033                    continue;
1034                }
1035
1036                if self.is_connecting_to_peer_on_path(
1037                    &node_addr,
1038                    candidate_transport_id,
1039                    &remote_addr,
1040                ) {
1041                    continue;
1042                }
1043
1044                let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1045                if connect_budget == 0
1046                    || self
1047                        .path_candidate_attempt_budget(&node_addr)
1048                        .saturating_sub(queued_for_peer)
1049                        == 0
1050                {
1051                    skipped_budget = skipped_budget.saturating_add(1);
1052                    continue;
1053                }
1054                to_connect.push((candidate_transport_id, remote_addr, identity, false));
1055                *queued_per_peer.entry(node_addr).or_default() += 1;
1056                connect_budget = connect_budget.saturating_sub(1);
1057            }
1058        }
1059
1060        if skipped_budget > 0 {
1061            debug!(
1062                skipped = skipped_budget,
1063                queued = to_connect.len(),
1064                "Transport discovery connect budget exhausted"
1065            );
1066        }
1067
1068        for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1069            info!(
1070                peer = %self.peer_display_name(identity.node_addr()),
1071                transport_id = %transport_id,
1072                remote_addr = %remote_addr,
1073                active_refresh,
1074                "Auto-connecting to discovered peer"
1075            );
1076            if let Err(e) = self
1077                .initiate_connection(transport_id, remote_addr, identity)
1078                .await
1079            {
1080                warn!(error = %e, "Failed to auto-connect to discovered peer");
1081            }
1082        }
1083    }
1084
1085    pub(super) async fn poll_nostr_discovery(&mut self) {
1086        let Some(bootstrap) = self.nostr_discovery.clone() else {
1087            return;
1088        };
1089
1090        if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1091            debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1092        }
1093
1094        for event in bootstrap.drain_events().await {
1095            match event {
1096                BootstrapEvent::Established { traversal } => {
1097                    let peer_npub = traversal.peer_npub.clone();
1098                    match self.adopt_established_traversal(traversal).await {
1099                        Ok(_) => {
1100                            info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1101                        }
1102                        Err(err) => {
1103                            warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1104                            if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1105                                self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1106                            }
1107                        }
1108                    }
1109                }
1110                BootstrapEvent::Failed {
1111                    peer_config,
1112                    reason,
1113                } => {
1114                    let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1115                        Ok(identity) => identity,
1116                        Err(_) => continue,
1117                    };
1118                    let node_addr = *peer_identity.node_addr();
1119                    if self.peers.contains_key(&node_addr) {
1120                        debug!(
1121                            npub = %peer_config.npub,
1122                            error = %reason,
1123                            "Ignoring failed NAT traversal for already-connected peer"
1124                        );
1125                        continue;
1126                    }
1127                    if self.is_connecting_to_peer(&node_addr) {
1128                        debug!(
1129                            npub = %peer_config.npub,
1130                            error = %reason,
1131                            "Ignoring failed NAT traversal while peer handshake is already in progress"
1132                        );
1133                        continue;
1134                    }
1135
1136                    let now_ms = Self::now_ms();
1137                    let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1138                    if decision.should_warn {
1139                        warn!(
1140                            npub = %peer_config.npub,
1141                            error = %reason,
1142                            consecutive_failures = decision.consecutive_failures,
1143                            cooldown_secs = decision
1144                                .cooldown_until_ms
1145                                .map(|t| t.saturating_sub(now_ms) / 1000),
1146                            "NAT traversal failed"
1147                        );
1148                    } else {
1149                        debug!(
1150                            npub = %peer_config.npub,
1151                            error = %reason,
1152                            consecutive_failures = decision.consecutive_failures,
1153                            "NAT traversal failed (suppressed by warn-rate-limit)"
1154                        );
1155                    }
1156
1157                    // B6: stale-advert eviction on the streak-threshold
1158                    // crossing. Fire-and-forget; the outcome is logged so
1159                    // operators can see when peers get cleaned up.
1160                    if decision.crossed_threshold {
1161                        bootstrap
1162                            .request_advert_stale_check(peer_config.npub.clone())
1163                            .await;
1164                    }
1165
1166                    if self
1167                        .try_peer_addresses(&peer_config, peer_identity, false)
1168                        .await
1169                        .is_ok()
1170                    {
1171                        continue;
1172                    }
1173
1174                    self.schedule_retry(node_addr, now_ms);
1175                    if let Some(cooldown_until_ms) = decision.cooldown_until_ms
1176                        && let Some(state) = self.retry_pending.get_mut(&node_addr)
1177                    {
1178                        // Push the next retry past the cooldown so the
1179                        // open-discovery sweep doesn't re-enqueue and the
1180                        // per-attempt backoff doesn't fire sooner.
1181                        state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1182                    }
1183                }
1184            }
1185        }
1186
1187        self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1188            .await;
1189        self.queue_open_discovery_retries(&bootstrap).await;
1190    }
1191
1192    /// Resolve the LAN-only discovery scope. Applications with explicit
1193    /// connectivity config can set `node.discovery.lan.scope` without
1194    /// changing the public Nostr discovery `app` tag. The older fallback
1195    /// extracts a scope from the Nostr app tag used by default scoped
1196    /// discovery.
1197    pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1198        if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1199            let scope = scope.trim();
1200            if !scope.is_empty() {
1201                return Some(scope.to_string());
1202            }
1203        }
1204
1205        let app = self.config.node.discovery.nostr.app.trim();
1206        if app.is_empty() {
1207            return None;
1208        }
1209        if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1210            let scope = rest.trim();
1211            if scope.is_empty() {
1212                None
1213            } else {
1214                Some(scope.to_string())
1215            }
1216        } else {
1217            Some(app.to_string())
1218        }
1219    }
1220
1221    /// Drain mDNS-discovered peers and initiate Noise XX handshakes. For
1222    /// active peers this is a non-disruptive alternate-path refresh: the
1223    /// current link stays live until a new handshake authenticates and
1224    /// promotes. The handshake itself is the authentication — a spoofed
1225    /// mDNS advert with someone else's npub fails the XX exchange and
1226    /// is dropped.
1227    pub(super) async fn poll_lan_discovery(&mut self) {
1228        let Some(runtime) = self.lan_discovery.clone() else {
1229            return;
1230        };
1231        let events = runtime.drain_events().await;
1232        if events.is_empty() {
1233            return;
1234        }
1235        let mut connect_budget = self.discovery_connect_budget();
1236        let mut skipped_budget = 0usize;
1237        for event in events {
1238            let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1239            let Some((transport_id, local_addr)) =
1240                self.find_udp_transport_for_remote_addr(peer.addr)
1241            else {
1242                debug!(
1243                    addr = %peer.addr,
1244                    "lan: skip discovered peer with no compatible UDP transport"
1245                );
1246                continue;
1247            };
1248            let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1249                Ok(id) => id,
1250                Err(err) => {
1251                    debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1252                    continue;
1253                }
1254            };
1255            let peer_node_addr = *identity.node_addr();
1256            let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1257            if self.peers.contains_key(&peer_node_addr) {
1258                let candidate = PeerAddress::new("udp", peer.addr.to_string());
1259                if self.active_peer_candidate_is_fresh_enough_to_skip(
1260                    &peer_node_addr,
1261                    std::slice::from_ref(&candidate),
1262                ) {
1263                    continue;
1264                }
1265                if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1266                    continue;
1267                }
1268                if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1269                    skipped_budget = skipped_budget.saturating_add(1);
1270                    continue;
1271                }
1272                info!(
1273                    npub = %identity.short_npub(),
1274                    addr = %peer.addr,
1275                    local_addr = %local_addr,
1276                    "lan: initiating alternate-path handshake to active peer"
1277                );
1278                if let Err(err) = self
1279                    .initiate_connection(transport_id, remote_addr, identity)
1280                    .await
1281                {
1282                    debug!(
1283                        npub = %peer.npub,
1284                        error = %err,
1285                        "lan: failed to initiate active peer alternate-path handshake"
1286                    );
1287                }
1288                connect_budget = connect_budget.saturating_sub(1);
1289                continue;
1290            }
1291            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1292                continue;
1293            }
1294            if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1295                skipped_budget = skipped_budget.saturating_add(1);
1296                continue;
1297            }
1298            info!(
1299                npub = %identity.short_npub(),
1300                addr = %peer.addr,
1301                local_addr = %local_addr,
1302                "lan: initiating handshake to discovered peer"
1303            );
1304            if let Err(err) = self
1305                .initiate_connection(transport_id, remote_addr, identity)
1306                .await
1307            {
1308                debug!(
1309                    npub = %peer.npub,
1310                    error = %err,
1311                    "lan: failed to initiate connection to discovered peer"
1312                );
1313            }
1314            connect_budget = connect_budget.saturating_sub(1);
1315        }
1316        if skipped_budget > 0 {
1317            debug!(
1318                skipped = skipped_budget,
1319                "lan: discovery connect budget exhausted"
1320            );
1321        }
1322    }
1323
1324    /// Poll pending transport connects and initiate handshakes for ready ones.
1325    ///
1326    /// Called from the tick handler. For each pending connect, queries the
1327    /// transport's connection state. When a connection is established,
1328    /// marks the link as Connected and starts the Noise handshake.
1329    /// Failed connections are cleaned up and scheduled for retry.
1330    pub(super) async fn poll_pending_connects(&mut self) {
1331        if self.pending_connects.is_empty() {
1332            return;
1333        }
1334
1335        let mut completed = Vec::new();
1336
1337        for (i, pending) in self.pending_connects.iter().enumerate() {
1338            let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
1339                transport.connection_state(&pending.remote_addr)
1340            } else {
1341                crate::transport::ConnectionState::Failed("transport removed".into())
1342            };
1343
1344            match state {
1345                crate::transport::ConnectionState::Connected => {
1346                    completed.push((i, true, None));
1347                }
1348                crate::transport::ConnectionState::Failed(reason) => {
1349                    completed.push((i, false, Some(reason)));
1350                }
1351                crate::transport::ConnectionState::Connecting => {
1352                    // Still in progress, check on next tick
1353                }
1354                crate::transport::ConnectionState::None => {
1355                    // Shouldn't happen — treat as failure
1356                    completed.push((i, false, Some("no connection attempt found".into())));
1357                }
1358            }
1359        }
1360
1361        // Process completions in reverse order to preserve indices
1362        for (i, success, reason) in completed.into_iter().rev() {
1363            let pending = self.pending_connects.remove(i);
1364
1365            if success {
1366                // Mark link as Connected
1367                if let Some(link) = self.links.get_mut(&pending.link_id) {
1368                    link.set_connected();
1369                }
1370
1371                debug!(
1372                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1373                    transport_id = %pending.transport_id,
1374                    remote_addr = %pending.remote_addr,
1375                    link_id = %pending.link_id,
1376                    "Transport connected, starting handshake"
1377                );
1378
1379                // Start the handshake now that the transport is connected
1380                if let Err(e) = self
1381                    .start_handshake(
1382                        pending.link_id,
1383                        pending.transport_id,
1384                        pending.remote_addr.clone(),
1385                        pending.peer_identity,
1386                    )
1387                    .await
1388                {
1389                    warn!(
1390                        link_id = %pending.link_id,
1391                        error = %e,
1392                        "Failed to start handshake after transport connect"
1393                    );
1394                    // Clean up link on handshake failure
1395                    self.remove_link(&pending.link_id);
1396                }
1397            } else {
1398                let reason = reason.unwrap_or_default();
1399                warn!(
1400                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1401                    transport_id = %pending.transport_id,
1402                    remote_addr = %pending.remote_addr,
1403                    link_id = %pending.link_id,
1404                    reason = %reason,
1405                    "Transport connect failed"
1406                );
1407
1408                // Clean up link and schedule retry
1409                self.remove_link(&pending.link_id);
1410                self.links.remove(&pending.link_id);
1411                self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
1412            }
1413        }
1414    }
1415
1416    // === State Transitions ===
1417
1418    /// Start the node.
1419    ///
1420    /// Initializes the TUN interface (if configured), spawns I/O threads,
1421    /// and transitions to the Running state.
1422    pub async fn start(&mut self) -> Result<(), NodeError> {
1423        node_start_debug_log("Node::start begin");
1424        if !self.state.can_start() {
1425            return Err(NodeError::AlreadyStarted);
1426        }
1427        self.state = NodeState::Starting;
1428        node_start_debug_log("Node::start state set to starting");
1429
1430        // Create packet channel for transport -> Node communication
1431        let packet_buffer_size = self.config.node.buffers.packet_channel;
1432        let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
1433        self.packet_tx = Some(packet_tx.clone());
1434        self.packet_rx = Some(packet_rx);
1435        node_start_debug_log("Node::start packet channel created");
1436
1437        // Initialize transports first (before TUN, before Nostr discovery).
1438        node_start_debug_log("Node::start create transports begin");
1439        let transport_handles = self.create_transports(&packet_tx).await;
1440        node_start_debug_log(format!(
1441            "Node::start create transports complete count={}",
1442            transport_handles.len()
1443        ));
1444
1445        for mut handle in transport_handles {
1446            let transport_id = handle.transport_id();
1447            let transport_type = handle.transport_type().name;
1448            let name = handle.name().map(|s| s.to_string());
1449
1450            node_start_debug_log(format!(
1451                "Node::start transport start begin id={} type={} name={:?}",
1452                transport_id, transport_type, name
1453            ));
1454            match handle.start().await {
1455                Ok(()) => {
1456                    node_start_debug_log(format!(
1457                        "Node::start transport start ok id={} type={}",
1458                        transport_id, transport_type
1459                    ));
1460                    self.transports.insert(transport_id, handle);
1461                }
1462                Err(e) => {
1463                    node_start_debug_log(format!(
1464                        "Node::start transport start error id={} type={} error={}",
1465                        transport_id, transport_type, e
1466                    ));
1467                    if let Some(ref n) = name {
1468                        warn!(transport_type, name = %n, error = %e, "Transport failed to start");
1469                    } else {
1470                        warn!(transport_type, error = %e, "Transport failed to start");
1471                    }
1472                }
1473            }
1474        }
1475
1476        if !self.transports.is_empty() {
1477            info!(count = self.transports.len(), "Transports initialized");
1478        }
1479
1480        // Spawn the off-task FMP-encrypt + UDP-send + FMP-decrypt
1481        // worker pools. **Unix only** — both pools issue direct
1482        // sendmmsg(2) / sendmsg(2)+UDP_GSO / recvmmsg(2) calls on
1483        // raw file descriptors via `AsRawFd`, which is a unix-only
1484        // trait. On Windows the rx_loop's tokio-based send/recv
1485        // remain the canonical path; the perf overhaul lands its
1486        // gains on unix.
1487        //
1488        // Worker count defaults to the number of CPUs, overridable
1489        // via `FIPS_ENCRYPT_WORKERS=N` / `FIPS_DECRYPT_WORKERS=N`
1490        // for debug / benchmarking. Hash-by-destination means a
1491        // single TCP flow pins to one worker (preserves wire
1492        // ordering); additional workers light up under multi-flow
1493        // / multi-peer load. See `node::encrypt_worker` /
1494        // `node::decrypt_worker` for full rationale.
1495        #[cfg(unix)]
1496        {
1497            if self.config.node.worker_pools_enabled {
1498                node_start_debug_log("Node::start worker pools begin");
1499                let cpu_default = std::thread::available_parallelism()
1500                    .map(|n| n.get())
1501                    .unwrap_or(1)
1502                    .max(1);
1503                let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
1504                    .ok()
1505                    .and_then(|s| s.parse().ok())
1506                    .unwrap_or(cpu_default)
1507                    .max(1);
1508                self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
1509                    encrypt_worker_count,
1510                ));
1511                info!(
1512                    workers = encrypt_worker_count,
1513                    "Spawned FMP-encrypt worker pool"
1514                );
1515
1516                // `FIPS_DECRYPT_WORKERS=0` disables the pool entirely and
1517                // falls through to the in-line rx_loop decrypt path (the
1518                // "test-mode" branch in `handle_encrypted_frame`, which is
1519                // in fact a fully functional synchronous decrypt). Useful
1520                // as an A/B against the worker pipeline when chasing
1521                // scheduling/queueing regressions on the native macOS
1522                // path. Any non-zero value (env or default) spawns the
1523                // pool as before.
1524                let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
1525                    .ok()
1526                    .and_then(|s| s.parse().ok())
1527                    .unwrap_or(cpu_default);
1528                if decrypt_worker_count == 0 {
1529                    info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
1530                } else {
1531                    self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
1532                        decrypt_worker_count,
1533                    ));
1534                    info!(
1535                        workers = decrypt_worker_count,
1536                        "Spawned FMP+FSP-decrypt worker pool"
1537                    );
1538                }
1539                node_start_debug_log("Node::start worker pools complete");
1540            } else {
1541                node_start_debug_log("Node::start worker pools disabled");
1542                info!("FIPS worker pools disabled; using in-line crypto/send path");
1543            }
1544        }
1545
1546        if self.config.node.discovery.nostr.enabled {
1547            node_start_debug_log("Node::start nostr discovery start begin");
1548            match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
1549                .await
1550            {
1551                Ok(runtime) => {
1552                    node_start_debug_log("Node::start nostr discovery runtime created");
1553                    if let Err(err) = self.refresh_overlay_advert(&runtime).await {
1554                        warn!(error = %err, "Failed to publish initial Nostr overlay advert");
1555                    }
1556                    node_start_debug_log("Node::start nostr overlay advert refreshed");
1557                    self.nostr_discovery = Some(runtime);
1558                    self.nostr_discovery_started_at_ms = Some(Self::now_ms());
1559                    info!("Nostr overlay discovery enabled");
1560                }
1561                Err(err) => {
1562                    node_start_debug_log(format!(
1563                        "Node::start nostr discovery start error error={}",
1564                        err
1565                    ));
1566                    warn!(error = %err, "Failed to start Nostr overlay discovery");
1567                }
1568            }
1569        }
1570
1571        // mDNS / DNS-SD LAN discovery. Independent of Nostr — runs even
1572        // when Nostr is disabled, since it gives us sub-second pairing
1573        // on the same link without any relay or NAT-traversal roundtrip.
1574        if self.config.node.discovery.lan.enabled {
1575            node_start_debug_log("Node::start lan discovery start begin");
1576            let advertised_udp_port = self
1577                .transports
1578                .values()
1579                .filter(|h| h.is_operational())
1580                .filter(|h| h.transport_type().name == "udp")
1581                .find_map(|h| h.local_addr().map(|addr| addr.port()))
1582                .unwrap_or(0);
1583            let scope = self.lan_discovery_scope();
1584            match crate::discovery::lan::LanDiscovery::start(
1585                &self.identity,
1586                scope,
1587                advertised_udp_port,
1588                self.config.node.discovery.lan.clone(),
1589            )
1590            .await
1591            {
1592                Ok(runtime) => {
1593                    node_start_debug_log("Node::start lan discovery start ok");
1594                    self.lan_discovery = Some(runtime);
1595                    info!("LAN mDNS discovery enabled");
1596                }
1597                Err(err) => {
1598                    node_start_debug_log(format!(
1599                        "Node::start lan discovery start error error={}",
1600                        err
1601                    ));
1602                    debug!(error = %err, "LAN mDNS discovery not started");
1603                }
1604            }
1605        }
1606
1607        // Connect to static peers before TUN is active
1608        // This allows handshake messages to be sent before we start accepting packets
1609        node_start_debug_log("Node::start initiate peer connections begin");
1610        self.initiate_peer_connections().await;
1611        node_start_debug_log("Node::start initiate peer connections complete");
1612
1613        // Initialize TUN interface last, after transports and peers are ready
1614        if self.config.tun.enabled {
1615            node_start_debug_log("Node::start tun init begin");
1616            let address = *self.identity.address();
1617            match TunDevice::create(&self.config.tun, address).await {
1618                Ok(device) => {
1619                    let mtu = device.mtu();
1620                    let name = device.name().to_string();
1621                    let our_addr = *device.address();
1622
1623                    info!("TUN device active:");
1624                    info!("     name: {}", name);
1625                    info!("  address: {}", device.address());
1626                    info!("      mtu: {}", mtu);
1627
1628                    // Calculate max MSS for TCP clamping
1629                    let effective_mtu = self.effective_ipv6_mtu();
1630                    let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); // IPv6 + TCP headers
1631
1632                    info!("effective MTU: {} bytes", effective_mtu);
1633                    debug!("   max TCP MSS: {} bytes", max_mss);
1634
1635                    // On macOS, create a shutdown pipe. Writing to it unblocks the
1636                    // reader thread's select() loop without closing the TUN fd
1637                    // (which would cause a double-close when TunDevice drops).
1638                    #[cfg(target_os = "macos")]
1639                    let (shutdown_read_fd, shutdown_write_fd) = {
1640                        let mut fds = [0i32; 2];
1641                        if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
1642                            return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
1643                                "failed to create shutdown pipe".into(),
1644                            )));
1645                        }
1646                        (fds[0], fds[1])
1647                    };
1648
1649                    // Create writer (dups the fd for independent write access).
1650                    // Pass path_mtu_lookup so inbound SYN-ACK clamp can read
1651                    // per-destination path MTU learned via discovery.
1652                    let (writer, tun_tx) =
1653                        device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
1654
1655                    // Spawn writer thread
1656                    let writer_handle = thread::spawn(move || {
1657                        writer.run();
1658                    });
1659
1660                    // Clone tun_tx for the reader
1661                    let reader_tun_tx = tun_tx.clone();
1662
1663                    // Create outbound channel for TUN reader → Node
1664                    let tun_channel_size = self.config.node.buffers.tun_channel;
1665                    let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
1666
1667                    // Spawn reader thread
1668                    let transport_mtu = self.transport_mtu();
1669                    let path_mtu_lookup = self.path_mtu_lookup.clone();
1670                    #[cfg(target_os = "macos")]
1671                    let reader_handle = thread::spawn(move || {
1672                        run_tun_reader(
1673                            device,
1674                            mtu,
1675                            our_addr,
1676                            reader_tun_tx,
1677                            outbound_tx,
1678                            transport_mtu,
1679                            path_mtu_lookup,
1680                            shutdown_read_fd,
1681                        );
1682                    });
1683                    #[cfg(not(target_os = "macos"))]
1684                    let reader_handle = thread::spawn(move || {
1685                        run_tun_reader(
1686                            device,
1687                            mtu,
1688                            our_addr,
1689                            reader_tun_tx,
1690                            outbound_tx,
1691                            transport_mtu,
1692                            path_mtu_lookup,
1693                        );
1694                    });
1695
1696                    self.tun_state = TunState::Active;
1697                    self.tun_name = Some(name);
1698                    self.tun_tx = Some(tun_tx);
1699                    self.tun_outbound_rx = Some(outbound_rx);
1700                    self.tun_reader_handle = Some(reader_handle);
1701                    self.tun_writer_handle = Some(writer_handle);
1702                    #[cfg(target_os = "macos")]
1703                    {
1704                        self.tun_shutdown_fd = Some(shutdown_write_fd);
1705                    }
1706                }
1707                Err(e) => {
1708                    self.tun_state = TunState::Failed;
1709                    warn!(error = %e, "Failed to initialize TUN, continuing without it");
1710                }
1711            }
1712            node_start_debug_log("Node::start tun init complete");
1713        }
1714
1715        // Initialize DNS responder (independent of TUN).
1716        //
1717        // Default bind_addr is "::1" (IPv6 loopback). The shipped
1718        // fips-dns-setup configures systemd-resolved via a global
1719        // /etc/systemd/resolved.conf.d/fips.conf drop-in pointing at
1720        // [::1]:5354, which sidesteps a Linux IPV6_PKTINFO behaviour
1721        // where self-destined traffic to fips0's address is attributed
1722        // to fips0 in PKTINFO and gets silently dropped by the
1723        // mesh-interface filter in src/upper/dns.rs.
1724        //
1725        // For mesh-reachable resolution (rare), set bind_addr: "::"
1726        // in fips.yaml. The mesh-interface filter remains active to
1727        // prevent hosts-file alias enumeration in that mode.
1728        // `IPV6_V6ONLY=0` is set explicitly so IPv4 clients on
1729        // 127.0.0.1 still reach us regardless of kernel sysctl
1730        // defaults — but only when bind is on a wildcard / IPv6 path.
1731        if self.config.dns.enabled {
1732            node_start_debug_log("Node::start dns init begin");
1733            let addr_str = self.config.dns.bind_addr();
1734            match addr_str.parse::<std::net::IpAddr>() {
1735                Ok(ip) => {
1736                    let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
1737                    match Self::bind_dns_socket(bind) {
1738                        Ok(socket) => {
1739                            let dns_channel_size = self.config.node.buffers.dns_channel;
1740                            let (identity_tx, identity_rx) =
1741                                tokio::sync::mpsc::channel(dns_channel_size);
1742                            let dns_ttl = self.config.dns.ttl();
1743                            let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
1744                                self.config.peers(),
1745                            );
1746                            let reloader = if self.config.node.system_files_enabled {
1747                                let hosts_path = std::path::PathBuf::from(
1748                                    crate::upper::hosts::DEFAULT_HOSTS_PATH,
1749                                );
1750                                crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
1751                            } else {
1752                                crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
1753                            };
1754                            // Resolve the TUN ifindex so the responder can
1755                            // drop queries arriving on the mesh interface
1756                            // (fips0). Without this, the `::` bind exposes
1757                            // /etc/fips/hosts alias probing to any mesh peer.
1758                            // When TUN isn't enabled or the name can't be
1759                            // resolved, `None` disables the filter (there
1760                            // is no mesh surface to defend anyway).
1761                            let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
1762                            info!(
1763                                bind = %bind,
1764                                hosts = reloader.hosts().len(),
1765                                mesh_ifindex = ?mesh_ifindex,
1766                                "DNS responder started for .fips domain (auto-reload enabled)"
1767                            );
1768                            let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
1769                                socket,
1770                                identity_tx,
1771                                dns_ttl,
1772                                reloader,
1773                                mesh_ifindex,
1774                            ));
1775                            self.dns_identity_rx = Some(identity_rx);
1776                            self.dns_task = Some(handle);
1777                        }
1778                        Err(e) => {
1779                            warn!(bind = %bind, error = %e, "Failed to start DNS responder");
1780                        }
1781                    }
1782                }
1783                Err(e) => {
1784                    warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
1785                }
1786            }
1787            node_start_debug_log("Node::start dns init complete");
1788        }
1789
1790        self.state = NodeState::Running;
1791        node_start_debug_log("Node::start running");
1792        info!("Node started:");
1793        info!("       state: {}", self.state);
1794        info!("  transports: {}", self.transports.len());
1795        info!(" connections: {}", self.connections.len());
1796        Ok(())
1797    }
1798
1799    /// Bind a UDP socket for the DNS responder.
1800    ///
1801    /// For IPv6 binds (including `::`), sets `IPV6_V6ONLY=0` so the socket
1802    /// also accepts IPv4-mapped addresses. This guarantees dual-stack
1803    /// delivery regardless of `net.ipv6.bindv6only` sysctl on the host —
1804    /// v4 clients on 127.0.0.1 and v6 clients on the fips0 address both
1805    /// land on the same socket.
1806    ///
1807    /// Also enables `IPV6_RECVPKTINFO` on IPv6 sockets so the responder
1808    /// can learn the arrival interface per packet. The responder uses that
1809    /// to drop queries arriving on the mesh TUN, closing the hosts-file
1810    /// probing side-channel created by the `::` bind.
1811    fn bind_dns_socket(
1812        addr: std::net::SocketAddr,
1813    ) -> Result<tokio::net::UdpSocket, std::io::Error> {
1814        use socket2::{Domain, Protocol, Socket, Type};
1815        let domain = if addr.is_ipv4() {
1816            Domain::IPV4
1817        } else {
1818            Domain::IPV6
1819        };
1820        let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
1821        if addr.is_ipv6() {
1822            sock.set_only_v6(false)?;
1823            #[cfg(unix)]
1824            Self::set_recv_pktinfo_v6(&sock)?;
1825        }
1826        sock.set_nonblocking(true)?;
1827        sock.bind(&addr.into())?;
1828        tokio::net::UdpSocket::from_std(sock.into())
1829    }
1830
1831    /// Enable `IPV6_RECVPKTINFO` on an IPv6 UDP socket.
1832    ///
1833    /// After this setsockopt, each `recvmsg()` call on the socket receives
1834    /// an `IPV6_PKTINFO` control message containing the arrival interface
1835    /// index, which the DNS responder uses for its mesh-interface filter.
1836    #[cfg(unix)]
1837    fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
1838        use std::os::fd::AsRawFd;
1839        let enable: libc::c_int = 1;
1840        let ret = unsafe {
1841            libc::setsockopt(
1842                sock.as_raw_fd(),
1843                libc::IPPROTO_IPV6,
1844                libc::IPV6_RECVPKTINFO,
1845                &enable as *const _ as *const libc::c_void,
1846                std::mem::size_of::<libc::c_int>() as libc::socklen_t,
1847            )
1848        };
1849        if ret < 0 {
1850            return Err(std::io::Error::last_os_error());
1851        }
1852        Ok(())
1853    }
1854
1855    /// Resolve the mesh TUN interface index by name.
1856    ///
1857    /// Returns `None` if the interface does not exist (e.g. TUN disabled
1858    /// or not yet created). A `None` result disables the DNS responder's
1859    /// mesh-interface filter — safe, because if there is no fips0 there
1860    /// is no mesh exposure to defend against.
1861    fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
1862        #[cfg(unix)]
1863        {
1864            let c_name = std::ffi::CString::new(name).ok()?;
1865            let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
1866            if idx == 0 { None } else { Some(idx) }
1867        }
1868        #[cfg(not(unix))]
1869        {
1870            let _ = name;
1871            None
1872        }
1873    }
1874
1875    /// Stop the node.
1876    ///
1877    /// Shuts down TUN interface, stops I/O threads, and transitions to
1878    /// the Stopped state.
1879    pub async fn stop(&mut self) -> Result<(), NodeError> {
1880        if !self.state.can_stop() {
1881            return Err(NodeError::NotStarted);
1882        }
1883        self.state = NodeState::Stopping;
1884        info!(state = %self.state, "Node stopping");
1885
1886        // Stop DNS responder
1887        if let Some(handle) = self.dns_task.take() {
1888            handle.abort();
1889            debug!("DNS responder stopped");
1890        }
1891
1892        // Send disconnect notifications to all active peers before closing transports
1893        self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
1894            .await;
1895
1896        // Stop Nostr overlay discovery background work and withdraw any advert.
1897        if let Some(bootstrap) = self.nostr_discovery.take()
1898            && let Err(e) = bootstrap.shutdown().await
1899        {
1900            warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
1901        }
1902
1903        // Tear down LAN mDNS responder + browser. Best-effort: the
1904        // OS will eventually time the advert out via its TTL even if
1905        // we don't get a clean unregister out before the daemon exits.
1906        if let Some(lan) = self.lan_discovery.take() {
1907            lan.shutdown().await;
1908        }
1909
1910        // Shutdown transports (they're packet producers)
1911        let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
1912        for transport_id in transport_ids {
1913            if let Some(mut handle) = self.transports.remove(&transport_id) {
1914                let transport_type = handle.transport_type().name;
1915                match handle.stop().await {
1916                    Ok(()) => {
1917                        info!(transport_id = %transport_id, transport_type, "Transport stopped");
1918                    }
1919                    Err(e) => {
1920                        warn!(
1921                            transport_id = %transport_id,
1922                            transport_type,
1923                            error = %e,
1924                            "Transport stop failed"
1925                        );
1926                    }
1927                }
1928            }
1929        }
1930
1931        // Drop packet channels
1932        self.packet_tx.take();
1933        self.packet_rx.take();
1934
1935        // Shutdown TUN interface
1936        if let Some(name) = self.tun_name.take() {
1937            info!(name = %name, "Shutting down TUN interface");
1938
1939            // Drop the tun_tx to signal the writer to stop
1940            self.tun_tx.take();
1941
1942            // Delete the interface (on Linux, causes reader to get EFAULT)
1943            if let Err(e) = shutdown_tun_interface(&name).await {
1944                warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
1945            }
1946
1947            // On macOS, signal the reader thread to exit by writing to the
1948            // shutdown pipe. The reader's select() will wake up and break.
1949            #[cfg(target_os = "macos")]
1950            if let Some(fd) = self.tun_shutdown_fd.take() {
1951                unsafe {
1952                    libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
1953                    libc::close(fd);
1954                }
1955            }
1956
1957            // Wait for threads to finish
1958            if let Some(handle) = self.tun_reader_handle.take() {
1959                let _ = handle.join();
1960            }
1961            if let Some(handle) = self.tun_writer_handle.take() {
1962                let _ = handle.join();
1963            }
1964
1965            self.tun_state = TunState::Disabled;
1966        }
1967
1968        self.state = NodeState::Stopped;
1969        info!(state = %self.state, "Node stopped");
1970        Ok(())
1971    }
1972
1973    /// Send disconnect notifications to all active peers.
1974    ///
1975    /// Best-effort: send failures are logged and ignored since the transport
1976    /// may already be degraded. This runs before transports are shut down.
1977    async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
1978        let disconnect = Disconnect::new(reason);
1979        let plaintext = disconnect.encode();
1980
1981        // Collect node_addrs to avoid borrow conflict with send helper
1982        let peer_addrs: Vec<NodeAddr> = self
1983            .peers
1984            .iter()
1985            .filter(|(_, peer)| peer.can_send() && peer.has_session())
1986            .map(|(addr, _)| *addr)
1987            .collect();
1988
1989        if peer_addrs.is_empty() {
1990            debug!(
1991                total_peers = self.peers.len(),
1992                "No sendable peers for disconnect notification"
1993            );
1994            return;
1995        }
1996
1997        let mut sent = 0usize;
1998        for node_addr in &peer_addrs {
1999            match self
2000                .send_encrypted_link_message(node_addr, &plaintext)
2001                .await
2002            {
2003                Ok(()) => sent += 1,
2004                Err(e) => {
2005                    debug!(
2006                        peer = %self.peer_display_name(node_addr),
2007                        error = %e,
2008                        "Failed to send disconnect (transport may be down)"
2009                    );
2010                }
2011            }
2012        }
2013
2014        info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2015    }
2016
2017    fn static_peer_addresses(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
2018        peer_config
2019            .addresses_by_priority()
2020            .into_iter()
2021            .cloned()
2022            .collect()
2023    }
2024
2025    async fn nostr_peer_fallback_addresses(
2026        &self,
2027        peer_config: &PeerConfig,
2028        existing: &[PeerAddress],
2029    ) -> Vec<PeerAddress> {
2030        if !self.config.node.discovery.nostr.enabled
2031            || self.config.node.discovery.nostr.policy
2032                == crate::config::NostrDiscoveryPolicy::Disabled
2033        {
2034            return Vec::new();
2035        }
2036
2037        let Some(bootstrap) = self.nostr_discovery.clone() else {
2038            return Vec::new();
2039        };
2040        let endpoints = match bootstrap
2041            .cached_advert_endpoints_for_peer(&peer_config.npub)
2042            .await
2043        {
2044            Some(endpoints) => endpoints,
2045            None => {
2046                debug!(
2047                    npub = %peer_config.npub,
2048                    "No cached Nostr advert endpoints for configured peer"
2049                );
2050                return Vec::new();
2051            }
2052        };
2053
2054        let mut fallback = Vec::new();
2055        let mut next_priority = existing
2056            .iter()
2057            .map(|addr| addr.priority)
2058            .max()
2059            .unwrap_or(100)
2060            .saturating_add(1);
2061        // Stamp every overlay-derived candidate with the current wall clock
2062        // so the dialer ranks it ahead of unstamped operator hints. We use a
2063        // single timestamp per fetch (rather than per-endpoint) because all
2064        // candidates in a single advert are equally fresh.
2065        let seen_at_ms = Self::now_ms();
2066        for endpoint in endpoints {
2067            let Some(candidate) =
2068                Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2069            else {
2070                continue;
2071            };
2072            if existing
2073                .iter()
2074                .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2075                || fallback.iter().any(|addr: &PeerAddress| {
2076                    addr.transport == candidate.transport && addr.addr == candidate.addr
2077                })
2078            {
2079                continue;
2080            }
2081            fallback.push(candidate);
2082            next_priority = next_priority.saturating_add(1);
2083        }
2084        fallback
2085    }
2086
2087    async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2088        if !self.config.node.discovery.nostr.enabled
2089            || self.config.node.discovery.nostr.policy
2090                == crate::config::NostrDiscoveryPolicy::Disabled
2091        {
2092            return false;
2093        }
2094        let Some(bootstrap) = self.nostr_discovery.clone() else {
2095            return false;
2096        };
2097        bootstrap.request_connect(peer_config.clone()).await;
2098        info!(npub = %peer_config.npub, "Started background Nostr UDP NAT traversal attempt");
2099        true
2100    }
2101
2102    fn overlay_endpoint_to_peer_address(
2103        endpoint: &OverlayEndpointAdvert,
2104        priority: u8,
2105        seen_at_ms: u64,
2106    ) -> Option<PeerAddress> {
2107        let transport = match endpoint.transport {
2108            OverlayTransportKind::Udp => "udp",
2109            OverlayTransportKind::Tcp => "tcp",
2110            OverlayTransportKind::Tor => "tor",
2111        };
2112        Some(
2113            PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2114                .with_seen_at_ms(seen_at_ms),
2115        )
2116    }
2117
2118    async fn attempt_peer_address_list(
2119        &mut self,
2120        peer_config: &PeerConfig,
2121        peer_identity: PeerIdentity,
2122        allow_bootstrap_nat: bool,
2123        addresses: &[PeerAddress],
2124    ) -> Result<(), NodeError> {
2125        let mut attempted = false;
2126        let peer_node_addr = *peer_identity.node_addr();
2127        let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2128
2129        for addr in addresses {
2130            if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2131                if !allow_bootstrap_nat {
2132                    continue;
2133                }
2134                if self.request_nostr_bootstrap(peer_config).await {
2135                    attempted = true;
2136                    continue;
2137                }
2138                debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2139                continue;
2140            }
2141
2142            let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2143                match self.resolve_ethernet_addr(&addr.addr) {
2144                    Ok(result) => result,
2145                    Err(e) => {
2146                        debug!(
2147                            transport = %addr.transport,
2148                            addr = %addr.addr,
2149                            error = %e,
2150                            "Failed to resolve Ethernet address"
2151                        );
2152                        continue;
2153                    }
2154                }
2155            } else if addr.transport == "ble" {
2156                #[cfg(bluer_available)]
2157                {
2158                    match self.resolve_ble_addr(&addr.addr) {
2159                        Ok(result) => result,
2160                        Err(e) => {
2161                            debug!(
2162                                transport = %addr.transport,
2163                                addr = %addr.addr,
2164                                error = %e,
2165                                "Failed to resolve BLE address"
2166                            );
2167                            continue;
2168                        }
2169                    }
2170                }
2171                #[cfg(not(bluer_available))]
2172                {
2173                    debug!(transport = %addr.transport, "BLE transport not available on this build");
2174                    continue;
2175                }
2176            } else {
2177                let tid = if addr.transport == "udp"
2178                    && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2179                {
2180                    match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2181                        Some((id, _)) => id,
2182                        None => {
2183                            debug!(
2184                                transport = %addr.transport,
2185                                addr = %addr.addr,
2186                                "No compatible operational UDP transport for address"
2187                            );
2188                            continue;
2189                        }
2190                    }
2191                } else {
2192                    match self.find_transport_for_type(&addr.transport) {
2193                        Some(id) => id,
2194                        None => {
2195                            debug!(
2196                                transport = %addr.transport,
2197                                addr = %addr.addr,
2198                                "No operational transport for address type"
2199                            );
2200                            continue;
2201                        }
2202                    }
2203                };
2204                (tid, TransportAddr::from_string(&addr.addr))
2205            };
2206
2207            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2208                attempted = true;
2209                debug!(
2210                    npub = %peer_config.npub,
2211                    transport_id = %transport_id,
2212                    remote_addr = %remote_addr,
2213                    "Skipping duplicate in-flight candidate path"
2214                );
2215                continue;
2216            }
2217
2218            if concrete_budget == 0 {
2219                debug!(
2220                    npub = %peer_config.npub,
2221                    max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2222                    "Path candidate race budget exhausted"
2223                );
2224                break;
2225            }
2226
2227            match self
2228                .initiate_connection(transport_id, remote_addr, peer_identity)
2229                .await
2230            {
2231                Ok(()) => {
2232                    attempted = true;
2233                    concrete_budget = concrete_budget.saturating_sub(1);
2234                }
2235                Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2236                Err(e) => {
2237                    debug!(
2238                        npub = %peer_config.npub,
2239                        transport_id = %transport_id,
2240                        error = %e,
2241                        "Connection attempt failed, trying next address"
2242                    );
2243                }
2244            }
2245        }
2246
2247        if attempted {
2248            return Ok(());
2249        }
2250
2251        Err(NodeError::NoTransportForType(format!(
2252            "no operational transport for any of {}'s addresses",
2253            peer_config.npub
2254        )))
2255    }
2256
2257    async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
2258        self.run_open_discovery_sweep(bootstrap, None, "per-tick")
2259            .await;
2260    }
2261
2262    /// Open-discovery cache sweep. Iterates the cached overlay adverts and
2263    /// queues retries for non-configured, not-yet-connected peers.
2264    ///
2265    /// `max_age_secs`, if set, filters out adverts whose `created_at` is
2266    /// older than `now - max_age_secs`. The per-tick sweep passes `None`
2267    /// (relies on the cache's own `valid_until_ms` filter); the one-shot
2268    /// startup sweep passes `Some(startup_sweep_max_age_secs)`.
2269    ///
2270    /// `caller` is a short label included in log lines so per-tick and
2271    /// startup sweeps are distinguishable in operator-facing logs.
2272    pub(in crate::node) async fn run_open_discovery_sweep(
2273        &mut self,
2274        bootstrap: &std::sync::Arc<NostrDiscovery>,
2275        max_age_secs: Option<u64>,
2276        caller: &'static str,
2277    ) {
2278        if !self.config.node.discovery.nostr.enabled
2279            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2280        {
2281            return;
2282        }
2283
2284        let configured_npubs = self
2285            .config
2286            .peers()
2287            .iter()
2288            .map(|peer| peer.npub.clone())
2289            .collect::<HashSet<_>>();
2290        let now_ms = Self::now_ms();
2291        let now_secs = now_ms / 1000;
2292        let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
2293        if enqueue_budget == 0 {
2294            debug!(
2295                caller = %caller,
2296                "open-discovery sweep: enqueue budget is 0, skipping"
2297            );
2298            return;
2299        }
2300
2301        let candidates = bootstrap.cached_open_discovery_candidates(64).await;
2302        let cached_count = candidates.len();
2303        let mut enqueued = 0usize;
2304        let mut skipped_age = 0usize;
2305        let mut skipped_configured = 0usize;
2306        let mut skipped_self = 0usize;
2307        let mut skipped_connected = 0usize;
2308        let mut skipped_retry_pending = 0usize;
2309        let mut skipped_connecting = 0usize;
2310        let mut skipped_no_endpoints = 0usize;
2311        let mut skipped_invalid_npub = 0usize;
2312        let mut skipped_cooldown = 0usize;
2313
2314        for (npub, endpoints, created_at_secs) in candidates {
2315            if enqueue_budget == 0 {
2316                break;
2317            }
2318
2319            if let Some(max_age) = max_age_secs
2320                && now_secs.saturating_sub(created_at_secs) > max_age
2321            {
2322                skipped_age = skipped_age.saturating_add(1);
2323                continue;
2324            }
2325
2326            if configured_npubs.contains(&npub) {
2327                // Configured peers don't go through the open-discovery
2328                // enqueue path — their `PeerConfig` is already in
2329                // `self.config.peers()`, so the regular retry queue is
2330                // what drives their reconnect. But on cold start with
2331                // NAT'd peers, every initial `initiate_peer_connection`
2332                // fails (no overlay data yet, static cache hints empty
2333                // or stale), each pushes the peer into `retry_pending`
2334                // with exponential backoff (5/10/20/40/80s), and by the
2335                // time the next backoff slot fires the Nostr advert is
2336                // already cached — we just don't act on it for ~80s.
2337                //
2338                // The arrival of an advert (which this sweep sees) means
2339                // we now have a path to dial. If the peer's retry is
2340                // scheduled in the future, pull it forward to "now" so
2341                // the next `process_pending_retries` tick fires it
2342                // immediately. The retry path (`initiate_peer_retry_
2343                // connection` → `try_peer_addresses`) then refetches
2344                // the advert and dials it — no behavioral change
2345                // beyond schedule timing.
2346                if let Ok(identity) = PeerIdentity::from_npub(&npub) {
2347                    let configured_addr = *identity.node_addr();
2348                    if let Some(state) = self.retry_pending.get_mut(&configured_addr)
2349                        && state.retry_after_ms > now_ms
2350                    {
2351                        state.retry_after_ms = now_ms;
2352                        debug!(
2353                            caller = %caller,
2354                            peer = %self.peer_display_name(&configured_addr),
2355                            advert_age_secs = now_secs.saturating_sub(created_at_secs),
2356                            "Expediting configured-peer retry after fresh overlay advert"
2357                        );
2358                    }
2359                }
2360                skipped_configured = skipped_configured.saturating_add(1);
2361                continue;
2362            }
2363
2364            let peer_identity = match PeerIdentity::from_npub(&npub) {
2365                Ok(identity) => identity,
2366                Err(_) => {
2367                    skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
2368                    continue;
2369                }
2370            };
2371            let node_addr = *peer_identity.node_addr();
2372            if node_addr == *self.identity.node_addr() {
2373                skipped_self = skipped_self.saturating_add(1);
2374                continue;
2375            }
2376            if self.peers.contains_key(&node_addr) {
2377                skipped_connected = skipped_connected.saturating_add(1);
2378                continue;
2379            }
2380            if self.retry_pending.contains_key(&node_addr) {
2381                skipped_retry_pending = skipped_retry_pending.saturating_add(1);
2382                continue;
2383            }
2384            if bootstrap.cooldown_until(&npub, now_ms).is_some() {
2385                skipped_cooldown = skipped_cooldown.saturating_add(1);
2386                continue;
2387            }
2388            let connecting = self.connections.values().any(|conn| {
2389                conn.expected_identity()
2390                    .map(|id| id.node_addr() == &node_addr)
2391                    .unwrap_or(false)
2392            });
2393            if connecting {
2394                skipped_connecting = skipped_connecting.saturating_add(1);
2395                continue;
2396            }
2397
2398            let mut addresses = Vec::new();
2399            let mut priority = 120u8;
2400            let seen_at_ms = Self::now_ms();
2401            for endpoint in endpoints {
2402                let Some(candidate) =
2403                    Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
2404                else {
2405                    continue;
2406                };
2407                if addresses.iter().any(|existing: &PeerAddress| {
2408                    existing.transport == candidate.transport && existing.addr == candidate.addr
2409                }) {
2410                    continue;
2411                }
2412                addresses.push(candidate);
2413                priority = priority.saturating_add(1);
2414            }
2415            if addresses.is_empty() {
2416                skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
2417                continue;
2418            }
2419
2420            self.peer_aliases
2421                .entry(node_addr)
2422                .or_insert_with(|| peer_identity.short_npub());
2423            self.register_identity(node_addr, peer_identity.pubkey_full());
2424
2425            let mut state = super::retry::RetryState::new(PeerConfig {
2426                npub: npub.clone(),
2427                alias: None,
2428                addresses,
2429                connect_policy: ConnectPolicy::AutoConnect,
2430                auto_reconnect: true,
2431                discovery_fallback_transit: false,
2432            });
2433            state.reconnect = false;
2434            state.retry_after_ms = now_ms;
2435            state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
2436            self.retry_pending.insert(node_addr, state);
2437            info!(
2438                caller = %caller,
2439                peer = %peer_identity.short_npub(),
2440                advert_age_secs = now_secs.saturating_sub(created_at_secs),
2441                "open-discovery sweep: queued retry for cached advert"
2442            );
2443            enqueue_budget = enqueue_budget.saturating_sub(1);
2444            enqueued = enqueued.saturating_add(1);
2445        }
2446
2447        // Always log a one-line summary on the startup sweep so operators
2448        // can verify it ran. Per-tick sweeps are noisier; only summarize
2449        // when something happened.
2450        let total_skipped = skipped_age
2451            + skipped_configured
2452            + skipped_self
2453            + skipped_connected
2454            + skipped_retry_pending
2455            + skipped_connecting
2456            + skipped_no_endpoints
2457            + skipped_invalid_npub
2458            + skipped_cooldown;
2459        let should_summarize = caller == "startup" || enqueued > 0;
2460        if should_summarize {
2461            info!(
2462                caller = %caller,
2463                cached = cached_count,
2464                queued = enqueued,
2465                skipped_age = skipped_age,
2466                skipped_configured = skipped_configured,
2467                skipped_self = skipped_self,
2468                skipped_connected = skipped_connected,
2469                skipped_retry_pending = skipped_retry_pending,
2470                skipped_connecting = skipped_connecting,
2471                skipped_no_endpoints = skipped_no_endpoints,
2472                skipped_invalid_npub = skipped_invalid_npub,
2473                skipped_cooldown = skipped_cooldown,
2474                skipped_total = total_skipped,
2475                "open-discovery sweep complete"
2476            );
2477        }
2478    }
2479
2480    /// One-shot startup sweep: runs once after the configured settle
2481    /// delay, iterating the cached overlay adverts and queueing retries
2482    /// for any peer with a recent enough advert that we haven't already
2483    /// configured statically or established a link to.
2484    ///
2485    /// Gated identically to [`run_open_discovery_sweep`]: requires
2486    /// `node.discovery.nostr.enabled` and `policy == open`.
2487    async fn maybe_run_startup_open_discovery_sweep(
2488        &mut self,
2489        bootstrap: &std::sync::Arc<NostrDiscovery>,
2490    ) {
2491        if self.startup_open_discovery_sweep_done {
2492            return;
2493        }
2494        if !self.config.node.discovery.nostr.enabled
2495            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2496        {
2497            // Mark done so we don't keep re-checking on every tick.
2498            self.startup_open_discovery_sweep_done = true;
2499            return;
2500        }
2501        let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
2502            return;
2503        };
2504        let now_ms = Self::now_ms();
2505        let delay_ms = self
2506            .config
2507            .node
2508            .discovery
2509            .nostr
2510            .startup_sweep_delay_secs
2511            .saturating_mul(1000);
2512        if now_ms < started_at_ms.saturating_add(delay_ms) {
2513            return;
2514        }
2515
2516        let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
2517        self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
2518            .await;
2519        self.startup_open_discovery_sweep_done = true;
2520    }
2521
2522    fn available_outbound_slots(&self) -> usize {
2523        let connection_used = self
2524            .connections
2525            .len()
2526            .saturating_add(self.pending_connects.len());
2527        let connection_slots = if self.max_connections == 0 {
2528            usize::MAX
2529        } else {
2530            self.max_connections.saturating_sub(connection_used)
2531        };
2532
2533        let peer_slots = if self.max_peers == 0 {
2534            usize::MAX
2535        } else {
2536            self.max_peers.saturating_sub(self.peers.len())
2537        };
2538
2539        connection_slots.min(peer_slots)
2540    }
2541
2542    fn open_discovery_enqueue_budget(&self, configured_npubs: &HashSet<String>) -> usize {
2543        let current_open_discovery_pending = self
2544            .retry_pending
2545            .values()
2546            .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
2547            .count();
2548
2549        let cap_remaining = self
2550            .config
2551            .node
2552            .discovery
2553            .nostr
2554            .open_discovery_max_pending
2555            .saturating_sub(current_open_discovery_pending);
2556
2557        cap_remaining.min(self.available_outbound_slots())
2558    }
2559
2560    fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
2561        now_ms.saturating_add(
2562            self.config
2563                .node
2564                .discovery
2565                .nostr
2566                .advert_ttl_secs
2567                .saturating_mul(1000)
2568                .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
2569        )
2570    }
2571
2572    async fn build_overlay_advert(
2573        &self,
2574        bootstrap: &std::sync::Arc<NostrDiscovery>,
2575    ) -> Option<OverlayAdvert> {
2576        if !self.config.node.discovery.nostr.enabled {
2577            return None;
2578        }
2579
2580        let mut endpoints = Vec::new();
2581        let mut has_udp_nat = false;
2582
2583        for handle in self.transports.values() {
2584            if !handle.is_operational() {
2585                continue;
2586            }
2587
2588            match handle.transport_type().name {
2589                "udp" => {
2590                    let Some(cfg) = self.lookup_udp_config(handle.name()) else {
2591                        continue;
2592                    };
2593                    if !cfg.advertise_on_nostr() {
2594                        continue;
2595                    }
2596                    if cfg.is_public() {
2597                        // Precedence:
2598                        // 1. operator-supplied `external_addr` (skips STUN)
2599                        // 2. non-wildcard *public* `local_addr` (operator
2600                        //    bound to a specific public IP directly)
2601                        // 3. STUN auto-discovery against ephemeral socket
2602                        //    (also taken when bind is wildcard *or* private —
2603                        //    a private bind is not peer-reachable, so we
2604                        //    must publish the public reflexive instead)
2605                        // 4. loud warn + omit endpoint
2606                        if let Some(explicit) = cfg.external_advert_addr() {
2607                            endpoints.push(OverlayEndpointAdvert {
2608                                transport: OverlayTransportKind::Udp,
2609                                addr: explicit.to_string(),
2610                            });
2611                        } else {
2612                            match handle.local_addr() {
2613                                Some(addr)
2614                                    if !addr.ip().is_unspecified()
2615                                        && !is_unroutable_advert_ip(addr.ip()) =>
2616                                {
2617                                    endpoints.push(OverlayEndpointAdvert {
2618                                        transport: OverlayTransportKind::Udp,
2619                                        addr: addr.to_string(),
2620                                    });
2621                                }
2622                                Some(addr) => {
2623                                    let key = handle.transport_id().as_u32();
2624                                    let port = addr.port();
2625                                    if let Some(public) =
2626                                        bootstrap.learn_public_udp_addr(key, port).await
2627                                    {
2628                                        endpoints.push(OverlayEndpointAdvert {
2629                                            transport: OverlayTransportKind::Udp,
2630                                            addr: public.to_string(),
2631                                        });
2632                                    } else {
2633                                        warn!(
2634                                            transport_id = key,
2635                                            bind_addr = %addr,
2636                                            "advert: udp public=true but bind is wildcard \
2637                                            or private and STUN observation failed; \
2638                                            advertising no UDP endpoint. Either set \
2639                                            transports.udp.external_addr, bind to a \
2640                                            specific *public* IP, or ensure \
2641                                            node.discovery.nostr.stun_servers is reachable"
2642                                        );
2643                                    }
2644                                }
2645                                None => {}
2646                            }
2647                        }
2648                    } else {
2649                        endpoints.push(OverlayEndpointAdvert {
2650                            transport: OverlayTransportKind::Udp,
2651                            addr: "nat".to_string(),
2652                        });
2653                        has_udp_nat = true;
2654                    }
2655                }
2656                "tcp" => {
2657                    let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
2658                        continue;
2659                    };
2660                    if !cfg.advertise_on_nostr() {
2661                        continue;
2662                    }
2663                    // Precedence:
2664                    // 1. operator-supplied `external_addr` (only path that
2665                    //    works on cloud-NAT setups where the public IP is
2666                    //    not on a host interface).
2667                    // 2. non-wildcard *public* `local_addr` (operator bound
2668                    //    to a specific public IP directly).
2669                    // 3. loud warn + omit endpoint (no TCP STUN equivalent).
2670                    //
2671                    // A wildcard *or* private bind is never advertised as-is
2672                    // — peers off-LAN can't reach a private bind, and there
2673                    // is no TCP STUN to discover a public reflexive.
2674                    if let Some(explicit) = cfg.external_advert_addr() {
2675                        endpoints.push(OverlayEndpointAdvert {
2676                            transport: OverlayTransportKind::Tcp,
2677                            addr: explicit.to_string(),
2678                        });
2679                    } else {
2680                        match handle.local_addr() {
2681                            Some(addr)
2682                                if !addr.ip().is_unspecified()
2683                                    && !is_unroutable_advert_ip(addr.ip()) =>
2684                            {
2685                                endpoints.push(OverlayEndpointAdvert {
2686                                    transport: OverlayTransportKind::Tcp,
2687                                    addr: addr.to_string(),
2688                                });
2689                            }
2690                            Some(addr) => {
2691                                warn!(
2692                                    bind_addr = %addr,
2693                                    "advert: tcp advertise_on_nostr=true bound to wildcard \
2694                                    or private IP and no transports.tcp.external_addr set; \
2695                                    advertising no TCP endpoint. Either set external_addr \
2696                                    to the public IP (recommended for cloud 1:1-NAT setups) \
2697                                    or bind explicitly to the public IP"
2698                                );
2699                            }
2700                            None => {}
2701                        }
2702                    }
2703                }
2704                "tor" => {
2705                    let Some(cfg) = self.lookup_tor_config(handle.name()) else {
2706                        continue;
2707                    };
2708                    if !cfg.advertise_on_nostr() {
2709                        continue;
2710                    }
2711                    if let Some(addr) = handle.onion_address() {
2712                        endpoints.push(OverlayEndpointAdvert {
2713                            transport: OverlayTransportKind::Tor,
2714                            addr: format!("{}:{}", addr, cfg.advertised_port()),
2715                        });
2716                    }
2717                }
2718                _ => {}
2719            }
2720        }
2721
2722        if endpoints.is_empty() {
2723            return None;
2724        }
2725
2726        Some(OverlayAdvert {
2727            identifier: ADVERT_IDENTIFIER.to_string(),
2728            version: ADVERT_VERSION,
2729            endpoints,
2730            signal_relays: has_udp_nat.then(|| self.config.node.discovery.nostr.dm_relays.clone()),
2731            stun_servers: has_udp_nat
2732                .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
2733        })
2734    }
2735
2736    async fn refresh_overlay_advert(
2737        &self,
2738        bootstrap: &std::sync::Arc<NostrDiscovery>,
2739    ) -> Result<(), crate::discovery::nostr::BootstrapError> {
2740        let advert = self.build_overlay_advert(bootstrap).await;
2741        bootstrap.update_local_advert(advert).await
2742    }
2743
2744    fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
2745        match (&self.config.transports.udp, transport_name) {
2746            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2747            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2748            _ => None,
2749        }
2750    }
2751
2752    fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
2753        match (&self.config.transports.tcp, transport_name) {
2754            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2755            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2756            _ => None,
2757        }
2758    }
2759
2760    fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
2761        match (&self.config.transports.tor, transport_name) {
2762            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2763            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2764            _ => None,
2765        }
2766    }
2767
2768    pub(in crate::node) async fn try_peer_addresses(
2769        &mut self,
2770        peer_config: &PeerConfig,
2771        peer_identity: PeerIdentity,
2772        allow_bootstrap_nat: bool,
2773    ) -> Result<(), NodeError> {
2774        let peer_node_addr = *peer_identity.node_addr();
2775        if self.peers.contains_key(&peer_node_addr) {
2776            debug!(
2777                npub = %peer_config.npub,
2778                "Peer already exists, skipping address attempts"
2779            );
2780            return Ok(());
2781        }
2782
2783        let candidates = self.peer_address_candidates(peer_config).await;
2784
2785        if candidates.is_empty() {
2786            if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
2787                return Ok(());
2788            }
2789            return Err(NodeError::NoTransportForType(format!(
2790                "no addresses known for {}",
2791                peer_config.npub
2792            )));
2793        }
2794
2795        if self
2796            .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
2797            .await
2798            .is_ok()
2799        {
2800            return Ok(());
2801        }
2802
2803        Err(NodeError::NoTransportForType(format!(
2804            "no operational transport for any of {}'s addresses",
2805            peer_config.npub
2806        )))
2807    }
2808
2809    async fn try_active_peer_alternative_addresses(
2810        &mut self,
2811        peer_config: &PeerConfig,
2812        peer_identity: PeerIdentity,
2813    ) -> Result<bool, NodeError> {
2814        let peer_node_addr = *peer_identity.node_addr();
2815        let candidates = self.peer_address_candidates(peer_config).await;
2816
2817        if candidates.is_empty() {
2818            return Err(NodeError::NoTransportForType(format!(
2819                "no addresses known for {}",
2820                peer_config.npub
2821            )));
2822        }
2823
2824        let alternatives: Vec<_> = candidates
2825            .into_iter()
2826            .filter(|addr| !(addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat")))
2827            .filter(|addr| !self.active_peer_matches_candidate(&peer_node_addr, addr))
2828            .collect();
2829
2830        if alternatives.is_empty() {
2831            return Err(NodeError::NoTransportForType(format!(
2832                "no concrete alternate addresses known for {}",
2833                peer_config.npub
2834            )));
2835        }
2836
2837        self.attempt_peer_address_list(peer_config, peer_identity, false, &alternatives)
2838            .await?;
2839        Ok(true)
2840    }
2841
2842    async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
2843        // Merge every candidate from every source we have for this peer
2844        // (operator-configured static addresses, freshly fetched overlay
2845        // adverts, callers' recent-peers caches via `update_peers`) and
2846        // try them in order of `seen_at_ms` descending — most-recent
2847        // first, source-agnostic. Addresses without a freshness signal
2848        // sort last. Addresses are hints, not the final word; concrete
2849        // candidates in a single pass race each other and `udp:nat` only
2850        // starts background Nostr traversal without suppressing direct
2851        // static/LAN attempts that appear later in the list.
2852        let static_addresses = self.static_peer_addresses(peer_config);
2853        let overlay_addresses = self
2854            .nostr_peer_fallback_addresses(peer_config, &static_addresses)
2855            .await;
2856
2857        let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
2858        for addr in overlay_addresses.into_iter().chain(static_addresses) {
2859            if !candidates.iter().any(|existing: &PeerAddress| {
2860                existing.transport == addr.transport && existing.addr == addr.addr
2861            }) {
2862                candidates.push(addr);
2863            }
2864        }
2865
2866        // Stable sort: most-recently-observed first (Some > None), tiebreak
2867        // by input order so equal-timestamp addresses keep operator-supplied
2868        // priority and the overlay-then-static merge order survives when
2869        // nothing has a freshness signal.
2870        candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
2871            (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
2872            (Some(_), None) => std::cmp::Ordering::Less,
2873            (None, Some(_)) => std::cmp::Ordering::Greater,
2874            (None, None) => std::cmp::Ordering::Equal,
2875        });
2876
2877        candidates
2878    }
2879
2880    fn active_peer_matches_any_candidate(
2881        &self,
2882        peer_node_addr: &NodeAddr,
2883        candidates: &[PeerAddress],
2884    ) -> bool {
2885        candidates
2886            .iter()
2887            .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
2888    }
2889
2890    pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
2891        &self,
2892        peer_node_addr: &NodeAddr,
2893        candidates: &[PeerAddress],
2894    ) -> bool {
2895        if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
2896            return false;
2897        }
2898        !self.active_peer_needs_same_path_refresh(peer_node_addr)
2899    }
2900
2901    fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
2902        let Some(peer) = self.peers.get(peer_node_addr) else {
2903            return false;
2904        };
2905        let stale_after_ms = self
2906            .config
2907            .node
2908            .heartbeat_interval_secs
2909            .saturating_mul(1000)
2910            .max(1000);
2911        peer.idle_time(Self::now_ms()) > stale_after_ms
2912    }
2913
2914    fn active_peer_matches_candidate(
2915        &self,
2916        peer_node_addr: &NodeAddr,
2917        candidate: &PeerAddress,
2918    ) -> bool {
2919        let Some(peer) = self.peers.get(peer_node_addr) else {
2920            return false;
2921        };
2922        let Some(current_addr) = peer.current_addr() else {
2923            return false;
2924        };
2925        if peer
2926            .transport_id()
2927            .map(|id| self.bootstrap_transports.contains(&id))
2928            .unwrap_or(false)
2929        {
2930            return false;
2931        }
2932        let current_addr = current_addr.to_string();
2933        let current_transport = peer
2934            .transport_id()
2935            .and_then(|id| self.transports.get(&id))
2936            .map(|transport| transport.transport_type().name);
2937
2938        candidate.addr == current_addr
2939            && current_transport
2940                .map(|transport| transport == candidate.transport)
2941                .unwrap_or(true)
2942    }
2943
2944    // === Control API methods ===
2945
2946    /// Connect to a peer via the control API.
2947    ///
2948    /// Creates an ephemeral peer connection (not persisted to config, no
2949    /// auto-reconnect). Reuses the same connection path as auto-connect
2950    /// peers. Returns JSON data on success or an error message.
2951    pub(crate) async fn api_connect(
2952        &mut self,
2953        npub: &str,
2954        address: &str,
2955        transport: &str,
2956    ) -> Result<serde_json::Value, String> {
2957        let peer_config = PeerConfig {
2958            npub: npub.to_string(),
2959            alias: None,
2960            addresses: vec![PeerAddress::new(transport, address)],
2961            connect_policy: ConnectPolicy::Manual,
2962            auto_reconnect: false,
2963            discovery_fallback_transit: true,
2964        };
2965
2966        // Pre-seed identity cache (same as initiate_peer_connections does)
2967        if let Ok(identity) = PeerIdentity::from_npub(npub) {
2968            self.peer_aliases
2969                .insert(*identity.node_addr(), identity.short_npub());
2970            self.register_identity(*identity.node_addr(), identity.pubkey_full());
2971        }
2972
2973        self.initiate_peer_connection(&peer_config)
2974            .await
2975            .map(|()| {
2976                info!(
2977                    npub = %npub,
2978                    address = %address,
2979                    transport = %transport,
2980                    "API connect initiated"
2981                );
2982                serde_json::json!({
2983                    "npub": npub,
2984                    "address": address,
2985                    "transport": transport,
2986                })
2987            })
2988            .map_err(|e| e.to_string())
2989    }
2990
2991    /// Disconnect a peer via the control API.
2992    ///
2993    /// Removes the peer and suppresses auto-reconnect.
2994    pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
2995        let peer_identity =
2996            PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
2997        let node_addr = *peer_identity.node_addr();
2998
2999        if !self.peers.contains_key(&node_addr) {
3000            return Err(format!("peer not found: {npub}"));
3001        }
3002
3003        // Remove the peer (full cleanup: sessions, indices, links, tree, bloom)
3004        self.remove_active_peer(&node_addr);
3005
3006        // Suppress any pending auto-reconnect
3007        self.retry_pending.remove(&node_addr);
3008
3009        info!(npub = %npub, "API disconnect completed");
3010
3011        Ok(serde_json::json!({
3012            "npub": npub,
3013            "disconnected": true,
3014        }))
3015    }
3016
3017    /// Adopt an already-established UDP traversal and start the normal FIPS
3018    /// Noise handshake over it.
3019    ///
3020    /// This is intended for integration with an external rendezvous runtime
3021    /// that has already completed relay signaling, STUN observation, and UDP
3022    /// hole punching. After handoff, the adopted socket is owned by FIPS.
3023    pub async fn adopt_established_traversal(
3024        &mut self,
3025        traversal: EstablishedTraversal,
3026    ) -> Result<BootstrapHandoffResult, NodeError> {
3027        debug!(
3028            peer_npub = %traversal.peer_npub,
3029            session_id = %traversal.session_id,
3030            remote_addr = %traversal.remote_addr,
3031            "adopting established traversal socket"
3032        );
3033
3034        if !self.state.is_operational() {
3035            return Err(NodeError::NotStarted);
3036        }
3037
3038        let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
3039        let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
3040            NodeError::InvalidPeerNpub {
3041                npub: traversal.peer_npub.clone(),
3042                reason: e.to_string(),
3043            }
3044        })?;
3045        let peer_node_addr = *peer_identity.node_addr();
3046        if self.peers.contains_key(&peer_node_addr) {
3047            debug!(
3048                peer_npub = %traversal.peer_npub,
3049                "Adopting NAT traversal handoff as alternate path for already-connected peer"
3050            );
3051        }
3052
3053        self.peer_aliases
3054            .insert(peer_node_addr, peer_identity.short_npub());
3055        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
3056
3057        let transport_id = self.allocate_transport_id();
3058        // Adopted ephemeral UDP transports inherit MTU + socket-buffer sizing
3059        // (and accept_connections / advertise flags) from the operator's
3060        // configured [transports.udp] when the bootstrap runtime doesn't
3061        // pass an explicit override. Lookup tries `transport_name` first
3062        // (covers the `Named` multi-listener variant) and falls back to the
3063        // unnamed `Single` listener, so single- and named-listener configs
3064        // both inherit cleanly.
3065        //
3066        // Tradeoff: `UdpConfig::default()` sets MTU 1280 (IPv6 minimum), the
3067        // only value guaranteed to survive arbitrary middlebox paths.
3068        // Inheriting a higher operator-chosen MTU means NAT-traversed flows
3069        // initially attempt that MTU and may black-hole on tighter paths
3070        // until reactive `MtuExceeded` recovery kicks in. Operators who
3071        // raise the primary MTU based on known-clean topology accept that
3072        // tradeoff; the silent drop on a too-low default was strictly
3073        // worse for the common case where the primary MTU is reachable.
3074        //
3075        // Bind / external address fields are cleared since the socket is
3076        // already bound.
3077        let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
3078            let mut cfg = self
3079                .lookup_udp_config(traversal.transport_name.as_deref())
3080                .or_else(|| self.lookup_udp_config(None))
3081                .cloned()
3082                .unwrap_or_default();
3083            cfg.bind_addr = None;
3084            cfg.external_addr = None;
3085            cfg
3086        });
3087        let mut transport = crate::transport::udp::UdpTransport::new(
3088            transport_id,
3089            traversal.transport_name.clone(),
3090            inherited_config,
3091            packet_tx,
3092        );
3093
3094        transport
3095            .adopt_socket_async(traversal.socket)
3096            .await
3097            .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
3098
3099        let local_addr = transport.local_addr().ok_or_else(|| {
3100            NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
3101        })?;
3102
3103        self.transports.insert(
3104            transport_id,
3105            crate::transport::TransportHandle::Udp(transport),
3106        );
3107        self.bootstrap_transports.insert(transport_id);
3108        self.bootstrap_transport_npubs
3109            .insert(transport_id, traversal.peer_npub.clone());
3110
3111        let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
3112        if let Err(err) = self
3113            .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
3114            .await
3115        {
3116            self.bootstrap_transports.remove(&transport_id);
3117            self.bootstrap_transport_npubs.remove(&transport_id);
3118            if let Some(mut handle) = self.transports.remove(&transport_id) {
3119                let _ = handle.stop().await;
3120            }
3121            return Err(err);
3122        }
3123
3124        info!(
3125            peer = %self.peer_display_name(&peer_node_addr),
3126            transport_id = %transport_id,
3127            local_addr = %local_addr,
3128            remote_addr = %traversal.remote_addr,
3129            session_id = %traversal.session_id,
3130            "adopted NAT traversal socket; handshake initiated"
3131        );
3132
3133        Ok(BootstrapHandoffResult {
3134            transport_id,
3135            local_addr,
3136            remote_addr: traversal.remote_addr,
3137            peer_node_addr,
3138            session_id: traversal.session_id,
3139        })
3140    }
3141}