Skip to main content

fips_core/node/
lifecycle.rs

1//! Node lifecycle management: start, stop, and peer connection initiation.
2
3use super::{Node, NodeError, NodeState};
4use crate::config::{ConnectPolicy, PeerAddress, PeerConfig};
5use crate::discovery::nostr::{
6    ADVERT_IDENTIFIER, ADVERT_VERSION, BootstrapEvent, NostrDiscovery, OverlayAdvert,
7    OverlayEndpointAdvert, OverlayTransportKind,
8};
9use crate::discovery::{BootstrapHandoffResult, EstablishedTraversal};
10use crate::node::acl::PeerAclContext;
11use crate::node::wire::build_msg1;
12use crate::peer::PeerConnection;
13use crate::protocol::{Disconnect, DisconnectReason};
14use crate::transport::{Link, LinkDirection, LinkId, TransportAddr, TransportId, packet_channel};
15use crate::upper::tun::{TunDevice, TunState, run_tun_reader, shutdown_tun_interface};
16use crate::{NodeAddr, PeerIdentity};
17use std::collections::HashSet;
18use std::net::IpAddr;
19use std::thread;
20use std::time::Duration;
21use tracing::{debug, info, warn};
22
23/// True if `ip` is not a viable canonical advert endpoint for peers off
24/// the publisher's own LAN. Covers RFC1918, loopback, link-local, IPv4
25/// CGNAT (100.64/10), unspecified, multicast/benchmark, and IPv6
26/// unique-local/loopback/unspecified. We never publish these as the
27/// peer's primary `runtime_endpoint`; an off-LAN consumer can't route
28/// to them, and latching one in onto a slow overlay-relay fallback is
29/// the original bug this guard exists to prevent.
30fn is_unroutable_advert_ip(ip: IpAddr) -> bool {
31    match ip {
32        IpAddr::V4(v4) => {
33            v4.is_private()
34                || v4.is_loopback()
35                || v4.is_link_local()
36                || v4.is_unspecified()
37                || v4.is_multicast()
38                || v4.is_broadcast()
39                || v4.is_documentation()
40                // 100.64.0.0/10 — CGNAT, RFC 6598. Not routable on the
41                // public internet; behaves like an extra NAT layer.
42                || (v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 64)
43        }
44        IpAddr::V6(v6) => {
45            v6.is_loopback()
46                || v6.is_unspecified()
47                || v6.is_unique_local()
48                || v6.is_multicast()
49                // IPv6 link-local: fe80::/10
50                || (v6.segments()[0] & 0xffc0) == 0xfe80
51        }
52    }
53}
54
55const OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER: u64 = 2;
56
57impl Node {
58    /// Initiate connections to configured static peers.
59    ///
60    /// For each peer configured with AutoConnect policy, creates a link and
61    /// peer entry, then starts the Noise handshake by sending the first message.
62    pub(super) async fn initiate_peer_connections(&mut self) {
63        // Build display name map from all configured peers (alias or short npub),
64        // and pre-seed the identity cache from each peer's npub so that TUN packets
65        // addressed to a configured peer can be dispatched (and trigger session
66        // initiation) immediately on startup — without waiting for the link-layer
67        // handshake to complete first.
68        let peer_identities: Vec<(PeerIdentity, Option<String>)> = self
69            .config
70            .peers()
71            .iter()
72            .filter_map(|pc| {
73                PeerIdentity::from_npub(&pc.npub)
74                    .ok()
75                    .map(|id| (id, pc.alias.clone()))
76            })
77            .collect();
78
79        for (identity, alias) in peer_identities {
80            let name = alias.unwrap_or_else(|| identity.short_npub());
81            self.peer_aliases.insert(*identity.node_addr(), name);
82            // Pre-seed identity cache. The parity may be wrong (npub is x-only)
83            // but will be corrected to the real value when the peer is promoted
84            // after a successful Noise handshake.
85            self.register_identity(*identity.node_addr(), identity.pubkey_full());
86        }
87
88        // Collect peer configs to avoid borrow conflicts
89        let peer_configs: Vec<_> = self.config.auto_connect_peers().cloned().collect();
90
91        if peer_configs.is_empty() {
92            debug!("No static peers configured");
93            return;
94        }
95
96        debug!(
97            count = peer_configs.len(),
98            "Initiating static peer connections"
99        );
100
101        for peer_config in peer_configs {
102            if let Err(e) = self.initiate_peer_connection(&peer_config).await {
103                warn!(
104                    npub = %peer_config.npub,
105                    alias = ?peer_config.alias,
106                    error = %e,
107                    "Failed to initiate peer connection"
108                );
109                // Schedule a retry so transient address-resolution failures
110                // (e.g. cached endpoints stale, NAT rebinds, all addresses
111                // currently unreachable) recover without a daemon restart.
112                if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_config.npub) {
113                    self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
114                }
115                // No-transport failures most often mean the cached overlay
116                // advert is pointing at a dead post-NAT-rebind address. The
117                // advert cache is read-only inside fetch_advert, so retries
118                // would loop on the same dead address until expiry. Force a
119                // re-fetch so the next retry tick picks up fresh endpoints.
120                if matches!(e, crate::node::NodeError::NoTransportForType(_))
121                    && let Some(bootstrap) = self.nostr_discovery.clone()
122                {
123                    let npub = peer_config.npub.clone();
124                    tokio::spawn(async move {
125                        let _ = bootstrap.refetch_advert_for_stale_check(&npub).await;
126                    });
127                }
128            }
129        }
130    }
131
132    /// Initiate a connection to a single peer.
133    ///
134    /// Creates a link, starts the Noise handshake, and sends the first message.
135    pub(super) async fn initiate_peer_connection(
136        &mut self,
137        peer_config: &crate::config::PeerConfig,
138    ) -> Result<(), NodeError> {
139        // Parse the peer's npub to get their identity
140        let peer_identity =
141            PeerIdentity::from_npub(&peer_config.npub).map_err(|e| NodeError::InvalidPeerNpub {
142                npub: peer_config.npub.clone(),
143                reason: e.to_string(),
144            })?;
145
146        let peer_node_addr = *peer_identity.node_addr();
147
148        // Check if peer already exists (fully authenticated)
149        if self.peers.contains_key(&peer_node_addr) {
150            debug!(
151                npub = %peer_config.npub,
152                "Peer already exists, skipping"
153            );
154            return Ok(());
155        }
156
157        // Check if connection already in progress to this peer
158        let already_connecting = self.connections.values().any(|conn| {
159            conn.expected_identity()
160                .map(|id| id.node_addr() == &peer_node_addr)
161                .unwrap_or(false)
162        });
163        if already_connecting {
164            debug!(
165                npub = %peer_config.npub,
166                "Connection already in progress, skipping"
167            );
168            return Ok(());
169        }
170
171        self.try_peer_addresses(peer_config, peer_identity, true)
172            .await
173    }
174
175    fn is_connecting_to_peer(&self, peer_node_addr: &NodeAddr) -> bool {
176        self.connections.values().any(|conn| {
177            conn.expected_identity()
178                .map(|id| id.node_addr() == peer_node_addr)
179                .unwrap_or(false)
180        })
181    }
182
183    /// Initiate a connection to a peer on a specific transport and address.
184    ///
185    /// For connectionless transports (UDP, Ethernet): allocates a link, starts
186    /// the Noise IK handshake, sends msg1, and registers the connection for
187    /// msg2 dispatch.
188    ///
189    /// For connection-oriented transports (TCP, Tor): allocates a link and
190    /// starts a non-blocking transport connect. The handshake is deferred
191    /// until the transport connection is established — the tick handler
192    /// polls `connection_state()` and initiates the handshake when ready.
193    pub(super) async fn initiate_connection(
194        &mut self,
195        transport_id: TransportId,
196        remote_addr: TransportAddr,
197        peer_identity: PeerIdentity,
198    ) -> Result<(), NodeError> {
199        let peer_node_addr = *peer_identity.node_addr();
200
201        self.authorize_peer(
202            &peer_identity,
203            PeerAclContext::OutboundConnect,
204            transport_id,
205            &remote_addr,
206        )?;
207
208        let is_connection_oriented = self
209            .transports
210            .get(&transport_id)
211            .map(|t| t.transport_type().connection_oriented)
212            .unwrap_or(false);
213
214        // Allocate link ID and create link
215        let link_id = self.allocate_link_id();
216
217        let link = if is_connection_oriented {
218            Link::new(
219                link_id,
220                transport_id,
221                remote_addr.clone(),
222                LinkDirection::Outbound,
223                Duration::from_millis(self.config.node.base_rtt_ms),
224            )
225        } else {
226            Link::connectionless(
227                link_id,
228                transport_id,
229                remote_addr.clone(),
230                LinkDirection::Outbound,
231                Duration::from_millis(self.config.node.base_rtt_ms),
232            )
233        };
234
235        self.links.insert(link_id, link);
236
237        // Add reverse lookup for packet dispatch
238        self.addr_to_link
239            .insert((transport_id, remote_addr.clone()), link_id);
240
241        if is_connection_oriented {
242            // Connection-oriented: start non-blocking connect, defer handshake
243            if let Some(transport) = self.transports.get(&transport_id) {
244                match transport.connect(&remote_addr).await {
245                    Ok(()) => {
246                        debug!(
247                            peer = %self.peer_display_name(&peer_node_addr),
248                            transport_id = %transport_id,
249                            remote_addr = %remote_addr,
250                            link_id = %link_id,
251                            "Transport connect initiated (non-blocking)"
252                        );
253                        self.pending_connects.push(super::PendingConnect {
254                            link_id,
255                            transport_id,
256                            remote_addr,
257                            peer_identity,
258                        });
259                    }
260                    Err(e) => {
261                        // Clean up link
262                        self.links.remove(&link_id);
263                        self.addr_to_link.remove(&(transport_id, remote_addr));
264                        return Err(NodeError::TransportError(e.to_string()));
265                    }
266                }
267            }
268            Ok(())
269        } else {
270            // Connectionless: proceed with immediate handshake
271            self.start_handshake(link_id, transport_id, remote_addr, peer_identity)
272                .await
273        }
274    }
275
276    /// Start the Noise handshake on a link and send msg1.
277    ///
278    /// Called immediately for connectionless transports, or after the
279    /// transport connection is established for connection-oriented transports.
280    pub(super) async fn start_handshake(
281        &mut self,
282        link_id: LinkId,
283        transport_id: TransportId,
284        remote_addr: TransportAddr,
285        peer_identity: PeerIdentity,
286    ) -> Result<(), NodeError> {
287        let peer_node_addr = *peer_identity.node_addr();
288
289        // Create connection in handshake phase (outbound knows expected identity)
290        let current_time_ms = Self::now_ms();
291        let mut connection = PeerConnection::outbound(link_id, peer_identity, current_time_ms);
292
293        // Allocate a session index for this handshake
294        let our_index = match self.index_allocator.allocate() {
295            Ok(idx) => idx,
296            Err(e) => {
297                // Clean up the link we just created
298                self.links.remove(&link_id);
299                self.addr_to_link.remove(&(transport_id, remote_addr));
300                return Err(NodeError::IndexAllocationFailed(e.to_string()));
301            }
302        };
303
304        // Start the Noise handshake and get message 1
305        let our_keypair = self.identity.keypair();
306        let noise_msg1 =
307            match connection.start_handshake(our_keypair, self.startup_epoch, current_time_ms) {
308                Ok(msg) => msg,
309                Err(e) => {
310                    // Clean up the index and link
311                    let _ = self.index_allocator.free(our_index);
312                    self.links.remove(&link_id);
313                    self.addr_to_link.remove(&(transport_id, remote_addr));
314                    return Err(NodeError::HandshakeFailed(e.to_string()));
315                }
316            };
317
318        // Set index and transport info on the connection
319        connection.set_our_index(our_index);
320        connection.set_transport_id(transport_id);
321        connection.set_source_addr(remote_addr.clone());
322
323        // Build wire format msg1: [0x01][sender_idx:4 LE][noise_msg1:82]
324        let wire_msg1 = build_msg1(our_index, &noise_msg1);
325
326        debug!(
327            peer = %self.peer_display_name(&peer_node_addr),
328            transport_id = %transport_id,
329            remote_addr = %remote_addr,
330            link_id = %link_id,
331            our_index = %our_index,
332            "Connection initiated"
333        );
334
335        // Store msg1 for resend and schedule first resend
336        let resend_interval = self.config.node.rate_limit.handshake_resend_interval_ms;
337        connection.set_handshake_msg1(wire_msg1.clone(), current_time_ms + resend_interval);
338
339        // Track in pending_outbound for msg2 dispatch
340        self.pending_outbound
341            .insert((transport_id, our_index.as_u32()), link_id);
342        self.connections.insert(link_id, connection);
343
344        // Send the wire format handshake message
345        let send_result = match self.transports.get(&transport_id) {
346            Some(transport) => Some(transport.send(&remote_addr, &wire_msg1).await),
347            None => None,
348        };
349        if let Some(send_result) = send_result {
350            self.note_local_send_outcome(&send_result);
351            match send_result {
352                Ok(bytes) => {
353                    debug!(
354                        link_id = %link_id,
355                        our_index = %our_index,
356                        bytes,
357                        "Sent Noise handshake message 1 (wire format)"
358                    );
359                }
360                Err(e) => {
361                    warn!(
362                        link_id = %link_id,
363                        transport_id = %transport_id,
364                        remote_addr = %remote_addr,
365                        our_index = %our_index,
366                        error = %e,
367                        "Failed to send handshake message"
368                    );
369                    // Mark connection as failed but don't remove it yet
370                    // The event loop can handle retry logic
371                    if let Some(conn) = self.connections.get_mut(&link_id) {
372                        conn.mark_failed();
373                    }
374                }
375            }
376        }
377
378        Ok(())
379    }
380
381    /// Poll all transports for discovered peers and auto-connect.
382    ///
383    /// Called from the tick handler. Iterates operational transports,
384    /// drains their discovery buffers, and initiates connections to
385    /// newly discovered peers (if auto_connect is enabled).
386    pub(super) async fn poll_transport_discovery(&mut self) {
387        // Collect discoveries first to avoid borrow conflict with self
388        let mut to_connect = Vec::new();
389
390        for (transport_id, transport) in &self.transports {
391            if !transport.is_operational() {
392                continue;
393            }
394            if !transport.auto_connect() {
395                // Still drain the buffer so it doesn't grow unbounded
396                let _ = transport.discover();
397                continue;
398            }
399            let discovered = match transport.discover() {
400                Ok(peers) => peers,
401                Err(_) => continue,
402            };
403            for peer in discovered {
404                let pubkey = match peer.pubkey_hint {
405                    Some(pk) => pk,
406                    None => continue,
407                };
408                let identity = PeerIdentity::from_pubkey(pubkey);
409                let node_addr = *identity.node_addr();
410
411                // Skip self
412                if node_addr == *self.identity.node_addr() {
413                    continue;
414                }
415                // Skip if already connected
416                if self.peers.contains_key(&node_addr) {
417                    continue;
418                }
419                // Skip if connection already in progress
420                let connecting = self.connections.values().any(|c| {
421                    c.expected_identity()
422                        .map(|id| id.node_addr() == &node_addr)
423                        .unwrap_or(false)
424                });
425                if connecting {
426                    continue;
427                }
428
429                to_connect.push((*transport_id, peer.addr, identity));
430            }
431        }
432
433        for (transport_id, remote_addr, identity) in to_connect {
434            info!(
435                peer = %self.peer_display_name(identity.node_addr()),
436                transport_id = %transport_id,
437                remote_addr = %remote_addr,
438                "Auto-connecting to discovered peer"
439            );
440            if let Err(e) = self
441                .initiate_connection(transport_id, remote_addr, identity)
442                .await
443            {
444                warn!(error = %e, "Failed to auto-connect to discovered peer");
445            }
446        }
447    }
448
449    pub(super) async fn poll_nostr_discovery(&mut self) {
450        let Some(bootstrap) = self.nostr_discovery.clone() else {
451            return;
452        };
453
454        if let Err(err) = self.refresh_overlay_advert(&bootstrap).await {
455            debug!(error = %err, "Failed to refresh local Nostr overlay advert");
456        }
457
458        for event in bootstrap.drain_events().await {
459            match event {
460                BootstrapEvent::Established { traversal } => {
461                    let peer_npub = traversal.peer_npub.clone();
462                    if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
463                        let peer_addr = *peer_identity.node_addr();
464                        if self.peers.contains_key(&peer_addr) {
465                            debug!(
466                                peer_npub = %peer_npub,
467                                "Ignoring established NAT traversal for already-connected peer"
468                            );
469                            continue;
470                        }
471                        if self.is_connecting_to_peer(&peer_addr) {
472                            debug!(
473                                peer_npub = %peer_npub,
474                                "Ignoring established NAT traversal while peer handshake is already in progress"
475                            );
476                            continue;
477                        }
478                    }
479                    match self.adopt_established_traversal(traversal).await {
480                        Ok(_) => {
481                            info!(peer_npub = %peer_npub, "Adopted NAT traversal socket");
482                        }
483                        Err(err) => {
484                            warn!(peer_npub = %peer_npub, error = %err, "Failed to adopt NAT traversal");
485                            if let Ok(peer_identity) = PeerIdentity::from_npub(&peer_npub) {
486                                self.schedule_retry(*peer_identity.node_addr(), Self::now_ms());
487                            }
488                        }
489                    }
490                }
491                BootstrapEvent::Failed {
492                    peer_config,
493                    reason,
494                } => {
495                    let peer_identity = match PeerIdentity::from_npub(&peer_config.npub) {
496                        Ok(identity) => identity,
497                        Err(_) => continue,
498                    };
499                    let node_addr = *peer_identity.node_addr();
500                    if self.peers.contains_key(&node_addr) {
501                        debug!(
502                            npub = %peer_config.npub,
503                            error = %reason,
504                            "Ignoring failed NAT traversal for already-connected peer"
505                        );
506                        continue;
507                    }
508                    if self.is_connecting_to_peer(&node_addr) {
509                        debug!(
510                            npub = %peer_config.npub,
511                            error = %reason,
512                            "Ignoring failed NAT traversal while peer handshake is already in progress"
513                        );
514                        continue;
515                    }
516
517                    let now_ms = Self::now_ms();
518                    let decision = bootstrap.record_traversal_failure(&peer_config.npub, now_ms);
519                    if decision.should_warn {
520                        warn!(
521                            npub = %peer_config.npub,
522                            error = %reason,
523                            consecutive_failures = decision.consecutive_failures,
524                            cooldown_secs = decision
525                                .cooldown_until_ms
526                                .map(|t| t.saturating_sub(now_ms) / 1000),
527                            "NAT traversal failed"
528                        );
529                    } else {
530                        debug!(
531                            npub = %peer_config.npub,
532                            error = %reason,
533                            consecutive_failures = decision.consecutive_failures,
534                            "NAT traversal failed (suppressed by warn-rate-limit)"
535                        );
536                    }
537
538                    // B6: stale-advert eviction on the streak-threshold
539                    // crossing. Fire-and-forget; the outcome is logged so
540                    // operators can see when peers get cleaned up.
541                    if decision.crossed_threshold {
542                        let bootstrap = bootstrap.clone();
543                        let npub = peer_config.npub.clone();
544                        tokio::spawn(async move {
545                            let outcome = bootstrap.refetch_advert_for_stale_check(&npub).await;
546                            match outcome {
547                                crate::discovery::nostr::NostrRefetchOutcome::Evicted => info!(
548                                    npub = %npub,
549                                    "stale-advert sweep: peer evicted from advert cache"
550                                ),
551                                crate::discovery::nostr::NostrRefetchOutcome::Refreshed => info!(
552                                    npub = %npub,
553                                    "stale-advert sweep: peer republished, cache refreshed and streak reset"
554                                ),
555                                crate::discovery::nostr::NostrRefetchOutcome::SameAdvert => debug!(
556                                    npub = %npub,
557                                    "stale-advert sweep: advert unchanged, cooldown stands"
558                                ),
559                                crate::discovery::nostr::NostrRefetchOutcome::Skipped => debug!(
560                                    npub = %npub,
561                                    "stale-advert sweep: skipped (relay error or no advert_relays)"
562                                ),
563                            }
564                        });
565                    }
566
567                    if self
568                        .try_peer_addresses(&peer_config, peer_identity, false)
569                        .await
570                        .is_ok()
571                    {
572                        continue;
573                    }
574
575                    self.schedule_retry(node_addr, now_ms);
576                    if let Some(cooldown_until_ms) = decision.cooldown_until_ms
577                        && let Some(state) = self.retry_pending.get_mut(&node_addr)
578                    {
579                        // Push the next retry past the cooldown so the
580                        // open-discovery sweep doesn't re-enqueue and the
581                        // per-attempt backoff doesn't fire sooner.
582                        state.retry_after_ms = state.retry_after_ms.max(cooldown_until_ms);
583                    }
584                }
585            }
586        }
587
588        self.maybe_run_startup_open_discovery_sweep(&bootstrap)
589            .await;
590        self.queue_open_discovery_retries(&bootstrap).await;
591    }
592
593    /// Extract the bare discovery scope from the Nostr discovery app tag,
594    /// which is encoded as `fips-overlay-v1:<scope>` by
595    /// `apply_default_scoped_discovery`. Returns `None` when no scope is
596    /// set, so the LAN browser surfaces every advert it sees.
597    pub(super) fn lan_discovery_scope(&self) -> Option<String> {
598        let app = self.config.node.discovery.nostr.app.trim();
599        if app.is_empty() {
600            return None;
601        }
602        if let Some(rest) = app.strip_prefix("fips-overlay-v1:") {
603            let scope = rest.trim();
604            if scope.is_empty() {
605                None
606            } else {
607                Some(scope.to_string())
608            }
609        } else {
610            Some(app.to_string())
611        }
612    }
613
614    /// Drain mDNS-discovered peers and initiate Noise XX handshakes for
615    /// any whose npub we don't already have a connection or active peer
616    /// for. The handshake itself is the authentication — a spoofed
617    /// mDNS advert with someone else's npub fails the XX exchange and
618    /// is dropped.
619    pub(super) async fn poll_lan_discovery(&mut self) {
620        let Some(runtime) = self.lan_discovery.clone() else {
621            return;
622        };
623        let events = runtime.drain_events().await;
624        if events.is_empty() {
625            return;
626        }
627        // Snapshot the operational UDP transport set up front; mDNS
628        // discovery only yields UDP endpoints today.
629        let udp_transport_id = self.find_transport_for_type("udp");
630        let Some(transport_id) = udp_transport_id else {
631            debug!("lan: no operational UDP transport, skipping discovered peers");
632            return;
633        };
634        for event in events {
635            let crate::discovery::lan::LanEvent::Discovered(peer) = event;
636            let identity = match crate::PeerIdentity::from_npub(&peer.npub) {
637                Ok(id) => id,
638                Err(err) => {
639                    debug!(npub = %peer.npub, error = %err, "lan: skip bad npub");
640                    continue;
641                }
642            };
643            let peer_node_addr = *identity.node_addr();
644            if self.peers.contains_key(&peer_node_addr) {
645                continue;
646            }
647            let already_connecting = self.connections.values().any(|conn| {
648                conn.expected_identity()
649                    .map(|id| id.node_addr() == &peer_node_addr)
650                    .unwrap_or(false)
651            });
652            if already_connecting {
653                continue;
654            }
655            let remote_addr = crate::transport::TransportAddr::from_string(&peer.addr.to_string());
656            info!(
657                npub = %identity.short_npub(),
658                addr = %peer.addr,
659                "lan: initiating handshake to discovered peer"
660            );
661            if let Err(err) = self
662                .initiate_connection(transport_id, remote_addr, identity)
663                .await
664            {
665                debug!(
666                    npub = %peer.npub,
667                    error = %err,
668                    "lan: failed to initiate connection to discovered peer"
669                );
670            }
671        }
672    }
673
674    /// Poll pending transport connects and initiate handshakes for ready ones.
675    ///
676    /// Called from the tick handler. For each pending connect, queries the
677    /// transport's connection state. When a connection is established,
678    /// marks the link as Connected and starts the Noise handshake.
679    /// Failed connections are cleaned up and scheduled for retry.
680    pub(super) async fn poll_pending_connects(&mut self) {
681        if self.pending_connects.is_empty() {
682            return;
683        }
684
685        let mut completed = Vec::new();
686
687        for (i, pending) in self.pending_connects.iter().enumerate() {
688            let state = if let Some(transport) = self.transports.get(&pending.transport_id) {
689                transport.connection_state(&pending.remote_addr)
690            } else {
691                crate::transport::ConnectionState::Failed("transport removed".into())
692            };
693
694            match state {
695                crate::transport::ConnectionState::Connected => {
696                    completed.push((i, true, None));
697                }
698                crate::transport::ConnectionState::Failed(reason) => {
699                    completed.push((i, false, Some(reason)));
700                }
701                crate::transport::ConnectionState::Connecting => {
702                    // Still in progress, check on next tick
703                }
704                crate::transport::ConnectionState::None => {
705                    // Shouldn't happen — treat as failure
706                    completed.push((i, false, Some("no connection attempt found".into())));
707                }
708            }
709        }
710
711        // Process completions in reverse order to preserve indices
712        for (i, success, reason) in completed.into_iter().rev() {
713            let pending = self.pending_connects.remove(i);
714
715            if success {
716                // Mark link as Connected
717                if let Some(link) = self.links.get_mut(&pending.link_id) {
718                    link.set_connected();
719                }
720
721                debug!(
722                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
723                    transport_id = %pending.transport_id,
724                    remote_addr = %pending.remote_addr,
725                    link_id = %pending.link_id,
726                    "Transport connected, starting handshake"
727                );
728
729                // Start the handshake now that the transport is connected
730                if let Err(e) = self
731                    .start_handshake(
732                        pending.link_id,
733                        pending.transport_id,
734                        pending.remote_addr.clone(),
735                        pending.peer_identity,
736                    )
737                    .await
738                {
739                    warn!(
740                        link_id = %pending.link_id,
741                        error = %e,
742                        "Failed to start handshake after transport connect"
743                    );
744                    // Clean up link on handshake failure
745                    self.remove_link(&pending.link_id);
746                }
747            } else {
748                let reason = reason.unwrap_or_default();
749                warn!(
750                    peer = %self.peer_display_name(pending.peer_identity.node_addr()),
751                    transport_id = %pending.transport_id,
752                    remote_addr = %pending.remote_addr,
753                    link_id = %pending.link_id,
754                    reason = %reason,
755                    "Transport connect failed"
756                );
757
758                // Clean up link and schedule retry
759                self.remove_link(&pending.link_id);
760                self.links.remove(&pending.link_id);
761                self.schedule_retry(*pending.peer_identity.node_addr(), Self::now_ms());
762            }
763        }
764    }
765
766    // === State Transitions ===
767
768    /// Start the node.
769    ///
770    /// Initializes the TUN interface (if configured), spawns I/O threads,
771    /// and transitions to the Running state.
772    pub async fn start(&mut self) -> Result<(), NodeError> {
773        if !self.state.can_start() {
774            return Err(NodeError::AlreadyStarted);
775        }
776        self.state = NodeState::Starting;
777
778        // Create packet channel for transport -> Node communication
779        let packet_buffer_size = self.config.node.buffers.packet_channel;
780        let (packet_tx, packet_rx) = packet_channel(packet_buffer_size);
781        self.packet_tx = Some(packet_tx.clone());
782        self.packet_rx = Some(packet_rx);
783
784        // Initialize transports first (before TUN, before Nostr discovery).
785        let transport_handles = self.create_transports(&packet_tx).await;
786
787        for mut handle in transport_handles {
788            let transport_id = handle.transport_id();
789            let transport_type = handle.transport_type().name;
790            let name = handle.name().map(|s| s.to_string());
791
792            match handle.start().await {
793                Ok(()) => {
794                    self.transports.insert(transport_id, handle);
795                }
796                Err(e) => {
797                    if let Some(ref n) = name {
798                        warn!(transport_type, name = %n, error = %e, "Transport failed to start");
799                    } else {
800                        warn!(transport_type, error = %e, "Transport failed to start");
801                    }
802                }
803            }
804        }
805
806        if !self.transports.is_empty() {
807            info!(count = self.transports.len(), "Transports initialized");
808        }
809
810        // Spawn the off-task FMP-encrypt + UDP-send + FMP-decrypt
811        // worker pools. **Unix only** — both pools issue direct
812        // sendmmsg(2) / sendmsg(2)+UDP_GSO / recvmmsg(2) calls on
813        // raw file descriptors via `AsRawFd`, which is a unix-only
814        // trait. On Windows the rx_loop's tokio-based send/recv
815        // remain the canonical path; the perf overhaul lands its
816        // gains on unix.
817        //
818        // Worker count defaults to the number of CPUs, overridable
819        // via `FIPS_ENCRYPT_WORKERS=N` / `FIPS_DECRYPT_WORKERS=N`
820        // for debug / benchmarking. Hash-by-destination means a
821        // single TCP flow pins to one worker (preserves wire
822        // ordering); additional workers light up under multi-flow
823        // / multi-peer load. See `node::encrypt_worker` /
824        // `node::decrypt_worker` for full rationale.
825        #[cfg(unix)]
826        {
827            let cpu_default = std::thread::available_parallelism()
828                .map(|n| n.get())
829                .unwrap_or(1)
830                .max(1);
831            let encrypt_worker_count: usize = std::env::var("FIPS_ENCRYPT_WORKERS")
832                .ok()
833                .and_then(|s| s.parse().ok())
834                .unwrap_or(cpu_default)
835                .max(1);
836            self.encrypt_workers = Some(super::encrypt_worker::EncryptWorkerPool::spawn(
837                encrypt_worker_count,
838            ));
839            info!(
840                workers = encrypt_worker_count,
841                "Spawned FMP-encrypt worker pool"
842            );
843
844            // `FIPS_DECRYPT_WORKERS=0` disables the pool entirely and
845            // falls through to the in-line rx_loop decrypt path (the
846            // "test-mode" branch in `handle_encrypted_frame`, which is
847            // in fact a fully functional synchronous decrypt). Useful
848            // as an A/B against the worker pipeline when chasing
849            // scheduling/queueing regressions on the native macOS
850            // path. Any non-zero value (env or default) spawns the
851            // pool as before.
852            let decrypt_worker_count: usize = std::env::var("FIPS_DECRYPT_WORKERS")
853                .ok()
854                .and_then(|s| s.parse().ok())
855                .unwrap_or(cpu_default);
856            if decrypt_worker_count == 0 {
857                info!("FIPS_DECRYPT_WORKERS=0 → in-line decrypt in rx_loop (no worker pool)");
858            } else {
859                self.decrypt_workers = Some(super::decrypt_worker::DecryptWorkerPool::spawn(
860                    decrypt_worker_count,
861                ));
862                info!(
863                    workers = decrypt_worker_count,
864                    "Spawned FMP+FSP-decrypt worker pool"
865                );
866            }
867        }
868
869        if self.config.node.discovery.nostr.enabled {
870            match NostrDiscovery::start(&self.identity, self.config.node.discovery.nostr.clone())
871                .await
872            {
873                Ok(runtime) => {
874                    if let Err(err) = self.refresh_overlay_advert(&runtime).await {
875                        warn!(error = %err, "Failed to publish initial Nostr overlay advert");
876                    }
877                    self.nostr_discovery = Some(runtime);
878                    self.nostr_discovery_started_at_ms = Some(Self::now_ms());
879                    info!("Nostr overlay discovery enabled");
880                }
881                Err(err) => {
882                    warn!(error = %err, "Failed to start Nostr overlay discovery");
883                }
884            }
885        }
886
887        // mDNS / DNS-SD LAN discovery. Independent of Nostr — runs even
888        // when Nostr is disabled, since it gives us sub-second pairing
889        // on the same link without any relay or NAT-traversal roundtrip.
890        if self.config.node.discovery.lan.enabled {
891            let advertised_udp_port = self
892                .transports
893                .values()
894                .filter(|h| h.is_operational())
895                .filter(|h| h.transport_type().name == "udp")
896                .find_map(|h| h.local_addr().map(|addr| addr.port()))
897                .unwrap_or(0);
898            let scope = self.lan_discovery_scope();
899            match crate::discovery::lan::LanDiscovery::start(
900                &self.identity,
901                scope,
902                advertised_udp_port,
903                self.config.node.discovery.lan.clone(),
904            )
905            .await
906            {
907                Ok(runtime) => {
908                    self.lan_discovery = Some(runtime);
909                    info!("LAN mDNS discovery enabled");
910                }
911                Err(err) => {
912                    debug!(error = %err, "LAN mDNS discovery not started");
913                }
914            }
915        }
916
917        // Connect to static peers before TUN is active
918        // This allows handshake messages to be sent before we start accepting packets
919        self.initiate_peer_connections().await;
920
921        // Initialize TUN interface last, after transports and peers are ready
922        if self.config.tun.enabled {
923            let address = *self.identity.address();
924            match TunDevice::create(&self.config.tun, address).await {
925                Ok(device) => {
926                    let mtu = device.mtu();
927                    let name = device.name().to_string();
928                    let our_addr = *device.address();
929
930                    info!("TUN device active:");
931                    info!("     name: {}", name);
932                    info!("  address: {}", device.address());
933                    info!("      mtu: {}", mtu);
934
935                    // Calculate max MSS for TCP clamping
936                    let effective_mtu = self.effective_ipv6_mtu();
937                    let max_mss = effective_mtu.saturating_sub(40).saturating_sub(20); // IPv6 + TCP headers
938
939                    info!("effective MTU: {} bytes", effective_mtu);
940                    debug!("   max TCP MSS: {} bytes", max_mss);
941
942                    // On macOS, create a shutdown pipe. Writing to it unblocks the
943                    // reader thread's select() loop without closing the TUN fd
944                    // (which would cause a double-close when TunDevice drops).
945                    #[cfg(target_os = "macos")]
946                    let (shutdown_read_fd, shutdown_write_fd) = {
947                        let mut fds = [0i32; 2];
948                        if unsafe { libc::pipe(fds.as_mut_ptr()) } < 0 {
949                            return Err(NodeError::Tun(crate::upper::tun::TunError::Configure(
950                                "failed to create shutdown pipe".into(),
951                            )));
952                        }
953                        (fds[0], fds[1])
954                    };
955
956                    // Create writer (dups the fd for independent write access).
957                    // Pass path_mtu_lookup so inbound SYN-ACK clamp can read
958                    // per-destination path MTU learned via discovery.
959                    let (writer, tun_tx) =
960                        device.create_writer(max_mss, self.path_mtu_lookup.clone())?;
961
962                    // Spawn writer thread
963                    let writer_handle = thread::spawn(move || {
964                        writer.run();
965                    });
966
967                    // Clone tun_tx for the reader
968                    let reader_tun_tx = tun_tx.clone();
969
970                    // Create outbound channel for TUN reader → Node
971                    let tun_channel_size = self.config.node.buffers.tun_channel;
972                    let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(tun_channel_size);
973
974                    // Spawn reader thread
975                    let transport_mtu = self.transport_mtu();
976                    let path_mtu_lookup = self.path_mtu_lookup.clone();
977                    #[cfg(target_os = "macos")]
978                    let reader_handle = thread::spawn(move || {
979                        run_tun_reader(
980                            device,
981                            mtu,
982                            our_addr,
983                            reader_tun_tx,
984                            outbound_tx,
985                            transport_mtu,
986                            path_mtu_lookup,
987                            shutdown_read_fd,
988                        );
989                    });
990                    #[cfg(not(target_os = "macos"))]
991                    let reader_handle = thread::spawn(move || {
992                        run_tun_reader(
993                            device,
994                            mtu,
995                            our_addr,
996                            reader_tun_tx,
997                            outbound_tx,
998                            transport_mtu,
999                            path_mtu_lookup,
1000                        );
1001                    });
1002
1003                    self.tun_state = TunState::Active;
1004                    self.tun_name = Some(name);
1005                    self.tun_tx = Some(tun_tx);
1006                    self.tun_outbound_rx = Some(outbound_rx);
1007                    self.tun_reader_handle = Some(reader_handle);
1008                    self.tun_writer_handle = Some(writer_handle);
1009                    #[cfg(target_os = "macos")]
1010                    {
1011                        self.tun_shutdown_fd = Some(shutdown_write_fd);
1012                    }
1013                }
1014                Err(e) => {
1015                    self.tun_state = TunState::Failed;
1016                    warn!(error = %e, "Failed to initialize TUN, continuing without it");
1017                }
1018            }
1019        }
1020
1021        // Initialize DNS responder (independent of TUN).
1022        //
1023        // Default bind_addr is "::1" (IPv6 loopback). The shipped
1024        // fips-dns-setup configures systemd-resolved via a global
1025        // /etc/systemd/resolved.conf.d/fips.conf drop-in pointing at
1026        // [::1]:5354, which sidesteps a Linux IPV6_PKTINFO behaviour
1027        // where self-destined traffic to fips0's address is attributed
1028        // to fips0 in PKTINFO and gets silently dropped by the
1029        // mesh-interface filter in src/upper/dns.rs.
1030        //
1031        // For mesh-reachable resolution (rare), set bind_addr: "::"
1032        // in fips.yaml. The mesh-interface filter remains active to
1033        // prevent hosts-file alias enumeration in that mode.
1034        // `IPV6_V6ONLY=0` is set explicitly so IPv4 clients on
1035        // 127.0.0.1 still reach us regardless of kernel sysctl
1036        // defaults — but only when bind is on a wildcard / IPv6 path.
1037        if self.config.dns.enabled {
1038            let addr_str = self.config.dns.bind_addr();
1039            match addr_str.parse::<std::net::IpAddr>() {
1040                Ok(ip) => {
1041                    let bind = std::net::SocketAddr::new(ip, self.config.dns.port());
1042                    match Self::bind_dns_socket(bind) {
1043                        Ok(socket) => {
1044                            let dns_channel_size = self.config.node.buffers.dns_channel;
1045                            let (identity_tx, identity_rx) =
1046                                tokio::sync::mpsc::channel(dns_channel_size);
1047                            let dns_ttl = self.config.dns.ttl();
1048                            let base_hosts = crate::upper::hosts::HostMap::from_peer_configs(
1049                                self.config.peers(),
1050                            );
1051                            let reloader = if self.config.node.system_files_enabled {
1052                                let hosts_path = std::path::PathBuf::from(
1053                                    crate::upper::hosts::DEFAULT_HOSTS_PATH,
1054                                );
1055                                crate::upper::hosts::HostMapReloader::new(base_hosts, hosts_path)
1056                            } else {
1057                                crate::upper::hosts::HostMapReloader::memory_only(base_hosts)
1058                            };
1059                            // Resolve the TUN ifindex so the responder can
1060                            // drop queries arriving on the mesh interface
1061                            // (fips0). Without this, the `::` bind exposes
1062                            // /etc/fips/hosts alias probing to any mesh peer.
1063                            // When TUN isn't enabled or the name can't be
1064                            // resolved, `None` disables the filter (there
1065                            // is no mesh surface to defend anyway).
1066                            let mesh_ifindex = Self::lookup_mesh_ifindex(self.config.tun.name());
1067                            info!(
1068                                bind = %bind,
1069                                hosts = reloader.hosts().len(),
1070                                mesh_ifindex = ?mesh_ifindex,
1071                                "DNS responder started for .fips domain (auto-reload enabled)"
1072                            );
1073                            let handle = tokio::spawn(crate::upper::dns::run_dns_responder(
1074                                socket,
1075                                identity_tx,
1076                                dns_ttl,
1077                                reloader,
1078                                mesh_ifindex,
1079                            ));
1080                            self.dns_identity_rx = Some(identity_rx);
1081                            self.dns_task = Some(handle);
1082                        }
1083                        Err(e) => {
1084                            warn!(bind = %bind, error = %e, "Failed to start DNS responder");
1085                        }
1086                    }
1087                }
1088                Err(e) => {
1089                    warn!(addr = %addr_str, error = %e, "Invalid dns.bind_addr; DNS responder not started");
1090                }
1091            }
1092        }
1093
1094        self.state = NodeState::Running;
1095        info!("Node started:");
1096        info!("       state: {}", self.state);
1097        info!("  transports: {}", self.transports.len());
1098        info!(" connections: {}", self.connections.len());
1099        Ok(())
1100    }
1101
1102    /// Bind a UDP socket for the DNS responder.
1103    ///
1104    /// For IPv6 binds (including `::`), sets `IPV6_V6ONLY=0` so the socket
1105    /// also accepts IPv4-mapped addresses. This guarantees dual-stack
1106    /// delivery regardless of `net.ipv6.bindv6only` sysctl on the host —
1107    /// v4 clients on 127.0.0.1 and v6 clients on the fips0 address both
1108    /// land on the same socket.
1109    ///
1110    /// Also enables `IPV6_RECVPKTINFO` on IPv6 sockets so the responder
1111    /// can learn the arrival interface per packet. The responder uses that
1112    /// to drop queries arriving on the mesh TUN, closing the hosts-file
1113    /// probing side-channel created by the `::` bind.
1114    fn bind_dns_socket(
1115        addr: std::net::SocketAddr,
1116    ) -> Result<tokio::net::UdpSocket, std::io::Error> {
1117        use socket2::{Domain, Protocol, Socket, Type};
1118        let domain = if addr.is_ipv4() {
1119            Domain::IPV4
1120        } else {
1121            Domain::IPV6
1122        };
1123        let sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
1124        if addr.is_ipv6() {
1125            sock.set_only_v6(false)?;
1126            #[cfg(unix)]
1127            Self::set_recv_pktinfo_v6(&sock)?;
1128        }
1129        sock.set_nonblocking(true)?;
1130        sock.bind(&addr.into())?;
1131        tokio::net::UdpSocket::from_std(sock.into())
1132    }
1133
1134    /// Enable `IPV6_RECVPKTINFO` on an IPv6 UDP socket.
1135    ///
1136    /// After this setsockopt, each `recvmsg()` call on the socket receives
1137    /// an `IPV6_PKTINFO` control message containing the arrival interface
1138    /// index, which the DNS responder uses for its mesh-interface filter.
1139    #[cfg(unix)]
1140    fn set_recv_pktinfo_v6(sock: &socket2::Socket) -> Result<(), std::io::Error> {
1141        use std::os::fd::AsRawFd;
1142        let enable: libc::c_int = 1;
1143        let ret = unsafe {
1144            libc::setsockopt(
1145                sock.as_raw_fd(),
1146                libc::IPPROTO_IPV6,
1147                libc::IPV6_RECVPKTINFO,
1148                &enable as *const _ as *const libc::c_void,
1149                std::mem::size_of::<libc::c_int>() as libc::socklen_t,
1150            )
1151        };
1152        if ret < 0 {
1153            return Err(std::io::Error::last_os_error());
1154        }
1155        Ok(())
1156    }
1157
1158    /// Resolve the mesh TUN interface index by name.
1159    ///
1160    /// Returns `None` if the interface does not exist (e.g. TUN disabled
1161    /// or not yet created). A `None` result disables the DNS responder's
1162    /// mesh-interface filter — safe, because if there is no fips0 there
1163    /// is no mesh exposure to defend against.
1164    fn lookup_mesh_ifindex(name: &str) -> Option<u32> {
1165        #[cfg(unix)]
1166        {
1167            let c_name = std::ffi::CString::new(name).ok()?;
1168            let idx = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
1169            if idx == 0 { None } else { Some(idx) }
1170        }
1171        #[cfg(not(unix))]
1172        {
1173            let _ = name;
1174            None
1175        }
1176    }
1177
1178    /// Stop the node.
1179    ///
1180    /// Shuts down TUN interface, stops I/O threads, and transitions to
1181    /// the Stopped state.
1182    pub async fn stop(&mut self) -> Result<(), NodeError> {
1183        if !self.state.can_stop() {
1184            return Err(NodeError::NotStarted);
1185        }
1186        self.state = NodeState::Stopping;
1187        info!(state = %self.state, "Node stopping");
1188
1189        // Stop DNS responder
1190        if let Some(handle) = self.dns_task.take() {
1191            handle.abort();
1192            debug!("DNS responder stopped");
1193        }
1194
1195        // Send disconnect notifications to all active peers before closing transports
1196        self.send_disconnect_to_all_peers(DisconnectReason::Shutdown)
1197            .await;
1198
1199        // Stop Nostr overlay discovery background work and withdraw any advert.
1200        if let Some(bootstrap) = self.nostr_discovery.take()
1201            && let Err(e) = bootstrap.shutdown().await
1202        {
1203            warn!(error = %e, "Failed to shutdown Nostr overlay discovery");
1204        }
1205
1206        // Tear down LAN mDNS responder + browser. Best-effort: the
1207        // OS will eventually time the advert out via its TTL even if
1208        // we don't get a clean unregister out before the daemon exits.
1209        if let Some(lan) = self.lan_discovery.take() {
1210            lan.shutdown().await;
1211        }
1212
1213        // Shutdown transports (they're packet producers)
1214        let transport_ids: Vec<_> = self.transports.keys().cloned().collect();
1215        for transport_id in transport_ids {
1216            if let Some(mut handle) = self.transports.remove(&transport_id) {
1217                let transport_type = handle.transport_type().name;
1218                match handle.stop().await {
1219                    Ok(()) => {
1220                        info!(transport_id = %transport_id, transport_type, "Transport stopped");
1221                    }
1222                    Err(e) => {
1223                        warn!(
1224                            transport_id = %transport_id,
1225                            transport_type,
1226                            error = %e,
1227                            "Transport stop failed"
1228                        );
1229                    }
1230                }
1231            }
1232        }
1233
1234        // Drop packet channels
1235        self.packet_tx.take();
1236        self.packet_rx.take();
1237
1238        // Shutdown TUN interface
1239        if let Some(name) = self.tun_name.take() {
1240            info!(name = %name, "Shutting down TUN interface");
1241
1242            // Drop the tun_tx to signal the writer to stop
1243            self.tun_tx.take();
1244
1245            // Delete the interface (on Linux, causes reader to get EFAULT)
1246            if let Err(e) = shutdown_tun_interface(&name).await {
1247                warn!(name = %name, error = %e, "Failed to shutdown TUN interface");
1248            }
1249
1250            // On macOS, signal the reader thread to exit by writing to the
1251            // shutdown pipe. The reader's select() will wake up and break.
1252            #[cfg(target_os = "macos")]
1253            if let Some(fd) = self.tun_shutdown_fd.take() {
1254                unsafe {
1255                    libc::write(fd, b"x".as_ptr() as *const libc::c_void, 1);
1256                    libc::close(fd);
1257                }
1258            }
1259
1260            // Wait for threads to finish
1261            if let Some(handle) = self.tun_reader_handle.take() {
1262                let _ = handle.join();
1263            }
1264            if let Some(handle) = self.tun_writer_handle.take() {
1265                let _ = handle.join();
1266            }
1267
1268            self.tun_state = TunState::Disabled;
1269        }
1270
1271        self.state = NodeState::Stopped;
1272        info!(state = %self.state, "Node stopped");
1273        Ok(())
1274    }
1275
1276    /// Send disconnect notifications to all active peers.
1277    ///
1278    /// Best-effort: send failures are logged and ignored since the transport
1279    /// may already be degraded. This runs before transports are shut down.
1280    async fn send_disconnect_to_all_peers(&mut self, reason: DisconnectReason) {
1281        let disconnect = Disconnect::new(reason);
1282        let plaintext = disconnect.encode();
1283
1284        // Collect node_addrs to avoid borrow conflict with send helper
1285        let peer_addrs: Vec<NodeAddr> = self
1286            .peers
1287            .iter()
1288            .filter(|(_, peer)| peer.can_send() && peer.has_session())
1289            .map(|(addr, _)| *addr)
1290            .collect();
1291
1292        if peer_addrs.is_empty() {
1293            debug!(
1294                total_peers = self.peers.len(),
1295                "No sendable peers for disconnect notification"
1296            );
1297            return;
1298        }
1299
1300        let mut sent = 0usize;
1301        for node_addr in &peer_addrs {
1302            match self
1303                .send_encrypted_link_message(node_addr, &plaintext)
1304                .await
1305            {
1306                Ok(()) => sent += 1,
1307                Err(e) => {
1308                    debug!(
1309                        peer = %self.peer_display_name(node_addr),
1310                        error = %e,
1311                        "Failed to send disconnect (transport may be down)"
1312                    );
1313                }
1314            }
1315        }
1316
1317        info!(sent, total = peer_addrs.len(), reason = %reason, "Sent disconnect notifications");
1318    }
1319
1320    fn static_peer_addresses(&self, peer_config: &PeerConfig) -> Vec<PeerAddress> {
1321        peer_config
1322            .addresses_by_priority()
1323            .into_iter()
1324            .cloned()
1325            .collect()
1326    }
1327
1328    async fn nostr_peer_fallback_addresses(
1329        &self,
1330        peer_config: &PeerConfig,
1331        existing: &[PeerAddress],
1332    ) -> Vec<PeerAddress> {
1333        if !self.config.node.discovery.nostr.enabled
1334            || self.config.node.discovery.nostr.policy
1335                == crate::config::NostrDiscoveryPolicy::Disabled
1336        {
1337            return Vec::new();
1338        }
1339
1340        let Some(bootstrap) = self.nostr_discovery.clone() else {
1341            return Vec::new();
1342        };
1343        let endpoints = match bootstrap.advert_endpoints_for_peer(&peer_config.npub).await {
1344            Ok(endpoints) => endpoints,
1345            Err(err) => {
1346                debug!(
1347                    npub = %peer_config.npub,
1348                    error = %err,
1349                    "Failed to resolve Nostr advert endpoints for configured peer"
1350                );
1351                return Vec::new();
1352            }
1353        };
1354
1355        let mut fallback = Vec::new();
1356        let mut next_priority = existing
1357            .iter()
1358            .map(|addr| addr.priority)
1359            .max()
1360            .unwrap_or(100)
1361            .saturating_add(1);
1362        for endpoint in endpoints {
1363            let Some(candidate) = Self::overlay_endpoint_to_peer_address(&endpoint, next_priority)
1364            else {
1365                continue;
1366            };
1367            if existing
1368                .iter()
1369                .any(|addr| addr.transport == candidate.transport && addr.addr == candidate.addr)
1370                || fallback.iter().any(|addr: &PeerAddress| {
1371                    addr.transport == candidate.transport && addr.addr == candidate.addr
1372                })
1373            {
1374                continue;
1375            }
1376            fallback.push(candidate);
1377            next_priority = next_priority.saturating_add(1);
1378        }
1379        fallback
1380    }
1381
1382    fn overlay_endpoint_to_peer_address(
1383        endpoint: &OverlayEndpointAdvert,
1384        priority: u8,
1385    ) -> Option<PeerAddress> {
1386        let transport = match endpoint.transport {
1387            OverlayTransportKind::Udp => "udp",
1388            OverlayTransportKind::Tcp => "tcp",
1389            OverlayTransportKind::Tor => "tor",
1390        };
1391        Some(PeerAddress::with_priority(
1392            transport,
1393            endpoint.addr.clone(),
1394            priority,
1395        ))
1396    }
1397
1398    async fn attempt_peer_address_list(
1399        &mut self,
1400        peer_config: &PeerConfig,
1401        peer_identity: PeerIdentity,
1402        allow_bootstrap_nat: bool,
1403        addresses: &[PeerAddress],
1404    ) -> Result<(), NodeError> {
1405        for addr in addresses {
1406            if addr.transport == "udp" && addr.addr.eq_ignore_ascii_case("nat") {
1407                if !allow_bootstrap_nat {
1408                    continue;
1409                }
1410                let Some(bootstrap) = self.nostr_discovery.clone() else {
1411                    debug!(npub = %peer_config.npub, "No Nostr overlay runtime for udp:nat address");
1412                    continue;
1413                };
1414                bootstrap.request_connect(peer_config.clone()).await;
1415                info!(npub = %peer_config.npub, "Started Nostr UDP NAT traversal attempt");
1416                return Ok(());
1417            }
1418
1419            let (transport_id, remote_addr) = if addr.transport == "ethernet" {
1420                match self.resolve_ethernet_addr(&addr.addr) {
1421                    Ok(result) => result,
1422                    Err(e) => {
1423                        debug!(
1424                            transport = %addr.transport,
1425                            addr = %addr.addr,
1426                            error = %e,
1427                            "Failed to resolve Ethernet address"
1428                        );
1429                        continue;
1430                    }
1431                }
1432            } else if addr.transport == "ble" {
1433                #[cfg(bluer_available)]
1434                {
1435                    match self.resolve_ble_addr(&addr.addr) {
1436                        Ok(result) => result,
1437                        Err(e) => {
1438                            debug!(
1439                                transport = %addr.transport,
1440                                addr = %addr.addr,
1441                                error = %e,
1442                                "Failed to resolve BLE address"
1443                            );
1444                            continue;
1445                        }
1446                    }
1447                }
1448                #[cfg(not(bluer_available))]
1449                {
1450                    debug!(transport = %addr.transport, "BLE transport not available on this build");
1451                    continue;
1452                }
1453            } else {
1454                let tid = match self.find_transport_for_type(&addr.transport) {
1455                    Some(id) => id,
1456                    None => {
1457                        debug!(
1458                            transport = %addr.transport,
1459                            addr = %addr.addr,
1460                            "No operational transport for address type"
1461                        );
1462                        continue;
1463                    }
1464                };
1465                (tid, TransportAddr::from_string(&addr.addr))
1466            };
1467
1468            match self
1469                .initiate_connection(transport_id, remote_addr, peer_identity)
1470                .await
1471            {
1472                Ok(()) => return Ok(()),
1473                Err(e @ NodeError::AccessDenied(_)) => return Err(e),
1474                Err(e) => {
1475                    debug!(
1476                        npub = %peer_config.npub,
1477                        transport_id = %transport_id,
1478                        error = %e,
1479                        "Connection attempt failed, trying next address"
1480                    );
1481                }
1482            }
1483        }
1484
1485        Err(NodeError::NoTransportForType(format!(
1486            "no operational transport for any of {}'s addresses",
1487            peer_config.npub
1488        )))
1489    }
1490
1491    async fn queue_open_discovery_retries(&mut self, bootstrap: &std::sync::Arc<NostrDiscovery>) {
1492        self.run_open_discovery_sweep(bootstrap, None, "per-tick")
1493            .await;
1494    }
1495
1496    /// Open-discovery cache sweep. Iterates the cached overlay adverts and
1497    /// queues retries for non-configured, not-yet-connected peers.
1498    ///
1499    /// `max_age_secs`, if set, filters out adverts whose `created_at` is
1500    /// older than `now - max_age_secs`. The per-tick sweep passes `None`
1501    /// (relies on the cache's own `valid_until_ms` filter); the one-shot
1502    /// startup sweep passes `Some(startup_sweep_max_age_secs)`.
1503    ///
1504    /// `caller` is a short label included in log lines so per-tick and
1505    /// startup sweeps are distinguishable in operator-facing logs.
1506    pub(in crate::node) async fn run_open_discovery_sweep(
1507        &mut self,
1508        bootstrap: &std::sync::Arc<NostrDiscovery>,
1509        max_age_secs: Option<u64>,
1510        caller: &'static str,
1511    ) {
1512        if !self.config.node.discovery.nostr.enabled
1513            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
1514        {
1515            return;
1516        }
1517
1518        let configured_npubs = self
1519            .config
1520            .peers()
1521            .iter()
1522            .map(|peer| peer.npub.clone())
1523            .collect::<HashSet<_>>();
1524        let now_ms = Self::now_ms();
1525        let now_secs = now_ms / 1000;
1526        let mut enqueue_budget = self.open_discovery_enqueue_budget(&configured_npubs);
1527        if enqueue_budget == 0 {
1528            debug!(
1529                caller = %caller,
1530                "open-discovery sweep: enqueue budget is 0, skipping"
1531            );
1532            return;
1533        }
1534
1535        let candidates = bootstrap.cached_open_discovery_candidates(64).await;
1536        let cached_count = candidates.len();
1537        let mut enqueued = 0usize;
1538        let mut skipped_age = 0usize;
1539        let mut skipped_configured = 0usize;
1540        let mut skipped_self = 0usize;
1541        let mut skipped_connected = 0usize;
1542        let mut skipped_retry_pending = 0usize;
1543        let mut skipped_connecting = 0usize;
1544        let mut skipped_no_endpoints = 0usize;
1545        let mut skipped_invalid_npub = 0usize;
1546        let mut skipped_cooldown = 0usize;
1547
1548        for (npub, endpoints, created_at_secs) in candidates {
1549            if enqueue_budget == 0 {
1550                break;
1551            }
1552
1553            if let Some(max_age) = max_age_secs
1554                && now_secs.saturating_sub(created_at_secs) > max_age
1555            {
1556                skipped_age = skipped_age.saturating_add(1);
1557                continue;
1558            }
1559
1560            if configured_npubs.contains(&npub) {
1561                skipped_configured = skipped_configured.saturating_add(1);
1562                continue;
1563            }
1564
1565            let peer_identity = match PeerIdentity::from_npub(&npub) {
1566                Ok(identity) => identity,
1567                Err(_) => {
1568                    skipped_invalid_npub = skipped_invalid_npub.saturating_add(1);
1569                    continue;
1570                }
1571            };
1572            let node_addr = *peer_identity.node_addr();
1573            if node_addr == *self.identity.node_addr() {
1574                skipped_self = skipped_self.saturating_add(1);
1575                continue;
1576            }
1577            if self.peers.contains_key(&node_addr) {
1578                skipped_connected = skipped_connected.saturating_add(1);
1579                continue;
1580            }
1581            if self.retry_pending.contains_key(&node_addr) {
1582                skipped_retry_pending = skipped_retry_pending.saturating_add(1);
1583                continue;
1584            }
1585            if bootstrap.cooldown_until(&npub, now_ms).is_some() {
1586                skipped_cooldown = skipped_cooldown.saturating_add(1);
1587                continue;
1588            }
1589            let connecting = self.connections.values().any(|conn| {
1590                conn.expected_identity()
1591                    .map(|id| id.node_addr() == &node_addr)
1592                    .unwrap_or(false)
1593            });
1594            if connecting {
1595                skipped_connecting = skipped_connecting.saturating_add(1);
1596                continue;
1597            }
1598
1599            let mut addresses = Vec::new();
1600            let mut priority = 120u8;
1601            for endpoint in endpoints {
1602                let Some(candidate) = Self::overlay_endpoint_to_peer_address(&endpoint, priority)
1603                else {
1604                    continue;
1605                };
1606                if addresses.iter().any(|existing: &PeerAddress| {
1607                    existing.transport == candidate.transport && existing.addr == candidate.addr
1608                }) {
1609                    continue;
1610                }
1611                addresses.push(candidate);
1612                priority = priority.saturating_add(1);
1613            }
1614            if addresses.is_empty() {
1615                skipped_no_endpoints = skipped_no_endpoints.saturating_add(1);
1616                continue;
1617            }
1618
1619            self.peer_aliases
1620                .entry(node_addr)
1621                .or_insert_with(|| peer_identity.short_npub());
1622            self.register_identity(node_addr, peer_identity.pubkey_full());
1623
1624            let mut state = super::retry::RetryState::new(PeerConfig {
1625                npub: npub.clone(),
1626                alias: None,
1627                addresses,
1628                connect_policy: ConnectPolicy::AutoConnect,
1629                auto_reconnect: true,
1630            });
1631            state.reconnect = false;
1632            state.retry_after_ms = now_ms;
1633            state.expires_at_ms = Some(self.open_discovery_retry_expires_at_ms(now_ms));
1634            self.retry_pending.insert(node_addr, state);
1635            info!(
1636                caller = %caller,
1637                peer = %peer_identity.short_npub(),
1638                advert_age_secs = now_secs.saturating_sub(created_at_secs),
1639                "open-discovery sweep: queued retry for cached advert"
1640            );
1641            enqueue_budget = enqueue_budget.saturating_sub(1);
1642            enqueued = enqueued.saturating_add(1);
1643        }
1644
1645        // Always log a one-line summary on the startup sweep so operators
1646        // can verify it ran. Per-tick sweeps are noisier; only summarize
1647        // when something happened.
1648        let total_skipped = skipped_age
1649            + skipped_configured
1650            + skipped_self
1651            + skipped_connected
1652            + skipped_retry_pending
1653            + skipped_connecting
1654            + skipped_no_endpoints
1655            + skipped_invalid_npub
1656            + skipped_cooldown;
1657        let should_summarize = caller == "startup" || enqueued > 0;
1658        if should_summarize {
1659            info!(
1660                caller = %caller,
1661                cached = cached_count,
1662                queued = enqueued,
1663                skipped_age = skipped_age,
1664                skipped_configured = skipped_configured,
1665                skipped_self = skipped_self,
1666                skipped_connected = skipped_connected,
1667                skipped_retry_pending = skipped_retry_pending,
1668                skipped_connecting = skipped_connecting,
1669                skipped_no_endpoints = skipped_no_endpoints,
1670                skipped_invalid_npub = skipped_invalid_npub,
1671                skipped_cooldown = skipped_cooldown,
1672                skipped_total = total_skipped,
1673                "open-discovery sweep complete"
1674            );
1675        }
1676    }
1677
1678    /// One-shot startup sweep: runs once after the configured settle
1679    /// delay, iterating the cached overlay adverts and queueing retries
1680    /// for any peer with a recent enough advert that we haven't already
1681    /// configured statically or established a link to.
1682    ///
1683    /// Gated identically to [`run_open_discovery_sweep`]: requires
1684    /// `node.discovery.nostr.enabled` and `policy == open`.
1685    async fn maybe_run_startup_open_discovery_sweep(
1686        &mut self,
1687        bootstrap: &std::sync::Arc<NostrDiscovery>,
1688    ) {
1689        if self.startup_open_discovery_sweep_done {
1690            return;
1691        }
1692        if !self.config.node.discovery.nostr.enabled
1693            || self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
1694        {
1695            // Mark done so we don't keep re-checking on every tick.
1696            self.startup_open_discovery_sweep_done = true;
1697            return;
1698        }
1699        let Some(started_at_ms) = self.nostr_discovery_started_at_ms else {
1700            return;
1701        };
1702        let now_ms = Self::now_ms();
1703        let delay_ms = self
1704            .config
1705            .node
1706            .discovery
1707            .nostr
1708            .startup_sweep_delay_secs
1709            .saturating_mul(1000);
1710        if now_ms < started_at_ms.saturating_add(delay_ms) {
1711            return;
1712        }
1713
1714        let max_age_secs = self.config.node.discovery.nostr.startup_sweep_max_age_secs;
1715        self.run_open_discovery_sweep(bootstrap, Some(max_age_secs), "startup")
1716            .await;
1717        self.startup_open_discovery_sweep_done = true;
1718    }
1719
1720    fn available_outbound_slots(&self) -> usize {
1721        let connection_used = self
1722            .connections
1723            .len()
1724            .saturating_add(self.pending_connects.len());
1725        let connection_slots = if self.max_connections == 0 {
1726            usize::MAX
1727        } else {
1728            self.max_connections.saturating_sub(connection_used)
1729        };
1730
1731        let peer_slots = if self.max_peers == 0 {
1732            usize::MAX
1733        } else {
1734            self.max_peers.saturating_sub(self.peers.len())
1735        };
1736
1737        connection_slots.min(peer_slots)
1738    }
1739
1740    fn open_discovery_enqueue_budget(&self, configured_npubs: &HashSet<String>) -> usize {
1741        let current_open_discovery_pending = self
1742            .retry_pending
1743            .values()
1744            .filter(|state| !configured_npubs.contains(&state.peer_config.npub))
1745            .count();
1746
1747        let cap_remaining = self
1748            .config
1749            .node
1750            .discovery
1751            .nostr
1752            .open_discovery_max_pending
1753            .saturating_sub(current_open_discovery_pending);
1754
1755        cap_remaining.min(self.available_outbound_slots())
1756    }
1757
1758    fn open_discovery_retry_expires_at_ms(&self, now_ms: u64) -> u64 {
1759        now_ms.saturating_add(
1760            self.config
1761                .node
1762                .discovery
1763                .nostr
1764                .advert_ttl_secs
1765                .saturating_mul(1000)
1766                .saturating_mul(OPEN_DISCOVERY_RETRY_LIFETIME_MULTIPLIER),
1767        )
1768    }
1769
1770    async fn build_overlay_advert(
1771        &self,
1772        bootstrap: &std::sync::Arc<NostrDiscovery>,
1773    ) -> Option<OverlayAdvert> {
1774        if !self.config.node.discovery.nostr.enabled {
1775            return None;
1776        }
1777
1778        let mut endpoints = Vec::new();
1779        let mut has_udp_nat = false;
1780
1781        for handle in self.transports.values() {
1782            if !handle.is_operational() {
1783                continue;
1784            }
1785
1786            match handle.transport_type().name {
1787                "udp" => {
1788                    let Some(cfg) = self.lookup_udp_config(handle.name()) else {
1789                        continue;
1790                    };
1791                    if !cfg.advertise_on_nostr() {
1792                        continue;
1793                    }
1794                    if cfg.is_public() {
1795                        // Precedence:
1796                        // 1. operator-supplied `external_addr` (skips STUN)
1797                        // 2. non-wildcard *public* `local_addr` (operator
1798                        //    bound to a specific public IP directly)
1799                        // 3. STUN auto-discovery against ephemeral socket
1800                        //    (also taken when bind is wildcard *or* private —
1801                        //    a private bind is not peer-reachable, so we
1802                        //    must publish the public reflexive instead)
1803                        // 4. loud warn + omit endpoint
1804                        if let Some(explicit) = cfg.external_advert_addr() {
1805                            endpoints.push(OverlayEndpointAdvert {
1806                                transport: OverlayTransportKind::Udp,
1807                                addr: explicit.to_string(),
1808                            });
1809                        } else {
1810                            match handle.local_addr() {
1811                                Some(addr)
1812                                    if !addr.ip().is_unspecified()
1813                                        && !is_unroutable_advert_ip(addr.ip()) =>
1814                                {
1815                                    endpoints.push(OverlayEndpointAdvert {
1816                                        transport: OverlayTransportKind::Udp,
1817                                        addr: addr.to_string(),
1818                                    });
1819                                }
1820                                Some(addr) => {
1821                                    let key = handle.transport_id().as_u32();
1822                                    let port = addr.port();
1823                                    if let Some(public) =
1824                                        bootstrap.learn_public_udp_addr(key, port).await
1825                                    {
1826                                        endpoints.push(OverlayEndpointAdvert {
1827                                            transport: OverlayTransportKind::Udp,
1828                                            addr: public.to_string(),
1829                                        });
1830                                    } else {
1831                                        warn!(
1832                                            transport_id = key,
1833                                            bind_addr = %addr,
1834                                            "advert: udp public=true but bind is wildcard \
1835                                            or private and STUN observation failed; \
1836                                            advertising no UDP endpoint. Either set \
1837                                            transports.udp.external_addr, bind to a \
1838                                            specific *public* IP, or ensure \
1839                                            node.discovery.nostr.stun_servers is reachable"
1840                                        );
1841                                    }
1842                                }
1843                                None => {}
1844                            }
1845                        }
1846                    } else {
1847                        endpoints.push(OverlayEndpointAdvert {
1848                            transport: OverlayTransportKind::Udp,
1849                            addr: "nat".to_string(),
1850                        });
1851                        has_udp_nat = true;
1852                    }
1853                }
1854                "tcp" => {
1855                    let Some(cfg) = self.lookup_tcp_config(handle.name()) else {
1856                        continue;
1857                    };
1858                    if !cfg.advertise_on_nostr() {
1859                        continue;
1860                    }
1861                    // Precedence:
1862                    // 1. operator-supplied `external_addr` (only path that
1863                    //    works on cloud-NAT setups where the public IP is
1864                    //    not on a host interface).
1865                    // 2. non-wildcard *public* `local_addr` (operator bound
1866                    //    to a specific public IP directly).
1867                    // 3. loud warn + omit endpoint (no TCP STUN equivalent).
1868                    //
1869                    // A wildcard *or* private bind is never advertised as-is
1870                    // — peers off-LAN can't reach a private bind, and there
1871                    // is no TCP STUN to discover a public reflexive.
1872                    if let Some(explicit) = cfg.external_advert_addr() {
1873                        endpoints.push(OverlayEndpointAdvert {
1874                            transport: OverlayTransportKind::Tcp,
1875                            addr: explicit.to_string(),
1876                        });
1877                    } else {
1878                        match handle.local_addr() {
1879                            Some(addr)
1880                                if !addr.ip().is_unspecified()
1881                                    && !is_unroutable_advert_ip(addr.ip()) =>
1882                            {
1883                                endpoints.push(OverlayEndpointAdvert {
1884                                    transport: OverlayTransportKind::Tcp,
1885                                    addr: addr.to_string(),
1886                                });
1887                            }
1888                            Some(addr) => {
1889                                warn!(
1890                                    bind_addr = %addr,
1891                                    "advert: tcp advertise_on_nostr=true bound to wildcard \
1892                                    or private IP and no transports.tcp.external_addr set; \
1893                                    advertising no TCP endpoint. Either set external_addr \
1894                                    to the public IP (recommended for cloud 1:1-NAT setups) \
1895                                    or bind explicitly to the public IP"
1896                                );
1897                            }
1898                            None => {}
1899                        }
1900                    }
1901                }
1902                "tor" => {
1903                    let Some(cfg) = self.lookup_tor_config(handle.name()) else {
1904                        continue;
1905                    };
1906                    if !cfg.advertise_on_nostr() {
1907                        continue;
1908                    }
1909                    if let Some(addr) = handle.onion_address() {
1910                        endpoints.push(OverlayEndpointAdvert {
1911                            transport: OverlayTransportKind::Tor,
1912                            addr: format!("{}:{}", addr, cfg.advertised_port()),
1913                        });
1914                    }
1915                }
1916                _ => {}
1917            }
1918        }
1919
1920        if endpoints.is_empty() {
1921            return None;
1922        }
1923
1924        Some(OverlayAdvert {
1925            identifier: ADVERT_IDENTIFIER.to_string(),
1926            version: ADVERT_VERSION,
1927            endpoints,
1928            signal_relays: has_udp_nat.then(|| self.config.node.discovery.nostr.dm_relays.clone()),
1929            stun_servers: has_udp_nat
1930                .then(|| self.config.node.discovery.nostr.stun_servers.clone()),
1931        })
1932    }
1933
1934    async fn refresh_overlay_advert(
1935        &self,
1936        bootstrap: &std::sync::Arc<NostrDiscovery>,
1937    ) -> Result<(), crate::discovery::nostr::BootstrapError> {
1938        let advert = self.build_overlay_advert(bootstrap).await;
1939        bootstrap.update_local_advert(advert).await
1940    }
1941
1942    fn lookup_udp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::UdpConfig> {
1943        match (&self.config.transports.udp, transport_name) {
1944            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
1945            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
1946            _ => None,
1947        }
1948    }
1949
1950    fn lookup_tcp_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TcpConfig> {
1951        match (&self.config.transports.tcp, transport_name) {
1952            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
1953            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
1954            _ => None,
1955        }
1956    }
1957
1958    fn lookup_tor_config(&self, transport_name: Option<&str>) -> Option<&crate::config::TorConfig> {
1959        match (&self.config.transports.tor, transport_name) {
1960            (crate::config::TransportInstances::Single(cfg), None) => Some(cfg),
1961            (crate::config::TransportInstances::Named(configs), Some(name)) => configs.get(name),
1962            _ => None,
1963        }
1964    }
1965
1966    pub(in crate::node) async fn try_peer_addresses(
1967        &mut self,
1968        peer_config: &PeerConfig,
1969        peer_identity: PeerIdentity,
1970        allow_bootstrap_nat: bool,
1971    ) -> Result<(), NodeError> {
1972        let peer_node_addr = *peer_identity.node_addr();
1973        if self.peers.contains_key(&peer_node_addr) {
1974            debug!(
1975                npub = %peer_config.npub,
1976                "Peer already exists, skipping address attempts"
1977            );
1978            return Ok(());
1979        }
1980        if self.is_connecting_to_peer(&peer_node_addr) {
1981            debug!(
1982                npub = %peer_config.npub,
1983                "Connection already in progress, skipping address attempts"
1984            );
1985            return Ok(());
1986        }
1987
1988        // Static-first dialing: avoid delaying configured address attempts on
1989        // advert fetch/network latency.
1990        let static_addresses = self.static_peer_addresses(peer_config);
1991        if self
1992            .attempt_peer_address_list(
1993                peer_config,
1994                peer_identity,
1995                allow_bootstrap_nat,
1996                &static_addresses,
1997            )
1998            .await
1999            .is_ok()
2000        {
2001            return Ok(());
2002        }
2003
2004        {
2005            let fallback = self
2006                .nostr_peer_fallback_addresses(peer_config, &static_addresses)
2007                .await;
2008            if !fallback.is_empty()
2009                && self
2010                    .attempt_peer_address_list(
2011                        peer_config,
2012                        peer_identity,
2013                        allow_bootstrap_nat,
2014                        &fallback,
2015                    )
2016                    .await
2017                    .is_ok()
2018            {
2019                return Ok(());
2020            }
2021        }
2022
2023        Err(NodeError::NoTransportForType(format!(
2024            "no operational transport for any of {}'s addresses",
2025            peer_config.npub
2026        )))
2027    }
2028
2029    // === Control API methods ===
2030
2031    /// Connect to a peer via the control API.
2032    ///
2033    /// Creates an ephemeral peer connection (not persisted to config, no
2034    /// auto-reconnect). Reuses the same connection path as auto-connect
2035    /// peers. Returns JSON data on success or an error message.
2036    pub(crate) async fn api_connect(
2037        &mut self,
2038        npub: &str,
2039        address: &str,
2040        transport: &str,
2041    ) -> Result<serde_json::Value, String> {
2042        let peer_config = PeerConfig {
2043            npub: npub.to_string(),
2044            alias: None,
2045            addresses: vec![PeerAddress::new(transport, address)],
2046            connect_policy: ConnectPolicy::Manual,
2047            auto_reconnect: false,
2048        };
2049
2050        // Pre-seed identity cache (same as initiate_peer_connections does)
2051        if let Ok(identity) = PeerIdentity::from_npub(npub) {
2052            self.peer_aliases
2053                .insert(*identity.node_addr(), identity.short_npub());
2054            self.register_identity(*identity.node_addr(), identity.pubkey_full());
2055        }
2056
2057        self.initiate_peer_connection(&peer_config)
2058            .await
2059            .map(|()| {
2060                info!(
2061                    npub = %npub,
2062                    address = %address,
2063                    transport = %transport,
2064                    "API connect initiated"
2065                );
2066                serde_json::json!({
2067                    "npub": npub,
2068                    "address": address,
2069                    "transport": transport,
2070                })
2071            })
2072            .map_err(|e| e.to_string())
2073    }
2074
2075    /// Disconnect a peer via the control API.
2076    ///
2077    /// Removes the peer and suppresses auto-reconnect.
2078    pub(crate) fn api_disconnect(&mut self, npub: &str) -> Result<serde_json::Value, String> {
2079        let peer_identity =
2080            PeerIdentity::from_npub(npub).map_err(|e| format!("invalid npub '{npub}': {e}"))?;
2081        let node_addr = *peer_identity.node_addr();
2082
2083        if !self.peers.contains_key(&node_addr) {
2084            return Err(format!("peer not found: {npub}"));
2085        }
2086
2087        // Remove the peer (full cleanup: sessions, indices, links, tree, bloom)
2088        self.remove_active_peer(&node_addr);
2089
2090        // Suppress any pending auto-reconnect
2091        self.retry_pending.remove(&node_addr);
2092
2093        info!(npub = %npub, "API disconnect completed");
2094
2095        Ok(serde_json::json!({
2096            "npub": npub,
2097            "disconnected": true,
2098        }))
2099    }
2100
2101    /// Adopt an already-established UDP traversal and start the normal FIPS
2102    /// Noise handshake over it.
2103    ///
2104    /// This is intended for integration with an external rendezvous runtime
2105    /// that has already completed relay signaling, STUN observation, and UDP
2106    /// hole punching. After handoff, the adopted socket is owned by FIPS.
2107    pub async fn adopt_established_traversal(
2108        &mut self,
2109        traversal: EstablishedTraversal,
2110    ) -> Result<BootstrapHandoffResult, NodeError> {
2111        debug!(
2112            peer_npub = %traversal.peer_npub,
2113            session_id = %traversal.session_id,
2114            remote_addr = %traversal.remote_addr,
2115            "adopting established traversal socket"
2116        );
2117
2118        if !self.state.is_operational() {
2119            return Err(NodeError::NotStarted);
2120        }
2121
2122        let packet_tx = self.packet_tx.clone().ok_or(NodeError::NotStarted)?;
2123        let peer_identity = PeerIdentity::from_npub(&traversal.peer_npub).map_err(|e| {
2124            NodeError::InvalidPeerNpub {
2125                npub: traversal.peer_npub.clone(),
2126                reason: e.to_string(),
2127            }
2128        })?;
2129        let peer_node_addr = *peer_identity.node_addr();
2130        if self.peers.contains_key(&peer_node_addr) {
2131            debug!(
2132                peer_npub = %traversal.peer_npub,
2133                "Ignoring NAT traversal handoff for already-connected peer"
2134            );
2135            return Err(NodeError::PeerAlreadyExists(peer_node_addr));
2136        }
2137        if self.is_connecting_to_peer(&peer_node_addr) {
2138            debug!(
2139                peer_npub = %traversal.peer_npub,
2140                "Ignoring NAT traversal handoff while peer handshake is already in progress"
2141            );
2142            return Err(NodeError::PeerAlreadyExists(peer_node_addr));
2143        }
2144
2145        self.peer_aliases
2146            .insert(peer_node_addr, peer_identity.short_npub());
2147        self.register_identity(peer_node_addr, peer_identity.pubkey_full());
2148
2149        let transport_id = self.allocate_transport_id();
2150        // Adopted ephemeral UDP transports inherit MTU + socket-buffer sizing
2151        // (and accept_connections / advertise flags) from the operator's
2152        // configured [transports.udp] when the bootstrap runtime doesn't
2153        // pass an explicit override. Lookup tries `transport_name` first
2154        // (covers the `Named` multi-listener variant) and falls back to the
2155        // unnamed `Single` listener, so single- and named-listener configs
2156        // both inherit cleanly.
2157        //
2158        // Tradeoff: `UdpConfig::default()` sets MTU 1280 (IPv6 minimum), the
2159        // only value guaranteed to survive arbitrary middlebox paths.
2160        // Inheriting a higher operator-chosen MTU means NAT-traversed flows
2161        // initially attempt that MTU and may black-hole on tighter paths
2162        // until reactive `MtuExceeded` recovery kicks in. Operators who
2163        // raise the primary MTU based on known-clean topology accept that
2164        // tradeoff; the silent drop on a too-low default was strictly
2165        // worse for the common case where the primary MTU is reachable.
2166        //
2167        // Bind / external address fields are cleared since the socket is
2168        // already bound.
2169        let inherited_config = traversal.transport_config.clone().unwrap_or_else(|| {
2170            let mut cfg = self
2171                .lookup_udp_config(traversal.transport_name.as_deref())
2172                .or_else(|| self.lookup_udp_config(None))
2173                .cloned()
2174                .unwrap_or_default();
2175            cfg.bind_addr = None;
2176            cfg.external_addr = None;
2177            cfg
2178        });
2179        let mut transport = crate::transport::udp::UdpTransport::new(
2180            transport_id,
2181            traversal.transport_name.clone(),
2182            inherited_config,
2183            packet_tx,
2184        );
2185
2186        transport
2187            .adopt_socket_async(traversal.socket)
2188            .await
2189            .map_err(|e| NodeError::BootstrapHandoff(e.to_string()))?;
2190
2191        let local_addr = transport.local_addr().ok_or_else(|| {
2192            NodeError::BootstrapHandoff("adopted UDP transport has no local address".into())
2193        })?;
2194
2195        self.transports.insert(
2196            transport_id,
2197            crate::transport::TransportHandle::Udp(transport),
2198        );
2199        self.bootstrap_transports.insert(transport_id);
2200        self.bootstrap_transport_npubs
2201            .insert(transport_id, traversal.peer_npub.clone());
2202
2203        let remote_addr = TransportAddr::from_string(&traversal.remote_addr.to_string());
2204        if let Err(err) = self
2205            .initiate_connection(transport_id, remote_addr.clone(), peer_identity)
2206            .await
2207        {
2208            self.bootstrap_transports.remove(&transport_id);
2209            self.bootstrap_transport_npubs.remove(&transport_id);
2210            if let Some(mut handle) = self.transports.remove(&transport_id) {
2211                let _ = handle.stop().await;
2212            }
2213            return Err(err);
2214        }
2215
2216        info!(
2217            peer = %self.peer_display_name(&peer_node_addr),
2218            transport_id = %transport_id,
2219            local_addr = %local_addr,
2220            remote_addr = %traversal.remote_addr,
2221            session_id = %traversal.session_id,
2222            "adopted NAT traversal socket; handshake initiated"
2223        );
2224
2225        Ok(BootstrapHandoffResult {
2226            transport_id,
2227            local_addr,
2228            remote_addr: traversal.remote_addr,
2229            peer_node_addr,
2230            session_id: traversal.session_id,
2231        })
2232    }
2233}