Skip to main content

fips_core/node/
lifecycle.rs

1//! Node lifecycle management: start, stop, and peer connection initiation.
2
3use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6    ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, NostrDiscovery, OverlayAdvert,
7    OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use std::collections::HashSet;
18use std::net::IpAddr;
19use std::thread;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23/// True if `ip` is not a viable canonical advert endpoint for peers off
24/// the publisher's own LAN. Covers RFC1918, loopback, link-local, IPv4
25/// CGNAT (100.64/10), unspecified, multicast/benchmark, and IPv6
26/// unique-local/loopback/unspecified. We never publish these as the
27/// peer's primary `runtime_endpoint`; an off-LAN consumer can't route
28/// to them, and latching one in onto a slow overlay-relay fallback is
29/// the original bug this guard exists to prevent.
30fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
31    match ip {
32        IpAddr::V4(v4) => {
33            v4.is_private()
34                || v4.is_loopback()
35                || v4.is_link_local()
36                || v4.is_unspecified()
37                || v4.is_multicast()
38                || v4.is_broadcast()
39                || v4.is_documentation()
40                // 100.64.0.0/10 — CGNAT, RFC 6598. Not routable on the
41                // public internet; behaves like an extra NAT layer.
42                || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
43        }
44        IpAddr::V6(v6) => {
45            v6.is_loopback()
46                || v6.is_unspecified()
47                || v6.is_unique_local()
48                || v6.is_multicast()
49                // IPv6 link-local: fe80::/10
50                || (v6.segments()[0] & 0xffc0) == 0xfe80
51        }
52    }
53}
54
55const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
56
57impl Node {
58    /// Initiate connections to configured static peers.
59    ///
60    /// For each peer configured with AutoConnect policy, creates a link and
61    /// peer entry, then starts the Noise handshake by sending the first message.
62    pub(super) async fn initiate_peer_connections(&mut self) {
63        // Build display name map from all configured peers (alias or short npub),
64        // and pre-seed the identity cache from each peer's npub so that TUN packets
65        // addressed to a configured peer can be dispatched (and trigger session
66        // initiation) immediately on startup — without waiting for the link-layer
67        // handshake to complete first.
68        let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
69            .config
70            .peers()
71            .iter()
72            .filter_map(|pc| {
73                PeerIdentity::from_npub(&pc.npub)
74                    .ok()
75                    .map(|id| (id, pc.alias.clone()))
76            })
77            .collect();
78
79        for (identity, alias) in peer_identities {
80            let name = alias.unwrap_or_else(|| identity.short_npub());
81            self.peer_aliases.insert(*identity.node_addr(), name);
82            // Pre-seed identity cache. The parity may be wrong (npub is x-only)
83            // but will be corrected to the real value when the peer is promoted
84            // after a successful Noise handshake.
85            self.register_identity(*identity.node_addr(), identity.pubkey_full());
86        }
87
88        // Collect peer configs to avoid borrow conflicts
89        let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
90
91        if peer_configs.is_empty() {
92            debug!("No static peers configured");
93            return;
94        }
95
96        debug!(
97            count = peer_configs.len(),
98            "Initiating static peer connections"
99        );
100
101        for peer_config in peer_configs {
102            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
103                warn!(
104                    npub = %peer_config.npub,
105                    alias = ?peer_config.alias,
106                    error = %e,
107                    "Failed to initiate peer connection"
108                );
109                // Schedule a retry so transient address-resolution failures
110                // (e.g. cached endpoints stale, NAT rebinds, all addresses
111                // currently unreachable) recover without a daemon restart.
112                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
113                    self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
114                }
115                // No-transport failures most often mean the cached overlay
116                // advert is pointing at a dead post-NAT-rebind address. The
117                // advert cache is read-only inside fetch_advert, so retries
118                // would loop on the same dead address until expiry. Force a
119                // re-fetch so the next retry tick picks up fresh endpoints.
120                if matches!(e, crate::node::NodeError::NoTransportForType(_))
121                    && let Some(bootstrap) = self.nostr_discovery.clone()
122                {
123                    let npub = peer_config.npub.clone();
124                    tokio::spawn(async move {
125                        let _ = bootstrap.refetch_advert_for_stale_check(&npub).await;
126                    });
127                }
128            }
129        }
130    }
131
132    /// Initiate a connection to a single peer.
133    ///
134    /// Creates a link, starts the Noise handshake, and sends the first message.
135    pub(super) async fn initiate_peer_connection(
136        &mut self,
137        peer_config: &crate::config::PeerConfig,
138    ) -> Result<(), NodeError> {
139        // Parse the peer's npub to get their identity
140        let peer_identity =
141            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
142                npub: peer_config.npub.clone(),
143                reason: e.to_string(),
144            })?;
145
146        let peer_node_addr = *peer_identity.node_addr();
147
148        // Check if peer already exists (fully authenticated)
149        if self.peers.contains_key(&peer_node_addr) {
150            debug!(
151                npub = %peer_config.npub,
152                "Peer already exists, skipping"
153            );
154            return Ok(());
155        }
156
157        // Check if connection already in progress to this peer
158        let already_connecting = self.connections.values().any(|conn| {
159            conn.expected_identity()
160                .map(|id| id.node_addr() == &peer_node_addr)
161                .unwrap_or(false)
162        });
163        if already_connecting {
164            debug!(
165                npub = %peer_config.npub,
166                "Connection already in progress, skipping"
167            );
168            return Ok(());
169        }
170
171        self.try_peer_addresses(peer_config, peer_identity, true)
172            .await
173    }
174
175    /// Initiate a connection to a peer on a specific transport and address.
176    ///
177    /// For connectionless transports (UDP, Ethernet): allocates a link, starts
178    /// the Noise IK handshake, sends msg1, and registers the connection for
179    /// msg2 dispatch.
180    ///
181    /// For connection-oriented transports (TCP, Tor): allocates a link and
182    /// starts a non-blocking transport connect. The handshake is deferred
183    /// until the transport connection is established — the tick handler
184    /// polls `connection_state()` and initiates the handshake when ready.
185    pub(super) async fn initiate_connection(
186        &mut self,
187        transport_id: TransportId,
188        remote_addr: TransportAddr,
189        peer_identity: PeerIdentity,
190    ) -> Result<(), NodeError> {
191        let peer_node_addr = *peer_identity.node_addr();
192
193        self.authorize_peer(
194            &peer_identity,
195            PeerAclContext::OutboundConnect,
196            transport_id,
197            &remote_addr,
198        )?;
199
200        let is_connection_oriented = self
201            .transports
202            .get(&transport_id)
203            .map(|t| t.transport_type().connection_oriented)
204            .unwrap_or(false);
205
206        // Allocate link ID and create link
207        let link_id = self.allocate_link_id();
208
209        let link = if is_connection_oriented {
210            Link::new(
211                link_id,
212                transport_id,
213                remote_addr.clone(),
214                LinkDirection::Outbound,
215                Duration::from_millis(self.config.node.base_rtt_ms),
216            )
217        } else {
218            Link::connectionless(
219                link_id,
220                transport_id,
221                remote_addr.clone(),
222                LinkDirection::Outbound,
223                Duration::from_millis(self.config.node.base_rtt_ms),
224            )
225        };
226
227        self.links.insert(link_id, link);
228
229        // Add reverse lookup for packet dispatch
230        self.addr_to_link
231            .insert((transport_id, remote_addr.clone()), link_id);
232
233        if is_connection_oriented {
234            // Connection-oriented: start non-blocking connect, defer handshake
235            if let Some(transport) = self.transports.get(&transport_id) {
236                match transport.connect(&remote_addr).await {
237                    Ok(()) => {
238                        debug!(
239                            peer = %self.peer_display_name(&peer_node_addr),
240                            transport_id = %transport_id,
241                            remote_addr = %remote_addr,
242                            link_id = %link_id,
243                            "Transport connect initiated (non-blocking)"
244                        );
245                        self.pending_connects.push(super::PendingConnect {
246                            link_id,
247                            transport_id,
248                            remote_addr,
249                            peer_identity,
250                        });
251                    }
252                    Err(e) => {
253                        // Clean up link
254                        self.links.remove(&link_id);
255                        self.addr_to_link.remove(&(transport_id, remote_addr));
256                        return Err(NodeError::TransportError(e.to_string()));
257                    }
258                }
259            }
260            Ok(())
261        } else {
262            // Connectionless: proceed with immediate handshake
263            self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
264                .await
265        }
266    }
267
268    /// Start the Noise handshake on a link and send msg1.
269    ///
270    /// Called immediately for connectionless transports, or after the
271    /// transport connection is established for connection-oriented transports.
272    pub(super) async fn start_handshake(
273        &mut self,
274        link_id: LinkId,
275        transport_id: TransportId,
276        remote_addr: TransportAddr,
277        peer_identity: PeerIdentity,
278    ) -> Result<(), NodeError> {
279        let peer_node_addr = *peer_identity.node_addr();
280
281        // Create connection in handshake phase (outbound knows expected identity)
282        let current_time_ms = Self::now_ms();
283        let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
284
285        // Allocate a session index for this handshake
286        let our_index = match self.index_allocator.allocate() {
287            Ok(idx) => idx,
288            Err(e) => {
289                // Clean up the link we just created
290                self.links.remove(&link_id);
291                self.addr_to_link.remove(&(transport_id, remote_addr));
292                return Err(NodeError::IndexAllocationFailed(e.to_string()));
293            }
294        };
295
296        // Start the Noise handshake and get message 1
297        let our_keypair = self.identity.keypair();
298        let noise_msg1 =
299            match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
300                Ok(msg) => msg,
301                Err(e) => {
302                    // Clean up the index and link
303                    let _ = self.index_allocator.free(our_index);
304                    self.links.remove(&link_id);
305                    self.addr_to_link.remove(&(transport_id, remote_addr));
306                    return Err(NodeError::HandshakeFailed(e.to_string()));
307                }
308            };
309
310        // Set index and transport info on the connection
311        connection.set_our_index(our_index);
312        connection.set_transport_id(transport_id);
313        connection.set_source_addr(remote_addr.clone());
314
315        // Build wire format msg1: [0x01][sender_idx:4 LE][noise_msg1:82]
316        let wire_msg1 = build_msg1(our_index, &noise_msg1);
317
318        debug!(
319            peer = %self.peer_display_name(&peer_node_addr),
320            transport_id = %transport_id,
321            remote_addr = %remote_addr,
322            link_id = %link_id,
323            our_index = %our_index,
324            "Connection initiated"
325        );
326
327        // Store msg1 for resend and schedule first resend
328        let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
329        connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
330
331        // Track in pending_outbound for msg2 dispatch
332        self.pending_outbound
333            .insert((transport_id, our_index.as_u32()), link_id);
334        self.connections.insert(link_id, connection);
335
336        // Send the wire format handshake message
337        let send_result = match self.transports.get(&transport_id) {
338            Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
339            None => None,
340        };
341        if let Some(send_result) = send_result {
342            self.note_local_send_outcome(&send_result);
343            match send_result {
344                Ok(bytes) => {
345                    debug!(
346                        link_id = %link_id,
347                        our_index = %our_index,
348                        bytes,
349                        "Sent Noise handshake message 1 (wire format)"
350                    );
351                }
352                Err(e) => {
353                    warn!(
354                        link_id = %link_id,
355                        transport_id = %transport_id,
356                        remote_addr = %remote_addr,
357                        our_index = %our_index,
358                        error = %e,
359                        "Failed to send handshake message"
360                    );
361                    // Mark connection as failed but don't remove it yet
362                    // The event loop can handle retry logic
363                    if let Some(conn) = self.connections.get_mut(&link_id) {
364                        conn.mark_failed();
365                    }
366                }
367            }
368        }
369
370        Ok(())
371    }
372
373    /// Poll all transports for discovered peers and auto-connect.
374    ///
375    /// Called from the tick handler. Iterates operational transports,
376    /// drains their discovery buffers, and initiates connections to
377    /// newly discovered peers (if auto_connect is enabled).
378    pub(super) async fn poll_transport_discovery(&mut self) {
379        // Collect discoveries first to avoid borrow conflict with self
380        let mut to_connect = Vec::new();
381
382        for (transport_id, transport) in &self.transports {
383            if !transport.is_operational() {
384                continue;
385            }
386            if !transport.auto_connect() {
387                // Still drain the buffer so it doesn't grow unbounded
388                let _ = transport.discover();
389                continue;
390            }
391            let discovered = match transport.discover() {
392                Ok(peers) => peers,
393                Err(_) => continue,
394            };
395            for peer in discovered {
396                let pubkey = match peer.pubkey_hint {
397                    Some(pk) => pk,
398                    None => continue,
399                };
400                let identity = PeerIdentity::from_pubkey(pubkey);
401                let node_addr = *identity.node_addr();
402
403                // Skip self
404                if node_addr == *self.identity.node_addr() {
405                    continue;
406                }
407                // Skip if already connected
408                if self.peers.contains_key(&node_addr) {
409                    continue;
410                }
411                // Skip if connection already in progress
412                let connecting = self.connections.values().any(|c| {
413                    c.expected_identity()
414                        .map(|id| id.node_addr() == &node_addr)
415                        .unwrap_or(false)
416                });
417                if connecting {
418                    continue;
419                }
420
421                to_connect.push((*transport_id, peer.addr, identity));
422            }
423        }
424
425        for (transport_id, remote_addr, identity) in to_connect {
426            info!(
427                peer = %self.peer_display_name(identity.node_addr()),
428                transport_id = %transport_id,
429                remote_addr = %remote_addr,
430                "Auto-connecting to discovered peer"
431            );
432            if let Err(e) = self
433                .initiate_connection(transport_id, remote_addr, identity)
434                .await
435            {
436                warn!(error = %e, "Failed to auto-connect to discovered peer");
437            }
438        }
439    }
440
441    pub(super) async fn poll_nostr_discovery(&mut self) {
442        let Some(bootstrap) = self.nostr_discovery.clone() else {
443            return;
444        };
445
446        if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
447            debug!(error = %err, "Failed to refresh local Nostr overlay advert");
448        }
449
450        for event in bootstrap.drain_events().await {
451            match event {
452                BootstrapEvent::Established { traversal } => {
453                    let peer_npub = traversal.peer_npub.clone();
454                    match self.adopt_established_traversal(traversal).await {
455                        Ok(_) => {
456                            info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
457                        }
458                        Err(err) => {
459                            warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
460                            if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
461                                self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
462                            }
463                        }
464                    }
465                }
466                BootstrapEvent::Failed {
467                    peer_config,
468                    reason,
469                } => {
470                    let now_ms = Self::now_ms();
471                    let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
472                    if decision.should_warn {
473                        warn!(
474                            npub = %peer_config.npub,
475                            error = %reason,
476                            consecutive_failures = decision.consecutive_failures,
477                            cooldown_secs = decision
478                                .cooldown_until_ms
479                                .map(|t| t.saturating_sub(now_ms) / 1000),
480                            "NAT traversal failed"
481                        );
482                    } else {
483                        debug!(
484                            npub = %peer_config.npub,
485                            error = %reason,
486                            consecutive_failures = decision.consecutive_failures,
487                            "NAT traversal failed (suppressed by warn-rate-limit)"
488                        );
489                    }
490
491                    // B6: stale-advert eviction on the streak-threshold
492                    // crossing. Fire-and-forget; the outcome is logged so
493                    // operators can see when peers get cleaned up.
494                    if decision.crossed_threshold {
495                        let bootstrap = bootstrap.clone();
496                        let npub = peer_config.npub.clone();
497                        tokio::spawn(async move {
498                            let outcome = bootstrap.refetch_advert_for_stale_check(&npub).await;
499                            match outcome {
500                                crate::discovery::nostr::NostrRefetchOutcome::Evicted => info!(
501                                    npub = %npub,
502                                    "stale-advert sweep: peer evicted from advert cache"
503                                ),
504                                crate::discovery::nostr::NostrRefetchOutcome::Refreshed => info!(
505                                    npub = %npub,
506                                    "stale-advert sweep: peer republished, cache refreshed and streak reset"
507                                ),
508                                crate::discovery::nostr::NostrRefetchOutcome::SameAdvert => debug!(
509                                    npub = %npub,
510                                    "stale-advert sweep: advert unchanged, cooldown stands"
511                                ),
512                                crate::discovery::nostr::NostrRefetchOutcome::Skipped => debug!(
513                                    npub = %npub,
514                                    "stale-advert sweep: skipped (relay error or no advert_relays)"
515                                ),
516                            }
517                        });
518                    }
519
520                    let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
521                        Ok(identity) => identity,
522                        Err(_) => continue,
523                    };
524
525                    if self
526                        .try_peer_addresses(&peer_config, peer_identity, false)
527                        .await
528                        .is_ok()
529                    {
530                        continue;
531                    }
532
533                    let node_addr = *peer_identity.node_addr();
534                    self.schedule_retry(node_addr, now_ms);
535                    if let Some(cooldown_until_ms) = decision.cooldown_until_ms
536                        && let Some(state) = self.retry_pending.get_mut(&node_addr)
537                    {
538                        // Push the next retry past the cooldown so the
539                        // open-discovery sweep doesn't re-enqueue and the
540                        // per-attempt backoff doesn't fire sooner.
541                        state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
542                    }
543                }
544            }
545        }
546
547        self.maybe_run_startup_open_discovery_sweep(&bootstrap)
548            .await;
549        self.queue_open_discovery_retries(&bootstrap).await;
550    }
551
552    /// Extract the bare discovery scope from the Nostr discovery app tag,
553    /// which is encoded as `fips-overlay-v1:<scope>` by
554    /// `apply_default_scoped_discovery`. Returns `None` when no scope is
555    /// set, so the LAN browser surfaces every advert it sees.
556    pub(super) fn lan_discovery_scope(&self) -> Option<String> {
557        let app = self.config.node.discovery.nostr.app.trim();
558        if app.is_empty() {
559            return None;
560        }
561        if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
562            let scope = rest.trim();
563            if scope.is_empty() {
564                None
565            } else {
566                Some(scope.to_string())
567            }
568        } else {
569            Some(app.to_string())
570        }
571    }
572
573    /// Drain mDNS-discovered peers and initiate Noise XX handshakes for
574    /// any whose npub we don't already have a connection or active peer
575    /// for. The handshake itself is the authentication — a spoofed
576    /// mDNS advert with someone else's npub fails the XX exchange and
577    /// is dropped.
578    pub(super) async fn poll_lan_discovery(&mut self) {
579        let Some(runtime) = self.lan_discovery.clone() else {
580            return;
581        };
582        let events = runtime.drain_events().await;
583        if events.is_empty() {
584            return;
585        }
586        // Snapshot the operational UDP transport set up front; mDNS
587        // discovery only yields UDP endpoints today.
588        let udp_transport_id = self.find_transport_for_type("udp");
589        let Some(transport_id) = udp_transport_id else {
590            debug!("lan: no operational UDP transport, skipping discovered peers");
591            return;
592        };
593        for event in events {
594            let crate::discovery::lan::LanEvent::Discovered(peer) = event;
595            let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
596                Ok(id) => id,
597                Err(err) => {
598                    debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
599                    continue;
600                }
601            };
602            let peer_node_addr = *identity.node_addr();
603            if self.peers.contains_key(&peer_node_addr) {
604                continue;
605            }
606            let already_connecting = self.connections.values().any(|conn| {
607                conn.expected_identity()
608                    .map(|id| id.node_addr() == &peer_node_addr)
609                    .unwrap_or(false)
610            });
611            if already_connecting {
612                continue;
613            }
614            let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
615            info!(
616                npub = %identity.short_npub(),
617                addr = %peer.addr,
618                "lan: initiating handshake to discovered peer"
619            );
620            if let Err(err) = self
621                .initiate_connection(transport_id, remote_addr, identity)
622                .await
623            {
624                debug!(
625                    npub = %peer.npub,
626                    error = %err,
627                    "lan: failed to initiate connection to discovered peer"
628                );
629            }
630        }
631    }
632
633    /// Poll pending transport connects and initiate handshakes for ready ones.
634    ///
635    /// Called from the tick handler. For each pending connect, queries the
636    /// transport's connection state. When a connection is established,
637    /// marks the link as Connected and starts the Noise handshake.
638    /// Failed connections are cleaned up and scheduled for retry.
639    pub(super) async fn poll_pending_connects(&mut self) {
640        if self.pending_connects.is_empty() {
641            return;
642        }
643
644        let mut completed = Vec::new();
645
646        for (i, pending) in self.pending_connects.iter().enumerate() {
647            let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
648                transport.connection_state(&pending.remote_addr)
649            } else {
650                crate::transport::ConnectionState::Failed("transport removed".into())
651            };
652
653            match state {
654                crate::transport::ConnectionState::Connected => {
655                    completed.push((i, true, None));
656                }
657                crate::transport::ConnectionState::Failed(reason) => {
658                    completed.push((i, false, Some(reason)));
659                }
660                crate::transport::ConnectionState::Connecting => {
661                    // Still in progress, check on next tick
662                }
663                crate::transport::ConnectionState::None => {
664                    // Shouldn't happen — treat as failure
665                    completed.push((i, false, Some("no connection attempt found".into())));
666                }
667            }
668        }
669
670        // Process completions in reverse order to preserve indices
671        for (i, success, reason) in completed.into_iter().rev() {
672            let pending = self.pending_connects.remove(i);
673
674            if success {
675                // Mark link as Connected
676                if let Some(link) = self.links.get_mut(&pending.link_id) {
677                    link.set_connected();
678                }
679
680                debug!(
681                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
682                    transport_id = %pending.transport_id,
683                    remote_addr = %pending.remote_addr,
684                    link_id = %pending.link_id,
685                    "Transport connected, starting handshake"
686                );
687
688                // Start the handshake now that the transport is connected
689                if let Err(e) = self
690                    .start_handshake(
691                        pending.link_id,
692                        pending.transport_id,
693                        pending.remote_addr.clone(),
694                        pending.peer_identity,
695                    )
696                    .await
697                {
698                    warn!(
699                        link_id = %pending.link_id,
700                        error = %e,
701                        "Failed to start handshake after transport connect"
702                    );
703                    // Clean up link on handshake failure
704                    self.remove_link(&pending.link_id);
705                }
706            } else {
707                let reason = reason.unwrap_or_default();
708                warn!(
709                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
710                    transport_id = %pending.transport_id,
711                    remote_addr = %pending.remote_addr,
712                    link_id = %pending.link_id,
713                    reason = %reason,
714                    "Transport connect failed"
715                );
716
717                // Clean up link and schedule retry
718                self.remove_link(&pending.link_id);
719                self.links.remove(&pending.link_id);
720                self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
721            }
722        }
723    }
724
725    // === State Transitions ===
726
727    /// Start the node.
728    ///
729    /// Initializes the TUN interface (if configured), spawns I/O threads,
730    /// and transitions to the Running state.
731    pub async fn start(&mut self) -> Result<(), NodeError> {
732        if !self.state.can_start() {
733            return Err(NodeError::AlreadyStarted);
734        }
735        self.state = NodeState::Starting;
736
737        // Create packet channel for transport -> Node communication
738        let packet_buffer_size = self.config.node.buffers.packet_channel;
739        let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
740        self.packet_tx = Some(packet_tx.clone());
741        self.packet_rx = Some(packet_rx);
742
743        // Initialize transports first (before TUN, before Nostr discovery).
744        let transport_handles = self.create_transports(&packet_tx).await;
745
746        for mut handle in transport_handles {
747            let transport_id = handle.transport_id();
748            let transport_type = handle.transport_type().name;
749            let name = handle.name().map(|s| s.to_string());
750
751            match handle.start().await {
752                Ok(()) => {
753                    self.transports.insert(transport_id, handle);
754                }
755                Err(e) => {
756                    if let Some(ref n) = name {
757                        warn!(transport_type, name = %n, error = %e, "Transport failed to start");
758                    } else {
759                        warn!(transport_type, error = %e, "Transport failed to start");
760                    }
761                }
762            }
763        }
764
765        if !self.transports.is_empty() {
766            info!(count = self.transports.len(), "Transports initialized");
767        }
768
769        // Spawn the off-task FMP-encrypt + UDP-send + FMP-decrypt
770        // worker pools. **Unix only** — both pools issue direct
771        // sendmmsg(2) / sendmsg(2)+UDP_GSO / recvmmsg(2) calls on
772        // raw file descriptors via `AsRawFd`, which is a unix-only
773        // trait. On Windows the rx_loop's tokio-based send/recv
774        // remain the canonical path; the perf overhaul lands its
775        // gains on unix.
776        //
777        // Worker count defaults to the number of CPUs, overridable
778        // via `FIPS_ENCRYPT_WORKERS=N` / `FIPS_DECRYPT_WORKERS=N`
779        // for debug / benchmarking. Hash-by-destination means a
780        // single TCP flow pins to one worker (preserves wire
781        // ordering); additional workers light up under multi-flow
782        // / multi-peer load. See `node::encrypt_worker` /
783        // `node::decrypt_worker` for full rationale.
784        #[cfg(unix)]
785        {
786            let cpu_default = std::thread::available_parallelism()
787                .map(|n| n.get())
788                .unwrap_or(1)
789                .max(1);
790            let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
791                .ok()
792                .and_then(|s| s.parse().ok())
793                .unwrap_or(cpu_default)
794                .max(1);
795            self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
796                encrypt_worker_count,
797            ));
798            info!(
799                workers = encrypt_worker_count,
800                "Spawned FMP-encrypt worker pool"
801            );
802
803            // `FIPS_DECRYPT_WORKERS=0` disables the pool entirely and
804            // falls through to the in-line rx_loop decrypt path (the
805            // "test-mode" branch in `handle_encrypted_frame`, which is
806            // in fact a fully functional synchronous decrypt). Useful
807            // as an A/B against the worker pipeline when chasing
808            // scheduling/queueing regressions on the native macOS
809            // path. Any non-zero value (env or default) spawns the
810            // pool as before.
811            let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
812                .ok()
813                .and_then(|s| s.parse().ok())
814                .unwrap_or(cpu_default);
815            if decrypt_worker_count == 0 {
816                info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
817            } else {
818                self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
819                    decrypt_worker_count,
820                ));
821                info!(
822                    workers = decrypt_worker_count,
823                    "Spawned FMP+FSP-decrypt worker pool"
824                );
825            }
826        }
827
828        if self.config.node.discovery.nostr.enabled {
829            match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
830                .await
831            {
832                Ok(runtime) => {
833                    if let Err(err) = self.refresh_overlay_advert(&runtime).await {
834                        warn!(error = %err, "Failed to publish initial Nostr overlay advert");
835                    }
836                    self.nostr_discovery = Some(runtime);
837                    self.nostr_discovery_started_at_ms = Some(Self::now_ms());
838                    info!("Nostr overlay discovery enabled");
839                }
840                Err(err) => {
841                    warn!(error = %err, "Failed to start Nostr overlay discovery");
842                }
843            }
844        }
845
846        // mDNS / DNS-SD LAN discovery. Independent of Nostr — runs even
847        // when Nostr is disabled, since it gives us sub-second pairing
848        // on the same link without any relay or NAT-traversal roundtrip.
849        if self.config.node.discovery.lan.enabled {
850            let advertised_udp_port = self
851                .transports
852                .values()
853                .filter(|h| h.is_operational())
854                .filter(|h| h.transport_type().name == "udp")
855                .find_map(|h| h.local_addr().map(|addr| addr.port()))
856                .unwrap_or(0);
857            let scope = self.lan_discovery_scope();
858            match crate::discovery::lan::LanDiscovery::start(
859                &self.identity,
860                scope,
861                advertised_udp_port,
862                self.config.node.discovery.lan.clone(),
863            )
864            .await
865            {
866                Ok(runtime) => {
867                    self.lan_discovery = Some(runtime);
868                    info!("LAN mDNS discovery enabled");
869                }
870                Err(err) => {
871                    debug!(error = %err, "LAN mDNS discovery not started");
872                }
873            }
874        }
875
876        // Connect to static peers before TUN is active
877        // This allows handshake messages to be sent before we start accepting packets
878        self.initiate_peer_connections().await;
879
880        // Initialize TUN interface last, after transports and peers are ready
881        if self.config.tun.enabled {
882            let address = *self.identity.address();
883            match TunDevice::create(&self.config.tun, address).await {
884                Ok(device) => {
885                    let mtu = device.mtu();
886                    let name = device.name().to_string();
887                    let our_addr = *device.address();
888
889                    info!("TUN device active:");
890                    info!("     name: {}", name);
891                    info!("  address: {}", device.address());
892                    info!("      mtu: {}", mtu);
893
894                    // Calculate max MSS for TCP clamping
895                    let effective_mtu = self.effective_ipv6_mtu();
896                    let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); // IPv6 + TCP headers
897
898                    info!("effective MTU: {} bytes", effective_mtu);
899                    debug!("   max TCP MSS: {} bytes", max_mss);
900
901                    // On macOS, create a shutdown pipe. Writing to it unblocks the
902                    // reader thread's select() loop without closing the TUN fd
903                    // (which would cause a double-close when TunDevice drops).
904                    #[cfg(target_os = "macos")]
905                    let (shutdown_read_fd, shutdown_write_fd) = {
906                        let mut fds = [0i32; 2];
907                        if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
908                            return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
909                                "failed to create shutdown pipe".into(),
910                            )));
911                        }
912                        (fds[0], fds[1])
913                    };
914
915                    // Create writer (dups the fd for independent write access).
916                    // Pass path_mtu_lookup so inbound SYN-ACK clamp can read
917                    // per-destination path MTU learned via discovery.
918                    let (writer, tun_tx) =
919                        device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
920
921                    // Spawn writer thread
922                    let writer_handle = thread::spawn(move || {
923                        writer.run();
924                    });
925
926                    // Clone tun_tx for the reader
927                    let reader_tun_tx = tun_tx.clone();
928
929                    // Create outbound channel for TUN reader → Node
930                    let tun_channel_size = self.config.node.buffers.tun_channel;
931                    let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
932
933                    // Spawn reader thread
934                    let transport_mtu = self.transport_mtu();
935                    let path_mtu_lookup = self.path_mtu_lookup.clone();
936                    #[cfg(target_os = "macos")]
937                    let reader_handle = thread::spawn(move || {
938                        run_tun_reader(
939                            device,
940                            mtu,
941                            our_addr,
942                            reader_tun_tx,
943                            outbound_tx,
944                            transport_mtu,
945                            path_mtu_lookup,
946                            shutdown_read_fd,
947                        );
948                    });
949                    #[cfg(not(target_os = "macos"))]
950                    let reader_handle = thread::spawn(move || {
951                        run_tun_reader(
952                            device,
953                            mtu,
954                            our_addr,
955                            reader_tun_tx,
956                            outbound_tx,
957                            transport_mtu,
958                            path_mtu_lookup,
959                        );
960                    });
961
962                    self.tun_state = TunState::Active;
963                    self.tun_name = Some(name);
964                    self.tun_tx = Some(tun_tx);
965                    self.tun_outbound_rx = Some(outbound_rx);
966                    self.tun_reader_handle = Some(reader_handle);
967                    self.tun_writer_handle = Some(writer_handle);
968                    #[cfg(target_os = "macos")]
969                    {
970                        self.tun_shutdown_fd = Some(shutdown_write_fd);
971                    }
972                }
973                Err(e) => {
974                    self.tun_state = TunState::Failed;
975                    warn!(error = %e, "Failed to initialize TUN, continuing without it");
976                }
977            }
978        }
979
980        // Initialize DNS responder (independent of TUN).
981        //
982        // Default bind_addr is "::1" (IPv6 loopback). The shipped
983        // fips-dns-setup configures systemd-resolved via a global
984        // /etc/systemd/resolved.conf.d/fips.conf drop-in pointing at
985        // [::1]:5354, which sidesteps a Linux IPV6_PKTINFO behaviour
986        // where self-destined traffic to fips0's address is attributed
987        // to fips0 in PKTINFO and gets silently dropped by the
988        // mesh-interface filter in src/upper/dns.rs.
989        //
990        // For mesh-reachable resolution (rare), set bind_addr: "::"
991        // in fips.yaml. The mesh-interface filter remains active to
992        // prevent hosts-file alias enumeration in that mode.
993        // `IPV6_V6ONLY=0` is set explicitly so IPv4 clients on
994        // 127.0.0.1 still reach us regardless of kernel sysctl
995        // defaults — but only when bind is on a wildcard / IPv6 path.
996        if self.config.dns.enabled {
997            let addr_str = self.config.dns.bind_addr();
998            match addr_str.parse::<std::net::IpAddr>() {
999                Ok(ip) => {
1000                    let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
1001                    match Self::bind_dns_socket(bind) {
1002                        Ok(socket) => {
1003                            let dns_channel_size = self.config.node.buffers.dns_channel;
1004                            let (identity_tx, identity_rx) =
1005                                tokio::sync::mpsc::channel(dns_channel_size);
1006                            let dns_ttl = self.config.dns.ttl();
1007                            let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
1008                                self.config.peers(),
1009                            );
1010                            let reloader = if self.config.node.system_files_enabled {
1011                                let hosts_path = std::path::PathBuf::from(
1012                                    crate::upper::hosts::DEFAULT_HOSTS_PATH,
1013                                );
1014                                crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
1015                            } else {
1016                                crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
1017                            };
1018                            // Resolve the TUN ifindex so the responder can
1019                            // drop queries arriving on the mesh interface
1020                            // (fips0). Without this, the `::` bind exposes
1021                            // /etc/fips/hosts alias probing to any mesh peer.
1022                            // When TUN isn't enabled or the name can't be
1023                            // resolved, `None` disables the filter (there
1024                            // is no mesh surface to defend anyway).
1025                            let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
1026                            info!(
1027                                bind = %bind,
1028                                hosts = reloader.hosts().len(),
1029                                mesh_ifindex = ?mesh_ifindex,
1030                                "DNS responder started for .fips domain (auto-reload enabled)"
1031                            );
1032                            let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
1033                                socket,
1034                                identity_tx,
1035                                dns_ttl,
1036                                reloader,
1037                                mesh_ifindex,
1038                            ));
1039                            self.dns_identity_rx = Some(identity_rx);
1040                            self.dns_task = Some(handle);
1041                        }
1042                        Err(e) => {
1043                            warn!(bind = %bind, error = %e, "Failed to start DNS responder");
1044                        }
1045                    }
1046                }
1047                Err(e) => {
1048                    warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
1049                }
1050            }
1051        }
1052
1053        self.state = NodeState::Running;
1054        info!("Node started:");
1055        info!("       state: {}", self.state);
1056        info!("  transports: {}", self.transports.len());
1057        info!(" connections: {}", self.connections.len());
1058        Ok(())
1059    }
1060
1061    /// Bind a UDP socket for the DNS responder.
1062    ///
1063    /// For IPv6 binds (including `::`), sets `IPV6_V6ONLY=0` so the socket
1064    /// also accepts IPv4-mapped addresses. This guarantees dual-stack
1065    /// delivery regardless of `net.ipv6.bindv6only` sysctl on the host —
1066    /// v4 clients on 127.0.0.1 and v6 clients on the fips0 address both
1067    /// land on the same socket.
1068    ///
1069    /// Also enables `IPV6_RECVPKTINFO` on IPv6 sockets so the responder
1070    /// can learn the arrival interface per packet. The responder uses that
1071    /// to drop queries arriving on the mesh TUN, closing the hosts-file
1072    /// probing side-channel created by the `::` bind.
1073    fn bind_dns_socket(
1074        addr: std::net::SocketAddr,
1075    ) -> Result<tokio::net::UdpSocket, std::io::Error> {
1076        use socket2::{Domain, Protocol, Socket, Type};
1077        let domain = if addr.is_ipv4() {
1078            Domain::IPV4
1079        } else {
1080            Domain::IPV6
1081        };
1082        let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
1083        if addr.is_ipv6() {
1084            sock.set_only_v6(false)?;
1085            #[cfg(unix)]
1086            Self::set_recv_pktinfo_v6(&sock)?;
1087        }
1088        sock.set_nonblocking(true)?;
1089        sock.bind(&addr.into())?;
1090        tokio::net::UdpSocket::from_std(sock.into())
1091    }
1092
1093    /// Enable `IPV6_RECVPKTINFO` on an IPv6 UDP socket.
1094    ///
1095    /// After this setsockopt, each `recvmsg()` call on the socket receives
1096    /// an `IPV6_PKTINFO` control message containing the arrival interface
1097    /// index, which the DNS responder uses for its mesh-interface filter.
1098    #[cfg(unix)]
1099    fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
1100        use std::os::fd::AsRawFd;
1101        let enable: libc::c_int = 1;
1102        let ret = unsafe {
1103            libc::setsockopt(
1104                sock.as_raw_fd(),
1105                libc::IPPROTO_IPV6,
1106                libc::IPV6_RECVPKTINFO,
1107                &enable as *const _ as *const libc::c_void,
1108                std::mem::size_of::<libc::c_int>() as libc::socklen_t,
1109            )
1110        };
1111        if ret < 0 {
1112            return Err(std::io::Error::last_os_error());
1113        }
1114        Ok(())
1115    }
1116
1117    /// Resolve the mesh TUN interface index by name.
1118    ///
1119    /// Returns `None` if the interface does not exist (e.g. TUN disabled
1120    /// or not yet created). A `None` result disables the DNS responder's
1121    /// mesh-interface filter — safe, because if there is no fips0 there
1122    /// is no mesh exposure to defend against.
1123    fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
1124        #[cfg(unix)]
1125        {
1126            let c_name = std::ffi::CString::new(name).ok()?;
1127            let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
1128            if idx == 0 { None } else { Some(idx) }
1129        }
1130        #[cfg(not(unix))]
1131        {
1132            let _ = name;
1133            None
1134        }
1135    }
1136
1137    /// Stop the node.
1138    ///
1139    /// Shuts down TUN interface, stops I/O threads, and transitions to
1140    /// the Stopped state.
1141    pub async fn stop(&mut self) -> Result<(), NodeError> {
1142        if !self.state.can_stop() {
1143            return Err(NodeError::NotStarted);
1144        }
1145        self.state = NodeState::Stopping;
1146        info!(state = %self.state, "Node stopping");
1147
1148        // Stop DNS responder
1149        if let Some(handle) = self.dns_task.take() {
1150            handle.abort();
1151            debug!("DNS responder stopped");
1152        }
1153
1154        // Send disconnect notifications to all active peers before closing transports
1155        self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
1156            .await;
1157
1158        // Stop Nostr overlay discovery background work and withdraw any advert.
1159        if let Some(bootstrap) = self.nostr_discovery.take()
1160            && let Err(e) = bootstrap.shutdown().await
1161        {
1162            warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
1163        }
1164
1165        // Tear down LAN mDNS responder + browser. Best-effort: the
1166        // OS will eventually time the advert out via its TTL even if
1167        // we don't get a clean unregister out before the daemon exits.
1168        if let Some(lan) = self.lan_discovery.take() {
1169            lan.shutdown().await;
1170        }
1171
1172        // Shutdown transports (they're packet producers)
1173        let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
1174        for transport_id in transport_ids {
1175            if let Some(mut handle) = self.transports.remove(&transport_id) {
1176                let transport_type = handle.transport_type().name;
1177                match handle.stop().await {
1178                    Ok(()) => {
1179                        info!(transport_id = %transport_id, transport_type, "Transport stopped");
1180                    }
1181                    Err(e) => {
1182                        warn!(
1183                            transport_id = %transport_id,
1184                            transport_type,
1185                            error = %e,
1186                            "Transport stop failed"
1187                        );
1188                    }
1189                }
1190            }
1191        }
1192
1193        // Drop packet channels
1194        self.packet_tx.take();
1195        self.packet_rx.take();
1196
1197        // Shutdown TUN interface
1198        if let Some(name) = self.tun_name.take() {
1199            info!(name = %name, "Shutting down TUN interface");
1200
1201            // Drop the tun_tx to signal the writer to stop
1202            self.tun_tx.take();
1203
1204            // Delete the interface (on Linux, causes reader to get EFAULT)
1205            if let Err(e) = shutdown_tun_interface(&name).await {
1206                warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
1207            }
1208
1209            // On macOS, signal the reader thread to exit by writing to the
1210            // shutdown pipe. The reader's select() will wake up and break.
1211            #[cfg(target_os = "macos")]
1212            if let Some(fd) = self.tun_shutdown_fd.take() {
1213                unsafe {
1214                    libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
1215                    libc::close(fd);
1216                }
1217            }
1218
1219            // Wait for threads to finish
1220            if let Some(handle) = self.tun_reader_handle.take() {
1221                let _ = handle.join();
1222            }
1223            if let Some(handle) = self.tun_writer_handle.take() {
1224                let _ = handle.join();
1225            }
1226
1227            self.tun_state = TunState::Disabled;
1228        }
1229
1230        self.state = NodeState::Stopped;
1231        info!(state = %self.state, "Node stopped");
1232        Ok(())
1233    }
1234
1235    /// Send disconnect notifications to all active peers.
1236    ///
1237    /// Best-effort: send failures are logged and ignored since the transport
1238    /// may already be degraded. This runs before transports are shut down.
1239    async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
1240        let disconnect = Disconnect::new(reason);
1241        let plaintext = disconnect.encode();
1242
1243        // Collect node_addrs to avoid borrow conflict with send helper
1244        let peer_addrs: Vec<NodeAddr> = self
1245            .peers
1246            .iter()
1247            .filter(|(_, peer)| peer.can_send() && peer.has_session())
1248            .map(|(addr, _)| *addr)
1249            .collect();
1250
1251        if peer_addrs.is_empty() {
1252            debug!(
1253                total_peers = self.peers.len(),
1254                "No sendable peers for disconnect notification"
1255            );
1256            return;
1257        }
1258
1259        let mut sent = 0usize;
1260        for node_addr in &peer_addrs {
1261            match self
1262                .send_encrypted_link_message(node_addr, &plaintext)
1263                .await
1264            {
1265                Ok(()) => sent += 1,
1266                Err(e) => {
1267                    debug!(
1268                        peer = %self.peer_display_name(node_addr),
1269                        error = %e,
1270                        "Failed to send disconnect (transport may be down)"
1271                    );
1272                }
1273            }
1274        }
1275
1276        info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
1277    }
1278
1279    fn static_peer_addresses(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
1280        peer_config
1281            .addresses_by_priority()
1282            .into_iter()
1283            .cloned()
1284            .collect()
1285    }
1286
1287    async fn nostr_peer_fallback_addresses(
1288        &self,
1289        peer_config: &PeerConfig,
1290        existing: &[PeerAddress],
1291    ) -> Vec<PeerAddress> {
1292        if !self.config.node.discovery.nostr.enabled
1293            || self.config.node.discovery.nostr.policy
1294                == crate::config::NostrDiscoveryPolicy::Disabled
1295        {
1296            return Vec::new();
1297        }
1298
1299        let Some(bootstrap) = self.nostr_discovery.clone() else {
1300            return Vec::new();
1301        };
1302        let endpoints = match bootstrap.advert_endpoints_for_peer(&peer_config.npub).await {
1303            Ok(endpoints) => endpoints,
1304            Err(err) => {
1305                debug!(
1306                    npub = %peer_config.npub,
1307                    error = %err,
1308                    "Failed to resolve Nostr advert endpoints for configured peer"
1309                );
1310                return Vec::new();
1311            }
1312        };
1313
1314        let mut fallback = Vec::new();
1315        let mut next_priority = existing
1316            .iter()
1317            .map(|addr| addr.priority)
1318            .max()
1319            .unwrap_or(100)
1320            .saturating_add(1);
1321        for endpoint in endpoints {
1322            let Some(candidate) = Self::overlay_endpoint_to_peer_address(&endpoint, next_priority)
1323            else {
1324                continue;
1325            };
1326            if existing
1327                .iter()
1328                .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
1329                || fallback.iter().any(|addr: &PeerAddress| {
1330                    addr.transport == candidate.transport && addr.addr == candidate.addr
1331                })
1332            {
1333                continue;
1334            }
1335            fallback.push(candidate);
1336            next_priority = next_priority.saturating_add(1);
1337        }
1338        fallback
1339    }
1340
1341    fn overlay_endpoint_to_peer_address(
1342        endpoint: &OverlayEndpointAdvert,
1343        priority: u8,
1344    ) -> Option<PeerAddress> {
1345        let transport = match endpoint.transport {
1346            OverlayTransportKind::Udp => "udp",
1347            OverlayTransportKind::Tcp => "tcp",
1348            OverlayTransportKind::Tor => "tor",
1349        };
1350        Some(PeerAddress::with_priority(
1351            transport,
1352            endpoint.addr.clone(),
1353            priority,
1354        ))
1355    }
1356
1357    async fn attempt_peer_address_list(
1358        &mut self,
1359        peer_config: &PeerConfig,
1360        peer_identity: PeerIdentity,
1361        allow_bootstrap_nat: bool,
1362        addresses: &[PeerAddress],
1363    ) -> Result<(), NodeError> {
1364        for addr in addresses {
1365            if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
1366                if !allow_bootstrap_nat {
1367                    continue;
1368                }
1369                let Some(bootstrap) = self.nostr_discovery.clone() else {
1370                    debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
1371                    continue;
1372                };
1373                bootstrap.request_connect(peer_config.clone()).await;
1374                info!(npub = %peer_config.npub, "Started Nostr UDP NAT traversal attempt");
1375                return Ok(());
1376            }
1377
1378            let (transport_id, remote_addr) = if addr.transport == "ethernet" {
1379                match self.resolve_ethernet_addr(&addr.addr) {
1380                    Ok(result) => result,
1381                    Err(e) => {
1382                        debug!(
1383                            transport = %addr.transport,
1384                            addr = %addr.addr,
1385                            error = %e,
1386                            "Failed to resolve Ethernet address"
1387                        );
1388                        continue;
1389                    }
1390                }
1391            } else if addr.transport == "ble" {
1392                #[cfg(bluer_available)]
1393                {
1394                    match self.resolve_ble_addr(&addr.addr) {
1395                        Ok(result) => result,
1396                        Err(e) => {
1397                            debug!(
1398                                transport = %addr.transport,
1399                                addr = %addr.addr,
1400                                error = %e,
1401                                "Failed to resolve BLE address"
1402                            );
1403                            continue;
1404                        }
1405                    }
1406                }
1407                #[cfg(not(bluer_available))]
1408                {
1409                    debug!(transport = %addr.transport, "BLE transport not available on this build");
1410                    continue;
1411                }
1412            } else {
1413                let tid = match self.find_transport_for_type(&addr.transport) {
1414                    Some(id) => id,
1415                    None => {
1416                        debug!(
1417                            transport = %addr.transport,
1418                            addr = %addr.addr,
1419                            "No operational transport for address type"
1420                        );
1421                        continue;
1422                    }
1423                };
1424                (tid, TransportAddr::from_string(&addr.addr))
1425            };
1426
1427            match self
1428                .initiate_connection(transport_id, remote_addr, peer_identity)
1429                .await
1430            {
1431                Ok(()) => return Ok(()),
1432                Err(e @ NodeError::AccessDenied(_)) => return Err(e),
1433                Err(e) => {
1434                    debug!(
1435                        npub = %peer_config.npub,
1436                        transport_id = %transport_id,
1437                        error = %e,
1438                        "Connection attempt failed, trying next address"
1439                    );
1440                }
1441            }
1442        }
1443
1444        Err(NodeError::NoTransportForType(format!(
1445            "no operational transport for any of {}'s addresses",
1446            peer_config.npub
1447        )))
1448    }
1449
1450    async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
1451        self.run_open_discovery_sweep(bootstrap, None, "per-tick")
1452            .await;
1453    }
1454
1455    /// Open-discovery cache sweep. Iterates the cached overlay adverts and
1456    /// queues retries for non-configured, not-yet-connected peers.
1457    ///
1458    /// `max_age_secs`, if set, filters out adverts whose `created_at` is
1459    /// older than `now - max_age_secs`. The per-tick sweep passes `None`
1460    /// (relies on the cache's own `valid_until_ms` filter); the one-shot
1461    /// startup sweep passes `Some(startup_sweep_max_age_secs)`.
1462    ///
1463    /// `caller` is a short label included in log lines so per-tick and
1464    /// startup sweeps are distinguishable in operator-facing logs.
1465    pub(in crate::node) async fn run_open_discovery_sweep(
1466        &mut self,
1467        bootstrap: &std::sync::Arc<NostrDiscovery>,
1468        max_age_secs: Option<u64>,
1469        caller: &'static str,
1470    ) {
1471        if !self.config.node.discovery.nostr.enabled
1472            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
1473        {
1474            return;
1475        }
1476
1477        let configured_npubs = self
1478            .config
1479            .peers()
1480            .iter()
1481            .map(|peer| peer.npub.clone())
1482            .collect::<HashSet<_>>();
1483        let now_ms = Self::now_ms();
1484        let now_secs = now_ms / 1000;
1485        let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
1486        if enqueue_budget == 0 {
1487            debug!(
1488                caller = %caller,
1489                "open-discovery sweep: enqueue budget is 0, skipping"
1490            );
1491            return;
1492        }
1493
1494        let candidates = bootstrap.cached_open_discovery_candidates(64).await;
1495        let cached_count = candidates.len();
1496        let mut enqueued = 0usize;
1497        let mut skipped_age = 0usize;
1498        let mut skipped_configured = 0usize;
1499        let mut skipped_self = 0usize;
1500        let mut skipped_connected = 0usize;
1501        let mut skipped_retry_pending = 0usize;
1502        let mut skipped_connecting = 0usize;
1503        let mut skipped_no_endpoints = 0usize;
1504        let mut skipped_invalid_npub = 0usize;
1505        let mut skipped_cooldown = 0usize;
1506
1507        for (npub, endpoints, created_at_secs) in candidates {
1508            if enqueue_budget == 0 {
1509                break;
1510            }
1511
1512            if let Some(max_age) = max_age_secs
1513                && now_secs.saturating_sub(created_at_secs) > max_age
1514            {
1515                skipped_age = skipped_age.saturating_add(1);
1516                continue;
1517            }
1518
1519            if configured_npubs.contains(&npub) {
1520                skipped_configured = skipped_configured.saturating_add(1);
1521                continue;
1522            }
1523
1524            let peer_identity = match PeerIdentity::from_npub(&npub) {
1525                Ok(identity) => identity,
1526                Err(_) => {
1527                    skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
1528                    continue;
1529                }
1530            };
1531            let node_addr = *peer_identity.node_addr();
1532            if node_addr == *self.identity.node_addr() {
1533                skipped_self = skipped_self.saturating_add(1);
1534                continue;
1535            }
1536            if self.peers.contains_key(&node_addr) {
1537                skipped_connected = skipped_connected.saturating_add(1);
1538                continue;
1539            }
1540            if self.retry_pending.contains_key(&node_addr) {
1541                skipped_retry_pending = skipped_retry_pending.saturating_add(1);
1542                continue;
1543            }
1544            if bootstrap.cooldown_until(&npub, now_ms).is_some() {
1545                skipped_cooldown = skipped_cooldown.saturating_add(1);
1546                continue;
1547            }
1548            let connecting = self.connections.values().any(|conn| {
1549                conn.expected_identity()
1550                    .map(|id| id.node_addr() == &node_addr)
1551                    .unwrap_or(false)
1552            });
1553            if connecting {
1554                skipped_connecting = skipped_connecting.saturating_add(1);
1555                continue;
1556            }
1557
1558            let mut addresses = Vec::new();
1559            let mut priority = 120u8;
1560            for endpoint in endpoints {
1561                let Some(candidate) = Self::overlay_endpoint_to_peer_address(&endpoint, priority)
1562                else {
1563                    continue;
1564                };
1565                if addresses.iter().any(|existing: &PeerAddress| {
1566                    existing.transport == candidate.transport && existing.addr == candidate.addr
1567                }) {
1568                    continue;
1569                }
1570                addresses.push(candidate);
1571                priority = priority.saturating_add(1);
1572            }
1573            if addresses.is_empty() {
1574                skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
1575                continue;
1576            }
1577
1578            self.peer_aliases
1579                .entry(node_addr)
1580                .or_insert_with(|| peer_identity.short_npub());
1581            self.register_identity(node_addr, peer_identity.pubkey_full());
1582
1583            let mut state = super::retry::RetryState::new(PeerConfig {
1584                npub: npub.clone(),
1585                alias: None,
1586                addresses,
1587                connect_policy: ConnectPolicy::AutoConnect,
1588                auto_reconnect: true,
1589            });
1590            state.reconnect = false;
1591            state.retry_after_ms = now_ms;
1592            state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
1593            self.retry_pending.insert(node_addr, state);
1594            info!(
1595                caller = %caller,
1596                peer = %peer_identity.short_npub(),
1597                advert_age_secs = now_secs.saturating_sub(created_at_secs),
1598                "open-discovery sweep: queued retry for cached advert"
1599            );
1600            enqueue_budget = enqueue_budget.saturating_sub(1);
1601            enqueued = enqueued.saturating_add(1);
1602        }
1603
1604        // Always log a one-line summary on the startup sweep so operators
1605        // can verify it ran. Per-tick sweeps are noisier; only summarize
1606        // when something happened.
1607        let total_skipped = skipped_age
1608            + skipped_configured
1609            + skipped_self
1610            + skipped_connected
1611            + skipped_retry_pending
1612            + skipped_connecting
1613            + skipped_no_endpoints
1614            + skipped_invalid_npub
1615            + skipped_cooldown;
1616        let should_summarize = caller == "startup" || enqueued > 0;
1617        if should_summarize {
1618            info!(
1619                caller = %caller,
1620                cached = cached_count,
1621                queued = enqueued,
1622                skipped_age = skipped_age,
1623                skipped_configured = skipped_configured,
1624                skipped_self = skipped_self,
1625                skipped_connected = skipped_connected,
1626                skipped_retry_pending = skipped_retry_pending,
1627                skipped_connecting = skipped_connecting,
1628                skipped_no_endpoints = skipped_no_endpoints,
1629                skipped_invalid_npub = skipped_invalid_npub,
1630                skipped_cooldown = skipped_cooldown,
1631                skipped_total = total_skipped,
1632                "open-discovery sweep complete"
1633            );
1634        }
1635    }
1636
1637    /// One-shot startup sweep: runs once after the configured settle
1638    /// delay, iterating the cached overlay adverts and queueing retries
1639    /// for any peer with a recent enough advert that we haven't already
1640    /// configured statically or established a link to.
1641    ///
1642    /// Gated identically to [`run_open_discovery_sweep`]: requires
1643    /// `node.discovery.nostr.enabled` and `policy == open`.
1644    async fn maybe_run_startup_open_discovery_sweep(
1645        &mut self,
1646        bootstrap: &std::sync::Arc<NostrDiscovery>,
1647    ) {
1648        if self.startup_open_discovery_sweep_done {
1649            return;
1650        }
1651        if !self.config.node.discovery.nostr.enabled
1652            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
1653        {
1654            // Mark done so we don't keep re-checking on every tick.
1655            self.startup_open_discovery_sweep_done = true;
1656            return;
1657        }
1658        let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
1659            return;
1660        };
1661        let now_ms = Self::now_ms();
1662        let delay_ms = self
1663            .config
1664            .node
1665            .discovery
1666            .nostr
1667            .startup_sweep_delay_secs
1668            .saturating_mul(1000);
1669        if now_ms < started_at_ms.saturating_add(delay_ms) {
1670            return;
1671        }
1672
1673        let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
1674        self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
1675            .await;
1676        self.startup_open_discovery_sweep_done = true;
1677    }
1678
1679    fn available_outbound_slots(&self) -> usize {
1680        let connection_used = self
1681            .connections
1682            .len()
1683            .saturating_add(self.pending_connects.len());
1684        let connection_slots = if self.max_connections == 0 {
1685            usize::MAX
1686        } else {
1687            self.max_connections.saturating_sub(connection_used)
1688        };
1689
1690        let peer_slots = if self.max_peers == 0 {
1691            usize::MAX
1692        } else {
1693            self.max_peers.saturating_sub(self.peers.len())
1694        };
1695
1696        connection_slots.min(peer_slots)
1697    }
1698
1699    fn open_discovery_enqueue_budget(&self, configured_npubs: &HashSet<String>) -> usize {
1700        let current_open_discovery_pending = self
1701            .retry_pending
1702            .values()
1703            .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
1704            .count();
1705
1706        let cap_remaining = self
1707            .config
1708            .node
1709            .discovery
1710            .nostr
1711            .open_discovery_max_pending
1712            .saturating_sub(current_open_discovery_pending);
1713
1714        cap_remaining.min(self.available_outbound_slots())
1715    }
1716
1717    fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
1718        now_ms.saturating_add(
1719            self.config
1720                .node
1721                .discovery
1722                .nostr
1723                .advert_ttl_secs
1724                .saturating_mul(1000)
1725                .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
1726        )
1727    }
1728
1729    async fn build_overlay_advert(
1730        &self,
1731        bootstrap: &std::sync::Arc<NostrDiscovery>,
1732    ) -> Option<OverlayAdvert> {
1733        if !self.config.node.discovery.nostr.enabled {
1734            return None;
1735        }
1736
1737        let mut endpoints = Vec::new();
1738        let mut has_udp_nat = false;
1739
1740        for handle in self.transports.values() {
1741            if !handle.is_operational() {
1742                continue;
1743            }
1744
1745            match handle.transport_type().name {
1746                "udp" => {
1747                    let Some(cfg) = self.lookup_udp_config(handle.name()) else {
1748                        continue;
1749                    };
1750                    if !cfg.advertise_on_nostr() {
1751                        continue;
1752                    }
1753                    if cfg.is_public() {
1754                        // Precedence:
1755                        // 1. operator-supplied `external_addr` (skips STUN)
1756                        // 2. non-wildcard *public* `local_addr` (operator
1757                        //    bound to a specific public IP directly)
1758                        // 3. STUN auto-discovery against ephemeral socket
1759                        //    (also taken when bind is wildcard *or* private —
1760                        //    a private bind is not peer-reachable, so we
1761                        //    must publish the public reflexive instead)
1762                        // 4. loud warn + omit endpoint
1763                        if let Some(explicit) = cfg.external_advert_addr() {
1764                            endpoints.push(OverlayEndpointAdvert {
1765                                transport: OverlayTransportKind::Udp,
1766                                addr: explicit.to_string(),
1767                            });
1768                        } else {
1769                            match handle.local_addr() {
1770                                Some(addr)
1771                                    if !addr.ip().is_unspecified()
1772                                        && !is_unroutable_advert_ip(addr.ip()) =>
1773                                {
1774                                    endpoints.push(OverlayEndpointAdvert {
1775                                        transport: OverlayTransportKind::Udp,
1776                                        addr: addr.to_string(),
1777                                    });
1778                                }
1779                                Some(addr) => {
1780                                    let key = handle.transport_id().as_u32();
1781                                    let port = addr.port();
1782                                    if let Some(public) =
1783                                        bootstrap.learn_public_udp_addr(key, port).await
1784                                    {
1785                                        endpoints.push(OverlayEndpointAdvert {
1786                                            transport: OverlayTransportKind::Udp,
1787                                            addr: public.to_string(),
1788                                        });
1789                                    } else {
1790                                        warn!(
1791                                            transport_id = key,
1792                                            bind_addr = %addr,
1793                                            "advert: udp public=true but bind is wildcard \
1794                                            or private and STUN observation failed; \
1795                                            advertising no UDP endpoint. Either set \
1796                                            transports.udp.external_addr, bind to a \
1797                                            specific *public* IP, or ensure \
1798                                            node.discovery.nostr.stun_servers is reachable"
1799                                        );
1800                                    }
1801                                }
1802                                None => {}
1803                            }
1804                        }
1805                    } else {
1806                        endpoints.push(OverlayEndpointAdvert {
1807                            transport: OverlayTransportKind::Udp,
1808                            addr: "nat".to_string(),
1809                        });
1810                        has_udp_nat = true;
1811                    }
1812                }
1813                "tcp" => {
1814                    let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
1815                        continue;
1816                    };
1817                    if !cfg.advertise_on_nostr() {
1818                        continue;
1819                    }
1820                    // Precedence:
1821                    // 1. operator-supplied `external_addr` (only path that
1822                    //    works on cloud-NAT setups where the public IP is
1823                    //    not on a host interface).
1824                    // 2. non-wildcard *public* `local_addr` (operator bound
1825                    //    to a specific public IP directly).
1826                    // 3. loud warn + omit endpoint (no TCP STUN equivalent).
1827                    //
1828                    // A wildcard *or* private bind is never advertised as-is
1829                    // — peers off-LAN can't reach a private bind, and there
1830                    // is no TCP STUN to discover a public reflexive.
1831                    if let Some(explicit) = cfg.external_advert_addr() {
1832                        endpoints.push(OverlayEndpointAdvert {
1833                            transport: OverlayTransportKind::Tcp,
1834                            addr: explicit.to_string(),
1835                        });
1836                    } else {
1837                        match handle.local_addr() {
1838                            Some(addr)
1839                                if !addr.ip().is_unspecified()
1840                                    && !is_unroutable_advert_ip(addr.ip()) =>
1841                            {
1842                                endpoints.push(OverlayEndpointAdvert {
1843                                    transport: OverlayTransportKind::Tcp,
1844                                    addr: addr.to_string(),
1845                                });
1846                            }
1847                            Some(addr) => {
1848                                warn!(
1849                                    bind_addr = %addr,
1850                                    "advert: tcp advertise_on_nostr=true bound to wildcard \
1851                                    or private IP and no transports.tcp.external_addr set; \
1852                                    advertising no TCP endpoint. Either set external_addr \
1853                                    to the public IP (recommended for cloud 1:1-NAT setups) \
1854                                    or bind explicitly to the public IP"
1855                                );
1856                            }
1857                            None => {}
1858                        }
1859                    }
1860                }
1861                "tor" => {
1862                    let Some(cfg) = self.lookup_tor_config(handle.name()) else {
1863                        continue;
1864                    };
1865                    if !cfg.advertise_on_nostr() {
1866                        continue;
1867                    }
1868                    if let Some(addr) = handle.onion_address() {
1869                        endpoints.push(OverlayEndpointAdvert {
1870                            transport: OverlayTransportKind::Tor,
1871                            addr: format!("{}:{}", addr, cfg.advertised_port()),
1872                        });
1873                    }
1874                }
1875                _ => {}
1876            }
1877        }
1878
1879        if endpoints.is_empty() {
1880            return None;
1881        }
1882
1883        Some(OverlayAdvert {
1884            identifier: ADVERT_IDENTIFIER.to_string(),
1885            version: ADVERT_VERSION,
1886            endpoints,
1887            signal_relays: has_udp_nat.then(|| self.config.node.discovery.nostr.dm_relays.clone()),
1888            stun_servers: has_udp_nat
1889                .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
1890        })
1891    }
1892
1893    async fn refresh_overlay_advert(
1894        &self,
1895        bootstrap: &std::sync::Arc<NostrDiscovery>,
1896    ) -> Result<(), crate::discovery::nostr::BootstrapError> {
1897        let advert = self.build_overlay_advert(bootstrap).await;
1898        bootstrap.update_local_advert(advert).await
1899    }
1900
1901    fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
1902        match (&self.config.transports.udp, transport_name) {
1903            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
1904            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
1905            _ => None,
1906        }
1907    }
1908
1909    fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
1910        match (&self.config.transports.tcp, transport_name) {
1911            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
1912            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
1913            _ => None,
1914        }
1915    }
1916
1917    fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
1918        match (&self.config.transports.tor, transport_name) {
1919            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
1920            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
1921            _ => None,
1922        }
1923    }
1924
1925    pub(in crate::node) async fn try_peer_addresses(
1926        &mut self,
1927        peer_config: &PeerConfig,
1928        peer_identity: PeerIdentity,
1929        allow_bootstrap_nat: bool,
1930    ) -> Result<(), NodeError> {
1931        // Static-first dialing: avoid delaying configured address attempts on
1932        // advert fetch/network latency.
1933        let static_addresses = self.static_peer_addresses(peer_config);
1934        if self
1935            .attempt_peer_address_list(
1936                peer_config,
1937                peer_identity,
1938                allow_bootstrap_nat,
1939                &static_addresses,
1940            )
1941            .await
1942            .is_ok()
1943        {
1944            return Ok(());
1945        }
1946
1947        {
1948            let fallback = self
1949                .nostr_peer_fallback_addresses(peer_config, &static_addresses)
1950                .await;
1951            if !fallback.is_empty()
1952                && self
1953                    .attempt_peer_address_list(
1954                        peer_config,
1955                        peer_identity,
1956                        allow_bootstrap_nat,
1957                        &fallback,
1958                    )
1959                    .await
1960                    .is_ok()
1961            {
1962                return Ok(());
1963            }
1964        }
1965
1966        Err(NodeError::NoTransportForType(format!(
1967            "no operational transport for any of {}'s addresses",
1968            peer_config.npub
1969        )))
1970    }
1971
1972    // === Control API methods ===
1973
1974    /// Connect to a peer via the control API.
1975    ///
1976    /// Creates an ephemeral peer connection (not persisted to config, no
1977    /// auto-reconnect). Reuses the same connection path as auto-connect
1978    /// peers. Returns JSON data on success or an error message.
1979    pub(crate) async fn api_connect(
1980        &mut self,
1981        npub: &str,
1982        address: &str,
1983        transport: &str,
1984    ) -> Result<serde_json::Value, String> {
1985        let peer_config = PeerConfig {
1986            npub: npub.to_string(),
1987            alias: None,
1988            addresses: vec![PeerAddress::new(transport, address)],
1989            connect_policy: ConnectPolicy::Manual,
1990            auto_reconnect: false,
1991        };
1992
1993        // Pre-seed identity cache (same as initiate_peer_connections does)
1994        if let Ok(identity) = PeerIdentity::from_npub(npub) {
1995            self.peer_aliases
1996                .insert(*identity.node_addr(), identity.short_npub());
1997            self.register_identity(*identity.node_addr(), identity.pubkey_full());
1998        }
1999
2000        self.initiate_peer_connection(&peer_config)
2001            .await
2002            .map(|()| {
2003                info!(
2004                    npub = %npub,
2005                    address = %address,
2006                    transport = %transport,
2007                    "API connect initiated"
2008                );
2009                serde_json::json!({
2010                    "npub": npub,
2011                    "address": address,
2012                    "transport": transport,
2013                })
2014            })
2015            .map_err(|e| e.to_string())
2016    }
2017
2018    /// Disconnect a peer via the control API.
2019    ///
2020    /// Removes the peer and suppresses auto-reconnect.
2021    pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
2022        let peer_identity =
2023            PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
2024        let node_addr = *peer_identity.node_addr();
2025
2026        if !self.peers.contains_key(&node_addr) {
2027            return Err(format!("peer not found: {npub}"));
2028        }
2029
2030        // Remove the peer (full cleanup: sessions, indices, links, tree, bloom)
2031        self.remove_active_peer(&node_addr);
2032
2033        // Suppress any pending auto-reconnect
2034        self.retry_pending.remove(&node_addr);
2035
2036        info!(npub = %npub, "API disconnect completed");
2037
2038        Ok(serde_json::json!({
2039            "npub": npub,
2040            "disconnected": true,
2041        }))
2042    }
2043
2044    /// Adopt an already-established UDP traversal and start the normal FIPS
2045    /// Noise handshake over it.
2046    ///
2047    /// This is intended for integration with an external rendezvous runtime
2048    /// that has already completed relay signaling, STUN observation, and UDP
2049    /// hole punching. After handoff, the adopted socket is owned by FIPS.
2050    pub async fn adopt_established_traversal(
2051        &mut self,
2052        traversal: EstablishedTraversal,
2053    ) -> Result<BootstrapHandoffResult, NodeError> {
2054        debug!(
2055            peer_npub = %traversal.peer_npub,
2056            session_id = %traversal.session_id,
2057            remote_addr = %traversal.remote_addr,
2058            "adopting established traversal socket"
2059        );
2060
2061        if !self.state.is_operational() {
2062            return Err(NodeError::NotStarted);
2063        }
2064
2065        let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
2066        let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
2067            NodeError::InvalidPeerNpub {
2068                npub: traversal.peer_npub.clone(),
2069                reason: e.to_string(),
2070            }
2071        })?;
2072        let peer_node_addr = *peer_identity.node_addr();
2073
2074        self.peer_aliases
2075            .insert(peer_node_addr, peer_identity.short_npub());
2076        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
2077
2078        let transport_id = self.allocate_transport_id();
2079        // Adopted ephemeral UDP transports inherit MTU + socket-buffer sizing
2080        // (and accept_connections / advertise flags) from the operator's
2081        // configured [transports.udp] when the bootstrap runtime doesn't
2082        // pass an explicit override. Lookup tries `transport_name` first
2083        // (covers the `Named` multi-listener variant) and falls back to the
2084        // unnamed `Single` listener, so single- and named-listener configs
2085        // both inherit cleanly.
2086        //
2087        // Tradeoff: `UdpConfig::default()` sets MTU 1280 (IPv6 minimum), the
2088        // only value guaranteed to survive arbitrary middlebox paths.
2089        // Inheriting a higher operator-chosen MTU means NAT-traversed flows
2090        // initially attempt that MTU and may black-hole on tighter paths
2091        // until reactive `MtuExceeded` recovery kicks in. Operators who
2092        // raise the primary MTU based on known-clean topology accept that
2093        // tradeoff; the silent drop on a too-low default was strictly
2094        // worse for the common case where the primary MTU is reachable.
2095        //
2096        // Bind / external address fields are cleared since the socket is
2097        // already bound.
2098        let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
2099            let mut cfg = self
2100                .lookup_udp_config(traversal.transport_name.as_deref())
2101                .or_else(|| self.lookup_udp_config(None))
2102                .cloned()
2103                .unwrap_or_default();
2104            cfg.bind_addr = None;
2105            cfg.external_addr = None;
2106            cfg
2107        });
2108        let mut transport = crate::transport::udp::UdpTransport::new(
2109            transport_id,
2110            traversal.transport_name.clone(),
2111            inherited_config,
2112            packet_tx,
2113        );
2114
2115        transport
2116            .adopt_socket_async(traversal.socket)
2117            .await
2118            .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
2119
2120        let local_addr = transport.local_addr().ok_or_else(|| {
2121            NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
2122        })?;
2123
2124        self.transports.insert(
2125            transport_id,
2126            crate::transport::TransportHandle::Udp(transport),
2127        );
2128        self.bootstrap_transports.insert(transport_id);
2129        self.bootstrap_transport_npubs
2130            .insert(transport_id, traversal.peer_npub.clone());
2131
2132        let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
2133        if let Err(err) = self
2134            .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
2135            .await
2136        {
2137            self.bootstrap_transports.remove(&transport_id);
2138            self.bootstrap_transport_npubs.remove(&transport_id);
2139            if let Some(mut handle) = self.transports.remove(&transport_id) {
2140                let _ = handle.stop().await;
2141            }
2142            return Err(err);
2143        }
2144
2145        info!(
2146            peer = %self.peer_display_name(&peer_node_addr),
2147            transport_id = %transport_id,
2148            local_addr = %local_addr,
2149            remote_addr = %traversal.remote_addr,
2150            session_id = %traversal.session_id,
2151            "adopted NAT traversal socket; handshake initiated"
2152        );
2153
2154        Ok(BootstrapHandoffResult {
2155            transport_id,
2156            local_addr,
2157            remote_addr: traversal.remote_addr,
2158            peer_node_addr,
2159            session_id: traversal.session_id,
2160        })
2161    }
2162}