Skip to main content

fips_core/node/
lifecycle.rs

1//! Node lifecycle management: start, stop, and peer connection initiation.
2
3use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6    ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, NostrDiscovery, OverlayAdvert,
7    OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use std::collections::{HashMap, HashSet};
18use std::net::{IpAddr, SocketAddr};
19use std::thread;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23#[cfg(debug_assertions)]
24fn node_start_debug_log(message: impl AsRef<str>) {
25    use std::io::Write as _;
26
27    if let Ok(mut file) = std::fs::OpenOptions::new()
28        .create(true)
29        .append(true)
30        .open(std::env::temp_dir().join("nvpn-fips-endpoint-debug.log"))
31    {
32        let _ = writeln!(
33            file,
34            "{:?} {}",
35            std::time::SystemTime::now(),
36            message.as_ref()
37        );
38    }
39}
40
41#[cfg(not(debug_assertions))]
42fn node_start_debug_log(_message: impl AsRef<str>) {}
43
44/// True if `ip` is not a viable canonical advert endpoint for peers off
45/// the publisher's own LAN. Covers RFC1918, loopback, link-local, IPv4
46/// CGNAT (100.64/10), unspecified, multicast/benchmark, and IPv6
47/// unique-local/loopback/unspecified. We never publish these as the
48/// peer's primary `runtime_endpoint`; an off-LAN consumer can't route
49/// to them, and latching one in onto a slow overlay-relay fallback is
50/// the original bug this guard exists to prevent.
51fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
52    match ip {
53        IpAddr::V4(v4) => {
54            v4.is_private()
55                || v4.is_loopback()
56                || v4.is_link_local()
57                || v4.is_unspecified()
58                || v4.is_multicast()
59                || v4.is_broadcast()
60                || v4.is_documentation()
61                // 100.64.0.0/10 — CGNAT, RFC 6598. Not routable on the
62                // public internet; behaves like an extra NAT layer.
63                || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
64        }
65        IpAddr::V6(v6) => {
66            v6.is_loopback()
67                || v6.is_unspecified()
68                || v6.is_unique_local()
69                || v6.is_multicast()
70                // IPv6 link-local: fe80::/10
71                || (v6.segments()[0] & 0xffc0) == 0xfe80
72        }
73    }
74}
75
76fn socket_addr_families_compatible(local: SocketAddr, remote: SocketAddr) -> bool {
77    matches!(
78        (local, remote),
79        (SocketAddr::V4(_), SocketAddr::V4(_)) | (SocketAddr::V6(_), SocketAddr::V6(_))
80    )
81}
82
83const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
84const MAX_PARALLEL_PATH_CANDIDATES_PER_PEER: usize = 4;
85const MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK: usize = 16;
86const MAX_DISCOVERY_CONNECTS_PER_TICK: usize = 16;
87
88impl Node {
89    /// Initiate connections to configured static peers.
90    ///
91    /// For each peer configured with AutoConnect policy, creates a link and
92    /// peer entry, then starts the Noise handshake by sending the first message.
93    /// Replace the runtime peer list. Newly added auto-connect peers get
94    /// `initiate_peer_connection` immediately; removed peers are dropped from
95    /// the retry queue (the regular liveness timeout reaps any active session
96    /// — we don't proactively disconnect, since the same npub might be on its
97    /// way back via a fresh advert). Existing auto-connect entries with new
98    /// direct addresses are dialed immediately, and retry state is refreshed
99    /// so later attempts also use the latest hints.
100    pub(super) async fn update_peers(
101        &mut self,
102        new_peers: Vec<crate::config::PeerConfig>,
103    ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
104        use std::collections::{HashMap, HashSet};
105
106        let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
107            HashMap::with_capacity(new_peers.len());
108        for peer in new_peers {
109            let identity = match PeerIdentity::from_npub(&peer.npub) {
110                Ok(id) => id,
111                Err(e) => {
112                    return Err(crate::node::NodeError::InvalidPeerNpub {
113                        npub: peer.npub.clone(),
114                        reason: e.to_string(),
115                    });
116                }
117            };
118            // Last-write-wins on duplicates so callers passing a multi-source
119            // candidate list (e.g. operator hints + recent-peers cache for
120            // the same npub) get the merge they meant.
121            new_by_addr.insert(*identity.node_addr(), peer);
122        }
123
124        let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
125            .config
126            .peers()
127            .iter()
128            .filter_map(|pc| {
129                PeerIdentity::from_npub(&pc.npub)
130                    .ok()
131                    .map(|id| (*id.node_addr(), pc.clone()))
132            })
133            .collect();
134
135        let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
136        let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
137
138        let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
139        let added: Vec<_> = new_addrs.difference(&current_addrs).copied().collect();
140        let kept: Vec<_> = new_addrs.intersection(&current_addrs).copied().collect();
141
142        let mut outcome = crate::node::UpdatePeersOutcome::default();
143
144        for node_addr in &removed {
145            if self.retry_pending.remove(node_addr).is_some() {
146                debug!(
147                    peer = %self.peer_display_name(node_addr),
148                    "Dropping retry entry for peer removed from runtime peer list"
149                );
150            }
151            self.peer_aliases.remove(node_addr);
152            outcome.removed += 1;
153        }
154
155        let mut auto_connect_refresh_configs = Vec::new();
156        for node_addr in &kept {
157            let new_pc = &new_by_addr[node_addr];
158            let current_pc = &current_by_addr[node_addr];
159            if new_pc.addresses != current_pc.addresses
160                || new_pc.alias != current_pc.alias
161                || new_pc.connect_policy != current_pc.connect_policy
162                || new_pc.auto_reconnect != current_pc.auto_reconnect
163            {
164                outcome.updated += 1;
165                if let Some(state) = self.retry_pending.get_mut(node_addr) {
166                    state.peer_config = new_pc.clone();
167                    state.retry_after_ms = Self::now_ms();
168                }
169                if let Some(alias) = new_pc.alias.clone() {
170                    self.peer_aliases.insert(*node_addr, alias);
171                }
172                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
173                    auto_connect_refresh_configs.push(new_pc.clone());
174                }
175            } else {
176                outcome.unchanged += 1;
177                if new_pc.is_auto_connect() && !new_pc.addresses.is_empty() {
178                    auto_connect_refresh_configs.push(new_pc.clone());
179                }
180            }
181        }
182
183        let added_configs: Vec<crate::config::PeerConfig> =
184            added.iter().map(|addr| new_by_addr[addr].clone()).collect();
185
186        // Replace the live config peer list before initiating connections so
187        // any helper that consults `self.config.peers()` during the dial
188        // (alias lookup, retry-state seeding) sees the new entries.
189        self.config.peers = new_by_addr.into_values().collect();
190
191        for peer_config in added_configs {
192            outcome.added += 1;
193            let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
194                continue;
195            };
196            let name = peer_config
197                .alias
198                .clone()
199                .unwrap_or_else(|| identity.short_npub());
200            self.peer_aliases.insert(*identity.node_addr(), name);
201            self.register_identity(*identity.node_addr(), identity.pubkey_full());
202
203            if !peer_config.is_auto_connect() {
204                continue;
205            }
206
207            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
208                warn!(
209                    npub = %peer_config.npub,
210                    error = %e,
211                    "Failed to initiate connection for newly added peer"
212                );
213                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
214                    self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
215                }
216                if matches!(e, crate::node::NodeError::NoTransportForType(_))
217                    && let Some(bootstrap) = self.nostr_discovery.clone()
218                {
219                    bootstrap
220                        .request_advert_stale_check(peer_config.npub.clone())
221                        .await;
222                }
223            }
224        }
225
226        for peer_config in auto_connect_refresh_configs {
227            let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) else {
228                continue;
229            };
230            let node_addr = *peer_identity.node_addr();
231
232            if self.peers.contains_key(&node_addr) {
233                match self
234                    .initiate_active_peer_alternative_connection(&peer_config)
235                    .await
236                {
237                    Ok(attempted) => {
238                        if attempted {
239                            debug!(
240                                peer = %self.peer_display_name(&node_addr),
241                                "Started non-disruptive alternate-path handshake for active peer"
242                            );
243                        }
244                    }
245                    Err(e) => {
246                        debug!(
247                            npub = %peer_config.npub,
248                            error = %e,
249                            "Active peer alternate-path refresh did not start"
250                        );
251                    }
252                }
253                continue;
254            }
255
256            match self.initiate_peer_connection(&peer_config).await {
257                Ok(()) => {
258                    let hs_timeout_ms = self.config.node.rate_limit.handshake_timeout_secs * 1000;
259                    if let Some(state) = self.retry_pending.get_mut(&node_addr) {
260                        state.peer_config = peer_config;
261                        state.retry_after_ms = Self::now_ms().saturating_add(hs_timeout_ms);
262                    }
263                }
264                Err(e) => {
265                    debug!(
266                        npub = %peer_config.npub,
267                        error = %e,
268                        "Refreshed peer addresses did not initiate a direct connection"
269                    );
270                    self.schedule_retry(node_addr, Self::now_ms());
271                }
272            }
273        }
274
275        self.warm_auto_connect_graph_sessions().await;
276
277        Ok(outcome)
278    }
279
280    pub(super) async fn initiate_peer_connections(&mut self) {
281        // Build display name map from all configured peers (alias or short npub),
282        // and pre-seed the identity cache from each peer's npub so that TUN packets
283        // addressed to a configured peer can be dispatched (and trigger session
284        // initiation) immediately on startup — without waiting for the link-layer
285        // handshake to complete first.
286        let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
287            .config
288            .peers()
289            .iter()
290            .filter_map(|pc| {
291                PeerIdentity::from_npub(&pc.npub)
292                    .ok()
293                    .map(|id| (id, pc.alias.clone()))
294            })
295            .collect();
296
297        for (identity, alias) in peer_identities {
298            let name = alias.unwrap_or_else(|| identity.short_npub());
299            self.peer_aliases.insert(*identity.node_addr(), name);
300            // Pre-seed identity cache. The parity may be wrong (npub is x-only)
301            // but will be corrected to the real value when the peer is promoted
302            // after a successful Noise handshake.
303            self.register_identity(*identity.node_addr(), identity.pubkey_full());
304        }
305
306        // Collect peer configs to avoid borrow conflicts
307        let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
308
309        if peer_configs.is_empty() {
310            debug!("No static peers configured");
311            return;
312        }
313
314        debug!(
315            count = peer_configs.len(),
316            "Initiating static peer connections"
317        );
318
319        for peer_config in peer_configs {
320            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
321                warn!(
322                    npub = %peer_config.npub,
323                    alias = ?peer_config.alias,
324                    error = %e,
325                    "Failed to initiate peer connection"
326                );
327                // Schedule a retry so transient address-resolution failures
328                // (e.g. cached endpoints stale, NAT rebinds, all addresses
329                // currently unreachable) recover without a daemon restart.
330                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
331                    self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
332                }
333                // No-transport failures most often mean the cached overlay
334                // advert is pointing at a dead post-NAT-rebind address. The
335                // advert cache is read-only inside fetch_advert, so retries
336                // would loop on the same dead address until expiry. Force a
337                // re-fetch so the next retry tick picks up fresh endpoints.
338                if matches!(e, crate::node::NodeError::NoTransportForType(_))
339                    && let Some(bootstrap) = self.nostr_discovery.clone()
340                {
341                    bootstrap
342                        .request_advert_stale_check(peer_config.npub.clone())
343                        .await;
344                }
345            }
346        }
347
348        self.warm_auto_connect_graph_sessions().await;
349    }
350
351    /// Initiate a connection to a single peer.
352    ///
353    /// Creates a link, starts the Noise handshake, and sends the first message.
354    pub(super) async fn initiate_peer_connection(
355        &mut self,
356        peer_config: &crate::config::PeerConfig,
357    ) -> Result<(), NodeError> {
358        self.initiate_peer_connection_inner(peer_config).await
359    }
360
361    /// Initiate a connection from the retry path. Identical to
362    /// [`initiate_peer_connection`] today — both paths fan out across every
363    /// known address (overlay-fresh first, then operator/cache hints) in a
364    /// single pass. The two entry points stay separate so callers can be
365    /// distinguished in tracing.
366    pub(super) async fn initiate_peer_retry_connection(
367        &mut self,
368        peer_config: &crate::config::PeerConfig,
369    ) -> Result<(), NodeError> {
370        self.initiate_peer_connection_inner(peer_config).await
371    }
372
373    async fn initiate_active_peer_alternative_connection(
374        &mut self,
375        peer_config: &crate::config::PeerConfig,
376    ) -> Result<bool, NodeError> {
377        let peer_identity =
378            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
379                npub: peer_config.npub.clone(),
380                reason: e.to_string(),
381            })?;
382        let peer_node_addr = *peer_identity.node_addr();
383
384        if !self.peers.contains_key(&peer_node_addr) {
385            self.initiate_peer_connection(peer_config).await?;
386            return Ok(true);
387        }
388
389        // Keep the current link live and race fresh concrete candidates.
390        // Cross-connection resolution still decides which replacement link
391        // wins if both peers try the same upgrade; the important part here is
392        // that a stale path does not depend on the remote peer receiving our
393        // hint first before either side attempts the better address.
394        self.try_active_peer_alternative_addresses(peer_config, peer_identity)
395            .await
396    }
397
398    async fn initiate_peer_connection_inner(
399        &mut self,
400        peer_config: &crate::config::PeerConfig,
401    ) -> Result<(), NodeError> {
402        // Parse the peer's npub to get their identity
403        let peer_identity =
404            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
405                npub: peer_config.npub.clone(),
406                reason: e.to_string(),
407            })?;
408
409        let peer_node_addr = *peer_identity.node_addr();
410
411        // Check if peer already exists (fully authenticated)
412        if self.peers.contains_key(&peer_node_addr) {
413            debug!(
414                npub = %peer_config.npub,
415                "Peer already exists, skipping"
416            );
417            return Ok(());
418        }
419
420        self.try_peer_addresses(peer_config, peer_identity, true)
421            .await
422    }
423
424    fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
425        self.connections.values().any(|conn| {
426            conn.expected_identity()
427                .map(|id| id.node_addr() == peer_node_addr)
428                .unwrap_or(false)
429        })
430    }
431
432    fn is_connecting_to_peer_on_path(
433        &self,
434        peer_node_addr: &NodeAddr,
435        transport_id: TransportId,
436        remote_addr: &TransportAddr,
437    ) -> bool {
438        self.connections.values().any(|conn| {
439            conn.expected_identity()
440                .map(|id| id.node_addr() == peer_node_addr)
441                .unwrap_or(false)
442                && conn.transport_id() == Some(transport_id)
443                && conn.source_addr() == Some(remote_addr)
444        }) || self.pending_connects.iter().any(|pending| {
445            pending.peer_identity.node_addr() == peer_node_addr
446                && pending.transport_id == transport_id
447                && &pending.remote_addr == remote_addr
448        })
449    }
450
451    pub(in crate::node) fn should_warm_auto_connect_session(
452        &self,
453        peer_node_addr: &NodeAddr,
454    ) -> bool {
455        if self.peers.contains_key(peer_node_addr)
456            || self
457                .sessions
458                .get(peer_node_addr)
459                .is_some_and(|entry| entry.is_established())
460        {
461            return false;
462        }
463
464        self.config.peers().iter().any(|peer| {
465            peer.is_auto_connect()
466                && PeerIdentity::from_npub(&peer.npub)
467                    .map(|identity| identity.node_addr() == peer_node_addr)
468                    .unwrap_or(false)
469        })
470    }
471
472    pub(in crate::node) async fn warm_auto_connect_graph_sessions(&mut self) -> usize {
473        if !self.peers.values().any(|peer| peer.can_send()) {
474            return 0;
475        }
476
477        let mut budget = self.graph_session_warmup_budget();
478        if budget == 0 {
479            return 0;
480        }
481
482        let peer_identities: Vec<_> = self
483            .config
484            .auto_connect_peers()
485            .filter_map(|peer| PeerIdentity::from_npub(&peer.npub).ok())
486            .collect();
487
488        let mut warmed = 0;
489        for identity in peer_identities {
490            if budget == 0 {
491                break;
492            }
493
494            let peer_node_addr = *identity.node_addr();
495            if peer_node_addr == *self.identity.node_addr()
496                || !self.should_warm_auto_connect_session(&peer_node_addr)
497                || self
498                    .sessions
499                    .get(&peer_node_addr)
500                    .is_some_and(|entry| entry.is_initiating())
501            {
502                continue;
503            }
504
505            self.register_identity(peer_node_addr, identity.pubkey_full());
506
507            if self.find_next_hop(&peer_node_addr).is_some() {
508                match self
509                    .initiate_session(peer_node_addr, identity.pubkey_full())
510                    .await
511                {
512                    Ok(()) => {
513                        warmed += 1;
514                        budget = budget.saturating_sub(1);
515                        debug!(
516                            peer = %self.peer_display_name(&peer_node_addr),
517                            "Warmed auto-connect peer session over existing FIPS graph"
518                        );
519                    }
520                    Err(NodeError::SendFailed { node_addr, reason })
521                        if node_addr == peer_node_addr && reason == "no route to destination" =>
522                    {
523                        self.maybe_initiate_lookup(&peer_node_addr).await;
524                        warmed += 1;
525                        budget = budget.saturating_sub(1);
526                    }
527                    Err(err) => {
528                        debug!(
529                            peer = %self.peer_display_name(&peer_node_addr),
530                            error = %err,
531                            "Failed to warm auto-connect peer session"
532                        );
533                    }
534                }
535            } else {
536                self.maybe_initiate_lookup(&peer_node_addr).await;
537                warmed += 1;
538                budget = budget.saturating_sub(1);
539            }
540        }
541
542        warmed
543    }
544
545    pub(in crate::node) fn graph_session_warmup_budget(&self) -> usize {
546        let max_destinations = self.config.node.session.pending_max_destinations;
547        if max_destinations == 0 {
548            return 0;
549        }
550
551        let pending_sessions = self
552            .sessions
553            .values()
554            .filter(|entry| !entry.is_established())
555            .count();
556        let pending_total = pending_sessions.saturating_add(self.pending_lookups.len());
557        max_destinations
558            .saturating_sub(pending_total)
559            .min(MAX_AUTO_CONNECT_GRAPH_WARMUPS_PER_TICK)
560    }
561
562    fn outbound_handshake_slots(&self) -> usize {
563        let used = self
564            .connections
565            .len()
566            .saturating_add(self.pending_connects.len());
567        if self.max_connections == 0 {
568            usize::MAX
569        } else {
570            self.max_connections.saturating_sub(used)
571        }
572    }
573
574    fn outbound_link_slots(&self) -> usize {
575        if self.max_links == 0 {
576            usize::MAX
577        } else {
578            self.max_links.saturating_sub(self.links.len())
579        }
580    }
581
582    fn path_candidate_attempt_budget(&self, peer_node_addr: &NodeAddr) -> usize {
583        if !self.peers.contains_key(peer_node_addr)
584            && self.max_peers > 0
585            && self.peers.len() >= self.max_peers
586        {
587            return 0;
588        }
589
590        let in_flight_for_peer = self
591            .connections
592            .values()
593            .filter(|conn| {
594                conn.expected_identity()
595                    .map(|id| id.node_addr() == peer_node_addr)
596                    .unwrap_or(false)
597            })
598            .count()
599            .saturating_add(
600                self.pending_connects
601                    .iter()
602                    .filter(|pending| pending.peer_identity.node_addr() == peer_node_addr)
603                    .count(),
604            );
605
606        self.outbound_handshake_slots()
607            .min(self.outbound_link_slots())
608            .min(MAX_PARALLEL_PATH_CANDIDATES_PER_PEER.saturating_sub(in_flight_for_peer))
609    }
610
611    fn discovery_connect_budget(&self) -> usize {
612        self.outbound_handshake_slots()
613            .min(self.outbound_link_slots())
614            .min(MAX_DISCOVERY_CONNECTS_PER_TICK)
615    }
616
617    /// Find a UDP transport whose bound socket can send to `remote_addr`.
618    ///
619    /// LAN discovery can surface both IPv4 and IPv6 addresses for the same
620    /// service. A wildcard IPv4 socket cannot send to an IPv6 link-local
621    /// target, and vice versa, so callers must choose by socket family rather
622    /// than by transport type alone.
623    fn find_udp_transport_for_remote_addr(
624        &self,
625        remote_addr: SocketAddr,
626    ) -> Option<(TransportId, SocketAddr)> {
627        self.transports
628            .iter()
629            .filter(|(id, handle)| {
630                handle.transport_type().name == "udp"
631                    && handle.is_operational()
632                    && !self.bootstrap_transports.contains(id)
633            })
634            .filter_map(|(id, handle)| {
635                let local_addr = handle.local_addr()?;
636                socket_addr_families_compatible(local_addr, remote_addr)
637                    .then_some((*id, local_addr))
638            })
639            .min_by_key(|(id, _)| id.as_u32())
640    }
641
642    pub(super) fn transport_discovery_candidate(
643        &self,
644        discovered_transport_id: TransportId,
645        discovered_addr: TransportAddr,
646    ) -> Option<(TransportId, TransportAddr, &'static str)> {
647        let transport = self.transports.get(&discovered_transport_id)?;
648        let transport_name = transport.transport_type().name;
649
650        if transport_name != "udp" {
651            return Some((discovered_transport_id, discovered_addr, transport_name));
652        }
653
654        let Some(remote_socket_addr) = discovered_addr
655            .as_str()
656            .and_then(|addr| addr.parse::<SocketAddr>().ok())
657        else {
658            if self.bootstrap_transports.contains(&discovered_transport_id) {
659                debug!(
660                    transport_id = %discovered_transport_id,
661                    remote_addr = %discovered_addr,
662                    "transport discovery: skip non-numeric UDP address from bootstrap transport"
663                );
664                return None;
665            }
666            return Some((discovered_transport_id, discovered_addr, transport_name));
667        };
668
669        let Some((transport_id, local_addr)) =
670            self.find_udp_transport_for_remote_addr(remote_socket_addr)
671        else {
672            debug!(
673                transport_id = %discovered_transport_id,
674                remote_addr = %discovered_addr,
675                "transport discovery: skip UDP peer with no compatible local socket"
676            );
677            return None;
678        };
679
680        if transport_id != discovered_transport_id {
681            debug!(
682                discovered_transport_id = %discovered_transport_id,
683                selected_transport_id = %transport_id,
684                local_addr = %local_addr,
685                remote_addr = %remote_socket_addr,
686                "transport discovery: selected compatible UDP transport"
687            );
688        }
689
690        Some((
691            transport_id,
692            TransportAddr::from_socket_addr(remote_socket_addr),
693            transport_name,
694        ))
695    }
696
697    /// Initiate a connection to a peer on a specific transport and address.
698    ///
699    /// For connectionless transports (UDP, Ethernet): allocates a link, starts
700    /// the Noise IK handshake, sends msg1, and registers the connection for
701    /// msg2 dispatch.
702    ///
703    /// For connection-oriented transports (TCP, Tor): allocates a link and
704    /// starts a non-blocking transport connect. The handshake is deferred
705    /// until the transport connection is established — the tick handler
706    /// polls `connection_state()` and initiates the handshake when ready.
707    pub(super) async fn initiate_connection(
708        &mut self,
709        transport_id: TransportId,
710        remote_addr: TransportAddr,
711        peer_identity: PeerIdentity,
712    ) -> Result<(), NodeError> {
713        let peer_node_addr = *peer_identity.node_addr();
714
715        if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
716            debug!(
717                peer = %self.peer_display_name(&peer_node_addr),
718                transport_id = %transport_id,
719                remote_addr = %remote_addr,
720                "Connection already in progress for candidate path"
721            );
722            return Ok(());
723        }
724
725        if self.outbound_handshake_slots() == 0 {
726            return Err(NodeError::MaxConnectionsExceeded {
727                max: self.max_connections,
728            });
729        }
730
731        if self.outbound_link_slots() == 0 {
732            return Err(NodeError::MaxLinksExceeded {
733                max: self.max_links,
734            });
735        }
736
737        if !self.peers.contains_key(&peer_node_addr)
738            && self.max_peers > 0
739            && self.peers.len() >= self.max_peers
740        {
741            return Err(NodeError::MaxPeersExceeded {
742                max: self.max_peers,
743            });
744        }
745
746        self.authorize_peer(
747            &peer_identity,
748            PeerAclContext::OutboundConnect,
749            transport_id,
750            &remote_addr,
751        )?;
752
753        let is_connection_oriented = self
754            .transports
755            .get(&transport_id)
756            .map(|t| t.transport_type().connection_oriented)
757            .unwrap_or(false);
758
759        // Allocate link ID and create link
760        let link_id = self.allocate_link_id();
761
762        let link = if is_connection_oriented {
763            Link::new(
764                link_id,
765                transport_id,
766                remote_addr.clone(),
767                LinkDirection::Outbound,
768                Duration::from_millis(self.config.node.base_rtt_ms),
769            )
770        } else {
771            Link::connectionless(
772                link_id,
773                transport_id,
774                remote_addr.clone(),
775                LinkDirection::Outbound,
776                Duration::from_millis(self.config.node.base_rtt_ms),
777            )
778        };
779
780        self.links.insert(link_id, link);
781
782        // Add reverse lookup for packet dispatch
783        self.addr_to_link
784            .insert((transport_id, remote_addr.clone()), link_id);
785
786        if is_connection_oriented {
787            // Connection-oriented: start non-blocking connect, defer handshake
788            if let Some(transport) = self.transports.get(&transport_id) {
789                match transport.connect(&remote_addr).await {
790                    Ok(()) => {
791                        debug!(
792                            peer = %self.peer_display_name(&peer_node_addr),
793                            transport_id = %transport_id,
794                            remote_addr = %remote_addr,
795                            link_id = %link_id,
796                            "Transport connect initiated (non-blocking)"
797                        );
798                        self.pending_connects.push(super::PendingConnect {
799                            link_id,
800                            transport_id,
801                            remote_addr,
802                            peer_identity,
803                        });
804                    }
805                    Err(e) => {
806                        // Clean up link
807                        self.links.remove(&link_id);
808                        self.addr_to_link.remove(&(transport_id, remote_addr));
809                        return Err(NodeError::TransportError(e.to_string()));
810                    }
811                }
812            }
813            Ok(())
814        } else {
815            // Connectionless: proceed with immediate handshake
816            self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
817                .await
818        }
819    }
820
821    /// Start the Noise handshake on a link and send msg1.
822    ///
823    /// Called immediately for connectionless transports, or after the
824    /// transport connection is established for connection-oriented transports.
825    pub(super) async fn start_handshake(
826        &mut self,
827        link_id: LinkId,
828        transport_id: TransportId,
829        remote_addr: TransportAddr,
830        peer_identity: PeerIdentity,
831    ) -> Result<(), NodeError> {
832        let peer_node_addr = *peer_identity.node_addr();
833
834        // Create connection in handshake phase (outbound knows expected identity)
835        let current_time_ms = Self::now_ms();
836        let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
837
838        // Allocate a session index for this handshake
839        let our_index = match self.index_allocator.allocate() {
840            Ok(idx) => idx,
841            Err(e) => {
842                // Clean up the link we just created
843                self.links.remove(&link_id);
844                self.addr_to_link.remove(&(transport_id, remote_addr));
845                return Err(NodeError::IndexAllocationFailed(e.to_string()));
846            }
847        };
848
849        // Start the Noise handshake and get message 1
850        let our_keypair = self.identity.keypair();
851        let noise_msg1 =
852            match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
853                Ok(msg) => msg,
854                Err(e) => {
855                    // Clean up the index and link
856                    let _ = self.index_allocator.free(our_index);
857                    self.links.remove(&link_id);
858                    self.addr_to_link.remove(&(transport_id, remote_addr));
859                    return Err(NodeError::HandshakeFailed(e.to_string()));
860                }
861            };
862
863        // Set index and transport info on the connection
864        connection.set_our_index(our_index);
865        connection.set_transport_id(transport_id);
866        connection.set_source_addr(remote_addr.clone());
867
868        // Build wire format msg1: [0x01][sender_idx:4 LE][noise_msg1:82]
869        let wire_msg1 = build_msg1(our_index, &noise_msg1);
870
871        debug!(
872            peer = %self.peer_display_name(&peer_node_addr),
873            transport_id = %transport_id,
874            remote_addr = %remote_addr,
875            link_id = %link_id,
876            our_index = %our_index,
877            "Connection initiated"
878        );
879
880        // Store msg1 for resend and schedule first resend
881        let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
882        connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
883
884        // Track in pending_outbound for msg2 dispatch
885        self.pending_outbound
886            .insert((transport_id, our_index.as_u32()), link_id);
887        self.connections.insert(link_id, connection);
888
889        // Send the wire format handshake message. If the very first send fails
890        // synchronously (for example an IPv6 candidate on an IPv4-only UDP
891        // socket), undo this candidate so the caller can try the next address
892        // in the same dial pass.
893        let send_result = match self.transports.get(&transport_id) {
894            Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
895            None => None,
896        };
897        match send_result {
898            Some(send_result) => {
899                self.note_local_send_outcome(&send_result);
900                match send_result {
901                    Ok(bytes) => {
902                        debug!(
903                            link_id = %link_id,
904                            our_index = %our_index,
905                            bytes,
906                            "Sent Noise handshake message 1 (wire format)"
907                        );
908                    }
909                    Err(e) => {
910                        warn!(
911                            link_id = %link_id,
912                            transport_id = %transport_id,
913                            remote_addr = %remote_addr,
914                            our_index = %our_index,
915                            error = %e,
916                            "Failed to send handshake message"
917                        );
918                        self.pending_outbound
919                            .remove(&(transport_id, our_index.as_u32()));
920                        self.connections.remove(&link_id);
921                        self.links.remove(&link_id);
922                        self.addr_to_link
923                            .remove(&(transport_id, remote_addr.clone()));
924                        let _ = self.index_allocator.free(our_index);
925                        return Err(NodeError::TransportError(e.to_string()));
926                    }
927                }
928            }
929            None => {
930                self.pending_outbound
931                    .remove(&(transport_id, our_index.as_u32()));
932                self.connections.remove(&link_id);
933                self.links.remove(&link_id);
934                self.addr_to_link
935                    .remove(&(transport_id, remote_addr.clone()));
936                let _ = self.index_allocator.free(our_index);
937                return Err(NodeError::TransportError(format!(
938                    "transport {transport_id} disappeared before first handshake send"
939                )));
940            }
941        }
942
943        Ok(())
944    }
945
946    /// Poll all transports for discovered peers and auto-connect.
947    ///
948    /// Called from the tick handler. Iterates operational transports,
949    /// drains their discovery buffers, and initiates connections to
950    /// newly discovered peers (if auto_connect is enabled).
951    pub(super) async fn poll_transport_discovery(&mut self) {
952        // Collect discoveries first to avoid borrow conflict with self
953        let mut to_connect = Vec::new();
954        let mut queued_per_peer: HashMap<NodeAddr, usize> = HashMap::new();
955        let mut connect_budget = self.discovery_connect_budget();
956        let mut skipped_budget = 0usize;
957
958        for transport in self.transports.values() {
959            if !transport.is_operational() {
960                continue;
961            }
962            if !transport.auto_connect() {
963                // Still drain the buffer so it doesn't grow unbounded
964                let _ = transport.discover();
965                continue;
966            }
967            let discovered = match transport.discover() {
968                Ok(peers) => peers,
969                Err(_) => continue,
970            };
971            for peer in discovered {
972                let discovered_transport_id = peer.transport_id;
973                let pubkey = match peer.pubkey_hint {
974                    Some(pk) => pk,
975                    None => continue,
976                };
977                let identity = PeerIdentity::from_pubkey(pubkey);
978                let node_addr = *identity.node_addr();
979
980                // Skip self
981                if node_addr == *self.identity.node_addr() {
982                    continue;
983                }
984
985                let Some((candidate_transport_id, remote_addr, transport_name)) =
986                    self.transport_discovery_candidate(discovered_transport_id, peer.addr)
987                else {
988                    continue;
989                };
990
991                if self.peers.contains_key(&node_addr) {
992                    let candidate = PeerAddress::new(transport_name, remote_addr.to_string());
993                    if self.active_peer_candidate_is_fresh_enough_to_skip(
994                        &node_addr,
995                        std::slice::from_ref(&candidate),
996                    ) {
997                        continue;
998                    }
999                    if self.is_connecting_to_peer_on_path(
1000                        &node_addr,
1001                        candidate_transport_id,
1002                        &remote_addr,
1003                    ) {
1004                        continue;
1005                    }
1006                    let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1007                    if connect_budget == 0
1008                        || self
1009                            .path_candidate_attempt_budget(&node_addr)
1010                            .saturating_sub(queued_for_peer)
1011                            == 0
1012                    {
1013                        skipped_budget = skipped_budget.saturating_add(1);
1014                        continue;
1015                    }
1016                    to_connect.push((candidate_transport_id, remote_addr, identity, true));
1017                    *queued_per_peer.entry(node_addr).or_default() += 1;
1018                    connect_budget = connect_budget.saturating_sub(1);
1019                    continue;
1020                }
1021
1022                if self.is_connecting_to_peer_on_path(
1023                    &node_addr,
1024                    candidate_transport_id,
1025                    &remote_addr,
1026                ) {
1027                    continue;
1028                }
1029
1030                let queued_for_peer = queued_per_peer.get(&node_addr).copied().unwrap_or(0);
1031                if connect_budget == 0
1032                    || self
1033                        .path_candidate_attempt_budget(&node_addr)
1034                        .saturating_sub(queued_for_peer)
1035                        == 0
1036                {
1037                    skipped_budget = skipped_budget.saturating_add(1);
1038                    continue;
1039                }
1040                to_connect.push((candidate_transport_id, remote_addr, identity, false));
1041                *queued_per_peer.entry(node_addr).or_default() += 1;
1042                connect_budget = connect_budget.saturating_sub(1);
1043            }
1044        }
1045
1046        if skipped_budget > 0 {
1047            debug!(
1048                skipped = skipped_budget,
1049                queued = to_connect.len(),
1050                "Transport discovery connect budget exhausted"
1051            );
1052        }
1053
1054        for (transport_id, remote_addr, identity, active_refresh) in to_connect {
1055            info!(
1056                peer = %self.peer_display_name(identity.node_addr()),
1057                transport_id = %transport_id,
1058                remote_addr = %remote_addr,
1059                active_refresh,
1060                "Auto-connecting to discovered peer"
1061            );
1062            if let Err(e) = self
1063                .initiate_connection(transport_id, remote_addr, identity)
1064                .await
1065            {
1066                warn!(error = %e, "Failed to auto-connect to discovered peer");
1067            }
1068        }
1069    }
1070
1071    pub(super) async fn poll_nostr_discovery(&mut self) {
1072        let Some(bootstrap) = self.nostr_discovery.clone() else {
1073            return;
1074        };
1075
1076        if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
1077            debug!(error = %err, "Failed to refresh local Nostr overlay advert");
1078        }
1079
1080        for event in bootstrap.drain_events().await {
1081            match event {
1082                BootstrapEvent::Established { traversal } => {
1083                    let peer_npub = traversal.peer_npub.clone();
1084                    match self.adopt_established_traversal(traversal).await {
1085                        Ok(_) => {
1086                            info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
1087                        }
1088                        Err(err) => {
1089                            warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
1090                            if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
1091                                self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
1092                            }
1093                        }
1094                    }
1095                }
1096                BootstrapEvent::Failed {
1097                    peer_config,
1098                    reason,
1099                } => {
1100                    let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
1101                        Ok(identity) => identity,
1102                        Err(_) => continue,
1103                    };
1104                    let node_addr = *peer_identity.node_addr();
1105                    if self.peers.contains_key(&node_addr) {
1106                        debug!(
1107                            npub = %peer_config.npub,
1108                            error = %reason,
1109                            "Ignoring failed NAT traversal for already-connected peer"
1110                        );
1111                        continue;
1112                    }
1113                    if self.is_connecting_to_peer(&node_addr) {
1114                        debug!(
1115                            npub = %peer_config.npub,
1116                            error = %reason,
1117                            "Ignoring failed NAT traversal while peer handshake is already in progress"
1118                        );
1119                        continue;
1120                    }
1121
1122                    let now_ms = Self::now_ms();
1123                    let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
1124                    if decision.should_warn {
1125                        warn!(
1126                            npub = %peer_config.npub,
1127                            error = %reason,
1128                            consecutive_failures = decision.consecutive_failures,
1129                            cooldown_secs = decision
1130                                .cooldown_until_ms
1131                                .map(|t| t.saturating_sub(now_ms) / 1000),
1132                            "NAT traversal failed"
1133                        );
1134                    } else {
1135                        debug!(
1136                            npub = %peer_config.npub,
1137                            error = %reason,
1138                            consecutive_failures = decision.consecutive_failures,
1139                            "NAT traversal failed (suppressed by warn-rate-limit)"
1140                        );
1141                    }
1142
1143                    // B6: stale-advert eviction on the streak-threshold
1144                    // crossing. Fire-and-forget; the outcome is logged so
1145                    // operators can see when peers get cleaned up.
1146                    if decision.crossed_threshold {
1147                        bootstrap
1148                            .request_advert_stale_check(peer_config.npub.clone())
1149                            .await;
1150                    }
1151
1152                    if self
1153                        .try_peer_addresses(&peer_config, peer_identity, false)
1154                        .await
1155                        .is_ok()
1156                    {
1157                        continue;
1158                    }
1159
1160                    self.schedule_retry(node_addr, now_ms);
1161                    if let Some(cooldown_until_ms) = decision.cooldown_until_ms
1162                        && let Some(state) = self.retry_pending.get_mut(&node_addr)
1163                    {
1164                        // Push the next retry past the cooldown so the
1165                        // open-discovery sweep doesn't re-enqueue and the
1166                        // per-attempt backoff doesn't fire sooner.
1167                        state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
1168                    }
1169                }
1170            }
1171        }
1172
1173        self.maybe_run_startup_open_discovery_sweep(&bootstrap)
1174            .await;
1175        self.queue_open_discovery_retries(&bootstrap).await;
1176    }
1177
1178    /// Resolve the LAN-only discovery scope. Applications with explicit
1179    /// connectivity config can set `node.discovery.lan.scope` without
1180    /// changing the public Nostr discovery `app` tag. The older fallback
1181    /// extracts a scope from the Nostr app tag used by default scoped
1182    /// discovery.
1183    pub(super) fn lan_discovery_scope(&self) -> Option<String> {
1184        if let Some(scope) = self.config.node.discovery.lan.scope.as_deref() {
1185            let scope = scope.trim();
1186            if !scope.is_empty() {
1187                return Some(scope.to_string());
1188            }
1189        }
1190
1191        let app = self.config.node.discovery.nostr.app.trim();
1192        if app.is_empty() {
1193            return None;
1194        }
1195        if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
1196            let scope = rest.trim();
1197            if scope.is_empty() {
1198                None
1199            } else {
1200                Some(scope.to_string())
1201            }
1202        } else {
1203            Some(app.to_string())
1204        }
1205    }
1206
1207    /// Drain mDNS-discovered peers and initiate Noise XX handshakes. For
1208    /// active peers this is a non-disruptive alternate-path refresh: the
1209    /// current link stays live until a new handshake authenticates and
1210    /// promotes. The handshake itself is the authentication — a spoofed
1211    /// mDNS advert with someone else's npub fails the XX exchange and
1212    /// is dropped.
1213    pub(super) async fn poll_lan_discovery(&mut self) {
1214        let Some(runtime) = self.lan_discovery.clone() else {
1215            return;
1216        };
1217        let events = runtime.drain_events().await;
1218        if events.is_empty() {
1219            return;
1220        }
1221        let mut connect_budget = self.discovery_connect_budget();
1222        let mut skipped_budget = 0usize;
1223        for event in events {
1224            let crate::discovery::lan::LanEvent::Discovered(peer) = event;
1225            let Some((transport_id, local_addr)) =
1226                self.find_udp_transport_for_remote_addr(peer.addr)
1227            else {
1228                debug!(
1229                    addr = %peer.addr,
1230                    "lan: skip discovered peer with no compatible UDP transport"
1231                );
1232                continue;
1233            };
1234            let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
1235                Ok(id) => id,
1236                Err(err) => {
1237                    debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
1238                    continue;
1239                }
1240            };
1241            let peer_node_addr = *identity.node_addr();
1242            let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
1243            if self.peers.contains_key(&peer_node_addr) {
1244                let candidate = PeerAddress::new("udp", peer.addr.to_string());
1245                if self.active_peer_candidate_is_fresh_enough_to_skip(
1246                    &peer_node_addr,
1247                    std::slice::from_ref(&candidate),
1248                ) {
1249                    continue;
1250                }
1251                if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1252                    continue;
1253                }
1254                if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1255                    skipped_budget = skipped_budget.saturating_add(1);
1256                    continue;
1257                }
1258                info!(
1259                    npub = %identity.short_npub(),
1260                    addr = %peer.addr,
1261                    local_addr = %local_addr,
1262                    "lan: initiating alternate-path handshake to active peer"
1263                );
1264                if let Err(err) = self
1265                    .initiate_connection(transport_id, remote_addr, identity)
1266                    .await
1267                {
1268                    debug!(
1269                        npub = %peer.npub,
1270                        error = %err,
1271                        "lan: failed to initiate active peer alternate-path handshake"
1272                    );
1273                }
1274                connect_budget = connect_budget.saturating_sub(1);
1275                continue;
1276            }
1277            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
1278                continue;
1279            }
1280            if connect_budget == 0 || self.path_candidate_attempt_budget(&peer_node_addr) == 0 {
1281                skipped_budget = skipped_budget.saturating_add(1);
1282                continue;
1283            }
1284            info!(
1285                npub = %identity.short_npub(),
1286                addr = %peer.addr,
1287                local_addr = %local_addr,
1288                "lan: initiating handshake to discovered peer"
1289            );
1290            if let Err(err) = self
1291                .initiate_connection(transport_id, remote_addr, identity)
1292                .await
1293            {
1294                debug!(
1295                    npub = %peer.npub,
1296                    error = %err,
1297                    "lan: failed to initiate connection to discovered peer"
1298                );
1299            }
1300            connect_budget = connect_budget.saturating_sub(1);
1301        }
1302        if skipped_budget > 0 {
1303            debug!(
1304                skipped = skipped_budget,
1305                "lan: discovery connect budget exhausted"
1306            );
1307        }
1308    }
1309
1310    /// Poll pending transport connects and initiate handshakes for ready ones.
1311    ///
1312    /// Called from the tick handler. For each pending connect, queries the
1313    /// transport's connection state. When a connection is established,
1314    /// marks the link as Connected and starts the Noise handshake.
1315    /// Failed connections are cleaned up and scheduled for retry.
1316    pub(super) async fn poll_pending_connects(&mut self) {
1317        if self.pending_connects.is_empty() {
1318            return;
1319        }
1320
1321        let mut completed = Vec::new();
1322
1323        for (i, pending) in self.pending_connects.iter().enumerate() {
1324            let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
1325                transport.connection_state(&pending.remote_addr)
1326            } else {
1327                crate::transport::ConnectionState::Failed("transport removed".into())
1328            };
1329
1330            match state {
1331                crate::transport::ConnectionState::Connected => {
1332                    completed.push((i, true, None));
1333                }
1334                crate::transport::ConnectionState::Failed(reason) => {
1335                    completed.push((i, false, Some(reason)));
1336                }
1337                crate::transport::ConnectionState::Connecting => {
1338                    // Still in progress, check on next tick
1339                }
1340                crate::transport::ConnectionState::None => {
1341                    // Shouldn't happen — treat as failure
1342                    completed.push((i, false, Some("no connection attempt found".into())));
1343                }
1344            }
1345        }
1346
1347        // Process completions in reverse order to preserve indices
1348        for (i, success, reason) in completed.into_iter().rev() {
1349            let pending = self.pending_connects.remove(i);
1350
1351            if success {
1352                // Mark link as Connected
1353                if let Some(link) = self.links.get_mut(&pending.link_id) {
1354                    link.set_connected();
1355                }
1356
1357                debug!(
1358                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1359                    transport_id = %pending.transport_id,
1360                    remote_addr = %pending.remote_addr,
1361                    link_id = %pending.link_id,
1362                    "Transport connected, starting handshake"
1363                );
1364
1365                // Start the handshake now that the transport is connected
1366                if let Err(e) = self
1367                    .start_handshake(
1368                        pending.link_id,
1369                        pending.transport_id,
1370                        pending.remote_addr.clone(),
1371                        pending.peer_identity,
1372                    )
1373                    .await
1374                {
1375                    warn!(
1376                        link_id = %pending.link_id,
1377                        error = %e,
1378                        "Failed to start handshake after transport connect"
1379                    );
1380                    // Clean up link on handshake failure
1381                    self.remove_link(&pending.link_id);
1382                }
1383            } else {
1384                let reason = reason.unwrap_or_default();
1385                warn!(
1386                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
1387                    transport_id = %pending.transport_id,
1388                    remote_addr = %pending.remote_addr,
1389                    link_id = %pending.link_id,
1390                    reason = %reason,
1391                    "Transport connect failed"
1392                );
1393
1394                // Clean up link and schedule retry
1395                self.remove_link(&pending.link_id);
1396                self.links.remove(&pending.link_id);
1397                self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
1398            }
1399        }
1400    }
1401
1402    // === State Transitions ===
1403
1404    /// Start the node.
1405    ///
1406    /// Initializes the TUN interface (if configured), spawns I/O threads,
1407    /// and transitions to the Running state.
1408    pub async fn start(&mut self) -> Result<(), NodeError> {
1409        node_start_debug_log("Node::start begin");
1410        if !self.state.can_start() {
1411            return Err(NodeError::AlreadyStarted);
1412        }
1413        self.state = NodeState::Starting;
1414        node_start_debug_log("Node::start state set to starting");
1415
1416        // Create packet channel for transport -> Node communication
1417        let packet_buffer_size = self.config.node.buffers.packet_channel;
1418        let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
1419        self.packet_tx = Some(packet_tx.clone());
1420        self.packet_rx = Some(packet_rx);
1421        node_start_debug_log("Node::start packet channel created");
1422
1423        // Initialize transports first (before TUN, before Nostr discovery).
1424        node_start_debug_log("Node::start create transports begin");
1425        let transport_handles = self.create_transports(&packet_tx).await;
1426        node_start_debug_log(format!(
1427            "Node::start create transports complete count={}",
1428            transport_handles.len()
1429        ));
1430
1431        for mut handle in transport_handles {
1432            let transport_id = handle.transport_id();
1433            let transport_type = handle.transport_type().name;
1434            let name = handle.name().map(|s| s.to_string());
1435
1436            node_start_debug_log(format!(
1437                "Node::start transport start begin id={} type={} name={:?}",
1438                transport_id, transport_type, name
1439            ));
1440            match handle.start().await {
1441                Ok(()) => {
1442                    node_start_debug_log(format!(
1443                        "Node::start transport start ok id={} type={}",
1444                        transport_id, transport_type
1445                    ));
1446                    self.transports.insert(transport_id, handle);
1447                }
1448                Err(e) => {
1449                    node_start_debug_log(format!(
1450                        "Node::start transport start error id={} type={} error={}",
1451                        transport_id, transport_type, e
1452                    ));
1453                    if let Some(ref n) = name {
1454                        warn!(transport_type, name = %n, error = %e, "Transport failed to start");
1455                    } else {
1456                        warn!(transport_type, error = %e, "Transport failed to start");
1457                    }
1458                }
1459            }
1460        }
1461
1462        if !self.transports.is_empty() {
1463            info!(count = self.transports.len(), "Transports initialized");
1464        }
1465
1466        // Spawn the off-task FMP-encrypt + UDP-send + FMP-decrypt
1467        // worker pools. **Unix only** — both pools issue direct
1468        // sendmmsg(2) / sendmsg(2)+UDP_GSO / recvmmsg(2) calls on
1469        // raw file descriptors via `AsRawFd`, which is a unix-only
1470        // trait. On Windows the rx_loop's tokio-based send/recv
1471        // remain the canonical path; the perf overhaul lands its
1472        // gains on unix.
1473        //
1474        // Worker count defaults to the number of CPUs, overridable
1475        // via `FIPS_ENCRYPT_WORKERS=N` / `FIPS_DECRYPT_WORKERS=N`
1476        // for debug / benchmarking. Hash-by-destination means a
1477        // single TCP flow pins to one worker (preserves wire
1478        // ordering); additional workers light up under multi-flow
1479        // / multi-peer load. See `node::encrypt_worker` /
1480        // `node::decrypt_worker` for full rationale.
1481        #[cfg(unix)]
1482        {
1483            if self.config.node.worker_pools_enabled {
1484                node_start_debug_log("Node::start worker pools begin");
1485                let cpu_default = std::thread::available_parallelism()
1486                    .map(|n| n.get())
1487                    .unwrap_or(1)
1488                    .max(1);
1489                let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
1490                    .ok()
1491                    .and_then(|s| s.parse().ok())
1492                    .unwrap_or(cpu_default)
1493                    .max(1);
1494                self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
1495                    encrypt_worker_count,
1496                ));
1497                info!(
1498                    workers = encrypt_worker_count,
1499                    "Spawned FMP-encrypt worker pool"
1500                );
1501
1502                // `FIPS_DECRYPT_WORKERS=0` disables the pool entirely and
1503                // falls through to the in-line rx_loop decrypt path (the
1504                // "test-mode" branch in `handle_encrypted_frame`, which is
1505                // in fact a fully functional synchronous decrypt). Useful
1506                // as an A/B against the worker pipeline when chasing
1507                // scheduling/queueing regressions on the native macOS
1508                // path. Any non-zero value (env or default) spawns the
1509                // pool as before.
1510                let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
1511                    .ok()
1512                    .and_then(|s| s.parse().ok())
1513                    .unwrap_or(cpu_default);
1514                if decrypt_worker_count == 0 {
1515                    info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
1516                } else {
1517                    self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
1518                        decrypt_worker_count,
1519                    ));
1520                    info!(
1521                        workers = decrypt_worker_count,
1522                        "Spawned FMP+FSP-decrypt worker pool"
1523                    );
1524                }
1525                node_start_debug_log("Node::start worker pools complete");
1526            } else {
1527                node_start_debug_log("Node::start worker pools disabled");
1528                info!("FIPS worker pools disabled; using in-line crypto/send path");
1529            }
1530        }
1531
1532        if self.config.node.discovery.nostr.enabled {
1533            node_start_debug_log("Node::start nostr discovery start begin");
1534            match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
1535                .await
1536            {
1537                Ok(runtime) => {
1538                    node_start_debug_log("Node::start nostr discovery runtime created");
1539                    if let Err(err) = self.refresh_overlay_advert(&runtime).await {
1540                        warn!(error = %err, "Failed to publish initial Nostr overlay advert");
1541                    }
1542                    node_start_debug_log("Node::start nostr overlay advert refreshed");
1543                    self.nostr_discovery = Some(runtime);
1544                    self.nostr_discovery_started_at_ms = Some(Self::now_ms());
1545                    info!("Nostr overlay discovery enabled");
1546                }
1547                Err(err) => {
1548                    node_start_debug_log(format!(
1549                        "Node::start nostr discovery start error error={}",
1550                        err
1551                    ));
1552                    warn!(error = %err, "Failed to start Nostr overlay discovery");
1553                }
1554            }
1555        }
1556
1557        // mDNS / DNS-SD LAN discovery. Independent of Nostr — runs even
1558        // when Nostr is disabled, since it gives us sub-second pairing
1559        // on the same link without any relay or NAT-traversal roundtrip.
1560        if self.config.node.discovery.lan.enabled {
1561            node_start_debug_log("Node::start lan discovery start begin");
1562            let advertised_udp_port = self
1563                .transports
1564                .values()
1565                .filter(|h| h.is_operational())
1566                .filter(|h| h.transport_type().name == "udp")
1567                .find_map(|h| h.local_addr().map(|addr| addr.port()))
1568                .unwrap_or(0);
1569            let scope = self.lan_discovery_scope();
1570            match crate::discovery::lan::LanDiscovery::start(
1571                &self.identity,
1572                scope,
1573                advertised_udp_port,
1574                self.config.node.discovery.lan.clone(),
1575            )
1576            .await
1577            {
1578                Ok(runtime) => {
1579                    node_start_debug_log("Node::start lan discovery start ok");
1580                    self.lan_discovery = Some(runtime);
1581                    info!("LAN mDNS discovery enabled");
1582                }
1583                Err(err) => {
1584                    node_start_debug_log(format!(
1585                        "Node::start lan discovery start error error={}",
1586                        err
1587                    ));
1588                    debug!(error = %err, "LAN mDNS discovery not started");
1589                }
1590            }
1591        }
1592
1593        // Connect to static peers before TUN is active
1594        // This allows handshake messages to be sent before we start accepting packets
1595        node_start_debug_log("Node::start initiate peer connections begin");
1596        self.initiate_peer_connections().await;
1597        node_start_debug_log("Node::start initiate peer connections complete");
1598
1599        // Initialize TUN interface last, after transports and peers are ready
1600        if self.config.tun.enabled {
1601            node_start_debug_log("Node::start tun init begin");
1602            let address = *self.identity.address();
1603            match TunDevice::create(&self.config.tun, address).await {
1604                Ok(device) => {
1605                    let mtu = device.mtu();
1606                    let name = device.name().to_string();
1607                    let our_addr = *device.address();
1608
1609                    info!("TUN device active:");
1610                    info!("     name: {}", name);
1611                    info!("  address: {}", device.address());
1612                    info!("      mtu: {}", mtu);
1613
1614                    // Calculate max MSS for TCP clamping
1615                    let effective_mtu = self.effective_ipv6_mtu();
1616                    let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); // IPv6 + TCP headers
1617
1618                    info!("effective MTU: {} bytes", effective_mtu);
1619                    debug!("   max TCP MSS: {} bytes", max_mss);
1620
1621                    // On macOS, create a shutdown pipe. Writing to it unblocks the
1622                    // reader thread's select() loop without closing the TUN fd
1623                    // (which would cause a double-close when TunDevice drops).
1624                    #[cfg(target_os = "macos")]
1625                    let (shutdown_read_fd, shutdown_write_fd) = {
1626                        let mut fds = [0i32; 2];
1627                        if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
1628                            return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
1629                                "failed to create shutdown pipe".into(),
1630                            )));
1631                        }
1632                        (fds[0], fds[1])
1633                    };
1634
1635                    // Create writer (dups the fd for independent write access).
1636                    // Pass path_mtu_lookup so inbound SYN-ACK clamp can read
1637                    // per-destination path MTU learned via discovery.
1638                    let (writer, tun_tx) =
1639                        device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
1640
1641                    // Spawn writer thread
1642                    let writer_handle = thread::spawn(move || {
1643                        writer.run();
1644                    });
1645
1646                    // Clone tun_tx for the reader
1647                    let reader_tun_tx = tun_tx.clone();
1648
1649                    // Create outbound channel for TUN reader → Node
1650                    let tun_channel_size = self.config.node.buffers.tun_channel;
1651                    let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
1652
1653                    // Spawn reader thread
1654                    let transport_mtu = self.transport_mtu();
1655                    let path_mtu_lookup = self.path_mtu_lookup.clone();
1656                    #[cfg(target_os = "macos")]
1657                    let reader_handle = thread::spawn(move || {
1658                        run_tun_reader(
1659                            device,
1660                            mtu,
1661                            our_addr,
1662                            reader_tun_tx,
1663                            outbound_tx,
1664                            transport_mtu,
1665                            path_mtu_lookup,
1666                            shutdown_read_fd,
1667                        );
1668                    });
1669                    #[cfg(not(target_os = "macos"))]
1670                    let reader_handle = thread::spawn(move || {
1671                        run_tun_reader(
1672                            device,
1673                            mtu,
1674                            our_addr,
1675                            reader_tun_tx,
1676                            outbound_tx,
1677                            transport_mtu,
1678                            path_mtu_lookup,
1679                        );
1680                    });
1681
1682                    self.tun_state = TunState::Active;
1683                    self.tun_name = Some(name);
1684                    self.tun_tx = Some(tun_tx);
1685                    self.tun_outbound_rx = Some(outbound_rx);
1686                    self.tun_reader_handle = Some(reader_handle);
1687                    self.tun_writer_handle = Some(writer_handle);
1688                    #[cfg(target_os = "macos")]
1689                    {
1690                        self.tun_shutdown_fd = Some(shutdown_write_fd);
1691                    }
1692                }
1693                Err(e) => {
1694                    self.tun_state = TunState::Failed;
1695                    warn!(error = %e, "Failed to initialize TUN, continuing without it");
1696                }
1697            }
1698            node_start_debug_log("Node::start tun init complete");
1699        }
1700
1701        // Initialize DNS responder (independent of TUN).
1702        //
1703        // Default bind_addr is "::1" (IPv6 loopback). The shipped
1704        // fips-dns-setup configures systemd-resolved via a global
1705        // /etc/systemd/resolved.conf.d/fips.conf drop-in pointing at
1706        // [::1]:5354, which sidesteps a Linux IPV6_PKTINFO behaviour
1707        // where self-destined traffic to fips0's address is attributed
1708        // to fips0 in PKTINFO and gets silently dropped by the
1709        // mesh-interface filter in src/upper/dns.rs.
1710        //
1711        // For mesh-reachable resolution (rare), set bind_addr: "::"
1712        // in fips.yaml. The mesh-interface filter remains active to
1713        // prevent hosts-file alias enumeration in that mode.
1714        // `IPV6_V6ONLY=0` is set explicitly so IPv4 clients on
1715        // 127.0.0.1 still reach us regardless of kernel sysctl
1716        // defaults — but only when bind is on a wildcard / IPv6 path.
1717        if self.config.dns.enabled {
1718            node_start_debug_log("Node::start dns init begin");
1719            let addr_str = self.config.dns.bind_addr();
1720            match addr_str.parse::<std::net::IpAddr>() {
1721                Ok(ip) => {
1722                    let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
1723                    match Self::bind_dns_socket(bind) {
1724                        Ok(socket) => {
1725                            let dns_channel_size = self.config.node.buffers.dns_channel;
1726                            let (identity_tx, identity_rx) =
1727                                tokio::sync::mpsc::channel(dns_channel_size);
1728                            let dns_ttl = self.config.dns.ttl();
1729                            let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
1730                                self.config.peers(),
1731                            );
1732                            let reloader = if self.config.node.system_files_enabled {
1733                                let hosts_path = std::path::PathBuf::from(
1734                                    crate::upper::hosts::DEFAULT_HOSTS_PATH,
1735                                );
1736                                crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
1737                            } else {
1738                                crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
1739                            };
1740                            // Resolve the TUN ifindex so the responder can
1741                            // drop queries arriving on the mesh interface
1742                            // (fips0). Without this, the `::` bind exposes
1743                            // /etc/fips/hosts alias probing to any mesh peer.
1744                            // When TUN isn't enabled or the name can't be
1745                            // resolved, `None` disables the filter (there
1746                            // is no mesh surface to defend anyway).
1747                            let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
1748                            info!(
1749                                bind = %bind,
1750                                hosts = reloader.hosts().len(),
1751                                mesh_ifindex = ?mesh_ifindex,
1752                                "DNS responder started for .fips domain (auto-reload enabled)"
1753                            );
1754                            let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
1755                                socket,
1756                                identity_tx,
1757                                dns_ttl,
1758                                reloader,
1759                                mesh_ifindex,
1760                            ));
1761                            self.dns_identity_rx = Some(identity_rx);
1762                            self.dns_task = Some(handle);
1763                        }
1764                        Err(e) => {
1765                            warn!(bind = %bind, error = %e, "Failed to start DNS responder");
1766                        }
1767                    }
1768                }
1769                Err(e) => {
1770                    warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
1771                }
1772            }
1773            node_start_debug_log("Node::start dns init complete");
1774        }
1775
1776        self.state = NodeState::Running;
1777        node_start_debug_log("Node::start running");
1778        info!("Node started:");
1779        info!("       state: {}", self.state);
1780        info!("  transports: {}", self.transports.len());
1781        info!(" connections: {}", self.connections.len());
1782        Ok(())
1783    }
1784
1785    /// Bind a UDP socket for the DNS responder.
1786    ///
1787    /// For IPv6 binds (including `::`), sets `IPV6_V6ONLY=0` so the socket
1788    /// also accepts IPv4-mapped addresses. This guarantees dual-stack
1789    /// delivery regardless of `net.ipv6.bindv6only` sysctl on the host —
1790    /// v4 clients on 127.0.0.1 and v6 clients on the fips0 address both
1791    /// land on the same socket.
1792    ///
1793    /// Also enables `IPV6_RECVPKTINFO` on IPv6 sockets so the responder
1794    /// can learn the arrival interface per packet. The responder uses that
1795    /// to drop queries arriving on the mesh TUN, closing the hosts-file
1796    /// probing side-channel created by the `::` bind.
1797    fn bind_dns_socket(
1798        addr: std::net::SocketAddr,
1799    ) -> Result<tokio::net::UdpSocket, std::io::Error> {
1800        use socket2::{Domain, Protocol, Socket, Type};
1801        let domain = if addr.is_ipv4() {
1802            Domain::IPV4
1803        } else {
1804            Domain::IPV6
1805        };
1806        let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
1807        if addr.is_ipv6() {
1808            sock.set_only_v6(false)?;
1809            #[cfg(unix)]
1810            Self::set_recv_pktinfo_v6(&sock)?;
1811        }
1812        sock.set_nonblocking(true)?;
1813        sock.bind(&addr.into())?;
1814        tokio::net::UdpSocket::from_std(sock.into())
1815    }
1816
1817    /// Enable `IPV6_RECVPKTINFO` on an IPv6 UDP socket.
1818    ///
1819    /// After this setsockopt, each `recvmsg()` call on the socket receives
1820    /// an `IPV6_PKTINFO` control message containing the arrival interface
1821    /// index, which the DNS responder uses for its mesh-interface filter.
1822    #[cfg(unix)]
1823    fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
1824        use std::os::fd::AsRawFd;
1825        let enable: libc::c_int = 1;
1826        let ret = unsafe {
1827            libc::setsockopt(
1828                sock.as_raw_fd(),
1829                libc::IPPROTO_IPV6,
1830                libc::IPV6_RECVPKTINFO,
1831                &enable as *const _ as *const libc::c_void,
1832                std::mem::size_of::<libc::c_int>() as libc::socklen_t,
1833            )
1834        };
1835        if ret < 0 {
1836            return Err(std::io::Error::last_os_error());
1837        }
1838        Ok(())
1839    }
1840
1841    /// Resolve the mesh TUN interface index by name.
1842    ///
1843    /// Returns `None` if the interface does not exist (e.g. TUN disabled
1844    /// or not yet created). A `None` result disables the DNS responder's
1845    /// mesh-interface filter — safe, because if there is no fips0 there
1846    /// is no mesh exposure to defend against.
1847    fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
1848        #[cfg(unix)]
1849        {
1850            let c_name = std::ffi::CString::new(name).ok()?;
1851            let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
1852            if idx == 0 { None } else { Some(idx) }
1853        }
1854        #[cfg(not(unix))]
1855        {
1856            let _ = name;
1857            None
1858        }
1859    }
1860
1861    /// Stop the node.
1862    ///
1863    /// Shuts down TUN interface, stops I/O threads, and transitions to
1864    /// the Stopped state.
1865    pub async fn stop(&mut self) -> Result<(), NodeError> {
1866        if !self.state.can_stop() {
1867            return Err(NodeError::NotStarted);
1868        }
1869        self.state = NodeState::Stopping;
1870        info!(state = %self.state, "Node stopping");
1871
1872        // Stop DNS responder
1873        if let Some(handle) = self.dns_task.take() {
1874            handle.abort();
1875            debug!("DNS responder stopped");
1876        }
1877
1878        // Send disconnect notifications to all active peers before closing transports
1879        self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
1880            .await;
1881
1882        // Stop Nostr overlay discovery background work and withdraw any advert.
1883        if let Some(bootstrap) = self.nostr_discovery.take()
1884            && let Err(e) = bootstrap.shutdown().await
1885        {
1886            warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
1887        }
1888
1889        // Tear down LAN mDNS responder + browser. Best-effort: the
1890        // OS will eventually time the advert out via its TTL even if
1891        // we don't get a clean unregister out before the daemon exits.
1892        if let Some(lan) = self.lan_discovery.take() {
1893            lan.shutdown().await;
1894        }
1895
1896        // Shutdown transports (they're packet producers)
1897        let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
1898        for transport_id in transport_ids {
1899            if let Some(mut handle) = self.transports.remove(&transport_id) {
1900                let transport_type = handle.transport_type().name;
1901                match handle.stop().await {
1902                    Ok(()) => {
1903                        info!(transport_id = %transport_id, transport_type, "Transport stopped");
1904                    }
1905                    Err(e) => {
1906                        warn!(
1907                            transport_id = %transport_id,
1908                            transport_type,
1909                            error = %e,
1910                            "Transport stop failed"
1911                        );
1912                    }
1913                }
1914            }
1915        }
1916
1917        // Drop packet channels
1918        self.packet_tx.take();
1919        self.packet_rx.take();
1920
1921        // Shutdown TUN interface
1922        if let Some(name) = self.tun_name.take() {
1923            info!(name = %name, "Shutting down TUN interface");
1924
1925            // Drop the tun_tx to signal the writer to stop
1926            self.tun_tx.take();
1927
1928            // Delete the interface (on Linux, causes reader to get EFAULT)
1929            if let Err(e) = shutdown_tun_interface(&name).await {
1930                warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
1931            }
1932
1933            // On macOS, signal the reader thread to exit by writing to the
1934            // shutdown pipe. The reader's select() will wake up and break.
1935            #[cfg(target_os = "macos")]
1936            if let Some(fd) = self.tun_shutdown_fd.take() {
1937                unsafe {
1938                    libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
1939                    libc::close(fd);
1940                }
1941            }
1942
1943            // Wait for threads to finish
1944            if let Some(handle) = self.tun_reader_handle.take() {
1945                let _ = handle.join();
1946            }
1947            if let Some(handle) = self.tun_writer_handle.take() {
1948                let _ = handle.join();
1949            }
1950
1951            self.tun_state = TunState::Disabled;
1952        }
1953
1954        self.state = NodeState::Stopped;
1955        info!(state = %self.state, "Node stopped");
1956        Ok(())
1957    }
1958
1959    /// Send disconnect notifications to all active peers.
1960    ///
1961    /// Best-effort: send failures are logged and ignored since the transport
1962    /// may already be degraded. This runs before transports are shut down.
1963    async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
1964        let disconnect = Disconnect::new(reason);
1965        let plaintext = disconnect.encode();
1966
1967        // Collect node_addrs to avoid borrow conflict with send helper
1968        let peer_addrs: Vec<NodeAddr> = self
1969            .peers
1970            .iter()
1971            .filter(|(_, peer)| peer.can_send() && peer.has_session())
1972            .map(|(addr, _)| *addr)
1973            .collect();
1974
1975        if peer_addrs.is_empty() {
1976            debug!(
1977                total_peers = self.peers.len(),
1978                "No sendable peers for disconnect notification"
1979            );
1980            return;
1981        }
1982
1983        let mut sent = 0usize;
1984        for node_addr in &peer_addrs {
1985            match self
1986                .send_encrypted_link_message(node_addr, &plaintext)
1987                .await
1988            {
1989                Ok(()) => sent += 1,
1990                Err(e) => {
1991                    debug!(
1992                        peer = %self.peer_display_name(node_addr),
1993                        error = %e,
1994                        "Failed to send disconnect (transport may be down)"
1995                    );
1996                }
1997            }
1998        }
1999
2000        info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
2001    }
2002
2003    fn static_peer_addresses(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
2004        peer_config
2005            .addresses_by_priority()
2006            .into_iter()
2007            .cloned()
2008            .collect()
2009    }
2010
2011    async fn nostr_peer_fallback_addresses(
2012        &self,
2013        peer_config: &PeerConfig,
2014        existing: &[PeerAddress],
2015    ) -> Vec<PeerAddress> {
2016        if !self.config.node.discovery.nostr.enabled
2017            || self.config.node.discovery.nostr.policy
2018                == crate::config::NostrDiscoveryPolicy::Disabled
2019        {
2020            return Vec::new();
2021        }
2022
2023        let Some(bootstrap) = self.nostr_discovery.clone() else {
2024            return Vec::new();
2025        };
2026        let endpoints = match bootstrap
2027            .cached_advert_endpoints_for_peer(&peer_config.npub)
2028            .await
2029        {
2030            Some(endpoints) => endpoints,
2031            None => {
2032                debug!(
2033                    npub = %peer_config.npub,
2034                    "No cached Nostr advert endpoints for configured peer"
2035                );
2036                return Vec::new();
2037            }
2038        };
2039
2040        let mut fallback = Vec::new();
2041        let mut next_priority = existing
2042            .iter()
2043            .map(|addr| addr.priority)
2044            .max()
2045            .unwrap_or(100)
2046            .saturating_add(1);
2047        // Stamp every overlay-derived candidate with the current wall clock
2048        // so the dialer ranks it ahead of unstamped operator hints. We use a
2049        // single timestamp per fetch (rather than per-endpoint) because all
2050        // candidates in a single advert are equally fresh.
2051        let seen_at_ms = Self::now_ms();
2052        for endpoint in endpoints {
2053            let Some(candidate) =
2054                Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
2055            else {
2056                continue;
2057            };
2058            if existing
2059                .iter()
2060                .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
2061                || fallback.iter().any(|addr: &PeerAddress| {
2062                    addr.transport == candidate.transport && addr.addr == candidate.addr
2063                })
2064            {
2065                continue;
2066            }
2067            fallback.push(candidate);
2068            next_priority = next_priority.saturating_add(1);
2069        }
2070        fallback
2071    }
2072
2073    async fn request_nostr_bootstrap(&self, peer_config: &PeerConfig) -> bool {
2074        if !self.config.node.discovery.nostr.enabled
2075            || self.config.node.discovery.nostr.policy
2076                == crate::config::NostrDiscoveryPolicy::Disabled
2077        {
2078            return false;
2079        }
2080        let Some(bootstrap) = self.nostr_discovery.clone() else {
2081            return false;
2082        };
2083        bootstrap.request_connect(peer_config.clone()).await;
2084        info!(npub = %peer_config.npub, "Started background Nostr UDP NAT traversal attempt");
2085        true
2086    }
2087
2088    fn overlay_endpoint_to_peer_address(
2089        endpoint: &OverlayEndpointAdvert,
2090        priority: u8,
2091        seen_at_ms: u64,
2092    ) -> Option<PeerAddress> {
2093        let transport = match endpoint.transport {
2094            OverlayTransportKind::Udp => "udp",
2095            OverlayTransportKind::Tcp => "tcp",
2096            OverlayTransportKind::Tor => "tor",
2097        };
2098        Some(
2099            PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
2100                .with_seen_at_ms(seen_at_ms),
2101        )
2102    }
2103
2104    async fn attempt_peer_address_list(
2105        &mut self,
2106        peer_config: &PeerConfig,
2107        peer_identity: PeerIdentity,
2108        allow_bootstrap_nat: bool,
2109        addresses: &[PeerAddress],
2110    ) -> Result<(), NodeError> {
2111        let mut attempted = false;
2112        let peer_node_addr = *peer_identity.node_addr();
2113        let mut concrete_budget = self.path_candidate_attempt_budget(&peer_node_addr);
2114
2115        for addr in addresses {
2116            if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
2117                if !allow_bootstrap_nat {
2118                    continue;
2119                }
2120                if self.request_nostr_bootstrap(peer_config).await {
2121                    attempted = true;
2122                    continue;
2123                }
2124                debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
2125                continue;
2126            }
2127
2128            let (transport_id, remote_addr) = if addr.transport == "ethernet" {
2129                match self.resolve_ethernet_addr(&addr.addr) {
2130                    Ok(result) => result,
2131                    Err(e) => {
2132                        debug!(
2133                            transport = %addr.transport,
2134                            addr = %addr.addr,
2135                            error = %e,
2136                            "Failed to resolve Ethernet address"
2137                        );
2138                        continue;
2139                    }
2140                }
2141            } else if addr.transport == "ble" {
2142                #[cfg(bluer_available)]
2143                {
2144                    match self.resolve_ble_addr(&addr.addr) {
2145                        Ok(result) => result,
2146                        Err(e) => {
2147                            debug!(
2148                                transport = %addr.transport,
2149                                addr = %addr.addr,
2150                                error = %e,
2151                                "Failed to resolve BLE address"
2152                            );
2153                            continue;
2154                        }
2155                    }
2156                }
2157                #[cfg(not(bluer_available))]
2158                {
2159                    debug!(transport = %addr.transport, "BLE transport not available on this build");
2160                    continue;
2161                }
2162            } else {
2163                let tid = if addr.transport == "udp"
2164                    && let Ok(remote_socket_addr) = addr.addr.parse::<SocketAddr>()
2165                {
2166                    match self.find_udp_transport_for_remote_addr(remote_socket_addr) {
2167                        Some((id, _)) => id,
2168                        None => {
2169                            debug!(
2170                                transport = %addr.transport,
2171                                addr = %addr.addr,
2172                                "No compatible operational UDP transport for address"
2173                            );
2174                            continue;
2175                        }
2176                    }
2177                } else {
2178                    match self.find_transport_for_type(&addr.transport) {
2179                        Some(id) => id,
2180                        None => {
2181                            debug!(
2182                                transport = %addr.transport,
2183                                addr = %addr.addr,
2184                                "No operational transport for address type"
2185                            );
2186                            continue;
2187                        }
2188                    }
2189                };
2190                (tid, TransportAddr::from_string(&addr.addr))
2191            };
2192
2193            if self.is_connecting_to_peer_on_path(&peer_node_addr, transport_id, &remote_addr) {
2194                attempted = true;
2195                debug!(
2196                    npub = %peer_config.npub,
2197                    transport_id = %transport_id,
2198                    remote_addr = %remote_addr,
2199                    "Skipping duplicate in-flight candidate path"
2200                );
2201                continue;
2202            }
2203
2204            if concrete_budget == 0 {
2205                debug!(
2206                    npub = %peer_config.npub,
2207                    max_candidates = MAX_PARALLEL_PATH_CANDIDATES_PER_PEER,
2208                    "Path candidate race budget exhausted"
2209                );
2210                break;
2211            }
2212
2213            match self
2214                .initiate_connection(transport_id, remote_addr, peer_identity)
2215                .await
2216            {
2217                Ok(()) => {
2218                    attempted = true;
2219                    concrete_budget = concrete_budget.saturating_sub(1);
2220                }
2221                Err(e @ NodeError::AccessDenied(_)) => return Err(e),
2222                Err(e) => {
2223                    debug!(
2224                        npub = %peer_config.npub,
2225                        transport_id = %transport_id,
2226                        error = %e,
2227                        "Connection attempt failed, trying next address"
2228                    );
2229                }
2230            }
2231        }
2232
2233        if attempted {
2234            return Ok(());
2235        }
2236
2237        Err(NodeError::NoTransportForType(format!(
2238            "no operational transport for any of {}'s addresses",
2239            peer_config.npub
2240        )))
2241    }
2242
2243    async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
2244        self.run_open_discovery_sweep(bootstrap, None, "per-tick")
2245            .await;
2246    }
2247
2248    /// Open-discovery cache sweep. Iterates the cached overlay adverts and
2249    /// queues retries for non-configured, not-yet-connected peers.
2250    ///
2251    /// `max_age_secs`, if set, filters out adverts whose `created_at` is
2252    /// older than `now - max_age_secs`. The per-tick sweep passes `None`
2253    /// (relies on the cache's own `valid_until_ms` filter); the one-shot
2254    /// startup sweep passes `Some(startup_sweep_max_age_secs)`.
2255    ///
2256    /// `caller` is a short label included in log lines so per-tick and
2257    /// startup sweeps are distinguishable in operator-facing logs.
2258    pub(in crate::node) async fn run_open_discovery_sweep(
2259        &mut self,
2260        bootstrap: &std::sync::Arc<NostrDiscovery>,
2261        max_age_secs: Option<u64>,
2262        caller: &'static str,
2263    ) {
2264        if !self.config.node.discovery.nostr.enabled
2265            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2266        {
2267            return;
2268        }
2269
2270        let configured_npubs = self
2271            .config
2272            .peers()
2273            .iter()
2274            .map(|peer| peer.npub.clone())
2275            .collect::<HashSet<_>>();
2276        let now_ms = Self::now_ms();
2277        let now_secs = now_ms / 1000;
2278        let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
2279        if enqueue_budget == 0 {
2280            debug!(
2281                caller = %caller,
2282                "open-discovery sweep: enqueue budget is 0, skipping"
2283            );
2284            return;
2285        }
2286
2287        let candidates = bootstrap.cached_open_discovery_candidates(64).await;
2288        let cached_count = candidates.len();
2289        let mut enqueued = 0usize;
2290        let mut skipped_age = 0usize;
2291        let mut skipped_configured = 0usize;
2292        let mut skipped_self = 0usize;
2293        let mut skipped_connected = 0usize;
2294        let mut skipped_retry_pending = 0usize;
2295        let mut skipped_connecting = 0usize;
2296        let mut skipped_no_endpoints = 0usize;
2297        let mut skipped_invalid_npub = 0usize;
2298        let mut skipped_cooldown = 0usize;
2299
2300        for (npub, endpoints, created_at_secs) in candidates {
2301            if enqueue_budget == 0 {
2302                break;
2303            }
2304
2305            if let Some(max_age) = max_age_secs
2306                && now_secs.saturating_sub(created_at_secs) > max_age
2307            {
2308                skipped_age = skipped_age.saturating_add(1);
2309                continue;
2310            }
2311
2312            if configured_npubs.contains(&npub) {
2313                // Configured peers don't go through the open-discovery
2314                // enqueue path — their `PeerConfig` is already in
2315                // `self.config.peers()`, so the regular retry queue is
2316                // what drives their reconnect. But on cold start with
2317                // NAT'd peers, every initial `initiate_peer_connection`
2318                // fails (no overlay data yet, static cache hints empty
2319                // or stale), each pushes the peer into `retry_pending`
2320                // with exponential backoff (5/10/20/40/80s), and by the
2321                // time the next backoff slot fires the Nostr advert is
2322                // already cached — we just don't act on it for ~80s.
2323                //
2324                // The arrival of an advert (which this sweep sees) means
2325                // we now have a path to dial. If the peer's retry is
2326                // scheduled in the future, pull it forward to "now" so
2327                // the next `process_pending_retries` tick fires it
2328                // immediately. The retry path (`initiate_peer_retry_
2329                // connection` → `try_peer_addresses`) then refetches
2330                // the advert and dials it — no behavioral change
2331                // beyond schedule timing.
2332                if let Ok(identity) = PeerIdentity::from_npub(&npub) {
2333                    let configured_addr = *identity.node_addr();
2334                    if let Some(state) = self.retry_pending.get_mut(&configured_addr)
2335                        && state.retry_after_ms > now_ms
2336                    {
2337                        state.retry_after_ms = now_ms;
2338                        debug!(
2339                            caller = %caller,
2340                            peer = %self.peer_display_name(&configured_addr),
2341                            advert_age_secs = now_secs.saturating_sub(created_at_secs),
2342                            "Expediting configured-peer retry after fresh overlay advert"
2343                        );
2344                    }
2345                }
2346                skipped_configured = skipped_configured.saturating_add(1);
2347                continue;
2348            }
2349
2350            let peer_identity = match PeerIdentity::from_npub(&npub) {
2351                Ok(identity) => identity,
2352                Err(_) => {
2353                    skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
2354                    continue;
2355                }
2356            };
2357            let node_addr = *peer_identity.node_addr();
2358            if node_addr == *self.identity.node_addr() {
2359                skipped_self = skipped_self.saturating_add(1);
2360                continue;
2361            }
2362            if self.peers.contains_key(&node_addr) {
2363                skipped_connected = skipped_connected.saturating_add(1);
2364                continue;
2365            }
2366            if self.retry_pending.contains_key(&node_addr) {
2367                skipped_retry_pending = skipped_retry_pending.saturating_add(1);
2368                continue;
2369            }
2370            if bootstrap.cooldown_until(&npub, now_ms).is_some() {
2371                skipped_cooldown = skipped_cooldown.saturating_add(1);
2372                continue;
2373            }
2374            let connecting = self.connections.values().any(|conn| {
2375                conn.expected_identity()
2376                    .map(|id| id.node_addr() == &node_addr)
2377                    .unwrap_or(false)
2378            });
2379            if connecting {
2380                skipped_connecting = skipped_connecting.saturating_add(1);
2381                continue;
2382            }
2383
2384            let mut addresses = Vec::new();
2385            let mut priority = 120u8;
2386            let seen_at_ms = Self::now_ms();
2387            for endpoint in endpoints {
2388                let Some(candidate) =
2389                    Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
2390                else {
2391                    continue;
2392                };
2393                if addresses.iter().any(|existing: &PeerAddress| {
2394                    existing.transport == candidate.transport && existing.addr == candidate.addr
2395                }) {
2396                    continue;
2397                }
2398                addresses.push(candidate);
2399                priority = priority.saturating_add(1);
2400            }
2401            if addresses.is_empty() {
2402                skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
2403                continue;
2404            }
2405
2406            self.peer_aliases
2407                .entry(node_addr)
2408                .or_insert_with(|| peer_identity.short_npub());
2409            self.register_identity(node_addr, peer_identity.pubkey_full());
2410
2411            let mut state = super::retry::RetryState::new(PeerConfig {
2412                npub: npub.clone(),
2413                alias: None,
2414                addresses,
2415                connect_policy: ConnectPolicy::AutoConnect,
2416                auto_reconnect: true,
2417            });
2418            state.reconnect = false;
2419            state.retry_after_ms = now_ms;
2420            state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
2421            self.retry_pending.insert(node_addr, state);
2422            info!(
2423                caller = %caller,
2424                peer = %peer_identity.short_npub(),
2425                advert_age_secs = now_secs.saturating_sub(created_at_secs),
2426                "open-discovery sweep: queued retry for cached advert"
2427            );
2428            enqueue_budget = enqueue_budget.saturating_sub(1);
2429            enqueued = enqueued.saturating_add(1);
2430        }
2431
2432        // Always log a one-line summary on the startup sweep so operators
2433        // can verify it ran. Per-tick sweeps are noisier; only summarize
2434        // when something happened.
2435        let total_skipped = skipped_age
2436            + skipped_configured
2437            + skipped_self
2438            + skipped_connected
2439            + skipped_retry_pending
2440            + skipped_connecting
2441            + skipped_no_endpoints
2442            + skipped_invalid_npub
2443            + skipped_cooldown;
2444        let should_summarize = caller == "startup" || enqueued > 0;
2445        if should_summarize {
2446            info!(
2447                caller = %caller,
2448                cached = cached_count,
2449                queued = enqueued,
2450                skipped_age = skipped_age,
2451                skipped_configured = skipped_configured,
2452                skipped_self = skipped_self,
2453                skipped_connected = skipped_connected,
2454                skipped_retry_pending = skipped_retry_pending,
2455                skipped_connecting = skipped_connecting,
2456                skipped_no_endpoints = skipped_no_endpoints,
2457                skipped_invalid_npub = skipped_invalid_npub,
2458                skipped_cooldown = skipped_cooldown,
2459                skipped_total = total_skipped,
2460                "open-discovery sweep complete"
2461            );
2462        }
2463    }
2464
2465    /// One-shot startup sweep: runs once after the configured settle
2466    /// delay, iterating the cached overlay adverts and queueing retries
2467    /// for any peer with a recent enough advert that we haven't already
2468    /// configured statically or established a link to.
2469    ///
2470    /// Gated identically to [`run_open_discovery_sweep`]: requires
2471    /// `node.discovery.nostr.enabled` and `policy == open`.
2472    async fn maybe_run_startup_open_discovery_sweep(
2473        &mut self,
2474        bootstrap: &std::sync::Arc<NostrDiscovery>,
2475    ) {
2476        if self.startup_open_discovery_sweep_done {
2477            return;
2478        }
2479        if !self.config.node.discovery.nostr.enabled
2480            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2481        {
2482            // Mark done so we don't keep re-checking on every tick.
2483            self.startup_open_discovery_sweep_done = true;
2484            return;
2485        }
2486        let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
2487            return;
2488        };
2489        let now_ms = Self::now_ms();
2490        let delay_ms = self
2491            .config
2492            .node
2493            .discovery
2494            .nostr
2495            .startup_sweep_delay_secs
2496            .saturating_mul(1000);
2497        if now_ms < started_at_ms.saturating_add(delay_ms) {
2498            return;
2499        }
2500
2501        let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
2502        self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
2503            .await;
2504        self.startup_open_discovery_sweep_done = true;
2505    }
2506
2507    fn available_outbound_slots(&self) -> usize {
2508        let connection_used = self
2509            .connections
2510            .len()
2511            .saturating_add(self.pending_connects.len());
2512        let connection_slots = if self.max_connections == 0 {
2513            usize::MAX
2514        } else {
2515            self.max_connections.saturating_sub(connection_used)
2516        };
2517
2518        let peer_slots = if self.max_peers == 0 {
2519            usize::MAX
2520        } else {
2521            self.max_peers.saturating_sub(self.peers.len())
2522        };
2523
2524        connection_slots.min(peer_slots)
2525    }
2526
2527    fn open_discovery_enqueue_budget(&self, configured_npubs: &HashSet<String>) -> usize {
2528        let current_open_discovery_pending = self
2529            .retry_pending
2530            .values()
2531            .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
2532            .count();
2533
2534        let cap_remaining = self
2535            .config
2536            .node
2537            .discovery
2538            .nostr
2539            .open_discovery_max_pending
2540            .saturating_sub(current_open_discovery_pending);
2541
2542        cap_remaining.min(self.available_outbound_slots())
2543    }
2544
2545    fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
2546        now_ms.saturating_add(
2547            self.config
2548                .node
2549                .discovery
2550                .nostr
2551                .advert_ttl_secs
2552                .saturating_mul(1000)
2553                .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
2554        )
2555    }
2556
2557    async fn build_overlay_advert(
2558        &self,
2559        bootstrap: &std::sync::Arc<NostrDiscovery>,
2560    ) -> Option<OverlayAdvert> {
2561        if !self.config.node.discovery.nostr.enabled {
2562            return None;
2563        }
2564
2565        let mut endpoints = Vec::new();
2566        let mut has_udp_nat = false;
2567
2568        for handle in self.transports.values() {
2569            if !handle.is_operational() {
2570                continue;
2571            }
2572
2573            match handle.transport_type().name {
2574                "udp" => {
2575                    let Some(cfg) = self.lookup_udp_config(handle.name()) else {
2576                        continue;
2577                    };
2578                    if !cfg.advertise_on_nostr() {
2579                        continue;
2580                    }
2581                    if cfg.is_public() {
2582                        // Precedence:
2583                        // 1. operator-supplied `external_addr` (skips STUN)
2584                        // 2. non-wildcard *public* `local_addr` (operator
2585                        //    bound to a specific public IP directly)
2586                        // 3. STUN auto-discovery against ephemeral socket
2587                        //    (also taken when bind is wildcard *or* private —
2588                        //    a private bind is not peer-reachable, so we
2589                        //    must publish the public reflexive instead)
2590                        // 4. loud warn + omit endpoint
2591                        if let Some(explicit) = cfg.external_advert_addr() {
2592                            endpoints.push(OverlayEndpointAdvert {
2593                                transport: OverlayTransportKind::Udp,
2594                                addr: explicit.to_string(),
2595                            });
2596                        } else {
2597                            match handle.local_addr() {
2598                                Some(addr)
2599                                    if !addr.ip().is_unspecified()
2600                                        && !is_unroutable_advert_ip(addr.ip()) =>
2601                                {
2602                                    endpoints.push(OverlayEndpointAdvert {
2603                                        transport: OverlayTransportKind::Udp,
2604                                        addr: addr.to_string(),
2605                                    });
2606                                }
2607                                Some(addr) => {
2608                                    let key = handle.transport_id().as_u32();
2609                                    let port = addr.port();
2610                                    if let Some(public) =
2611                                        bootstrap.learn_public_udp_addr(key, port).await
2612                                    {
2613                                        endpoints.push(OverlayEndpointAdvert {
2614                                            transport: OverlayTransportKind::Udp,
2615                                            addr: public.to_string(),
2616                                        });
2617                                    } else {
2618                                        warn!(
2619                                            transport_id = key,
2620                                            bind_addr = %addr,
2621                                            "advert: udp public=true but bind is wildcard \
2622                                            or private and STUN observation failed; \
2623                                            advertising no UDP endpoint. Either set \
2624                                            transports.udp.external_addr, bind to a \
2625                                            specific *public* IP, or ensure \
2626                                            node.discovery.nostr.stun_servers is reachable"
2627                                        );
2628                                    }
2629                                }
2630                                None => {}
2631                            }
2632                        }
2633                    } else {
2634                        endpoints.push(OverlayEndpointAdvert {
2635                            transport: OverlayTransportKind::Udp,
2636                            addr: "nat".to_string(),
2637                        });
2638                        has_udp_nat = true;
2639                    }
2640                }
2641                "tcp" => {
2642                    let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
2643                        continue;
2644                    };
2645                    if !cfg.advertise_on_nostr() {
2646                        continue;
2647                    }
2648                    // Precedence:
2649                    // 1. operator-supplied `external_addr` (only path that
2650                    //    works on cloud-NAT setups where the public IP is
2651                    //    not on a host interface).
2652                    // 2. non-wildcard *public* `local_addr` (operator bound
2653                    //    to a specific public IP directly).
2654                    // 3. loud warn + omit endpoint (no TCP STUN equivalent).
2655                    //
2656                    // A wildcard *or* private bind is never advertised as-is
2657                    // — peers off-LAN can't reach a private bind, and there
2658                    // is no TCP STUN to discover a public reflexive.
2659                    if let Some(explicit) = cfg.external_advert_addr() {
2660                        endpoints.push(OverlayEndpointAdvert {
2661                            transport: OverlayTransportKind::Tcp,
2662                            addr: explicit.to_string(),
2663                        });
2664                    } else {
2665                        match handle.local_addr() {
2666                            Some(addr)
2667                                if !addr.ip().is_unspecified()
2668                                    && !is_unroutable_advert_ip(addr.ip()) =>
2669                            {
2670                                endpoints.push(OverlayEndpointAdvert {
2671                                    transport: OverlayTransportKind::Tcp,
2672                                    addr: addr.to_string(),
2673                                });
2674                            }
2675                            Some(addr) => {
2676                                warn!(
2677                                    bind_addr = %addr,
2678                                    "advert: tcp advertise_on_nostr=true bound to wildcard \
2679                                    or private IP and no transports.tcp.external_addr set; \
2680                                    advertising no TCP endpoint. Either set external_addr \
2681                                    to the public IP (recommended for cloud 1:1-NAT setups) \
2682                                    or bind explicitly to the public IP"
2683                                );
2684                            }
2685                            None => {}
2686                        }
2687                    }
2688                }
2689                "tor" => {
2690                    let Some(cfg) = self.lookup_tor_config(handle.name()) else {
2691                        continue;
2692                    };
2693                    if !cfg.advertise_on_nostr() {
2694                        continue;
2695                    }
2696                    if let Some(addr) = handle.onion_address() {
2697                        endpoints.push(OverlayEndpointAdvert {
2698                            transport: OverlayTransportKind::Tor,
2699                            addr: format!("{}:{}", addr, cfg.advertised_port()),
2700                        });
2701                    }
2702                }
2703                _ => {}
2704            }
2705        }
2706
2707        if endpoints.is_empty() {
2708            return None;
2709        }
2710
2711        Some(OverlayAdvert {
2712            identifier: ADVERT_IDENTIFIER.to_string(),
2713            version: ADVERT_VERSION,
2714            endpoints,
2715            signal_relays: has_udp_nat.then(|| self.config.node.discovery.nostr.dm_relays.clone()),
2716            stun_servers: has_udp_nat
2717                .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
2718        })
2719    }
2720
2721    async fn refresh_overlay_advert(
2722        &self,
2723        bootstrap: &std::sync::Arc<NostrDiscovery>,
2724    ) -> Result<(), crate::discovery::nostr::BootstrapError> {
2725        let advert = self.build_overlay_advert(bootstrap).await;
2726        bootstrap.update_local_advert(advert).await
2727    }
2728
2729    fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
2730        match (&self.config.transports.udp, transport_name) {
2731            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2732            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2733            _ => None,
2734        }
2735    }
2736
2737    fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
2738        match (&self.config.transports.tcp, transport_name) {
2739            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2740            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2741            _ => None,
2742        }
2743    }
2744
2745    fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
2746        match (&self.config.transports.tor, transport_name) {
2747            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2748            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2749            _ => None,
2750        }
2751    }
2752
2753    pub(in crate::node) async fn try_peer_addresses(
2754        &mut self,
2755        peer_config: &PeerConfig,
2756        peer_identity: PeerIdentity,
2757        allow_bootstrap_nat: bool,
2758    ) -> Result<(), NodeError> {
2759        let peer_node_addr = *peer_identity.node_addr();
2760        if self.peers.contains_key(&peer_node_addr) {
2761            debug!(
2762                npub = %peer_config.npub,
2763                "Peer already exists, skipping address attempts"
2764            );
2765            return Ok(());
2766        }
2767
2768        let candidates = self.peer_address_candidates(peer_config).await;
2769
2770        if candidates.is_empty() {
2771            if allow_bootstrap_nat && self.request_nostr_bootstrap(peer_config).await {
2772                return Ok(());
2773            }
2774            return Err(NodeError::NoTransportForType(format!(
2775                "no addresses known for {}",
2776                peer_config.npub
2777            )));
2778        }
2779
2780        if self
2781            .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
2782            .await
2783            .is_ok()
2784        {
2785            return Ok(());
2786        }
2787
2788        Err(NodeError::NoTransportForType(format!(
2789            "no operational transport for any of {}'s addresses",
2790            peer_config.npub
2791        )))
2792    }
2793
2794    async fn try_active_peer_alternative_addresses(
2795        &mut self,
2796        peer_config: &PeerConfig,
2797        peer_identity: PeerIdentity,
2798    ) -> Result<bool, NodeError> {
2799        let peer_node_addr = *peer_identity.node_addr();
2800        let candidates = self.peer_address_candidates(peer_config).await;
2801
2802        if candidates.is_empty() {
2803            return Err(NodeError::NoTransportForType(format!(
2804                "no addresses known for {}",
2805                peer_config.npub
2806            )));
2807        }
2808
2809        let alternatives: Vec<_> = candidates
2810            .into_iter()
2811            .filter(|addr| !(addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat")))
2812            .filter(|addr| !self.active_peer_matches_candidate(&peer_node_addr, addr))
2813            .collect();
2814
2815        if alternatives.is_empty() {
2816            return Err(NodeError::NoTransportForType(format!(
2817                "no concrete alternate addresses known for {}",
2818                peer_config.npub
2819            )));
2820        }
2821
2822        self.attempt_peer_address_list(peer_config, peer_identity, false, &alternatives)
2823            .await?;
2824        Ok(true)
2825    }
2826
2827    async fn peer_address_candidates(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
2828        // Merge every candidate from every source we have for this peer
2829        // (operator-configured static addresses, freshly fetched overlay
2830        // adverts, callers' recent-peers caches via `update_peers`) and
2831        // try them in order of `seen_at_ms` descending — most-recent
2832        // first, source-agnostic. Addresses without a freshness signal
2833        // sort last. Addresses are hints, not the final word; concrete
2834        // candidates in a single pass race each other and `udp:nat` only
2835        // starts background Nostr traversal without suppressing direct
2836        // static/LAN attempts that appear later in the list.
2837        let static_addresses = self.static_peer_addresses(peer_config);
2838        let overlay_addresses = self
2839            .nostr_peer_fallback_addresses(peer_config, &static_addresses)
2840            .await;
2841
2842        let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
2843        for addr in overlay_addresses.into_iter().chain(static_addresses) {
2844            if !candidates.iter().any(|existing: &PeerAddress| {
2845                existing.transport == addr.transport && existing.addr == addr.addr
2846            }) {
2847                candidates.push(addr);
2848            }
2849        }
2850
2851        // Stable sort: most-recently-observed first (Some > None), tiebreak
2852        // by input order so equal-timestamp addresses keep operator-supplied
2853        // priority and the overlay-then-static merge order survives when
2854        // nothing has a freshness signal.
2855        candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
2856            (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
2857            (Some(_), None) => std::cmp::Ordering::Less,
2858            (None, Some(_)) => std::cmp::Ordering::Greater,
2859            (None, None) => std::cmp::Ordering::Equal,
2860        });
2861
2862        candidates
2863    }
2864
2865    fn active_peer_matches_any_candidate(
2866        &self,
2867        peer_node_addr: &NodeAddr,
2868        candidates: &[PeerAddress],
2869    ) -> bool {
2870        candidates
2871            .iter()
2872            .any(|candidate| self.active_peer_matches_candidate(peer_node_addr, candidate))
2873    }
2874
2875    pub(in crate::node) fn active_peer_candidate_is_fresh_enough_to_skip(
2876        &self,
2877        peer_node_addr: &NodeAddr,
2878        candidates: &[PeerAddress],
2879    ) -> bool {
2880        if !self.active_peer_matches_any_candidate(peer_node_addr, candidates) {
2881            return false;
2882        }
2883        !self.active_peer_needs_same_path_refresh(peer_node_addr)
2884    }
2885
2886    fn active_peer_needs_same_path_refresh(&self, peer_node_addr: &NodeAddr) -> bool {
2887        let Some(peer) = self.peers.get(peer_node_addr) else {
2888            return false;
2889        };
2890        let stale_after_ms = self
2891            .config
2892            .node
2893            .heartbeat_interval_secs
2894            .saturating_mul(1000)
2895            .max(1000);
2896        peer.idle_time(Self::now_ms()) > stale_after_ms
2897    }
2898
2899    fn active_peer_matches_candidate(
2900        &self,
2901        peer_node_addr: &NodeAddr,
2902        candidate: &PeerAddress,
2903    ) -> bool {
2904        let Some(peer) = self.peers.get(peer_node_addr) else {
2905            return false;
2906        };
2907        let Some(current_addr) = peer.current_addr() else {
2908            return false;
2909        };
2910        if peer
2911            .transport_id()
2912            .map(|id| self.bootstrap_transports.contains(&id))
2913            .unwrap_or(false)
2914        {
2915            return false;
2916        }
2917        let current_addr = current_addr.to_string();
2918        let current_transport = peer
2919            .transport_id()
2920            .and_then(|id| self.transports.get(&id))
2921            .map(|transport| transport.transport_type().name);
2922
2923        candidate.addr == current_addr
2924            && current_transport
2925                .map(|transport| transport == candidate.transport)
2926                .unwrap_or(true)
2927    }
2928
2929    // === Control API methods ===
2930
2931    /// Connect to a peer via the control API.
2932    ///
2933    /// Creates an ephemeral peer connection (not persisted to config, no
2934    /// auto-reconnect). Reuses the same connection path as auto-connect
2935    /// peers. Returns JSON data on success or an error message.
2936    pub(crate) async fn api_connect(
2937        &mut self,
2938        npub: &str,
2939        address: &str,
2940        transport: &str,
2941    ) -> Result<serde_json::Value, String> {
2942        let peer_config = PeerConfig {
2943            npub: npub.to_string(),
2944            alias: None,
2945            addresses: vec![PeerAddress::new(transport, address)],
2946            connect_policy: ConnectPolicy::Manual,
2947            auto_reconnect: false,
2948        };
2949
2950        // Pre-seed identity cache (same as initiate_peer_connections does)
2951        if let Ok(identity) = PeerIdentity::from_npub(npub) {
2952            self.peer_aliases
2953                .insert(*identity.node_addr(), identity.short_npub());
2954            self.register_identity(*identity.node_addr(), identity.pubkey_full());
2955        }
2956
2957        self.initiate_peer_connection(&peer_config)
2958            .await
2959            .map(|()| {
2960                info!(
2961                    npub = %npub,
2962                    address = %address,
2963                    transport = %transport,
2964                    "API connect initiated"
2965                );
2966                serde_json::json!({
2967                    "npub": npub,
2968                    "address": address,
2969                    "transport": transport,
2970                })
2971            })
2972            .map_err(|e| e.to_string())
2973    }
2974
2975    /// Disconnect a peer via the control API.
2976    ///
2977    /// Removes the peer and suppresses auto-reconnect.
2978    pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
2979        let peer_identity =
2980            PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
2981        let node_addr = *peer_identity.node_addr();
2982
2983        if !self.peers.contains_key(&node_addr) {
2984            return Err(format!("peer not found: {npub}"));
2985        }
2986
2987        // Remove the peer (full cleanup: sessions, indices, links, tree, bloom)
2988        self.remove_active_peer(&node_addr);
2989
2990        // Suppress any pending auto-reconnect
2991        self.retry_pending.remove(&node_addr);
2992
2993        info!(npub = %npub, "API disconnect completed");
2994
2995        Ok(serde_json::json!({
2996            "npub": npub,
2997            "disconnected": true,
2998        }))
2999    }
3000
3001    /// Adopt an already-established UDP traversal and start the normal FIPS
3002    /// Noise handshake over it.
3003    ///
3004    /// This is intended for integration with an external rendezvous runtime
3005    /// that has already completed relay signaling, STUN observation, and UDP
3006    /// hole punching. After handoff, the adopted socket is owned by FIPS.
3007    pub async fn adopt_established_traversal(
3008        &mut self,
3009        traversal: EstablishedTraversal,
3010    ) -> Result<BootstrapHandoffResult, NodeError> {
3011        debug!(
3012            peer_npub = %traversal.peer_npub,
3013            session_id = %traversal.session_id,
3014            remote_addr = %traversal.remote_addr,
3015            "adopting established traversal socket"
3016        );
3017
3018        if !self.state.is_operational() {
3019            return Err(NodeError::NotStarted);
3020        }
3021
3022        let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
3023        let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
3024            NodeError::InvalidPeerNpub {
3025                npub: traversal.peer_npub.clone(),
3026                reason: e.to_string(),
3027            }
3028        })?;
3029        let peer_node_addr = *peer_identity.node_addr();
3030        if self.peers.contains_key(&peer_node_addr) {
3031            debug!(
3032                peer_npub = %traversal.peer_npub,
3033                "Adopting NAT traversal handoff as alternate path for already-connected peer"
3034            );
3035        }
3036
3037        self.peer_aliases
3038            .insert(peer_node_addr, peer_identity.short_npub());
3039        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
3040
3041        let transport_id = self.allocate_transport_id();
3042        // Adopted ephemeral UDP transports inherit MTU + socket-buffer sizing
3043        // (and accept_connections / advertise flags) from the operator's
3044        // configured [transports.udp] when the bootstrap runtime doesn't
3045        // pass an explicit override. Lookup tries `transport_name` first
3046        // (covers the `Named` multi-listener variant) and falls back to the
3047        // unnamed `Single` listener, so single- and named-listener configs
3048        // both inherit cleanly.
3049        //
3050        // Tradeoff: `UdpConfig::default()` sets MTU 1280 (IPv6 minimum), the
3051        // only value guaranteed to survive arbitrary middlebox paths.
3052        // Inheriting a higher operator-chosen MTU means NAT-traversed flows
3053        // initially attempt that MTU and may black-hole on tighter paths
3054        // until reactive `MtuExceeded` recovery kicks in. Operators who
3055        // raise the primary MTU based on known-clean topology accept that
3056        // tradeoff; the silent drop on a too-low default was strictly
3057        // worse for the common case where the primary MTU is reachable.
3058        //
3059        // Bind / external address fields are cleared since the socket is
3060        // already bound.
3061        let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
3062            let mut cfg = self
3063                .lookup_udp_config(traversal.transport_name.as_deref())
3064                .or_else(|| self.lookup_udp_config(None))
3065                .cloned()
3066                .unwrap_or_default();
3067            cfg.bind_addr = None;
3068            cfg.external_addr = None;
3069            cfg
3070        });
3071        let mut transport = crate::transport::udp::UdpTransport::new(
3072            transport_id,
3073            traversal.transport_name.clone(),
3074            inherited_config,
3075            packet_tx,
3076        );
3077
3078        transport
3079            .adopt_socket_async(traversal.socket)
3080            .await
3081            .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
3082
3083        let local_addr = transport.local_addr().ok_or_else(|| {
3084            NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
3085        })?;
3086
3087        self.transports.insert(
3088            transport_id,
3089            crate::transport::TransportHandle::Udp(transport),
3090        );
3091        self.bootstrap_transports.insert(transport_id);
3092        self.bootstrap_transport_npubs
3093            .insert(transport_id, traversal.peer_npub.clone());
3094
3095        let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
3096        if let Err(err) = self
3097            .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
3098            .await
3099        {
3100            self.bootstrap_transports.remove(&transport_id);
3101            self.bootstrap_transport_npubs.remove(&transport_id);
3102            if let Some(mut handle) = self.transports.remove(&transport_id) {
3103                let _ = handle.stop().await;
3104            }
3105            return Err(err);
3106        }
3107
3108        info!(
3109            peer = %self.peer_display_name(&peer_node_addr),
3110            transport_id = %transport_id,
3111            local_addr = %local_addr,
3112            remote_addr = %traversal.remote_addr,
3113            session_id = %traversal.session_id,
3114            "adopted NAT traversal socket; handshake initiated"
3115        );
3116
3117        Ok(BootstrapHandoffResult {
3118            transport_id,
3119            local_addr,
3120            remote_addr: traversal.remote_addr,
3121            peer_node_addr,
3122            session_id: traversal.session_id,
3123        })
3124    }
3125}