Skip to main content

fips_core/node/
lifecycle.rs

1//! Node lifecycle management: start, stop, and peer connection initiation.
2
3use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6    ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, NostrDiscovery, OverlayAdvert,
7    OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use std::collections::HashSet;
18use std::net::IpAddr;
19use std::thread;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23/// True if `ip` is not a viable canonical advert endpoint for peers off
24/// the publisher's own LAN. Covers RFC1918, loopback, link-local, IPv4
25/// CGNAT (100.64/10), unspecified, multicast/benchmark, and IPv6
26/// unique-local/loopback/unspecified. We never publish these as the
27/// peer's primary `runtime_endpoint`; an off-LAN consumer can't route
28/// to them, and latching one in onto a slow overlay-relay fallback is
29/// the original bug this guard exists to prevent.
30fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
31    match ip {
32        IpAddr::V4(v4) => {
33            v4.is_private()
34                || v4.is_loopback()
35                || v4.is_link_local()
36                || v4.is_unspecified()
37                || v4.is_multicast()
38                || v4.is_broadcast()
39                || v4.is_documentation()
40                // 100.64.0.0/10 — CGNAT, RFC 6598. Not routable on the
41                // public internet; behaves like an extra NAT layer.
42                || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
43        }
44        IpAddr::V6(v6) => {
45            v6.is_loopback()
46                || v6.is_unspecified()
47                || v6.is_unique_local()
48                || v6.is_multicast()
49                // IPv6 link-local: fe80::/10
50                || (v6.segments()[0] & 0xffc0) == 0xfe80
51        }
52    }
53}
54
55const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
56
57impl Node {
58    /// Initiate connections to configured static peers.
59    ///
60    /// For each peer configured with AutoConnect policy, creates a link and
61    /// peer entry, then starts the Noise handshake by sending the first message.
62    /// Replace the runtime peer list. Newly added auto-connect peers get
63    /// `initiate_peer_connection` immediately; removed peers are dropped from
64    /// the retry queue (the regular liveness timeout reaps any active session
65    /// — we don't proactively disconnect, since the same npub might be on its
66    /// way back via a fresh advert). Existing entries get their `addresses`
67    /// field refreshed so the next retry sees the latest hints.
68    pub(super) async fn update_peers(
69        &mut self,
70        new_peers: Vec<crate::config::PeerConfig>,
71    ) -> Result<crate::node::UpdatePeersOutcome, crate::node::NodeError> {
72        use std::collections::{HashMap, HashSet};
73
74        let mut new_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> =
75            HashMap::with_capacity(new_peers.len());
76        for peer in new_peers {
77            let identity = match PeerIdentity::from_npub(&peer.npub) {
78                Ok(id) => id,
79                Err(e) => {
80                    return Err(crate::node::NodeError::InvalidPeerNpub {
81                        npub: peer.npub.clone(),
82                        reason: e.to_string(),
83                    });
84                }
85            };
86            // Last-write-wins on duplicates so callers passing a multi-source
87            // candidate list (e.g. operator hints + recent-peers cache for
88            // the same npub) get the merge they meant.
89            new_by_addr.insert(*identity.node_addr(), peer);
90        }
91
92        let current_by_addr: HashMap<crate::identity::NodeAddr, crate::config::PeerConfig> = self
93            .config
94            .peers()
95            .iter()
96            .filter_map(|pc| {
97                PeerIdentity::from_npub(&pc.npub)
98                    .ok()
99                    .map(|id| (*id.node_addr(), pc.clone()))
100            })
101            .collect();
102
103        let new_addrs: HashSet<_> = new_by_addr.keys().copied().collect();
104        let current_addrs: HashSet<_> = current_by_addr.keys().copied().collect();
105
106        let removed: Vec<_> = current_addrs.difference(&new_addrs).copied().collect();
107        let added: Vec<_> = new_addrs.difference(&current_addrs).copied().collect();
108        let kept: Vec<_> = new_addrs.intersection(&current_addrs).copied().collect();
109
110        let mut outcome = crate::node::UpdatePeersOutcome::default();
111
112        for node_addr in &removed {
113            if self.retry_pending.remove(node_addr).is_some() {
114                debug!(
115                    peer = %self.peer_display_name(node_addr),
116                    "Dropping retry entry for peer removed from runtime peer list"
117                );
118            }
119            self.peer_aliases.remove(node_addr);
120            outcome.removed += 1;
121        }
122
123        for node_addr in &kept {
124            let new_pc = &new_by_addr[node_addr];
125            let current_pc = &current_by_addr[node_addr];
126            if new_pc.addresses != current_pc.addresses
127                || new_pc.alias != current_pc.alias
128                || new_pc.connect_policy != current_pc.connect_policy
129                || new_pc.auto_reconnect != current_pc.auto_reconnect
130            {
131                outcome.updated += 1;
132                if let Some(state) = self.retry_pending.get_mut(node_addr) {
133                    state.peer_config = new_pc.clone();
134                }
135                if let Some(alias) = new_pc.alias.clone() {
136                    self.peer_aliases.insert(*node_addr, alias);
137                }
138            } else {
139                outcome.unchanged += 1;
140            }
141        }
142
143        let added_configs: Vec<crate::config::PeerConfig> =
144            added.iter().map(|addr| new_by_addr[addr].clone()).collect();
145
146        // Replace the live config peer list before initiating connections so
147        // any helper that consults `self.config.peers()` during the dial
148        // (alias lookup, retry-state seeding) sees the new entries.
149        self.config.peers = new_by_addr.into_values().collect();
150
151        for peer_config in added_configs {
152            outcome.added += 1;
153            let Ok(identity) = PeerIdentity::from_npub(&peer_config.npub) else {
154                continue;
155            };
156            let name = peer_config
157                .alias
158                .clone()
159                .unwrap_or_else(|| identity.short_npub());
160            self.peer_aliases.insert(*identity.node_addr(), name);
161            self.register_identity(*identity.node_addr(), identity.pubkey_full());
162
163            if !peer_config.is_auto_connect() {
164                continue;
165            }
166
167            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
168                warn!(
169                    npub = %peer_config.npub,
170                    error = %e,
171                    "Failed to initiate connection for newly added peer"
172                );
173                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
174                    self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
175                }
176                if matches!(e, crate::node::NodeError::NoTransportForType(_))
177                    && let Some(bootstrap) = self.nostr_discovery.clone()
178                {
179                    let npub = peer_config.npub.clone();
180                    tokio::spawn(async move {
181                        let _ = bootstrap.refetch_advert_for_stale_check(&npub).await;
182                    });
183                }
184            }
185        }
186
187        Ok(outcome)
188    }
189
190    pub(super) async fn initiate_peer_connections(&mut self) {
191        // Build display name map from all configured peers (alias or short npub),
192        // and pre-seed the identity cache from each peer's npub so that TUN packets
193        // addressed to a configured peer can be dispatched (and trigger session
194        // initiation) immediately on startup — without waiting for the link-layer
195        // handshake to complete first.
196        let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
197            .config
198            .peers()
199            .iter()
200            .filter_map(|pc| {
201                PeerIdentity::from_npub(&pc.npub)
202                    .ok()
203                    .map(|id| (id, pc.alias.clone()))
204            })
205            .collect();
206
207        for (identity, alias) in peer_identities {
208            let name = alias.unwrap_or_else(|| identity.short_npub());
209            self.peer_aliases.insert(*identity.node_addr(), name);
210            // Pre-seed identity cache. The parity may be wrong (npub is x-only)
211            // but will be corrected to the real value when the peer is promoted
212            // after a successful Noise handshake.
213            self.register_identity(*identity.node_addr(), identity.pubkey_full());
214        }
215
216        // Collect peer configs to avoid borrow conflicts
217        let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
218
219        if peer_configs.is_empty() {
220            debug!("No static peers configured");
221            return;
222        }
223
224        debug!(
225            count = peer_configs.len(),
226            "Initiating static peer connections"
227        );
228
229        for peer_config in peer_configs {
230            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
231                warn!(
232                    npub = %peer_config.npub,
233                    alias = ?peer_config.alias,
234                    error = %e,
235                    "Failed to initiate peer connection"
236                );
237                // Schedule a retry so transient address-resolution failures
238                // (e.g. cached endpoints stale, NAT rebinds, all addresses
239                // currently unreachable) recover without a daemon restart.
240                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
241                    self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
242                }
243                // No-transport failures most often mean the cached overlay
244                // advert is pointing at a dead post-NAT-rebind address. The
245                // advert cache is read-only inside fetch_advert, so retries
246                // would loop on the same dead address until expiry. Force a
247                // re-fetch so the next retry tick picks up fresh endpoints.
248                if matches!(e, crate::node::NodeError::NoTransportForType(_))
249                    && let Some(bootstrap) = self.nostr_discovery.clone()
250                {
251                    let npub = peer_config.npub.clone();
252                    tokio::spawn(async move {
253                        let _ = bootstrap.refetch_advert_for_stale_check(&npub).await;
254                    });
255                }
256            }
257        }
258    }
259
260    /// Initiate a connection to a single peer.
261    ///
262    /// Creates a link, starts the Noise handshake, and sends the first message.
263    pub(super) async fn initiate_peer_connection(
264        &mut self,
265        peer_config: &crate::config::PeerConfig,
266    ) -> Result<(), NodeError> {
267        self.initiate_peer_connection_inner(peer_config).await
268    }
269
270    /// Initiate a connection from the retry path. Identical to
271    /// [`initiate_peer_connection`] today — both paths fan out across every
272    /// known address (overlay-fresh first, then operator/cache hints) in a
273    /// single pass. The two entry points stay separate so callers can be
274    /// distinguished in tracing.
275    pub(super) async fn initiate_peer_retry_connection(
276        &mut self,
277        peer_config: &crate::config::PeerConfig,
278    ) -> Result<(), NodeError> {
279        self.initiate_peer_connection_inner(peer_config).await
280    }
281
282    async fn initiate_peer_connection_inner(
283        &mut self,
284        peer_config: &crate::config::PeerConfig,
285    ) -> Result<(), NodeError> {
286        // Parse the peer's npub to get their identity
287        let peer_identity =
288            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
289                npub: peer_config.npub.clone(),
290                reason: e.to_string(),
291            })?;
292
293        let peer_node_addr = *peer_identity.node_addr();
294
295        // Check if peer already exists (fully authenticated)
296        if self.peers.contains_key(&peer_node_addr) {
297            debug!(
298                npub = %peer_config.npub,
299                "Peer already exists, skipping"
300            );
301            return Ok(());
302        }
303
304        // Check if connection already in progress to this peer
305        let already_connecting = self.connections.values().any(|conn| {
306            conn.expected_identity()
307                .map(|id| id.node_addr() == &peer_node_addr)
308                .unwrap_or(false)
309        });
310        if already_connecting {
311            debug!(
312                npub = %peer_config.npub,
313                "Connection already in progress, skipping"
314            );
315            return Ok(());
316        }
317
318        self.try_peer_addresses(peer_config, peer_identity, true)
319            .await
320    }
321
322    fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
323        self.connections.values().any(|conn| {
324            conn.expected_identity()
325                .map(|id| id.node_addr() == peer_node_addr)
326                .unwrap_or(false)
327        })
328    }
329
330    /// Initiate a connection to a peer on a specific transport and address.
331    ///
332    /// For connectionless transports (UDP, Ethernet): allocates a link, starts
333    /// the Noise IK handshake, sends msg1, and registers the connection for
334    /// msg2 dispatch.
335    ///
336    /// For connection-oriented transports (TCP, Tor): allocates a link and
337    /// starts a non-blocking transport connect. The handshake is deferred
338    /// until the transport connection is established — the tick handler
339    /// polls `connection_state()` and initiates the handshake when ready.
340    pub(super) async fn initiate_connection(
341        &mut self,
342        transport_id: TransportId,
343        remote_addr: TransportAddr,
344        peer_identity: PeerIdentity,
345    ) -> Result<(), NodeError> {
346        let peer_node_addr = *peer_identity.node_addr();
347
348        self.authorize_peer(
349            &peer_identity,
350            PeerAclContext::OutboundConnect,
351            transport_id,
352            &remote_addr,
353        )?;
354
355        let is_connection_oriented = self
356            .transports
357            .get(&transport_id)
358            .map(|t| t.transport_type().connection_oriented)
359            .unwrap_or(false);
360
361        // Allocate link ID and create link
362        let link_id = self.allocate_link_id();
363
364        let link = if is_connection_oriented {
365            Link::new(
366                link_id,
367                transport_id,
368                remote_addr.clone(),
369                LinkDirection::Outbound,
370                Duration::from_millis(self.config.node.base_rtt_ms),
371            )
372        } else {
373            Link::connectionless(
374                link_id,
375                transport_id,
376                remote_addr.clone(),
377                LinkDirection::Outbound,
378                Duration::from_millis(self.config.node.base_rtt_ms),
379            )
380        };
381
382        self.links.insert(link_id, link);
383
384        // Add reverse lookup for packet dispatch
385        self.addr_to_link
386            .insert((transport_id, remote_addr.clone()), link_id);
387
388        if is_connection_oriented {
389            // Connection-oriented: start non-blocking connect, defer handshake
390            if let Some(transport) = self.transports.get(&transport_id) {
391                match transport.connect(&remote_addr).await {
392                    Ok(()) => {
393                        debug!(
394                            peer = %self.peer_display_name(&peer_node_addr),
395                            transport_id = %transport_id,
396                            remote_addr = %remote_addr,
397                            link_id = %link_id,
398                            "Transport connect initiated (non-blocking)"
399                        );
400                        self.pending_connects.push(super::PendingConnect {
401                            link_id,
402                            transport_id,
403                            remote_addr,
404                            peer_identity,
405                        });
406                    }
407                    Err(e) => {
408                        // Clean up link
409                        self.links.remove(&link_id);
410                        self.addr_to_link.remove(&(transport_id, remote_addr));
411                        return Err(NodeError::TransportError(e.to_string()));
412                    }
413                }
414            }
415            Ok(())
416        } else {
417            // Connectionless: proceed with immediate handshake
418            self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
419                .await
420        }
421    }
422
423    /// Start the Noise handshake on a link and send msg1.
424    ///
425    /// Called immediately for connectionless transports, or after the
426    /// transport connection is established for connection-oriented transports.
427    pub(super) async fn start_handshake(
428        &mut self,
429        link_id: LinkId,
430        transport_id: TransportId,
431        remote_addr: TransportAddr,
432        peer_identity: PeerIdentity,
433    ) -> Result<(), NodeError> {
434        let peer_node_addr = *peer_identity.node_addr();
435
436        // Create connection in handshake phase (outbound knows expected identity)
437        let current_time_ms = Self::now_ms();
438        let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
439
440        // Allocate a session index for this handshake
441        let our_index = match self.index_allocator.allocate() {
442            Ok(idx) => idx,
443            Err(e) => {
444                // Clean up the link we just created
445                self.links.remove(&link_id);
446                self.addr_to_link.remove(&(transport_id, remote_addr));
447                return Err(NodeError::IndexAllocationFailed(e.to_string()));
448            }
449        };
450
451        // Start the Noise handshake and get message 1
452        let our_keypair = self.identity.keypair();
453        let noise_msg1 =
454            match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
455                Ok(msg) => msg,
456                Err(e) => {
457                    // Clean up the index and link
458                    let _ = self.index_allocator.free(our_index);
459                    self.links.remove(&link_id);
460                    self.addr_to_link.remove(&(transport_id, remote_addr));
461                    return Err(NodeError::HandshakeFailed(e.to_string()));
462                }
463            };
464
465        // Set index and transport info on the connection
466        connection.set_our_index(our_index);
467        connection.set_transport_id(transport_id);
468        connection.set_source_addr(remote_addr.clone());
469
470        // Build wire format msg1: [0x01][sender_idx:4 LE][noise_msg1:82]
471        let wire_msg1 = build_msg1(our_index, &noise_msg1);
472
473        debug!(
474            peer = %self.peer_display_name(&peer_node_addr),
475            transport_id = %transport_id,
476            remote_addr = %remote_addr,
477            link_id = %link_id,
478            our_index = %our_index,
479            "Connection initiated"
480        );
481
482        // Store msg1 for resend and schedule first resend
483        let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
484        connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
485
486        // Track in pending_outbound for msg2 dispatch
487        self.pending_outbound
488            .insert((transport_id, our_index.as_u32()), link_id);
489        self.connections.insert(link_id, connection);
490
491        // Send the wire format handshake message
492        let send_result = match self.transports.get(&transport_id) {
493            Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
494            None => None,
495        };
496        if let Some(send_result) = send_result {
497            self.note_local_send_outcome(&send_result);
498            match send_result {
499                Ok(bytes) => {
500                    debug!(
501                        link_id = %link_id,
502                        our_index = %our_index,
503                        bytes,
504                        "Sent Noise handshake message 1 (wire format)"
505                    );
506                }
507                Err(e) => {
508                    warn!(
509                        link_id = %link_id,
510                        transport_id = %transport_id,
511                        remote_addr = %remote_addr,
512                        our_index = %our_index,
513                        error = %e,
514                        "Failed to send handshake message"
515                    );
516                    // Mark connection as failed but don't remove it yet
517                    // The event loop can handle retry logic
518                    if let Some(conn) = self.connections.get_mut(&link_id) {
519                        conn.mark_failed();
520                    }
521                }
522            }
523        }
524
525        Ok(())
526    }
527
528    /// Poll all transports for discovered peers and auto-connect.
529    ///
530    /// Called from the tick handler. Iterates operational transports,
531    /// drains their discovery buffers, and initiates connections to
532    /// newly discovered peers (if auto_connect is enabled).
533    pub(super) async fn poll_transport_discovery(&mut self) {
534        // Collect discoveries first to avoid borrow conflict with self
535        let mut to_connect = Vec::new();
536
537        for (transport_id, transport) in &self.transports {
538            if !transport.is_operational() {
539                continue;
540            }
541            if !transport.auto_connect() {
542                // Still drain the buffer so it doesn't grow unbounded
543                let _ = transport.discover();
544                continue;
545            }
546            let discovered = match transport.discover() {
547                Ok(peers) => peers,
548                Err(_) => continue,
549            };
550            for peer in discovered {
551                let pubkey = match peer.pubkey_hint {
552                    Some(pk) => pk,
553                    None => continue,
554                };
555                let identity = PeerIdentity::from_pubkey(pubkey);
556                let node_addr = *identity.node_addr();
557
558                // Skip self
559                if node_addr == *self.identity.node_addr() {
560                    continue;
561                }
562                // Skip if already connected
563                if self.peers.contains_key(&node_addr) {
564                    continue;
565                }
566                // Skip if connection already in progress
567                let connecting = self.connections.values().any(|c| {
568                    c.expected_identity()
569                        .map(|id| id.node_addr() == &node_addr)
570                        .unwrap_or(false)
571                });
572                if connecting {
573                    continue;
574                }
575
576                to_connect.push((*transport_id, peer.addr, identity));
577            }
578        }
579
580        for (transport_id, remote_addr, identity) in to_connect {
581            info!(
582                peer = %self.peer_display_name(identity.node_addr()),
583                transport_id = %transport_id,
584                remote_addr = %remote_addr,
585                "Auto-connecting to discovered peer"
586            );
587            if let Err(e) = self
588                .initiate_connection(transport_id, remote_addr, identity)
589                .await
590            {
591                warn!(error = %e, "Failed to auto-connect to discovered peer");
592            }
593        }
594    }
595
596    pub(super) async fn poll_nostr_discovery(&mut self) {
597        let Some(bootstrap) = self.nostr_discovery.clone() else {
598            return;
599        };
600
601        if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
602            debug!(error = %err, "Failed to refresh local Nostr overlay advert");
603        }
604
605        for event in bootstrap.drain_events().await {
606            match event {
607                BootstrapEvent::Established { traversal } => {
608                    let peer_npub = traversal.peer_npub.clone();
609                    if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
610                        let peer_addr = *peer_identity.node_addr();
611                        if self.peers.contains_key(&peer_addr) {
612                            debug!(
613                                peer_npub = %peer_npub,
614                                "Ignoring established NAT traversal for already-connected peer"
615                            );
616                            continue;
617                        }
618                        if self.is_connecting_to_peer(&peer_addr) {
619                            debug!(
620                                peer_npub = %peer_npub,
621                                "Ignoring established NAT traversal while peer handshake is already in progress"
622                            );
623                            continue;
624                        }
625                    }
626                    match self.adopt_established_traversal(traversal).await {
627                        Ok(_) => {
628                            info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
629                        }
630                        Err(err) => {
631                            warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
632                            if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
633                                self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
634                            }
635                        }
636                    }
637                }
638                BootstrapEvent::Failed {
639                    peer_config,
640                    reason,
641                } => {
642                    let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
643                        Ok(identity) => identity,
644                        Err(_) => continue,
645                    };
646                    let node_addr = *peer_identity.node_addr();
647                    if self.peers.contains_key(&node_addr) {
648                        debug!(
649                            npub = %peer_config.npub,
650                            error = %reason,
651                            "Ignoring failed NAT traversal for already-connected peer"
652                        );
653                        continue;
654                    }
655                    if self.is_connecting_to_peer(&node_addr) {
656                        debug!(
657                            npub = %peer_config.npub,
658                            error = %reason,
659                            "Ignoring failed NAT traversal while peer handshake is already in progress"
660                        );
661                        continue;
662                    }
663
664                    let now_ms = Self::now_ms();
665                    let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
666                    if decision.should_warn {
667                        warn!(
668                            npub = %peer_config.npub,
669                            error = %reason,
670                            consecutive_failures = decision.consecutive_failures,
671                            cooldown_secs = decision
672                                .cooldown_until_ms
673                                .map(|t| t.saturating_sub(now_ms) / 1000),
674                            "NAT traversal failed"
675                        );
676                    } else {
677                        debug!(
678                            npub = %peer_config.npub,
679                            error = %reason,
680                            consecutive_failures = decision.consecutive_failures,
681                            "NAT traversal failed (suppressed by warn-rate-limit)"
682                        );
683                    }
684
685                    // B6: stale-advert eviction on the streak-threshold
686                    // crossing. Fire-and-forget; the outcome is logged so
687                    // operators can see when peers get cleaned up.
688                    if decision.crossed_threshold {
689                        let bootstrap = bootstrap.clone();
690                        let npub = peer_config.npub.clone();
691                        tokio::spawn(async move {
692                            let outcome = bootstrap.refetch_advert_for_stale_check(&npub).await;
693                            match outcome {
694                                crate::discovery::nostr::NostrRefetchOutcome::Evicted => info!(
695                                    npub = %npub,
696                                    "stale-advert sweep: peer evicted from advert cache"
697                                ),
698                                crate::discovery::nostr::NostrRefetchOutcome::Refreshed => info!(
699                                    npub = %npub,
700                                    "stale-advert sweep: peer republished, cache refreshed and streak reset"
701                                ),
702                                crate::discovery::nostr::NostrRefetchOutcome::SameAdvert => debug!(
703                                    npub = %npub,
704                                    "stale-advert sweep: advert unchanged, cooldown stands"
705                                ),
706                                crate::discovery::nostr::NostrRefetchOutcome::Skipped => debug!(
707                                    npub = %npub,
708                                    "stale-advert sweep: skipped (relay error or no advert_relays)"
709                                ),
710                            }
711                        });
712                    }
713
714                    if self
715                        .try_peer_addresses(&peer_config, peer_identity, false)
716                        .await
717                        .is_ok()
718                    {
719                        continue;
720                    }
721
722                    self.schedule_retry(node_addr, now_ms);
723                    if let Some(cooldown_until_ms) = decision.cooldown_until_ms
724                        && let Some(state) = self.retry_pending.get_mut(&node_addr)
725                    {
726                        // Push the next retry past the cooldown so the
727                        // open-discovery sweep doesn't re-enqueue and the
728                        // per-attempt backoff doesn't fire sooner.
729                        state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
730                    }
731                }
732            }
733        }
734
735        self.maybe_run_startup_open_discovery_sweep(&bootstrap)
736            .await;
737        self.queue_open_discovery_retries(&bootstrap).await;
738    }
739
740    /// Extract the bare discovery scope from the Nostr discovery app tag,
741    /// which is encoded as `fips-overlay-v1:<scope>` by
742    /// `apply_default_scoped_discovery`. Returns `None` when no scope is
743    /// set, so the LAN browser surfaces every advert it sees.
744    pub(super) fn lan_discovery_scope(&self) -> Option<String> {
745        let app = self.config.node.discovery.nostr.app.trim();
746        if app.is_empty() {
747            return None;
748        }
749        if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
750            let scope = rest.trim();
751            if scope.is_empty() {
752                None
753            } else {
754                Some(scope.to_string())
755            }
756        } else {
757            Some(app.to_string())
758        }
759    }
760
761    /// Drain mDNS-discovered peers and initiate Noise XX handshakes for
762    /// any whose npub we don't already have a connection or active peer
763    /// for. The handshake itself is the authentication — a spoofed
764    /// mDNS advert with someone else's npub fails the XX exchange and
765    /// is dropped.
766    pub(super) async fn poll_lan_discovery(&mut self) {
767        let Some(runtime) = self.lan_discovery.clone() else {
768            return;
769        };
770        let events = runtime.drain_events().await;
771        if events.is_empty() {
772            return;
773        }
774        // Snapshot the operational UDP transport set up front; mDNS
775        // discovery only yields UDP endpoints today.
776        let udp_transport_id = self.find_transport_for_type("udp");
777        let Some(transport_id) = udp_transport_id else {
778            debug!("lan: no operational UDP transport, skipping discovered peers");
779            return;
780        };
781        for event in events {
782            let crate::discovery::lan::LanEvent::Discovered(peer) = event;
783            let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
784                Ok(id) => id,
785                Err(err) => {
786                    debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
787                    continue;
788                }
789            };
790            let peer_node_addr = *identity.node_addr();
791            if self.peers.contains_key(&peer_node_addr) {
792                continue;
793            }
794            let already_connecting = self.connections.values().any(|conn| {
795                conn.expected_identity()
796                    .map(|id| id.node_addr() == &peer_node_addr)
797                    .unwrap_or(false)
798            });
799            if already_connecting {
800                continue;
801            }
802            let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
803            info!(
804                npub = %identity.short_npub(),
805                addr = %peer.addr,
806                "lan: initiating handshake to discovered peer"
807            );
808            if let Err(err) = self
809                .initiate_connection(transport_id, remote_addr, identity)
810                .await
811            {
812                debug!(
813                    npub = %peer.npub,
814                    error = %err,
815                    "lan: failed to initiate connection to discovered peer"
816                );
817            }
818        }
819    }
820
821    /// Poll pending transport connects and initiate handshakes for ready ones.
822    ///
823    /// Called from the tick handler. For each pending connect, queries the
824    /// transport's connection state. When a connection is established,
825    /// marks the link as Connected and starts the Noise handshake.
826    /// Failed connections are cleaned up and scheduled for retry.
827    pub(super) async fn poll_pending_connects(&mut self) {
828        if self.pending_connects.is_empty() {
829            return;
830        }
831
832        let mut completed = Vec::new();
833
834        for (i, pending) in self.pending_connects.iter().enumerate() {
835            let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
836                transport.connection_state(&pending.remote_addr)
837            } else {
838                crate::transport::ConnectionState::Failed("transport removed".into())
839            };
840
841            match state {
842                crate::transport::ConnectionState::Connected => {
843                    completed.push((i, true, None));
844                }
845                crate::transport::ConnectionState::Failed(reason) => {
846                    completed.push((i, false, Some(reason)));
847                }
848                crate::transport::ConnectionState::Connecting => {
849                    // Still in progress, check on next tick
850                }
851                crate::transport::ConnectionState::None => {
852                    // Shouldn't happen — treat as failure
853                    completed.push((i, false, Some("no connection attempt found".into())));
854                }
855            }
856        }
857
858        // Process completions in reverse order to preserve indices
859        for (i, success, reason) in completed.into_iter().rev() {
860            let pending = self.pending_connects.remove(i);
861
862            if success {
863                // Mark link as Connected
864                if let Some(link) = self.links.get_mut(&pending.link_id) {
865                    link.set_connected();
866                }
867
868                debug!(
869                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
870                    transport_id = %pending.transport_id,
871                    remote_addr = %pending.remote_addr,
872                    link_id = %pending.link_id,
873                    "Transport connected, starting handshake"
874                );
875
876                // Start the handshake now that the transport is connected
877                if let Err(e) = self
878                    .start_handshake(
879                        pending.link_id,
880                        pending.transport_id,
881                        pending.remote_addr.clone(),
882                        pending.peer_identity,
883                    )
884                    .await
885                {
886                    warn!(
887                        link_id = %pending.link_id,
888                        error = %e,
889                        "Failed to start handshake after transport connect"
890                    );
891                    // Clean up link on handshake failure
892                    self.remove_link(&pending.link_id);
893                }
894            } else {
895                let reason = reason.unwrap_or_default();
896                warn!(
897                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
898                    transport_id = %pending.transport_id,
899                    remote_addr = %pending.remote_addr,
900                    link_id = %pending.link_id,
901                    reason = %reason,
902                    "Transport connect failed"
903                );
904
905                // Clean up link and schedule retry
906                self.remove_link(&pending.link_id);
907                self.links.remove(&pending.link_id);
908                self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
909            }
910        }
911    }
912
913    // === State Transitions ===
914
915    /// Start the node.
916    ///
917    /// Initializes the TUN interface (if configured), spawns I/O threads,
918    /// and transitions to the Running state.
919    pub async fn start(&mut self) -> Result<(), NodeError> {
920        if !self.state.can_start() {
921            return Err(NodeError::AlreadyStarted);
922        }
923        self.state = NodeState::Starting;
924
925        // Create packet channel for transport -> Node communication
926        let packet_buffer_size = self.config.node.buffers.packet_channel;
927        let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
928        self.packet_tx = Some(packet_tx.clone());
929        self.packet_rx = Some(packet_rx);
930
931        // Initialize transports first (before TUN, before Nostr discovery).
932        let transport_handles = self.create_transports(&packet_tx).await;
933
934        for mut handle in transport_handles {
935            let transport_id = handle.transport_id();
936            let transport_type = handle.transport_type().name;
937            let name = handle.name().map(|s| s.to_string());
938
939            match handle.start().await {
940                Ok(()) => {
941                    self.transports.insert(transport_id, handle);
942                }
943                Err(e) => {
944                    if let Some(ref n) = name {
945                        warn!(transport_type, name = %n, error = %e, "Transport failed to start");
946                    } else {
947                        warn!(transport_type, error = %e, "Transport failed to start");
948                    }
949                }
950            }
951        }
952
953        if !self.transports.is_empty() {
954            info!(count = self.transports.len(), "Transports initialized");
955        }
956
957        // Spawn the off-task FMP-encrypt + UDP-send + FMP-decrypt
958        // worker pools. **Unix only** — both pools issue direct
959        // sendmmsg(2) / sendmsg(2)+UDP_GSO / recvmmsg(2) calls on
960        // raw file descriptors via `AsRawFd`, which is a unix-only
961        // trait. On Windows the rx_loop's tokio-based send/recv
962        // remain the canonical path; the perf overhaul lands its
963        // gains on unix.
964        //
965        // Worker count defaults to the number of CPUs, overridable
966        // via `FIPS_ENCRYPT_WORKERS=N` / `FIPS_DECRYPT_WORKERS=N`
967        // for debug / benchmarking. Hash-by-destination means a
968        // single TCP flow pins to one worker (preserves wire
969        // ordering); additional workers light up under multi-flow
970        // / multi-peer load. See `node::encrypt_worker` /
971        // `node::decrypt_worker` for full rationale.
972        #[cfg(unix)]
973        {
974            let cpu_default = std::thread::available_parallelism()
975                .map(|n| n.get())
976                .unwrap_or(1)
977                .max(1);
978            let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
979                .ok()
980                .and_then(|s| s.parse().ok())
981                .unwrap_or(cpu_default)
982                .max(1);
983            self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
984                encrypt_worker_count,
985            ));
986            info!(
987                workers = encrypt_worker_count,
988                "Spawned FMP-encrypt worker pool"
989            );
990
991            // `FIPS_DECRYPT_WORKERS=0` disables the pool entirely and
992            // falls through to the in-line rx_loop decrypt path (the
993            // "test-mode" branch in `handle_encrypted_frame`, which is
994            // in fact a fully functional synchronous decrypt). Useful
995            // as an A/B against the worker pipeline when chasing
996            // scheduling/queueing regressions on the native macOS
997            // path. Any non-zero value (env or default) spawns the
998            // pool as before.
999            let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
1000                .ok()
1001                .and_then(|s| s.parse().ok())
1002                .unwrap_or(cpu_default);
1003            if decrypt_worker_count == 0 {
1004                info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
1005            } else {
1006                self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
1007                    decrypt_worker_count,
1008                ));
1009                info!(
1010                    workers = decrypt_worker_count,
1011                    "Spawned FMP+FSP-decrypt worker pool"
1012                );
1013            }
1014        }
1015
1016        if self.config.node.discovery.nostr.enabled {
1017            match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
1018                .await
1019            {
1020                Ok(runtime) => {
1021                    if let Err(err) = self.refresh_overlay_advert(&runtime).await {
1022                        warn!(error = %err, "Failed to publish initial Nostr overlay advert");
1023                    }
1024                    self.nostr_discovery = Some(runtime);
1025                    self.nostr_discovery_started_at_ms = Some(Self::now_ms());
1026                    info!("Nostr overlay discovery enabled");
1027                }
1028                Err(err) => {
1029                    warn!(error = %err, "Failed to start Nostr overlay discovery");
1030                }
1031            }
1032        }
1033
1034        // mDNS / DNS-SD LAN discovery. Independent of Nostr — runs even
1035        // when Nostr is disabled, since it gives us sub-second pairing
1036        // on the same link without any relay or NAT-traversal roundtrip.
1037        if self.config.node.discovery.lan.enabled {
1038            let advertised_udp_port = self
1039                .transports
1040                .values()
1041                .filter(|h| h.is_operational())
1042                .filter(|h| h.transport_type().name == "udp")
1043                .find_map(|h| h.local_addr().map(|addr| addr.port()))
1044                .unwrap_or(0);
1045            let scope = self.lan_discovery_scope();
1046            match crate::discovery::lan::LanDiscovery::start(
1047                &self.identity,
1048                scope,
1049                advertised_udp_port,
1050                self.config.node.discovery.lan.clone(),
1051            )
1052            .await
1053            {
1054                Ok(runtime) => {
1055                    self.lan_discovery = Some(runtime);
1056                    info!("LAN mDNS discovery enabled");
1057                }
1058                Err(err) => {
1059                    debug!(error = %err, "LAN mDNS discovery not started");
1060                }
1061            }
1062        }
1063
1064        // Connect to static peers before TUN is active
1065        // This allows handshake messages to be sent before we start accepting packets
1066        self.initiate_peer_connections().await;
1067
1068        // Initialize TUN interface last, after transports and peers are ready
1069        if self.config.tun.enabled {
1070            let address = *self.identity.address();
1071            match TunDevice::create(&self.config.tun, address).await {
1072                Ok(device) => {
1073                    let mtu = device.mtu();
1074                    let name = device.name().to_string();
1075                    let our_addr = *device.address();
1076
1077                    info!("TUN device active:");
1078                    info!("     name: {}", name);
1079                    info!("  address: {}", device.address());
1080                    info!("      mtu: {}", mtu);
1081
1082                    // Calculate max MSS for TCP clamping
1083                    let effective_mtu = self.effective_ipv6_mtu();
1084                    let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); // IPv6 + TCP headers
1085
1086                    info!("effective MTU: {} bytes", effective_mtu);
1087                    debug!("   max TCP MSS: {} bytes", max_mss);
1088
1089                    // On macOS, create a shutdown pipe. Writing to it unblocks the
1090                    // reader thread's select() loop without closing the TUN fd
1091                    // (which would cause a double-close when TunDevice drops).
1092                    #[cfg(target_os = "macos")]
1093                    let (shutdown_read_fd, shutdown_write_fd) = {
1094                        let mut fds = [0i32; 2];
1095                        if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
1096                            return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
1097                                "failed to create shutdown pipe".into(),
1098                            )));
1099                        }
1100                        (fds[0], fds[1])
1101                    };
1102
1103                    // Create writer (dups the fd for independent write access).
1104                    // Pass path_mtu_lookup so inbound SYN-ACK clamp can read
1105                    // per-destination path MTU learned via discovery.
1106                    let (writer, tun_tx) =
1107                        device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
1108
1109                    // Spawn writer thread
1110                    let writer_handle = thread::spawn(move || {
1111                        writer.run();
1112                    });
1113
1114                    // Clone tun_tx for the reader
1115                    let reader_tun_tx = tun_tx.clone();
1116
1117                    // Create outbound channel for TUN reader → Node
1118                    let tun_channel_size = self.config.node.buffers.tun_channel;
1119                    let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
1120
1121                    // Spawn reader thread
1122                    let transport_mtu = self.transport_mtu();
1123                    let path_mtu_lookup = self.path_mtu_lookup.clone();
1124                    #[cfg(target_os = "macos")]
1125                    let reader_handle = thread::spawn(move || {
1126                        run_tun_reader(
1127                            device,
1128                            mtu,
1129                            our_addr,
1130                            reader_tun_tx,
1131                            outbound_tx,
1132                            transport_mtu,
1133                            path_mtu_lookup,
1134                            shutdown_read_fd,
1135                        );
1136                    });
1137                    #[cfg(not(target_os = "macos"))]
1138                    let reader_handle = thread::spawn(move || {
1139                        run_tun_reader(
1140                            device,
1141                            mtu,
1142                            our_addr,
1143                            reader_tun_tx,
1144                            outbound_tx,
1145                            transport_mtu,
1146                            path_mtu_lookup,
1147                        );
1148                    });
1149
1150                    self.tun_state = TunState::Active;
1151                    self.tun_name = Some(name);
1152                    self.tun_tx = Some(tun_tx);
1153                    self.tun_outbound_rx = Some(outbound_rx);
1154                    self.tun_reader_handle = Some(reader_handle);
1155                    self.tun_writer_handle = Some(writer_handle);
1156                    #[cfg(target_os = "macos")]
1157                    {
1158                        self.tun_shutdown_fd = Some(shutdown_write_fd);
1159                    }
1160                }
1161                Err(e) => {
1162                    self.tun_state = TunState::Failed;
1163                    warn!(error = %e, "Failed to initialize TUN, continuing without it");
1164                }
1165            }
1166        }
1167
1168        // Initialize DNS responder (independent of TUN).
1169        //
1170        // Default bind_addr is "::1" (IPv6 loopback). The shipped
1171        // fips-dns-setup configures systemd-resolved via a global
1172        // /etc/systemd/resolved.conf.d/fips.conf drop-in pointing at
1173        // [::1]:5354, which sidesteps a Linux IPV6_PKTINFO behaviour
1174        // where self-destined traffic to fips0's address is attributed
1175        // to fips0 in PKTINFO and gets silently dropped by the
1176        // mesh-interface filter in src/upper/dns.rs.
1177        //
1178        // For mesh-reachable resolution (rare), set bind_addr: "::"
1179        // in fips.yaml. The mesh-interface filter remains active to
1180        // prevent hosts-file alias enumeration in that mode.
1181        // `IPV6_V6ONLY=0` is set explicitly so IPv4 clients on
1182        // 127.0.0.1 still reach us regardless of kernel sysctl
1183        // defaults — but only when bind is on a wildcard / IPv6 path.
1184        if self.config.dns.enabled {
1185            let addr_str = self.config.dns.bind_addr();
1186            match addr_str.parse::<std::net::IpAddr>() {
1187                Ok(ip) => {
1188                    let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
1189                    match Self::bind_dns_socket(bind) {
1190                        Ok(socket) => {
1191                            let dns_channel_size = self.config.node.buffers.dns_channel;
1192                            let (identity_tx, identity_rx) =
1193                                tokio::sync::mpsc::channel(dns_channel_size);
1194                            let dns_ttl = self.config.dns.ttl();
1195                            let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
1196                                self.config.peers(),
1197                            );
1198                            let reloader = if self.config.node.system_files_enabled {
1199                                let hosts_path = std::path::PathBuf::from(
1200                                    crate::upper::hosts::DEFAULT_HOSTS_PATH,
1201                                );
1202                                crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
1203                            } else {
1204                                crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
1205                            };
1206                            // Resolve the TUN ifindex so the responder can
1207                            // drop queries arriving on the mesh interface
1208                            // (fips0). Without this, the `::` bind exposes
1209                            // /etc/fips/hosts alias probing to any mesh peer.
1210                            // When TUN isn't enabled or the name can't be
1211                            // resolved, `None` disables the filter (there
1212                            // is no mesh surface to defend anyway).
1213                            let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
1214                            info!(
1215                                bind = %bind,
1216                                hosts = reloader.hosts().len(),
1217                                mesh_ifindex = ?mesh_ifindex,
1218                                "DNS responder started for .fips domain (auto-reload enabled)"
1219                            );
1220                            let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
1221                                socket,
1222                                identity_tx,
1223                                dns_ttl,
1224                                reloader,
1225                                mesh_ifindex,
1226                            ));
1227                            self.dns_identity_rx = Some(identity_rx);
1228                            self.dns_task = Some(handle);
1229                        }
1230                        Err(e) => {
1231                            warn!(bind = %bind, error = %e, "Failed to start DNS responder");
1232                        }
1233                    }
1234                }
1235                Err(e) => {
1236                    warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
1237                }
1238            }
1239        }
1240
1241        self.state = NodeState::Running;
1242        info!("Node started:");
1243        info!("       state: {}", self.state);
1244        info!("  transports: {}", self.transports.len());
1245        info!(" connections: {}", self.connections.len());
1246        Ok(())
1247    }
1248
1249    /// Bind a UDP socket for the DNS responder.
1250    ///
1251    /// For IPv6 binds (including `::`), sets `IPV6_V6ONLY=0` so the socket
1252    /// also accepts IPv4-mapped addresses. This guarantees dual-stack
1253    /// delivery regardless of `net.ipv6.bindv6only` sysctl on the host —
1254    /// v4 clients on 127.0.0.1 and v6 clients on the fips0 address both
1255    /// land on the same socket.
1256    ///
1257    /// Also enables `IPV6_RECVPKTINFO` on IPv6 sockets so the responder
1258    /// can learn the arrival interface per packet. The responder uses that
1259    /// to drop queries arriving on the mesh TUN, closing the hosts-file
1260    /// probing side-channel created by the `::` bind.
1261    fn bind_dns_socket(
1262        addr: std::net::SocketAddr,
1263    ) -> Result<tokio::net::UdpSocket, std::io::Error> {
1264        use socket2::{Domain, Protocol, Socket, Type};
1265        let domain = if addr.is_ipv4() {
1266            Domain::IPV4
1267        } else {
1268            Domain::IPV6
1269        };
1270        let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
1271        if addr.is_ipv6() {
1272            sock.set_only_v6(false)?;
1273            #[cfg(unix)]
1274            Self::set_recv_pktinfo_v6(&sock)?;
1275        }
1276        sock.set_nonblocking(true)?;
1277        sock.bind(&addr.into())?;
1278        tokio::net::UdpSocket::from_std(sock.into())
1279    }
1280
1281    /// Enable `IPV6_RECVPKTINFO` on an IPv6 UDP socket.
1282    ///
1283    /// After this setsockopt, each `recvmsg()` call on the socket receives
1284    /// an `IPV6_PKTINFO` control message containing the arrival interface
1285    /// index, which the DNS responder uses for its mesh-interface filter.
1286    #[cfg(unix)]
1287    fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
1288        use std::os::fd::AsRawFd;
1289        let enable: libc::c_int = 1;
1290        let ret = unsafe {
1291            libc::setsockopt(
1292                sock.as_raw_fd(),
1293                libc::IPPROTO_IPV6,
1294                libc::IPV6_RECVPKTINFO,
1295                &enable as *const _ as *const libc::c_void,
1296                std::mem::size_of::<libc::c_int>() as libc::socklen_t,
1297            )
1298        };
1299        if ret < 0 {
1300            return Err(std::io::Error::last_os_error());
1301        }
1302        Ok(())
1303    }
1304
1305    /// Resolve the mesh TUN interface index by name.
1306    ///
1307    /// Returns `None` if the interface does not exist (e.g. TUN disabled
1308    /// or not yet created). A `None` result disables the DNS responder's
1309    /// mesh-interface filter — safe, because if there is no fips0 there
1310    /// is no mesh exposure to defend against.
1311    fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
1312        #[cfg(unix)]
1313        {
1314            let c_name = std::ffi::CString::new(name).ok()?;
1315            let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
1316            if idx == 0 { None } else { Some(idx) }
1317        }
1318        #[cfg(not(unix))]
1319        {
1320            let _ = name;
1321            None
1322        }
1323    }
1324
1325    /// Stop the node.
1326    ///
1327    /// Shuts down TUN interface, stops I/O threads, and transitions to
1328    /// the Stopped state.
1329    pub async fn stop(&mut self) -> Result<(), NodeError> {
1330        if !self.state.can_stop() {
1331            return Err(NodeError::NotStarted);
1332        }
1333        self.state = NodeState::Stopping;
1334        info!(state = %self.state, "Node stopping");
1335
1336        // Stop DNS responder
1337        if let Some(handle) = self.dns_task.take() {
1338            handle.abort();
1339            debug!("DNS responder stopped");
1340        }
1341
1342        // Send disconnect notifications to all active peers before closing transports
1343        self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
1344            .await;
1345
1346        // Stop Nostr overlay discovery background work and withdraw any advert.
1347        if let Some(bootstrap) = self.nostr_discovery.take()
1348            && let Err(e) = bootstrap.shutdown().await
1349        {
1350            warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
1351        }
1352
1353        // Tear down LAN mDNS responder + browser. Best-effort: the
1354        // OS will eventually time the advert out via its TTL even if
1355        // we don't get a clean unregister out before the daemon exits.
1356        if let Some(lan) = self.lan_discovery.take() {
1357            lan.shutdown().await;
1358        }
1359
1360        // Shutdown transports (they're packet producers)
1361        let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
1362        for transport_id in transport_ids {
1363            if let Some(mut handle) = self.transports.remove(&transport_id) {
1364                let transport_type = handle.transport_type().name;
1365                match handle.stop().await {
1366                    Ok(()) => {
1367                        info!(transport_id = %transport_id, transport_type, "Transport stopped");
1368                    }
1369                    Err(e) => {
1370                        warn!(
1371                            transport_id = %transport_id,
1372                            transport_type,
1373                            error = %e,
1374                            "Transport stop failed"
1375                        );
1376                    }
1377                }
1378            }
1379        }
1380
1381        // Drop packet channels
1382        self.packet_tx.take();
1383        self.packet_rx.take();
1384
1385        // Shutdown TUN interface
1386        if let Some(name) = self.tun_name.take() {
1387            info!(name = %name, "Shutting down TUN interface");
1388
1389            // Drop the tun_tx to signal the writer to stop
1390            self.tun_tx.take();
1391
1392            // Delete the interface (on Linux, causes reader to get EFAULT)
1393            if let Err(e) = shutdown_tun_interface(&name).await {
1394                warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
1395            }
1396
1397            // On macOS, signal the reader thread to exit by writing to the
1398            // shutdown pipe. The reader's select() will wake up and break.
1399            #[cfg(target_os = "macos")]
1400            if let Some(fd) = self.tun_shutdown_fd.take() {
1401                unsafe {
1402                    libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
1403                    libc::close(fd);
1404                }
1405            }
1406
1407            // Wait for threads to finish
1408            if let Some(handle) = self.tun_reader_handle.take() {
1409                let _ = handle.join();
1410            }
1411            if let Some(handle) = self.tun_writer_handle.take() {
1412                let _ = handle.join();
1413            }
1414
1415            self.tun_state = TunState::Disabled;
1416        }
1417
1418        self.state = NodeState::Stopped;
1419        info!(state = %self.state, "Node stopped");
1420        Ok(())
1421    }
1422
1423    /// Send disconnect notifications to all active peers.
1424    ///
1425    /// Best-effort: send failures are logged and ignored since the transport
1426    /// may already be degraded. This runs before transports are shut down.
1427    async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
1428        let disconnect = Disconnect::new(reason);
1429        let plaintext = disconnect.encode();
1430
1431        // Collect node_addrs to avoid borrow conflict with send helper
1432        let peer_addrs: Vec<NodeAddr> = self
1433            .peers
1434            .iter()
1435            .filter(|(_, peer)| peer.can_send() && peer.has_session())
1436            .map(|(addr, _)| *addr)
1437            .collect();
1438
1439        if peer_addrs.is_empty() {
1440            debug!(
1441                total_peers = self.peers.len(),
1442                "No sendable peers for disconnect notification"
1443            );
1444            return;
1445        }
1446
1447        let mut sent = 0usize;
1448        for node_addr in &peer_addrs {
1449            match self
1450                .send_encrypted_link_message(node_addr, &plaintext)
1451                .await
1452            {
1453                Ok(()) => sent += 1,
1454                Err(e) => {
1455                    debug!(
1456                        peer = %self.peer_display_name(node_addr),
1457                        error = %e,
1458                        "Failed to send disconnect (transport may be down)"
1459                    );
1460                }
1461            }
1462        }
1463
1464        info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
1465    }
1466
1467    fn static_peer_addresses(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
1468        peer_config
1469            .addresses_by_priority()
1470            .into_iter()
1471            .cloned()
1472            .collect()
1473    }
1474
1475    async fn nostr_peer_fallback_addresses(
1476        &self,
1477        peer_config: &PeerConfig,
1478        existing: &[PeerAddress],
1479    ) -> Vec<PeerAddress> {
1480        if !self.config.node.discovery.nostr.enabled
1481            || self.config.node.discovery.nostr.policy
1482                == crate::config::NostrDiscoveryPolicy::Disabled
1483        {
1484            return Vec::new();
1485        }
1486
1487        let Some(bootstrap) = self.nostr_discovery.clone() else {
1488            return Vec::new();
1489        };
1490        let endpoints = match bootstrap.advert_endpoints_for_peer(&peer_config.npub).await {
1491            Ok(endpoints) => endpoints,
1492            Err(err) => {
1493                debug!(
1494                    npub = %peer_config.npub,
1495                    error = %err,
1496                    "Failed to resolve Nostr advert endpoints for configured peer"
1497                );
1498                return Vec::new();
1499            }
1500        };
1501
1502        let mut fallback = Vec::new();
1503        let mut next_priority = existing
1504            .iter()
1505            .map(|addr| addr.priority)
1506            .max()
1507            .unwrap_or(100)
1508            .saturating_add(1);
1509        // Stamp every overlay-derived candidate with the current wall clock
1510        // so the dialer ranks it ahead of unstamped operator hints. We use a
1511        // single timestamp per fetch (rather than per-endpoint) because all
1512        // candidates in a single advert are equally fresh.
1513        let seen_at_ms = Self::now_ms();
1514        for endpoint in endpoints {
1515            let Some(candidate) =
1516                Self::overlay_endpoint_to_peer_address(&endpoint, next_priority, seen_at_ms)
1517            else {
1518                continue;
1519            };
1520            if existing
1521                .iter()
1522                .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
1523                || fallback.iter().any(|addr: &PeerAddress| {
1524                    addr.transport == candidate.transport && addr.addr == candidate.addr
1525                })
1526            {
1527                continue;
1528            }
1529            fallback.push(candidate);
1530            next_priority = next_priority.saturating_add(1);
1531        }
1532        fallback
1533    }
1534
1535    fn overlay_endpoint_to_peer_address(
1536        endpoint: &OverlayEndpointAdvert,
1537        priority: u8,
1538        seen_at_ms: u64,
1539    ) -> Option<PeerAddress> {
1540        let transport = match endpoint.transport {
1541            OverlayTransportKind::Udp => "udp",
1542            OverlayTransportKind::Tcp => "tcp",
1543            OverlayTransportKind::Tor => "tor",
1544        };
1545        Some(
1546            PeerAddress::with_priority(transport, endpoint.addr.clone(), priority)
1547                .with_seen_at_ms(seen_at_ms),
1548        )
1549    }
1550
1551    async fn attempt_peer_address_list(
1552        &mut self,
1553        peer_config: &PeerConfig,
1554        peer_identity: PeerIdentity,
1555        allow_bootstrap_nat: bool,
1556        addresses: &[PeerAddress],
1557    ) -> Result<(), NodeError> {
1558        for addr in addresses {
1559            if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
1560                if !allow_bootstrap_nat {
1561                    continue;
1562                }
1563                let Some(bootstrap) = self.nostr_discovery.clone() else {
1564                    debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
1565                    continue;
1566                };
1567                bootstrap.request_connect(peer_config.clone()).await;
1568                info!(npub = %peer_config.npub, "Started Nostr UDP NAT traversal attempt");
1569                return Ok(());
1570            }
1571
1572            let (transport_id, remote_addr) = if addr.transport == "ethernet" {
1573                match self.resolve_ethernet_addr(&addr.addr) {
1574                    Ok(result) => result,
1575                    Err(e) => {
1576                        debug!(
1577                            transport = %addr.transport,
1578                            addr = %addr.addr,
1579                            error = %e,
1580                            "Failed to resolve Ethernet address"
1581                        );
1582                        continue;
1583                    }
1584                }
1585            } else if addr.transport == "ble" {
1586                #[cfg(bluer_available)]
1587                {
1588                    match self.resolve_ble_addr(&addr.addr) {
1589                        Ok(result) => result,
1590                        Err(e) => {
1591                            debug!(
1592                                transport = %addr.transport,
1593                                addr = %addr.addr,
1594                                error = %e,
1595                                "Failed to resolve BLE address"
1596                            );
1597                            continue;
1598                        }
1599                    }
1600                }
1601                #[cfg(not(bluer_available))]
1602                {
1603                    debug!(transport = %addr.transport, "BLE transport not available on this build");
1604                    continue;
1605                }
1606            } else {
1607                let tid = match self.find_transport_for_type(&addr.transport) {
1608                    Some(id) => id,
1609                    None => {
1610                        debug!(
1611                            transport = %addr.transport,
1612                            addr = %addr.addr,
1613                            "No operational transport for address type"
1614                        );
1615                        continue;
1616                    }
1617                };
1618                (tid, TransportAddr::from_string(&addr.addr))
1619            };
1620
1621            match self
1622                .initiate_connection(transport_id, remote_addr, peer_identity)
1623                .await
1624            {
1625                Ok(()) => return Ok(()),
1626                Err(e @ NodeError::AccessDenied(_)) => return Err(e),
1627                Err(e) => {
1628                    debug!(
1629                        npub = %peer_config.npub,
1630                        transport_id = %transport_id,
1631                        error = %e,
1632                        "Connection attempt failed, trying next address"
1633                    );
1634                }
1635            }
1636        }
1637
1638        Err(NodeError::NoTransportForType(format!(
1639            "no operational transport for any of {}'s addresses",
1640            peer_config.npub
1641        )))
1642    }
1643
1644    async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
1645        self.run_open_discovery_sweep(bootstrap, None, "per-tick")
1646            .await;
1647    }
1648
1649    /// Open-discovery cache sweep. Iterates the cached overlay adverts and
1650    /// queues retries for non-configured, not-yet-connected peers.
1651    ///
1652    /// `max_age_secs`, if set, filters out adverts whose `created_at` is
1653    /// older than `now - max_age_secs`. The per-tick sweep passes `None`
1654    /// (relies on the cache's own `valid_until_ms` filter); the one-shot
1655    /// startup sweep passes `Some(startup_sweep_max_age_secs)`.
1656    ///
1657    /// `caller` is a short label included in log lines so per-tick and
1658    /// startup sweeps are distinguishable in operator-facing logs.
1659    pub(in crate::node) async fn run_open_discovery_sweep(
1660        &mut self,
1661        bootstrap: &std::sync::Arc<NostrDiscovery>,
1662        max_age_secs: Option<u64>,
1663        caller: &'static str,
1664    ) {
1665        if !self.config.node.discovery.nostr.enabled
1666            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
1667        {
1668            return;
1669        }
1670
1671        let configured_npubs = self
1672            .config
1673            .peers()
1674            .iter()
1675            .map(|peer| peer.npub.clone())
1676            .collect::<HashSet<_>>();
1677        let now_ms = Self::now_ms();
1678        let now_secs = now_ms / 1000;
1679        let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
1680        if enqueue_budget == 0 {
1681            debug!(
1682                caller = %caller,
1683                "open-discovery sweep: enqueue budget is 0, skipping"
1684            );
1685            return;
1686        }
1687
1688        let candidates = bootstrap.cached_open_discovery_candidates(64).await;
1689        let cached_count = candidates.len();
1690        let mut enqueued = 0usize;
1691        let mut skipped_age = 0usize;
1692        let mut skipped_configured = 0usize;
1693        let mut skipped_self = 0usize;
1694        let mut skipped_connected = 0usize;
1695        let mut skipped_retry_pending = 0usize;
1696        let mut skipped_connecting = 0usize;
1697        let mut skipped_no_endpoints = 0usize;
1698        let mut skipped_invalid_npub = 0usize;
1699        let mut skipped_cooldown = 0usize;
1700
1701        for (npub, endpoints, created_at_secs) in candidates {
1702            if enqueue_budget == 0 {
1703                break;
1704            }
1705
1706            if let Some(max_age) = max_age_secs
1707                && now_secs.saturating_sub(created_at_secs) > max_age
1708            {
1709                skipped_age = skipped_age.saturating_add(1);
1710                continue;
1711            }
1712
1713            if configured_npubs.contains(&npub) {
1714                // Configured peers don't go through the open-discovery
1715                // enqueue path — their `PeerConfig` is already in
1716                // `self.config.peers()`, so the regular retry queue is
1717                // what drives their reconnect. But on cold start with
1718                // NAT'd peers, every initial `initiate_peer_connection`
1719                // fails (no overlay data yet, static cache hints empty
1720                // or stale), each pushes the peer into `retry_pending`
1721                // with exponential backoff (5/10/20/40/80s), and by the
1722                // time the next backoff slot fires the Nostr advert is
1723                // already cached — we just don't act on it for ~80s.
1724                //
1725                // The arrival of an advert (which this sweep sees) means
1726                // we now have a path to dial. If the peer's retry is
1727                // scheduled in the future, pull it forward to "now" so
1728                // the next `process_pending_retries` tick fires it
1729                // immediately. The retry path (`initiate_peer_retry_
1730                // connection` → `try_peer_addresses`) then refetches
1731                // the advert and dials it — no behavioral change
1732                // beyond schedule timing.
1733                if let Ok(identity) = PeerIdentity::from_npub(&npub) {
1734                    let configured_addr = *identity.node_addr();
1735                    if let Some(state) = self.retry_pending.get_mut(&configured_addr)
1736                        && state.retry_after_ms > now_ms
1737                    {
1738                        state.retry_after_ms = now_ms;
1739                        debug!(
1740                            caller = %caller,
1741                            peer = %self.peer_display_name(&configured_addr),
1742                            advert_age_secs = now_secs.saturating_sub(created_at_secs),
1743                            "Expediting configured-peer retry after fresh overlay advert"
1744                        );
1745                    }
1746                }
1747                skipped_configured = skipped_configured.saturating_add(1);
1748                continue;
1749            }
1750
1751            let peer_identity = match PeerIdentity::from_npub(&npub) {
1752                Ok(identity) => identity,
1753                Err(_) => {
1754                    skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
1755                    continue;
1756                }
1757            };
1758            let node_addr = *peer_identity.node_addr();
1759            if node_addr == *self.identity.node_addr() {
1760                skipped_self = skipped_self.saturating_add(1);
1761                continue;
1762            }
1763            if self.peers.contains_key(&node_addr) {
1764                skipped_connected = skipped_connected.saturating_add(1);
1765                continue;
1766            }
1767            if self.retry_pending.contains_key(&node_addr) {
1768                skipped_retry_pending = skipped_retry_pending.saturating_add(1);
1769                continue;
1770            }
1771            if bootstrap.cooldown_until(&npub, now_ms).is_some() {
1772                skipped_cooldown = skipped_cooldown.saturating_add(1);
1773                continue;
1774            }
1775            let connecting = self.connections.values().any(|conn| {
1776                conn.expected_identity()
1777                    .map(|id| id.node_addr() == &node_addr)
1778                    .unwrap_or(false)
1779            });
1780            if connecting {
1781                skipped_connecting = skipped_connecting.saturating_add(1);
1782                continue;
1783            }
1784
1785            let mut addresses = Vec::new();
1786            let mut priority = 120u8;
1787            let seen_at_ms = Self::now_ms();
1788            for endpoint in endpoints {
1789                let Some(candidate) =
1790                    Self::overlay_endpoint_to_peer_address(&endpoint, priority, seen_at_ms)
1791                else {
1792                    continue;
1793                };
1794                if addresses.iter().any(|existing: &PeerAddress| {
1795                    existing.transport == candidate.transport && existing.addr == candidate.addr
1796                }) {
1797                    continue;
1798                }
1799                addresses.push(candidate);
1800                priority = priority.saturating_add(1);
1801            }
1802            if addresses.is_empty() {
1803                skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
1804                continue;
1805            }
1806
1807            self.peer_aliases
1808                .entry(node_addr)
1809                .or_insert_with(|| peer_identity.short_npub());
1810            self.register_identity(node_addr, peer_identity.pubkey_full());
1811
1812            let mut state = super::retry::RetryState::new(PeerConfig {
1813                npub: npub.clone(),
1814                alias: None,
1815                addresses,
1816                connect_policy: ConnectPolicy::AutoConnect,
1817                auto_reconnect: true,
1818            });
1819            state.reconnect = false;
1820            state.retry_after_ms = now_ms;
1821            state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
1822            self.retry_pending.insert(node_addr, state);
1823            info!(
1824                caller = %caller,
1825                peer = %peer_identity.short_npub(),
1826                advert_age_secs = now_secs.saturating_sub(created_at_secs),
1827                "open-discovery sweep: queued retry for cached advert"
1828            );
1829            enqueue_budget = enqueue_budget.saturating_sub(1);
1830            enqueued = enqueued.saturating_add(1);
1831        }
1832
1833        // Always log a one-line summary on the startup sweep so operators
1834        // can verify it ran. Per-tick sweeps are noisier; only summarize
1835        // when something happened.
1836        let total_skipped = skipped_age
1837            + skipped_configured
1838            + skipped_self
1839            + skipped_connected
1840            + skipped_retry_pending
1841            + skipped_connecting
1842            + skipped_no_endpoints
1843            + skipped_invalid_npub
1844            + skipped_cooldown;
1845        let should_summarize = caller == "startup" || enqueued > 0;
1846        if should_summarize {
1847            info!(
1848                caller = %caller,
1849                cached = cached_count,
1850                queued = enqueued,
1851                skipped_age = skipped_age,
1852                skipped_configured = skipped_configured,
1853                skipped_self = skipped_self,
1854                skipped_connected = skipped_connected,
1855                skipped_retry_pending = skipped_retry_pending,
1856                skipped_connecting = skipped_connecting,
1857                skipped_no_endpoints = skipped_no_endpoints,
1858                skipped_invalid_npub = skipped_invalid_npub,
1859                skipped_cooldown = skipped_cooldown,
1860                skipped_total = total_skipped,
1861                "open-discovery sweep complete"
1862            );
1863        }
1864    }
1865
1866    /// One-shot startup sweep: runs once after the configured settle
1867    /// delay, iterating the cached overlay adverts and queueing retries
1868    /// for any peer with a recent enough advert that we haven't already
1869    /// configured statically or established a link to.
1870    ///
1871    /// Gated identically to [`run_open_discovery_sweep`]: requires
1872    /// `node.discovery.nostr.enabled` and `policy == open`.
1873    async fn maybe_run_startup_open_discovery_sweep(
1874        &mut self,
1875        bootstrap: &std::sync::Arc<NostrDiscovery>,
1876    ) {
1877        if self.startup_open_discovery_sweep_done {
1878            return;
1879        }
1880        if !self.config.node.discovery.nostr.enabled
1881            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
1882        {
1883            // Mark done so we don't keep re-checking on every tick.
1884            self.startup_open_discovery_sweep_done = true;
1885            return;
1886        }
1887        let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
1888            return;
1889        };
1890        let now_ms = Self::now_ms();
1891        let delay_ms = self
1892            .config
1893            .node
1894            .discovery
1895            .nostr
1896            .startup_sweep_delay_secs
1897            .saturating_mul(1000);
1898        if now_ms < started_at_ms.saturating_add(delay_ms) {
1899            return;
1900        }
1901
1902        let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
1903        self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
1904            .await;
1905        self.startup_open_discovery_sweep_done = true;
1906    }
1907
1908    fn available_outbound_slots(&self) -> usize {
1909        let connection_used = self
1910            .connections
1911            .len()
1912            .saturating_add(self.pending_connects.len());
1913        let connection_slots = if self.max_connections == 0 {
1914            usize::MAX
1915        } else {
1916            self.max_connections.saturating_sub(connection_used)
1917        };
1918
1919        let peer_slots = if self.max_peers == 0 {
1920            usize::MAX
1921        } else {
1922            self.max_peers.saturating_sub(self.peers.len())
1923        };
1924
1925        connection_slots.min(peer_slots)
1926    }
1927
1928    fn open_discovery_enqueue_budget(&self, configured_npubs: &HashSet<String>) -> usize {
1929        let current_open_discovery_pending = self
1930            .retry_pending
1931            .values()
1932            .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
1933            .count();
1934
1935        let cap_remaining = self
1936            .config
1937            .node
1938            .discovery
1939            .nostr
1940            .open_discovery_max_pending
1941            .saturating_sub(current_open_discovery_pending);
1942
1943        cap_remaining.min(self.available_outbound_slots())
1944    }
1945
1946    fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
1947        now_ms.saturating_add(
1948            self.config
1949                .node
1950                .discovery
1951                .nostr
1952                .advert_ttl_secs
1953                .saturating_mul(1000)
1954                .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
1955        )
1956    }
1957
1958    async fn build_overlay_advert(
1959        &self,
1960        bootstrap: &std::sync::Arc<NostrDiscovery>,
1961    ) -> Option<OverlayAdvert> {
1962        if !self.config.node.discovery.nostr.enabled {
1963            return None;
1964        }
1965
1966        let mut endpoints = Vec::new();
1967        let mut has_udp_nat = false;
1968
1969        for handle in self.transports.values() {
1970            if !handle.is_operational() {
1971                continue;
1972            }
1973
1974            match handle.transport_type().name {
1975                "udp" => {
1976                    let Some(cfg) = self.lookup_udp_config(handle.name()) else {
1977                        continue;
1978                    };
1979                    if !cfg.advertise_on_nostr() {
1980                        continue;
1981                    }
1982                    if cfg.is_public() {
1983                        // Precedence:
1984                        // 1. operator-supplied `external_addr` (skips STUN)
1985                        // 2. non-wildcard *public* `local_addr` (operator
1986                        //    bound to a specific public IP directly)
1987                        // 3. STUN auto-discovery against ephemeral socket
1988                        //    (also taken when bind is wildcard *or* private —
1989                        //    a private bind is not peer-reachable, so we
1990                        //    must publish the public reflexive instead)
1991                        // 4. loud warn + omit endpoint
1992                        if let Some(explicit) = cfg.external_advert_addr() {
1993                            endpoints.push(OverlayEndpointAdvert {
1994                                transport: OverlayTransportKind::Udp,
1995                                addr: explicit.to_string(),
1996                            });
1997                        } else {
1998                            match handle.local_addr() {
1999                                Some(addr)
2000                                    if !addr.ip().is_unspecified()
2001                                        && !is_unroutable_advert_ip(addr.ip()) =>
2002                                {
2003                                    endpoints.push(OverlayEndpointAdvert {
2004                                        transport: OverlayTransportKind::Udp,
2005                                        addr: addr.to_string(),
2006                                    });
2007                                }
2008                                Some(addr) => {
2009                                    let key = handle.transport_id().as_u32();
2010                                    let port = addr.port();
2011                                    if let Some(public) =
2012                                        bootstrap.learn_public_udp_addr(key, port).await
2013                                    {
2014                                        endpoints.push(OverlayEndpointAdvert {
2015                                            transport: OverlayTransportKind::Udp,
2016                                            addr: public.to_string(),
2017                                        });
2018                                    } else {
2019                                        warn!(
2020                                            transport_id = key,
2021                                            bind_addr = %addr,
2022                                            "advert: udp public=true but bind is wildcard \
2023                                            or private and STUN observation failed; \
2024                                            advertising no UDP endpoint. Either set \
2025                                            transports.udp.external_addr, bind to a \
2026                                            specific *public* IP, or ensure \
2027                                            node.discovery.nostr.stun_servers is reachable"
2028                                        );
2029                                    }
2030                                }
2031                                None => {}
2032                            }
2033                        }
2034                    } else {
2035                        endpoints.push(OverlayEndpointAdvert {
2036                            transport: OverlayTransportKind::Udp,
2037                            addr: "nat".to_string(),
2038                        });
2039                        has_udp_nat = true;
2040                    }
2041                }
2042                "tcp" => {
2043                    let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
2044                        continue;
2045                    };
2046                    if !cfg.advertise_on_nostr() {
2047                        continue;
2048                    }
2049                    // Precedence:
2050                    // 1. operator-supplied `external_addr` (only path that
2051                    //    works on cloud-NAT setups where the public IP is
2052                    //    not on a host interface).
2053                    // 2. non-wildcard *public* `local_addr` (operator bound
2054                    //    to a specific public IP directly).
2055                    // 3. loud warn + omit endpoint (no TCP STUN equivalent).
2056                    //
2057                    // A wildcard *or* private bind is never advertised as-is
2058                    // — peers off-LAN can't reach a private bind, and there
2059                    // is no TCP STUN to discover a public reflexive.
2060                    if let Some(explicit) = cfg.external_advert_addr() {
2061                        endpoints.push(OverlayEndpointAdvert {
2062                            transport: OverlayTransportKind::Tcp,
2063                            addr: explicit.to_string(),
2064                        });
2065                    } else {
2066                        match handle.local_addr() {
2067                            Some(addr)
2068                                if !addr.ip().is_unspecified()
2069                                    && !is_unroutable_advert_ip(addr.ip()) =>
2070                            {
2071                                endpoints.push(OverlayEndpointAdvert {
2072                                    transport: OverlayTransportKind::Tcp,
2073                                    addr: addr.to_string(),
2074                                });
2075                            }
2076                            Some(addr) => {
2077                                warn!(
2078                                    bind_addr = %addr,
2079                                    "advert: tcp advertise_on_nostr=true bound to wildcard \
2080                                    or private IP and no transports.tcp.external_addr set; \
2081                                    advertising no TCP endpoint. Either set external_addr \
2082                                    to the public IP (recommended for cloud 1:1-NAT setups) \
2083                                    or bind explicitly to the public IP"
2084                                );
2085                            }
2086                            None => {}
2087                        }
2088                    }
2089                }
2090                "tor" => {
2091                    let Some(cfg) = self.lookup_tor_config(handle.name()) else {
2092                        continue;
2093                    };
2094                    if !cfg.advertise_on_nostr() {
2095                        continue;
2096                    }
2097                    if let Some(addr) = handle.onion_address() {
2098                        endpoints.push(OverlayEndpointAdvert {
2099                            transport: OverlayTransportKind::Tor,
2100                            addr: format!("{}:{}", addr, cfg.advertised_port()),
2101                        });
2102                    }
2103                }
2104                _ => {}
2105            }
2106        }
2107
2108        if endpoints.is_empty() {
2109            return None;
2110        }
2111
2112        Some(OverlayAdvert {
2113            identifier: ADVERT_IDENTIFIER.to_string(),
2114            version: ADVERT_VERSION,
2115            endpoints,
2116            signal_relays: has_udp_nat.then(|| self.config.node.discovery.nostr.dm_relays.clone()),
2117            stun_servers: has_udp_nat
2118                .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
2119        })
2120    }
2121
2122    async fn refresh_overlay_advert(
2123        &self,
2124        bootstrap: &std::sync::Arc<NostrDiscovery>,
2125    ) -> Result<(), crate::discovery::nostr::BootstrapError> {
2126        let advert = self.build_overlay_advert(bootstrap).await;
2127        bootstrap.update_local_advert(advert).await
2128    }
2129
2130    fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
2131        match (&self.config.transports.udp, transport_name) {
2132            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2133            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2134            _ => None,
2135        }
2136    }
2137
2138    fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
2139        match (&self.config.transports.tcp, transport_name) {
2140            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2141            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2142            _ => None,
2143        }
2144    }
2145
2146    fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
2147        match (&self.config.transports.tor, transport_name) {
2148            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
2149            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
2150            _ => None,
2151        }
2152    }
2153
2154    pub(in crate::node) async fn try_peer_addresses(
2155        &mut self,
2156        peer_config: &PeerConfig,
2157        peer_identity: PeerIdentity,
2158        allow_bootstrap_nat: bool,
2159    ) -> Result<(), NodeError> {
2160        let peer_node_addr = *peer_identity.node_addr();
2161        if self.peers.contains_key(&peer_node_addr) {
2162            debug!(
2163                npub = %peer_config.npub,
2164                "Peer already exists, skipping address attempts"
2165            );
2166            return Ok(());
2167        }
2168        if self.is_connecting_to_peer(&peer_node_addr) {
2169            debug!(
2170                npub = %peer_config.npub,
2171                "Connection already in progress, skipping address attempts"
2172            );
2173            return Ok(());
2174        }
2175
2176        // Merge every candidate from every source we have for this peer
2177        // (operator-configured static addresses, freshly fetched overlay
2178        // adverts, callers' recent-peers caches via `update_peers`) and
2179        // try them in order of `seen_at_ms` descending — most-recent
2180        // first, source-agnostic. Addresses without a freshness signal
2181        // sort last. Addresses are hints, not the final word; we try them
2182        // all in one pass and stop at the first success.
2183        let static_addresses = self.static_peer_addresses(peer_config);
2184        let overlay_addresses = self
2185            .nostr_peer_fallback_addresses(peer_config, &static_addresses)
2186            .await;
2187
2188        let mut candidates = Vec::with_capacity(overlay_addresses.len() + static_addresses.len());
2189        for addr in overlay_addresses.into_iter().chain(static_addresses) {
2190            if !candidates.iter().any(|existing: &PeerAddress| {
2191                existing.transport == addr.transport && existing.addr == addr.addr
2192            }) {
2193                candidates.push(addr);
2194            }
2195        }
2196
2197        // Stable sort: most-recently-observed first (Some > None), tiebreak
2198        // by input order so equal-timestamp addresses keep operator-supplied
2199        // priority and the overlay-then-static merge order survives when
2200        // nothing has a freshness signal.
2201        candidates.sort_by(|a, b| match (a.seen_at_ms, b.seen_at_ms) {
2202            (Some(a_ts), Some(b_ts)) => b_ts.cmp(&a_ts),
2203            (Some(_), None) => std::cmp::Ordering::Less,
2204            (None, Some(_)) => std::cmp::Ordering::Greater,
2205            (None, None) => std::cmp::Ordering::Equal,
2206        });
2207
2208        if candidates.is_empty() {
2209            return Err(NodeError::NoTransportForType(format!(
2210                "no addresses known for {}",
2211                peer_config.npub
2212            )));
2213        }
2214
2215        if self
2216            .attempt_peer_address_list(peer_config, peer_identity, allow_bootstrap_nat, &candidates)
2217            .await
2218            .is_ok()
2219        {
2220            return Ok(());
2221        }
2222
2223        Err(NodeError::NoTransportForType(format!(
2224            "no operational transport for any of {}'s addresses",
2225            peer_config.npub
2226        )))
2227    }
2228
2229    // === Control API methods ===
2230
2231    /// Connect to a peer via the control API.
2232    ///
2233    /// Creates an ephemeral peer connection (not persisted to config, no
2234    /// auto-reconnect). Reuses the same connection path as auto-connect
2235    /// peers. Returns JSON data on success or an error message.
2236    pub(crate) async fn api_connect(
2237        &mut self,
2238        npub: &str,
2239        address: &str,
2240        transport: &str,
2241    ) -> Result<serde_json::Value, String> {
2242        let peer_config = PeerConfig {
2243            npub: npub.to_string(),
2244            alias: None,
2245            addresses: vec![PeerAddress::new(transport, address)],
2246            connect_policy: ConnectPolicy::Manual,
2247            auto_reconnect: false,
2248        };
2249
2250        // Pre-seed identity cache (same as initiate_peer_connections does)
2251        if let Ok(identity) = PeerIdentity::from_npub(npub) {
2252            self.peer_aliases
2253                .insert(*identity.node_addr(), identity.short_npub());
2254            self.register_identity(*identity.node_addr(), identity.pubkey_full());
2255        }
2256
2257        self.initiate_peer_connection(&peer_config)
2258            .await
2259            .map(|()| {
2260                info!(
2261                    npub = %npub,
2262                    address = %address,
2263                    transport = %transport,
2264                    "API connect initiated"
2265                );
2266                serde_json::json!({
2267                    "npub": npub,
2268                    "address": address,
2269                    "transport": transport,
2270                })
2271            })
2272            .map_err(|e| e.to_string())
2273    }
2274
2275    /// Disconnect a peer via the control API.
2276    ///
2277    /// Removes the peer and suppresses auto-reconnect.
2278    pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
2279        let peer_identity =
2280            PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
2281        let node_addr = *peer_identity.node_addr();
2282
2283        if !self.peers.contains_key(&node_addr) {
2284            return Err(format!("peer not found: {npub}"));
2285        }
2286
2287        // Remove the peer (full cleanup: sessions, indices, links, tree, bloom)
2288        self.remove_active_peer(&node_addr);
2289
2290        // Suppress any pending auto-reconnect
2291        self.retry_pending.remove(&node_addr);
2292
2293        info!(npub = %npub, "API disconnect completed");
2294
2295        Ok(serde_json::json!({
2296            "npub": npub,
2297            "disconnected": true,
2298        }))
2299    }
2300
2301    /// Adopt an already-established UDP traversal and start the normal FIPS
2302    /// Noise handshake over it.
2303    ///
2304    /// This is intended for integration with an external rendezvous runtime
2305    /// that has already completed relay signaling, STUN observation, and UDP
2306    /// hole punching. After handoff, the adopted socket is owned by FIPS.
2307    pub async fn adopt_established_traversal(
2308        &mut self,
2309        traversal: EstablishedTraversal,
2310    ) -> Result<BootstrapHandoffResult, NodeError> {
2311        debug!(
2312            peer_npub = %traversal.peer_npub,
2313            session_id = %traversal.session_id,
2314            remote_addr = %traversal.remote_addr,
2315            "adopting established traversal socket"
2316        );
2317
2318        if !self.state.is_operational() {
2319            return Err(NodeError::NotStarted);
2320        }
2321
2322        let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
2323        let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
2324            NodeError::InvalidPeerNpub {
2325                npub: traversal.peer_npub.clone(),
2326                reason: e.to_string(),
2327            }
2328        })?;
2329        let peer_node_addr = *peer_identity.node_addr();
2330        if self.peers.contains_key(&peer_node_addr) {
2331            debug!(
2332                peer_npub = %traversal.peer_npub,
2333                "Ignoring NAT traversal handoff for already-connected peer"
2334            );
2335            return Err(NodeError::PeerAlreadyExists(peer_node_addr));
2336        }
2337        if self.is_connecting_to_peer(&peer_node_addr) {
2338            debug!(
2339                peer_npub = %traversal.peer_npub,
2340                "Ignoring NAT traversal handoff while peer handshake is already in progress"
2341            );
2342            return Err(NodeError::PeerAlreadyExists(peer_node_addr));
2343        }
2344
2345        self.peer_aliases
2346            .insert(peer_node_addr, peer_identity.short_npub());
2347        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
2348
2349        let transport_id = self.allocate_transport_id();
2350        // Adopted ephemeral UDP transports inherit MTU + socket-buffer sizing
2351        // (and accept_connections / advertise flags) from the operator's
2352        // configured [transports.udp] when the bootstrap runtime doesn't
2353        // pass an explicit override. Lookup tries `transport_name` first
2354        // (covers the `Named` multi-listener variant) and falls back to the
2355        // unnamed `Single` listener, so single- and named-listener configs
2356        // both inherit cleanly.
2357        //
2358        // Tradeoff: `UdpConfig::default()` sets MTU 1280 (IPv6 minimum), the
2359        // only value guaranteed to survive arbitrary middlebox paths.
2360        // Inheriting a higher operator-chosen MTU means NAT-traversed flows
2361        // initially attempt that MTU and may black-hole on tighter paths
2362        // until reactive `MtuExceeded` recovery kicks in. Operators who
2363        // raise the primary MTU based on known-clean topology accept that
2364        // tradeoff; the silent drop on a too-low default was strictly
2365        // worse for the common case where the primary MTU is reachable.
2366        //
2367        // Bind / external address fields are cleared since the socket is
2368        // already bound.
2369        let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
2370            let mut cfg = self
2371                .lookup_udp_config(traversal.transport_name.as_deref())
2372                .or_else(|| self.lookup_udp_config(None))
2373                .cloned()
2374                .unwrap_or_default();
2375            cfg.bind_addr = None;
2376            cfg.external_addr = None;
2377            cfg
2378        });
2379        let mut transport = crate::transport::udp::UdpTransport::new(
2380            transport_id,
2381            traversal.transport_name.clone(),
2382            inherited_config,
2383            packet_tx,
2384        );
2385
2386        transport
2387            .adopt_socket_async(traversal.socket)
2388            .await
2389            .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
2390
2391        let local_addr = transport.local_addr().ok_or_else(|| {
2392            NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
2393        })?;
2394
2395        self.transports.insert(
2396            transport_id,
2397            crate::transport::TransportHandle::Udp(transport),
2398        );
2399        self.bootstrap_transports.insert(transport_id);
2400        self.bootstrap_transport_npubs
2401            .insert(transport_id, traversal.peer_npub.clone());
2402
2403        let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
2404        if let Err(err) = self
2405            .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
2406            .await
2407        {
2408            self.bootstrap_transports.remove(&transport_id);
2409            self.bootstrap_transport_npubs.remove(&transport_id);
2410            if let Some(mut handle) = self.transports.remove(&transport_id) {
2411                let _ = handle.stop().await;
2412            }
2413            return Err(err);
2414        }
2415
2416        info!(
2417            peer = %self.peer_display_name(&peer_node_addr),
2418            transport_id = %transport_id,
2419            local_addr = %local_addr,
2420            remote_addr = %traversal.remote_addr,
2421            session_id = %traversal.session_id,
2422            "adopted NAT traversal socket; handshake initiated"
2423        );
2424
2425        Ok(BootstrapHandoffResult {
2426            transport_id,
2427            local_addr,
2428            remote_addr: traversal.remote_addr,
2429            peer_node_addr,
2430            session_id: traversal.session_id,
2431        })
2432    }
2433}