Skip to main content

fips_core/node/
mod.rs

1//! FIPS Node Entity
2//!
3//! Top-level structure representing a running FIPS instance. The Node
4//! holds all state required for mesh routing: identity, tree state,
5//! Bloom filters, coordinate caches, transports, links, and peers.
6
7mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32    ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33    build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
38use crate::node::session::SessionEntry;
39use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
40use crate::peer::{ActivePeer, PeerConnection};
41#[cfg(any(target_os = "linux", target_os = "macos"))]
42use crate::transport::ethernet::EthernetTransport;
43use crate::transport::tcp::TcpTransport;
44use crate::transport::tor::TorTransport;
45use crate::transport::udp::UdpTransport;
46#[cfg(feature = "webrtc-transport")]
47use crate::transport::webrtc::WebRtcTransport;
48use crate::transport::{
49    ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
50    TransportHandle, TransportId,
51};
52use crate::tree::TreeState;
53use crate::upper::hosts::HostMap;
54use crate::upper::icmp_rate_limit::IcmpRateLimiter;
55use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
56use crate::utils::index::IndexAllocator;
57use crate::{
58    Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
59    PeerIdentity, encode_npub,
60};
61use rand::Rng;
62use std::collections::{HashMap, HashSet, VecDeque};
63use std::fmt;
64use std::sync::Arc;
65use std::thread::JoinHandle;
66use thiserror::Error;
67use tracing::{debug, warn};
68
69const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
70
71fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
72    if !plaintext
73        .first()
74        .is_some_and(|ty| *ty == LinkMessageType::SessionDatagram.to_byte())
75    {
76        return false;
77    }
78    let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
79        return false;
80    };
81    FspCommonPrefix::parse(fsp_payload)
82        .is_some_and(|prefix| prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted())
83}
84
85/// Half-range of the symmetric jitter applied to per-session rekey timers.
86///
87/// Each FMP/FSP session draws an offset uniformly from
88/// `[-REKEY_JITTER_SECS, +REKEY_JITTER_SECS]` seconds at construction and
89/// after each cutover. This preserves the configured mean interval while
90/// reducing dual-initiation bursts in symmetric-start meshes.
91pub(crate) const REKEY_JITTER_SECS: i64 = 15;
92
93/// Errors related to node operations.
94#[derive(Debug, Error)]
95pub enum NodeError {
96    #[error("node not started")]
97    NotStarted,
98
99    #[error("node already started")]
100    AlreadyStarted,
101
102    #[error("node already stopped")]
103    AlreadyStopped,
104
105    #[error("transport not found: {0}")]
106    TransportNotFound(TransportId),
107
108    #[error("no transport available for type: {0}")]
109    NoTransportForType(String),
110
111    #[error("link not found: {0}")]
112    LinkNotFound(LinkId),
113
114    #[error("connection not found: {0}")]
115    ConnectionNotFound(LinkId),
116
117    #[error("peer not found: {0:?}")]
118    PeerNotFound(NodeAddr),
119
120    #[error("peer already exists: {0:?}")]
121    PeerAlreadyExists(NodeAddr),
122
123    #[error("connection already exists for link: {0}")]
124    ConnectionAlreadyExists(LinkId),
125
126    #[error("invalid peer npub '{npub}': {reason}")]
127    InvalidPeerNpub { npub: String, reason: String },
128
129    #[error("discovery error: {0}")]
130    Discovery(String),
131
132    #[error("access denied: {0}")]
133    AccessDenied(String),
134
135    #[error("max connections exceeded: {max}")]
136    MaxConnectionsExceeded { max: usize },
137
138    #[error("max peers exceeded: {max}")]
139    MaxPeersExceeded { max: usize },
140
141    #[error("max links exceeded: {max}")]
142    MaxLinksExceeded { max: usize },
143
144    #[error("handshake incomplete for link {0}")]
145    HandshakeIncomplete(LinkId),
146
147    #[error("no session available for link {0}")]
148    NoSession(LinkId),
149
150    #[error("promotion failed for link {link_id}: {reason}")]
151    PromotionFailed { link_id: LinkId, reason: String },
152
153    #[error("send failed to {node_addr}: {reason}")]
154    SendFailed { node_addr: NodeAddr, reason: String },
155
156    #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
157    MtuExceeded {
158        node_addr: NodeAddr,
159        packet_size: usize,
160        mtu: u16,
161    },
162
163    #[error("config error: {0}")]
164    Config(#[from] ConfigError),
165
166    #[error("identity error: {0}")]
167    Identity(#[from] IdentityError),
168
169    #[error("TUN error: {0}")]
170    Tun(#[from] TunError),
171
172    #[error("index allocation failed: {0}")]
173    IndexAllocationFailed(String),
174
175    #[error("handshake failed: {0}")]
176    HandshakeFailed(String),
177
178    #[error("transport error: {0}")]
179    TransportError(String),
180
181    #[error("local route unavailable: {0}")]
182    LocalRouteUnavailable(String),
183
184    #[error("bootstrap handoff failed: {0}")]
185    BootstrapHandoff(String),
186}
187
188impl NodeError {
189    pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
190        if error.is_local_route_unavailable() {
191            Self::LocalRouteUnavailable(error.to_string())
192        } else {
193            Self::TransportError(error.to_string())
194        }
195    }
196
197    pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
198        matches!(self, Self::LocalRouteUnavailable(_))
199    }
200}
201
202/// Source-attributed packet delivered by a node running without a system TUN.
203#[derive(Debug, Clone, PartialEq, Eq)]
204pub struct NodeDeliveredPacket {
205    /// FIPS node address that originated the packet.
206    pub source_node_addr: NodeAddr,
207    /// Source Nostr public key when the node has learned it.
208    pub source_npub: Option<String>,
209    /// Destination FIPS address from the IPv6 packet.
210    pub destination: FipsAddress,
211    /// Full IPv6 packet after FIPS session decapsulation.
212    pub packet: Vec<u8>,
213}
214
215#[derive(Debug, Clone)]
216struct IdentityCacheEntry {
217    node_addr: NodeAddr,
218    pubkey: secp256k1::PublicKey,
219    npub: String,
220    last_seen_ms: u64,
221}
222
223impl IdentityCacheEntry {
224    fn new(
225        node_addr: NodeAddr,
226        pubkey: secp256k1::PublicKey,
227        npub: String,
228        last_seen_ms: u64,
229    ) -> Self {
230        Self {
231            node_addr,
232            pubkey,
233            npub,
234            last_seen_ms,
235        }
236    }
237}
238
239/// App-owned packet channels for embedding FIPS without a system TUN.
240#[derive(Debug)]
241pub struct ExternalPacketIo {
242    /// Send outbound IPv6 packets into the node.
243    pub outbound_tx: crate::upper::tun::TunOutboundTx,
244    /// Receive inbound IPv6 packets delivered by FIPS sessions.
245    pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
246}
247
248/// App-owned endpoint data channels for embedding FIPS without a daemon.
249#[derive(Debug)]
250pub(crate) struct EndpointDataIo {
251    /// Send endpoint data commands into the node RX loop.
252    ///
253    /// Bounded with a generous default so normal sender bursts do not
254    /// stall on semaphore acquisition. macOS pacing happens at the UDP
255    /// egress thread where the real Wi-Fi/interface bottleneck is visible;
256    /// constraining this app queue instead caused the inner TCP flow to
257    /// collapse under iperf. `FIPS_ENDPOINT_DATA_QUEUE_CAP` overrides the
258    /// default for benches.
259    pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
260    /// Receive endpoint data delivered by FIPS sessions.
261    ///
262    /// Unbounded so the rx_loop's send on inbound packet delivery is a
263    /// wait-free push (no semaphore acquire), and so we can drop the
264    /// per-packet cross-task relay that previously sat between the node
265    /// task and the `FipsEndpoint::recv()` consumer. Backpressure is
266    /// naturally bounded — the rx_loop both produces here and runs the
267    /// same runtime that schedules the consumer, so a stalled consumer
268    /// stalls production too.
269    pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
270    /// Clone of the event_tx exposed for in-process loopback (e.g.
271    /// `FipsEndpoint::send` to self_npub). Lets the endpoint inject an
272    /// event into the same queue without going through the encrypt /
273    /// decrypt path, while keeping every consumer reading from a single
274    /// channel.
275    pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
276}
277
278fn endpoint_data_command_capacity(requested: usize) -> usize {
279    if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
280        && let Ok(value) = raw.trim().parse::<usize>()
281        && value > 0
282    {
283        return value;
284    }
285
286    requested.max(1).max(32_768)
287}
288
289/// Commands accepted by the node endpoint data service.
290#[derive(Debug)]
291pub(crate) enum NodeEndpointCommand {
292    /// Send with an explicit response channel — used by callers that
293    /// care whether the local-stack handoff succeeded (e.g.
294    /// `blocking_send` waits for the runtime to accept the send).
295    Send {
296        remote: PeerIdentity,
297        payload: Vec<u8>,
298        queued_at: Option<std::time::Instant>,
299        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
300    },
301    /// **Fire-and-forget** variant of `Send` — no oneshot allocation,
302    /// no per-packet result channel. Used by the data-plane fast path
303    /// (`FipsEndpoint::send`) where the caller already discards the
304    /// result. Saves one oneshot::channel() allocation per outbound
305    /// packet on the application's send hot path.
306    SendOneway {
307        remote: PeerIdentity,
308        payload: Vec<u8>,
309        queued_at: Option<std::time::Instant>,
310    },
311    PeerSnapshot {
312        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
313    },
314    RelaySnapshot {
315        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
316    },
317    UpdateRelays {
318        advert_relays: Vec<String>,
319        dm_relays: Vec<String>,
320        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
321    },
322    /// Replace the runtime peer list. Newly added auto-connect peers get
323    /// `initiate_peer_connection` immediately; removed peers are dropped
324    /// from the retry queue (the regular liveness timeout reaps any active
325    /// session). Existing entries are kept and their `addresses` field is
326    /// refreshed so the next retry sees the latest hints.
327    UpdatePeers {
328        peers: Vec<crate::config::PeerConfig>,
329        response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
330    },
331}
332
333/// Reports what changed in response to `UpdatePeers`.
334#[derive(Debug, Clone, Default, PartialEq, Eq)]
335pub(crate) struct UpdatePeersOutcome {
336    pub(crate) added: usize,
337    pub(crate) removed: usize,
338    pub(crate) updated: usize,
339    pub(crate) unchanged: usize,
340}
341
342/// Endpoint data events emitted by the node session receive path.
343#[derive(Debug)]
344pub(crate) enum NodeEndpointEvent {
345    Data {
346        source_node_addr: NodeAddr,
347        source_npub: Option<String>,
348        payload: Vec<u8>,
349        queued_at: Option<std::time::Instant>,
350    },
351}
352
353/// Authenticated peer state exposed to embedded endpoint callers.
354#[derive(Debug, Clone, PartialEq, Eq)]
355pub(crate) struct NodeEndpointPeer {
356    pub(crate) npub: String,
357    pub(crate) connected: bool,
358    pub(crate) transport_addr: Option<String>,
359    pub(crate) transport_type: Option<String>,
360    pub(crate) link_id: u64,
361    pub(crate) srtt_ms: Option<u64>,
362    pub(crate) packets_sent: u64,
363    pub(crate) packets_recv: u64,
364    pub(crate) bytes_sent: u64,
365    pub(crate) bytes_recv: u64,
366    pub(crate) direct_probe_pending: bool,
367    pub(crate) direct_probe_after_ms: Option<u64>,
368}
369
370/// Live Nostr relay state exposed to embedded endpoint callers.
371#[derive(Debug, Clone, PartialEq, Eq)]
372pub(crate) struct NodeEndpointRelayStatus {
373    pub(crate) url: String,
374    pub(crate) status: String,
375}
376
377/// Node operational state.
378#[derive(Clone, Copy, Debug, PartialEq, Eq)]
379pub enum NodeState {
380    /// Created but not started.
381    Created,
382    /// Starting up (initializing transports).
383    Starting,
384    /// Fully operational.
385    Running,
386    /// Shutting down.
387    Stopping,
388    /// Stopped.
389    Stopped,
390}
391
392impl NodeState {
393    /// Check if node is operational.
394    pub fn is_operational(&self) -> bool {
395        matches!(self, NodeState::Running)
396    }
397
398    /// Check if node can be started.
399    pub fn can_start(&self) -> bool {
400        matches!(self, NodeState::Created | NodeState::Stopped)
401    }
402
403    /// Check if node can be stopped.
404    pub fn can_stop(&self) -> bool {
405        matches!(self, NodeState::Running)
406    }
407}
408
409impl fmt::Display for NodeState {
410    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
411        let s = match self {
412            NodeState::Created => "created",
413            NodeState::Starting => "starting",
414            NodeState::Running => "running",
415            NodeState::Stopping => "stopping",
416            NodeState::Stopped => "stopped",
417        };
418        write!(f, "{}", s)
419    }
420}
421
422/// Recent request tracking for dedup and reverse-path forwarding.
423///
424/// When a LookupRequest is forwarded through a node, the node stores the
425/// request_id and which peer sent it. When the corresponding LookupResponse
426/// arrives, it's forwarded back to that peer (reverse-path forwarding).
427/// The `response_forwarded` flag prevents response routing loops.
428#[derive(Clone, Debug)]
429pub(crate) struct RecentRequest {
430    /// The peer who sent this request to us.
431    pub(crate) from_peer: NodeAddr,
432    /// When we received this request (Unix milliseconds).
433    pub(crate) timestamp_ms: u64,
434    /// Whether we've already forwarded a response for this request.
435    /// Prevents response routing loops when convergent request paths
436    /// create bidirectional entries in recent_requests.
437    pub(crate) response_forwarded: bool,
438}
439
440impl RecentRequest {
441    pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
442        Self {
443            from_peer,
444            timestamp_ms,
445            response_forwarded: false,
446        }
447    }
448
449    /// Check if this entry has expired (older than expiry_ms).
450    pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
451        current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
452    }
453}
454
455/// Key for addr_to_link reverse lookup.
456type AddrKey = (TransportId, TransportAddr);
457
458/// Per-transport kernel drop tracking for congestion detection.
459///
460/// Sampled every tick (1s). The `dropping` flag indicates whether new
461/// kernel drops were observed since the previous sample.
462#[derive(Debug, Default)]
463struct TransportDropState {
464    /// Previous `recv_drops` sample (cumulative counter).
465    prev_drops: u64,
466    /// True if drops increased since the last sample.
467    dropping: bool,
468}
469
470/// State for a link waiting for transport-level connection establishment.
471///
472/// For connection-oriented transports (TCP, Tor), the transport connect runs
473/// asynchronously. This struct holds the data needed to complete the handshake
474/// once the connection is ready.
475struct PendingConnect {
476    /// The link that was created for this connection.
477    link_id: LinkId,
478    /// Which transport is being used.
479    transport_id: TransportId,
480    /// The remote address being connected to.
481    remote_addr: TransportAddr,
482    /// The peer identity (for handshake initiation).
483    peer_identity: PeerIdentity,
484}
485
486/// A running FIPS node instance.
487///
488/// This is the top-level container holding all node state.
489///
490/// ## Peer Lifecycle
491///
492/// Peers go through two phases:
493/// 1. **Connection phase** (`connections`): Handshake in progress, indexed by LinkId
494/// 2. **Active phase** (`peers`): Authenticated, indexed by NodeAddr
495///
496/// The `addr_to_link` map enables dispatching incoming packets to the right
497/// connection before authentication completes.
498// Discovery lookup constants moved to config: node.discovery.attempt_timeouts_secs, node.discovery.ttl
499pub struct Node {
500    // === Identity ===
501    /// This node's cryptographic identity.
502    identity: Identity,
503
504    /// Random epoch generated at startup for peer restart detection.
505    /// Exchanged inside Noise handshake messages so peers can detect restarts.
506    startup_epoch: [u8; 8],
507
508    /// Instant when the node was created, for uptime reporting.
509    started_at: std::time::Instant,
510
511    // === Configuration ===
512    /// Loaded configuration.
513    config: Config,
514
515    // === State ===
516    /// Node operational state.
517    state: NodeState,
518
519    /// Whether this is a leaf-only node.
520    is_leaf_only: bool,
521
522    // === Spanning Tree ===
523    /// Local spanning tree state.
524    tree_state: TreeState,
525
526    // === Bloom Filter ===
527    /// Local Bloom filter state.
528    bloom_state: BloomState,
529
530    // === Routing ===
531    /// Address -> coordinates cache (from session setup and discovery).
532    coord_cache: CoordCache,
533    /// Locally learned reverse-path next-hop hints.
534    learned_routes: LearnedRouteTable,
535    /// Recent discovery requests (dedup + reverse-path forwarding).
536    /// Maps request_id → RecentRequest.
537    recent_requests: HashMap<u64, RecentRequest>,
538    /// Per-destination path MTU lookup, keyed by FipsAddress (mirrors
539    /// `coord_cache.entries[*].path_mtu`). Sync read-only access from
540    /// the TUN reader/writer threads at TCP MSS clamp time so the
541    /// SYN/SYN-ACK clamp can use the smaller of the local-egress floor
542    /// and the learned per-destination path MTU.
543    path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
544
545    // === Transports & Links ===
546    /// Active transports (owned by Node).
547    transports: HashMap<TransportId, TransportHandle>,
548    /// Per-transport kernel drop tracking for congestion detection.
549    transport_drops: HashMap<TransportId, TransportDropState>,
550    /// Active links.
551    links: HashMap<LinkId, Link>,
552    /// Reverse lookup: (transport_id, remote_addr) -> link_id.
553    addr_to_link: HashMap<AddrKey, LinkId>,
554
555    // === Packet Channel ===
556    /// Packet sender for transports.
557    packet_tx: Option<PacketTx>,
558    /// Packet receiver (for event loop).
559    packet_rx: Option<PacketRx>,
560
561    // === Connections (Handshake Phase) ===
562    /// Pending connections (handshake in progress).
563    /// Indexed by LinkId since we don't know the peer's identity yet.
564    connections: HashMap<LinkId, PeerConnection>,
565
566    // === Peers (Active Phase) ===
567    /// Authenticated peers.
568    /// Indexed by NodeAddr (verified identity).
569    peers: HashMap<NodeAddr, ActivePeer>,
570
571    // === End-to-End Sessions ===
572    /// Session table for end-to-end encrypted sessions.
573    /// Keyed by remote NodeAddr.
574    sessions: HashMap<NodeAddr, SessionEntry>,
575
576    // === Identity Cache ===
577    /// Maps FipsAddress prefix bytes (bytes 1-15) to cached peer identity data.
578    /// Enables reverse lookup from IPv6 destination to session/routing identity.
579    identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
580
581    // === Pending TUN Packets ===
582    /// Packets queued while waiting for session establishment.
583    /// Keyed by destination NodeAddr, bounded per-dest and total.
584    pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
585    /// Endpoint data payloads queued while waiting for session establishment.
586    pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
587    // === Pending Discovery Lookups ===
588    /// Tracks in-flight discovery lookups. Maps target NodeAddr to the
589    /// initiation timestamp (Unix ms). Prevents duplicate flood queries.
590    pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
591
592    // === Resource Limits ===
593    /// Maximum connections (0 = unlimited).
594    max_connections: usize,
595    /// Maximum peers (0 = unlimited).
596    max_peers: usize,
597    /// Maximum links (0 = unlimited).
598    max_links: usize,
599
600    // === Counters ===
601    /// Next link ID to allocate.
602    next_link_id: u64,
603    /// Next transport ID to allocate.
604    next_transport_id: u32,
605
606    // === Node Statistics ===
607    /// Routing, forwarding, discovery, and error signal counters.
608    stats: stats::NodeStats,
609
610    /// Time-series history of node-level metrics (1s/1m rings).
611    stats_history: stats_history::StatsHistory,
612
613    // === TUN Interface ===
614    /// TUN device state.
615    tun_state: TunState,
616    /// TUN interface name (for cleanup).
617    tun_name: Option<String>,
618    /// TUN packet sender channel.
619    tun_tx: Option<TunTx>,
620    /// Receiver for outbound packets from the TUN reader.
621    tun_outbound_rx: Option<TunOutboundRx>,
622    /// App-owned packet sink used by embedded/no-TUN integrations.
623    external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
624    /// Endpoint data command receiver used by embedded/no-daemon integrations.
625    endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
626    /// Endpoint data event sink used by embedded/no-daemon integrations.
627    endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
628    /// Off-task FMP-encrypt + UDP-send worker pool. `None` if not yet
629    /// spawned (set up in `start()` once transports are running).
630    /// `Some(pool)` once available; the pool internally holds
631    /// per-worker mpsc senders and round-robins jobs across them.
632    /// See `node::encrypt_worker` for the rationale and layout.
633    encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
634    /// Off-task FMP + FSP decrypt + delivery worker pool. Mirror of
635    /// `encrypt_workers` for the receive side.
636    decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
637    /// Set of sessions that have been registered with the decrypt
638    /// shard worker pool. Used by rx_loop to decide between fast-path
639    /// dispatch (worker owns the session) and legacy in-place decrypt
640    /// (worker doesn't have it yet). Per the data-plane restructure,
641    /// the worker owns its session state directly — there's no shared
642    /// `Arc<RwLock<HashMap>>` of cipher / replay state anymore, only
643    /// this set tracks **whether** the worker has been told about a
644    /// given session.
645    decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
646    /// Fallback channel: decrypt worker bounces non-fast-path packets
647    /// (anything that's not bulk EndpointData) back here for rx_loop
648    /// to handle via the legacy path. Drained by a new rx_loop arm.
649    decrypt_fallback_rx:
650        Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
651    decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
652    /// TUN reader thread handle.
653    tun_reader_handle: Option<JoinHandle<()>>,
654    /// TUN writer thread handle.
655    tun_writer_handle: Option<JoinHandle<()>>,
656    /// Shutdown pipe: writing to this fd unblocks the TUN reader thread on macOS.
657    /// On Linux, deleting the interface via netlink serves the same purpose.
658    #[cfg(target_os = "macos")]
659    tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
660
661    // === DNS Responder ===
662    /// Receiver for resolved identities from the DNS responder.
663    dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
664    /// DNS responder task handle.
665    dns_task: Option<tokio::task::JoinHandle<()>>,
666
667    // === Index-Based Session Dispatch ===
668    /// Allocator for session indices.
669    index_allocator: IndexAllocator,
670    /// O(1) lookup: (transport_id, our_index) → NodeAddr.
671    /// This maps our session index to the peer that uses it.
672    peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
673    /// Pending outbound handshakes by our sender_idx.
674    /// Tracks which LinkId corresponds to which session index.
675    pending_outbound: HashMap<(TransportId, u32), LinkId>,
676
677    // === Rate Limiting ===
678    /// Rate limiter for msg1 processing (DoS protection).
679    msg1_rate_limiter: HandshakeRateLimiter,
680    /// Rate limiter for ICMP Packet Too Big messages.
681    icmp_rate_limiter: IcmpRateLimiter,
682    /// Rate limiter for routing error signals (CoordsRequired / PathBroken).
683    routing_error_rate_limiter: RoutingErrorRateLimiter,
684    /// Rate limiter for source-side CoordsRequired/PathBroken responses.
685    coords_response_rate_limiter: RoutingErrorRateLimiter,
686    /// Backoff for failed discovery lookups (originator-side).
687    discovery_backoff: DiscoveryBackoff,
688    /// Rate limiter for forwarded discovery requests (transit-side).
689    discovery_forward_limiter: DiscoveryForwardRateLimiter,
690
691    // === Pending Transport Connects ===
692    /// Links waiting for transport-level connection establishment before
693    /// sending handshake msg1. For connection-oriented transports (TCP, Tor),
694    /// the transport connect runs in the background; the tick handler polls
695    /// connection_state() and initiates the handshake when connected.
696    pending_connects: Vec<PendingConnect>,
697
698    // === Connection Retry ===
699    /// Retry state for peers whose outbound connections have failed.
700    /// Keyed by NodeAddr. Entries are created when a handshake times out
701    /// or fails, and removed on successful promotion or when max retries
702    /// are exhausted.
703    retry_pending: HashMap<NodeAddr, retry::RetryState>,
704
705    /// Optional Nostr/STUN overlay discovery coordinator for `udp:nat` peers.
706    nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
707    /// mDNS / DNS-SD responder + browser for local-link peer discovery.
708    /// Identity is unverified at this layer — the Noise XX handshake
709    /// initiated against an mDNS-observed endpoint is what proves the
710    /// peer holds the matching private key.
711    lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
712    /// Same-host JSON registry under `~/.fips/instances`. Records are
713    /// loopback routing hints only; peer identity is still verified by the
714    /// Noise handshake.
715    local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
716    local_instance_started_at_ms: Option<u64>,
717    last_local_instance_publish_ms: Option<u64>,
718    last_local_instance_scan_ms: Option<u64>,
719    /// Wall-clock ms when Nostr discovery successfully started, used to
720    /// schedule the one-shot startup advert sweep after a settle delay.
721    /// `None` until discovery comes up; remains `None` if discovery is
722    /// disabled or failed to start.
723    nostr_discovery_started_at_ms: Option<u64>,
724    /// Whether the one-shot startup advert sweep has run. Set to true
725    /// after the first sweep fires (under `policy: open`); thereafter
726    /// only the per-tick `queue_open_discovery_retries` continues.
727    startup_open_discovery_sweep_done: bool,
728    /// Per-peer UDP transports adopted from NAT traversal handoff.
729    bootstrap_transports: HashSet<TransportId>,
730    /// Originating peer npub (bech32) for each adopted bootstrap
731    /// transport, captured at `adopt_established_traversal` time.
732    /// Populated alongside `bootstrap_transports`; cleared in
733    /// `cleanup_bootstrap_transport_if_unused`. Used by the rx loop to
734    /// route fatal-protocol-mismatch observations back to the
735    /// Nostr-discovery `failure_state` for long cooldown application.
736    bootstrap_transport_npubs: HashMap<TransportId, String>,
737    /// Peers that should not be used as reply-learned fallback transit for
738    /// other destinations. Direct lookups to the peer are still permitted.
739    discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
740
741    // === Periodic Parent Re-evaluation ===
742    /// Timestamp of last periodic parent re-evaluation (for pacing).
743    last_parent_reeval: Option<crate::time::Instant>,
744
745    // === Congestion Logging ===
746    /// Timestamp of last congestion detection log (rate-limited to 5s).
747    last_congestion_log: Option<std::time::Instant>,
748
749    // === Mesh Size Estimate ===
750    /// Cached estimated mesh size (computed once per tick from bloom filters).
751    estimated_mesh_size: Option<u64>,
752    /// Timestamp of last mesh size log emission.
753    last_mesh_size_log: Option<std::time::Instant>,
754
755    // === Bloom Self-Plausibility ===
756    /// Rate-limit state for the self-plausibility WARN. Fires at most
757    /// once per 60s globally when our own outgoing FilterAnnounce has
758    /// an FPR above `node.bloom.max_inbound_fpr`, signalling either
759    /// aggregation drift or an ingress bypass.
760    last_self_warn: Option<std::time::Instant>,
761
762    // === Local Outbound Liveness ===
763    /// Set when a `transport.send` returned a local-side io error
764    /// (`NetworkUnreachable` / `HostUnreachable` / `AddrNotAvailable`),
765    /// cleared on the next successful send. Used by
766    /// `check_link_heartbeats` to compress the dead-timeout to
767    /// `fast_link_dead_timeout_secs` while our outbound is observed
768    /// broken — direct kernel evidence beats waiting on receive-silence.
769    last_local_send_failure_at: Option<std::time::Instant>,
770    /// Set when the rx loop could not complete its 1s maintenance work
771    /// inside the watchdog timeout. Link-dead detection may be valid during
772    /// overload, but traversal cooldown should not punish a path just because
773    /// our own scheduler/worker queue was late.
774    last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
775
776    // === Display Names ===
777    /// Human-readable names for configured peers (alias or short npub).
778    /// Populated at startup from peer config.
779    peer_aliases: HashMap<NodeAddr, String>,
780    /// Scheduler weight for explicitly configured peers. Built when config
781    /// changes so the packet hot path only does a NodeAddr hash lookup.
782    configured_peer_send_weights: HashMap<NodeAddr, u8>,
783
784    /// Reloadable peer ACL state from standard allow/deny files.
785    peer_acl: acl::PeerAclReloader,
786
787    // === Host Map ===
788    /// Static hostname → npub mapping for DNS resolution.
789    /// Built at construction from peer aliases and /etc/fips/hosts.
790    host_map: Arc<HostMap>,
791}
792
793impl Node {
794    /// Create a new node from configuration.
795    pub fn new(config: Config) -> Result<Self, NodeError> {
796        config.validate()?;
797        let identity = config.create_identity()?;
798        let node_addr = *identity.node_addr();
799        let is_leaf_only = config.is_leaf_only();
800
801        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
802        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
803
804        let mut startup_epoch = [0u8; 8];
805        rand::rng().fill_bytes(&mut startup_epoch);
806
807        let mut bloom_state = if is_leaf_only {
808            BloomState::leaf_only(node_addr)
809        } else {
810            BloomState::new(node_addr)
811        };
812        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
813
814        let tun_state = if config.tun.enabled {
815            TunState::Configured
816        } else {
817            TunState::Disabled
818        };
819
820        // Initialize tree state with signed self-declaration
821        let mut tree_state = TreeState::new(node_addr);
822        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
823        tree_state.set_hold_down(config.node.tree.hold_down_secs);
824        tree_state.set_flap_dampening(
825            config.node.tree.flap_threshold,
826            config.node.tree.flap_window_secs,
827            config.node.tree.flap_dampening_secs,
828        );
829        tree_state
830            .sign_declaration(&identity)
831            .expect("signing own declaration should never fail");
832
833        let coord_cache = CoordCache::new(
834            config.node.cache.coord_size,
835            config.node.cache.coord_ttl_secs * 1000,
836        );
837        let rl = &config.node.rate_limit;
838        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
839            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
840            config.node.limits.max_pending_inbound,
841        );
842
843        let max_connections = config.node.limits.max_connections;
844        let max_peers = config.node.limits.max_peers;
845        let max_links = config.node.limits.max_links;
846        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
847        let backoff_base_secs = config.node.discovery.backoff_base_secs;
848        let backoff_max_secs = config.node.discovery.backoff_max_secs;
849        let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
850
851        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
852        let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
853
854        Ok(Self {
855            identity,
856            startup_epoch,
857            started_at: std::time::Instant::now(),
858            config,
859            state: NodeState::Created,
860            is_leaf_only,
861            tree_state,
862            bloom_state,
863            coord_cache,
864            learned_routes: LearnedRouteTable::default(),
865            recent_requests: HashMap::new(),
866            transports: HashMap::new(),
867            transport_drops: HashMap::new(),
868            links: HashMap::new(),
869            addr_to_link: HashMap::new(),
870            packet_tx: None,
871            packet_rx: None,
872            connections: HashMap::new(),
873            peers: HashMap::new(),
874            sessions: HashMap::new(),
875            identity_cache: HashMap::new(),
876            pending_tun_packets: HashMap::new(),
877            pending_endpoint_data: HashMap::new(),
878            pending_lookups: HashMap::new(),
879            max_connections,
880            max_peers,
881            max_links,
882            next_link_id: 1,
883            next_transport_id: 1,
884            stats: stats::NodeStats::new(),
885            stats_history: stats_history::StatsHistory::new(),
886            tun_state,
887            tun_name: None,
888            tun_tx: None,
889            tun_outbound_rx: None,
890            external_packet_tx: None,
891            endpoint_command_rx: None,
892            endpoint_event_tx: None,
893            encrypt_workers: None,
894            decrypt_workers: None,
895            decrypt_registered_sessions: std::collections::HashSet::new(),
896            decrypt_fallback_tx,
897            decrypt_fallback_rx,
898            tun_reader_handle: None,
899            tun_writer_handle: None,
900            #[cfg(target_os = "macos")]
901            tun_shutdown_fd: None,
902            dns_identity_rx: None,
903            dns_task: None,
904            index_allocator: IndexAllocator::new(),
905            peers_by_index: HashMap::new(),
906            pending_outbound: HashMap::new(),
907            msg1_rate_limiter,
908            icmp_rate_limiter: IcmpRateLimiter::new(),
909            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
910            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
911                std::time::Duration::from_millis(coords_response_interval_ms),
912            ),
913            discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
914            discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
915                std::time::Duration::from_secs(forward_min_interval_secs),
916            ),
917            pending_connects: Vec::new(),
918            retry_pending: HashMap::new(),
919            nostr_discovery: None,
920            nostr_discovery_started_at_ms: None,
921            lan_discovery: None,
922            local_instance_registry: None,
923            local_instance_started_at_ms: None,
924            last_local_instance_publish_ms: None,
925            last_local_instance_scan_ms: None,
926            startup_open_discovery_sweep_done: false,
927            bootstrap_transports: HashSet::new(),
928            bootstrap_transport_npubs: HashMap::new(),
929            discovery_fallback_transit_blocked_peers: HashSet::new(),
930            last_parent_reeval: None,
931            last_congestion_log: None,
932            estimated_mesh_size: None,
933            last_mesh_size_log: None,
934            last_self_warn: None,
935            last_local_send_failure_at: None,
936            last_rx_loop_maintenance_timeout_at: None,
937            peer_aliases: HashMap::new(),
938            configured_peer_send_weights,
939            peer_acl,
940            host_map,
941            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
942        })
943    }
944
945    /// Create a node with a specific identity.
946    ///
947    /// This constructor validates cross-field config invariants before
948    /// constructing the node, same as [`Node::new`].
949    pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
950        config.validate()?;
951        let node_addr = *identity.node_addr();
952
953        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
954        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
955
956        let mut startup_epoch = [0u8; 8];
957        rand::rng().fill_bytes(&mut startup_epoch);
958
959        let tun_state = if config.tun.enabled {
960            TunState::Configured
961        } else {
962            TunState::Disabled
963        };
964
965        // Initialize tree state with signed self-declaration
966        let mut tree_state = TreeState::new(node_addr);
967        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
968        tree_state.set_hold_down(config.node.tree.hold_down_secs);
969        tree_state.set_flap_dampening(
970            config.node.tree.flap_threshold,
971            config.node.tree.flap_window_secs,
972            config.node.tree.flap_dampening_secs,
973        );
974        tree_state
975            .sign_declaration(&identity)
976            .expect("signing own declaration should never fail");
977
978        let mut bloom_state = BloomState::new(node_addr);
979        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
980
981        let coord_cache = CoordCache::new(
982            config.node.cache.coord_size,
983            config.node.cache.coord_ttl_secs * 1000,
984        );
985        let rl = &config.node.rate_limit;
986        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
987            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
988            config.node.limits.max_pending_inbound,
989        );
990
991        let max_connections = config.node.limits.max_connections;
992        let max_peers = config.node.limits.max_peers;
993        let max_links = config.node.limits.max_links;
994        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
995
996        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
997        let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
998
999        Ok(Self {
1000            identity,
1001            startup_epoch,
1002            started_at: std::time::Instant::now(),
1003            config,
1004            state: NodeState::Created,
1005            is_leaf_only: false,
1006            tree_state,
1007            bloom_state,
1008            coord_cache,
1009            learned_routes: LearnedRouteTable::default(),
1010            recent_requests: HashMap::new(),
1011            transports: HashMap::new(),
1012            transport_drops: HashMap::new(),
1013            links: HashMap::new(),
1014            addr_to_link: HashMap::new(),
1015            packet_tx: None,
1016            packet_rx: None,
1017            connections: HashMap::new(),
1018            peers: HashMap::new(),
1019            sessions: HashMap::new(),
1020            identity_cache: HashMap::new(),
1021            pending_tun_packets: HashMap::new(),
1022            pending_endpoint_data: HashMap::new(),
1023            pending_lookups: HashMap::new(),
1024            max_connections,
1025            max_peers,
1026            max_links,
1027            next_link_id: 1,
1028            next_transport_id: 1,
1029            stats: stats::NodeStats::new(),
1030            stats_history: stats_history::StatsHistory::new(),
1031            tun_state,
1032            tun_name: None,
1033            tun_tx: None,
1034            tun_outbound_rx: None,
1035            external_packet_tx: None,
1036            endpoint_command_rx: None,
1037            endpoint_event_tx: None,
1038            encrypt_workers: None,
1039            decrypt_workers: None,
1040            decrypt_registered_sessions: std::collections::HashSet::new(),
1041            decrypt_fallback_tx,
1042            decrypt_fallback_rx,
1043            tun_reader_handle: None,
1044            tun_writer_handle: None,
1045            #[cfg(target_os = "macos")]
1046            tun_shutdown_fd: None,
1047            dns_identity_rx: None,
1048            dns_task: None,
1049            index_allocator: IndexAllocator::new(),
1050            peers_by_index: HashMap::new(),
1051            pending_outbound: HashMap::new(),
1052            msg1_rate_limiter,
1053            icmp_rate_limiter: IcmpRateLimiter::new(),
1054            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1055            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1056                std::time::Duration::from_millis(coords_response_interval_ms),
1057            ),
1058            discovery_backoff: DiscoveryBackoff::new(),
1059            discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1060            pending_connects: Vec::new(),
1061            retry_pending: HashMap::new(),
1062            nostr_discovery: None,
1063            nostr_discovery_started_at_ms: None,
1064            lan_discovery: None,
1065            local_instance_registry: None,
1066            local_instance_started_at_ms: None,
1067            last_local_instance_publish_ms: None,
1068            last_local_instance_scan_ms: None,
1069            startup_open_discovery_sweep_done: false,
1070            bootstrap_transports: HashSet::new(),
1071            bootstrap_transport_npubs: HashMap::new(),
1072            discovery_fallback_transit_blocked_peers: HashSet::new(),
1073            last_parent_reeval: None,
1074            last_congestion_log: None,
1075            estimated_mesh_size: None,
1076            last_mesh_size_log: None,
1077            last_self_warn: None,
1078            last_local_send_failure_at: None,
1079            last_rx_loop_maintenance_timeout_at: None,
1080            peer_aliases: HashMap::new(),
1081            configured_peer_send_weights,
1082            peer_acl,
1083            host_map,
1084            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1085        })
1086    }
1087
1088    /// Create a leaf-only node (simplified state).
1089    pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1090        let mut node = Self::new(config)?;
1091        node.is_leaf_only = true;
1092        node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1093        Ok(node)
1094    }
1095
1096    fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1097        let base_host_map = HostMap::from_peer_configs(config.peers());
1098        if !config.node.system_files_enabled {
1099            return (
1100                Arc::new(base_host_map.clone()),
1101                acl::PeerAclReloader::memory_only(base_host_map),
1102            );
1103        }
1104
1105        let mut host_map = base_host_map.clone();
1106        let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1107        let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1108            crate::upper::hosts::DEFAULT_HOSTS_PATH,
1109        ));
1110        host_map.merge(hosts_file);
1111        let peer_acl = acl::PeerAclReloader::with_alias_sources(
1112            std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1113            std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1114            base_host_map,
1115            hosts_path,
1116        );
1117        (Arc::new(host_map), peer_acl)
1118    }
1119
1120    fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1121        config
1122            .peers()
1123            .iter()
1124            .filter_map(|peer| {
1125                PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1126                    (
1127                        *identity.node_addr(),
1128                        encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1129                    )
1130                })
1131            })
1132            .collect()
1133    }
1134
1135    fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1136        self.configured_peer_send_weights
1137            .get(peer_addr)
1138            .copied()
1139            .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1140    }
1141
1142    /// Create transport instances from configuration.
1143    ///
1144    /// Returns a vector of TransportHandles for all configured transports.
1145    async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1146        let mut transports = Vec::new();
1147
1148        // Collect UDP configs with optional names to avoid borrow conflicts
1149        let udp_instances: Vec<_> = self
1150            .config
1151            .transports
1152            .udp
1153            .iter()
1154            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1155            .collect();
1156
1157        // Create UDP transport instances
1158        for (name, udp_config) in udp_instances {
1159            let transport_id = self.allocate_transport_id();
1160            let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1161            transports.push(TransportHandle::Udp(udp));
1162        }
1163
1164        #[cfg(feature = "sim-transport")]
1165        {
1166            let sim_instances: Vec<_> = self
1167                .config
1168                .transports
1169                .sim
1170                .iter()
1171                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1172                .collect();
1173
1174            for (name, sim_config) in sim_instances {
1175                let transport_id = self.allocate_transport_id();
1176                let sim = crate::transport::sim::SimTransport::new(
1177                    transport_id,
1178                    name,
1179                    sim_config,
1180                    packet_tx.clone(),
1181                );
1182                transports.push(TransportHandle::Sim(sim));
1183            }
1184        }
1185
1186        // Create Ethernet transport instances where raw-socket support exists.
1187        #[cfg(any(target_os = "linux", target_os = "macos"))]
1188        {
1189            let eth_instances: Vec<_> = self
1190                .config
1191                .transports
1192                .ethernet
1193                .iter()
1194                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1195                .collect();
1196            let xonly = self.identity.pubkey();
1197            for (name, eth_config) in eth_instances {
1198                let mut eth_config = eth_config;
1199                if eth_config.discovery_scope.is_none() {
1200                    eth_config.discovery_scope = self.lan_discovery_scope();
1201                }
1202                let transport_id = self.allocate_transport_id();
1203                let mut eth =
1204                    EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1205                eth.set_local_pubkey(xonly);
1206                transports.push(TransportHandle::Ethernet(eth));
1207            }
1208        }
1209
1210        // Create TCP transport instances
1211        let tcp_instances: Vec<_> = self
1212            .config
1213            .transports
1214            .tcp
1215            .iter()
1216            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1217            .collect();
1218
1219        for (name, tcp_config) in tcp_instances {
1220            let transport_id = self.allocate_transport_id();
1221            let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1222            transports.push(TransportHandle::Tcp(tcp));
1223        }
1224
1225        // Create Tor transport instances
1226        let tor_instances: Vec<_> = self
1227            .config
1228            .transports
1229            .tor
1230            .iter()
1231            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1232            .collect();
1233
1234        for (name, tor_config) in tor_instances {
1235            let transport_id = self.allocate_transport_id();
1236            let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1237            transports.push(TransportHandle::Tor(tor));
1238        }
1239
1240        let webrtc_instances: Vec<_> = self
1241            .config
1242            .transports
1243            .webrtc
1244            .iter()
1245            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1246            .collect();
1247
1248        #[cfg(feature = "webrtc-transport")]
1249        {
1250            for (name, webrtc_config) in webrtc_instances {
1251                let transport_id = self.allocate_transport_id();
1252                match WebRtcTransport::new(
1253                    transport_id,
1254                    name,
1255                    webrtc_config,
1256                    packet_tx.clone(),
1257                    &self.identity,
1258                    &self.config.node.discovery.nostr,
1259                ) {
1260                    Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1261                    Err(err) => {
1262                        warn!(
1263                            transport_id = %transport_id,
1264                            error = %err,
1265                            "failed to initialize WebRTC transport"
1266                        );
1267                    }
1268                }
1269            }
1270        }
1271        #[cfg(not(feature = "webrtc-transport"))]
1272        if !webrtc_instances.is_empty() {
1273            warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1274        }
1275
1276        // Create BLE transport instances
1277        #[cfg(bluer_available)]
1278        {
1279            let ble_instances: Vec<_> = self
1280                .config
1281                .transports
1282                .ble
1283                .iter()
1284                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1285                .collect();
1286
1287            #[cfg(all(bluer_available, not(test)))]
1288            for (name, ble_config) in ble_instances {
1289                let transport_id = self.allocate_transport_id();
1290                let adapter = ble_config.adapter().to_string();
1291                let mtu = ble_config.mtu();
1292                match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1293                    Ok(io) => {
1294                        let mut ble = crate::transport::ble::BleTransport::new(
1295                            transport_id,
1296                            name,
1297                            ble_config,
1298                            io,
1299                            packet_tx.clone(),
1300                        );
1301                        ble.set_local_pubkey(self.identity.pubkey().serialize());
1302                        transports.push(TransportHandle::Ble(ble));
1303                    }
1304                    Err(e) => {
1305                        tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1306                    }
1307                }
1308            }
1309
1310            #[cfg(any(not(bluer_available), test))]
1311            if !ble_instances.is_empty() {
1312                #[cfg(not(test))]
1313                tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1314            }
1315        }
1316
1317        transports
1318    }
1319
1320    /// Find an operational transport that matches the given transport type name.
1321    ///
1322    /// Adopted UDP bootstrap transports are point-to-point sockets handed off
1323    /// from Nostr/STUN traversal. They must not be reused for ordinary
1324    /// `udp host:port` dials discovered through static config, mDNS, or overlay
1325    /// adverts: on macOS a `send_to` through the wrong adopted socket can fail
1326    /// with `EINVAL`, and even on platforms that allow it the packet would use
1327    /// the wrong 5-tuple/NAT mapping. Prefer configured transports and make the
1328    /// choice deterministic by lowest transport id instead of HashMap order.
1329    fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1330        self.transports
1331            .iter()
1332            .filter(|(id, handle)| {
1333                handle.transport_type().name == transport_type
1334                    && handle.is_operational()
1335                    && !self.bootstrap_transports.contains(id)
1336            })
1337            .min_by_key(|(id, _)| id.as_u32())
1338            .map(|(id, _)| *id)
1339    }
1340
1341    /// Resolve an Ethernet peer address ("interface/mac") to a transport ID
1342    /// and binary TransportAddr.
1343    ///
1344    /// Finds the Ethernet transport instance bound to the named interface
1345    /// and parses the MAC portion into a 6-byte TransportAddr.
1346    #[allow(unused_variables)]
1347    fn resolve_ethernet_addr(
1348        &self,
1349        addr_str: &str,
1350    ) -> Result<(TransportId, TransportAddr), NodeError> {
1351        #[cfg(any(target_os = "linux", target_os = "macos"))]
1352        {
1353            let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1354                NodeError::NoTransportForType(format!(
1355                    "invalid Ethernet address format '{}': expected 'interface/mac'",
1356                    addr_str
1357                ))
1358            })?;
1359
1360            // Find the Ethernet transport bound to this interface
1361            let transport_id = self
1362                .transports
1363                .iter()
1364                .find(|(_, handle)| {
1365                    handle.transport_type().name == "ethernet"
1366                        && handle.is_operational()
1367                        && handle.interface_name() == Some(iface)
1368                })
1369                .map(|(id, _)| *id)
1370                .ok_or_else(|| {
1371                    NodeError::NoTransportForType(format!(
1372                        "no operational Ethernet transport for interface '{}'",
1373                        iface
1374                    ))
1375                })?;
1376
1377            let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1378                NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1379            })?;
1380
1381            Ok((transport_id, TransportAddr::from_bytes(&mac)))
1382        }
1383        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1384        {
1385            Err(NodeError::NoTransportForType(
1386                "Ethernet transport is not supported on this platform".to_string(),
1387            ))
1388        }
1389    }
1390
1391    /// Resolve a BLE address string (`"adapter/AA:BB:CC:DD:EE:FF"`) to a
1392    /// (TransportId, TransportAddr) pair by finding the BLE transport
1393    /// instance matching the adapter name.
1394    #[cfg(bluer_available)]
1395    fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1396        let ta = TransportAddr::from_string(addr_str);
1397        let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1398            NodeError::NoTransportForType(format!(
1399                "invalid BLE address format '{}': expected 'adapter/mac'",
1400                addr_str
1401            ))
1402        })?;
1403
1404        // Find the BLE transport for this adapter
1405        let transport_id = self
1406            .transports
1407            .iter()
1408            .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1409            .map(|(id, _)| *id)
1410            .ok_or_else(|| {
1411                NodeError::NoTransportForType(format!(
1412                    "no operational BLE transport for adapter '{}'",
1413                    adapter
1414                ))
1415            })?;
1416
1417        // Validate the address format
1418        crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1419            NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1420        })?;
1421
1422        Ok((transport_id, TransportAddr::from_string(addr_str)))
1423    }
1424
1425    // === Identity Accessors ===
1426
1427    /// Get this node's identity.
1428    pub fn identity(&self) -> &Identity {
1429        &self.identity
1430    }
1431
1432    /// Get this node's NodeAddr.
1433    pub fn node_addr(&self) -> &NodeAddr {
1434        self.identity.node_addr()
1435    }
1436
1437    /// Get this node's npub.
1438    pub fn npub(&self) -> String {
1439        self.identity.npub()
1440    }
1441
1442    /// Return a human-readable display name for a NodeAddr.
1443    ///
1444    /// Lookup order:
1445    /// 1. Host map hostname (from peer aliases + /etc/fips/hosts)
1446    /// 2. Configured peer alias or short npub (from startup map)
1447    /// 3. Active peer's short npub (e.g., inbound peer not in config)
1448    /// 4. Session endpoint's short npub (end-to-end, may not be direct peer)
1449    /// 5. Truncated NodeAddr hex (unknown address)
1450    pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1451        if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1452            return hostname.to_string();
1453        }
1454        if let Some(name) = self.peer_aliases.get(addr) {
1455            return name.clone();
1456        }
1457        if let Some(peer) = self.peers.get(addr) {
1458            return peer.identity().short_npub();
1459        }
1460        if let Some(entry) = self.sessions.get(addr) {
1461            let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1462            return PeerIdentity::from_pubkey(xonly).short_npub();
1463        }
1464        addr.short_hex()
1465    }
1466
1467    /// Tear down a `peers_by_index` entry **and** keep the shard-owned
1468    /// decrypt-worker state coherent: removes the same `cache_key`
1469    /// from the registered-sessions tracking set and tells the
1470    /// assigned shard worker to drop its `OwnedSessionState` entry.
1471    ///
1472    /// Use this instead of a bare `self.peers_by_index.remove(&key)`
1473    /// at every session-lifecycle teardown site (rekey cross-connection
1474    /// swap, peer disconnect, dispatch session-rotation) so the worker
1475    /// doesn't keep stale ciphers / replay windows around. The
1476    /// follow-up `RegisterSession` for the NEW key (if any) will then
1477    /// install the fresh state on the same shard.
1478    pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1479        // Find the peer that owns this index BEFORE removing it from
1480        // the index map, so we can decide whether the deregistration
1481        // also tears down the peer's connected UDP socket.
1482        let owning_peer = self.peers_by_index.get(&cache_key).copied();
1483        self.peers_by_index.remove(&cache_key);
1484        if self.decrypt_registered_sessions.remove(&cache_key)
1485            && let Some(workers) = self.decrypt_workers.as_ref()
1486        {
1487            workers.unregister_session(cache_key);
1488        }
1489        // Tear down the per-peer connected UDP socket *only* if no
1490        // other peers_by_index entry still resolves to this peer.
1491        // Rekey drain calls into this helper with the OLD session
1492        // index while the NEW index is already installed and points
1493        // at the same peer — there the connect()-ed 5-tuple is
1494        // still valid for the new session and we must not close it.
1495        // Peer-teardown sites (CrossConnection swap, stale-index
1496        // fall-through in encrypted.rs, disconnect handler) call
1497        // here when this is the peer's last index, so the connected
1498        // socket goes away with the peer.
1499        if let Some(peer_addr) = owning_peer {
1500            let peer_has_other_index = self
1501                .peers_by_index
1502                .values()
1503                .any(|other| *other == peer_addr);
1504            if !peer_has_other_index {
1505                self.clear_connected_udp_for_peer(&peer_addr);
1506            }
1507        }
1508    }
1509
1510    /// Ensure the current FMP receive index resolves to this peer.
1511    ///
1512    /// Rekey msg1/msg2 handlers pre-register the pending index before
1513    /// cutover, but losing that registration in a debug build used to
1514    /// panic in the cutover path. Repairing the map here is safe: the
1515    /// peer has already promoted the pending session, and the decrypt
1516    /// worker registration immediately after cutover depends on the
1517    /// same `(transport_id, our_index)` key.
1518    pub(in crate::node) fn ensure_current_session_index_registered(
1519        &mut self,
1520        node_addr: &NodeAddr,
1521        context: &'static str,
1522    ) -> bool {
1523        let Some(peer) = self.peers.get(node_addr) else {
1524            return false;
1525        };
1526        let Some(transport_id) = peer.transport_id() else {
1527            warn!(
1528                peer = %self.peer_display_name(node_addr),
1529                context,
1530                "Cannot register current session index without transport id"
1531            );
1532            return false;
1533        };
1534        let Some(our_index) = peer.our_index() else {
1535            warn!(
1536                peer = %self.peer_display_name(node_addr),
1537                context,
1538                "Cannot register current session index without local index"
1539            );
1540            return false;
1541        };
1542
1543        let cache_key = (transport_id, our_index.as_u32());
1544        match self.peers_by_index.get(&cache_key).copied() {
1545            Some(existing) if existing == *node_addr => true,
1546            Some(existing) => {
1547                warn!(
1548                    peer = %self.peer_display_name(node_addr),
1549                    previous_owner = %self.peer_display_name(&existing),
1550                    transport_id = %transport_id,
1551                    our_index = %our_index,
1552                    context,
1553                    "Repairing current session index with stale owner"
1554                );
1555                self.peers_by_index.insert(cache_key, *node_addr);
1556                true
1557            }
1558            None => {
1559                warn!(
1560                    peer = %self.peer_display_name(node_addr),
1561                    transport_id = %transport_id,
1562                    our_index = %our_index,
1563                    context,
1564                    "Repairing missing current session index"
1565                );
1566                self.peers_by_index.insert(cache_key, *node_addr);
1567                true
1568            }
1569        }
1570    }
1571
1572    // === Configuration ===
1573
1574    /// Get the configuration.
1575    pub fn config(&self) -> &Config {
1576        &self.config
1577    }
1578
1579    /// Calculate the effective IPv6 MTU that can be sent over FIPS.
1580    ///
1581    /// Delegates to `upper::icmp::effective_ipv6_mtu()` with this node's
1582    /// transport MTU. Returns the maximum IPv6 packet size (including
1583    /// IPv6 header) that can be transmitted through the FIPS mesh.
1584    pub fn effective_ipv6_mtu(&self) -> u16 {
1585        crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1586    }
1587
1588    /// Get the transport MTU governing the global TUN-boundary MSS clamp.
1589    ///
1590    /// Returns the **minimum** MTU across all operational transports, or
1591    /// 1280 (IPv6 minimum) as fallback. Used for initial TUN configuration
1592    /// where a specific egress transport isn't yet known: the resulting
1593    /// `effective_ipv6_mtu` (transport_mtu - 77) and `max_mss`
1594    /// (effective_mtu - 60) form a conservative ceiling that fits ANY
1595    /// configured-transport's egress, eliminating PMTU-D black holes that
1596    /// would otherwise occur when a flow's actual egress is smaller than
1597    /// the clamp ceiling assumed at TUN init.
1598    ///
1599    /// Returning the smallest (rather than the first-iterated, which used
1600    /// to vary across HashMap iteration order + async-startup race) makes
1601    /// the clamp deterministic across daemon restarts.
1602    ///
1603    /// See `ISSUE-2026-0011` for the empirical investigation.
1604    pub fn transport_mtu(&self) -> u16 {
1605        let min_operational = self
1606            .transports
1607            .values()
1608            .filter(|h| h.is_operational())
1609            .map(|h| h.mtu())
1610            .min();
1611        if let Some(mtu) = min_operational {
1612            return mtu;
1613        }
1614        // Fallback to config: try UDP first, then Ethernet
1615        if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1616            return cfg.mtu();
1617        }
1618        1280
1619    }
1620
1621    // === State ===
1622
1623    /// Get the node state.
1624    pub fn state(&self) -> NodeState {
1625        self.state
1626    }
1627
1628    /// Get the node uptime.
1629    pub fn uptime(&self) -> std::time::Duration {
1630        self.started_at.elapsed()
1631    }
1632
1633    /// Check if node is operational.
1634    pub fn is_running(&self) -> bool {
1635        self.state.is_operational()
1636    }
1637
1638    /// Check if this is a leaf-only node.
1639    pub fn is_leaf_only(&self) -> bool {
1640        self.is_leaf_only
1641    }
1642
1643    // === Tree State ===
1644
1645    /// Get the tree state.
1646    pub fn tree_state(&self) -> &TreeState {
1647        &self.tree_state
1648    }
1649
1650    /// Get mutable tree state.
1651    pub fn tree_state_mut(&mut self) -> &mut TreeState {
1652        &mut self.tree_state
1653    }
1654
1655    // === Bloom State ===
1656
1657    /// Get the Bloom filter state.
1658    pub fn bloom_state(&self) -> &BloomState {
1659        &self.bloom_state
1660    }
1661
1662    /// Get mutable Bloom filter state.
1663    pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1664        &mut self.bloom_state
1665    }
1666
1667    // === Mesh Size Estimate ===
1668
1669    /// Get the cached estimated mesh size.
1670    pub fn estimated_mesh_size(&self) -> Option<u64> {
1671        self.estimated_mesh_size
1672    }
1673
1674    /// Compute and cache the estimated mesh size from bloom filters.
1675    ///
1676    /// Uses the spanning tree partition: parent's filter covers nodes reachable
1677    /// upward, children's filters cover disjoint subtrees downward. The sum
1678    /// of estimated entry counts plus one (self) approximates total network size.
1679    pub(crate) fn compute_mesh_size(&mut self) {
1680        let my_addr = *self.tree_state.my_node_addr();
1681        let parent_id = *self.tree_state.my_declaration().parent_id();
1682        let is_root = self.tree_state.is_root();
1683
1684        let max_fpr = self.config.node.bloom.max_inbound_fpr;
1685        let mut total: f64 = 1.0; // count self
1686        let mut child_count: u32 = 0;
1687        let mut has_data = false;
1688
1689        // Parent's filter: nodes reachable upward through the tree.
1690        // If any contributing filter is above the FPR cap, we refuse to
1691        // estimate rather than substitute a partial/biased aggregate —
1692        // Node.estimated_mesh_size is already Option<u64> and consumers
1693        // (control socket, fipstop, periodic debug log) handle None.
1694        if !is_root
1695            && let Some(parent) = self.peers.get(&parent_id)
1696            && let Some(filter) = parent.inbound_filter()
1697        {
1698            match filter.estimated_count(max_fpr) {
1699                Some(n) => {
1700                    total += n;
1701                    has_data = true;
1702                }
1703                None => {
1704                    self.estimated_mesh_size = None;
1705                    return;
1706                }
1707            }
1708        }
1709
1710        // Children's filters: each child's subtree is disjoint
1711        for (peer_addr, peer) in &self.peers {
1712            if peer_addr == &parent_id {
1713                continue;
1714            }
1715            if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1716                && *decl.parent_id() == my_addr
1717            {
1718                child_count += 1;
1719                if let Some(filter) = peer.inbound_filter() {
1720                    match filter.estimated_count(max_fpr) {
1721                        Some(n) => {
1722                            total += n;
1723                            has_data = true;
1724                        }
1725                        None => {
1726                            self.estimated_mesh_size = None;
1727                            return;
1728                        }
1729                    }
1730                }
1731            }
1732        }
1733
1734        if !has_data {
1735            self.estimated_mesh_size = None;
1736            return;
1737        }
1738
1739        let size = total.round() as u64;
1740        self.estimated_mesh_size = Some(size);
1741
1742        // Periodic logging (reuse MMP default interval: 30s)
1743        let now = std::time::Instant::now();
1744        let should_log = match self.last_mesh_size_log {
1745            None => true,
1746            Some(last) => {
1747                now.duration_since(last)
1748                    >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1749            }
1750        };
1751        if should_log {
1752            tracing::debug!(
1753                estimated_mesh_size = size,
1754                peers = self.peers.len(),
1755                children = child_count,
1756                "Mesh size estimate"
1757            );
1758            self.last_mesh_size_log = Some(now);
1759        }
1760    }
1761
1762    // === Coord Cache ===
1763
1764    /// Get the coordinate cache.
1765    pub fn coord_cache(&self) -> &CoordCache {
1766        &self.coord_cache
1767    }
1768
1769    /// Get mutable coordinate cache.
1770    pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1771        &mut self.coord_cache
1772    }
1773
1774    // === Node Statistics ===
1775
1776    /// Get the node statistics.
1777    pub fn stats(&self) -> &stats::NodeStats {
1778        &self.stats
1779    }
1780
1781    /// Get mutable node statistics.
1782    pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1783        &mut self.stats
1784    }
1785
1786    /// Get the stats history collector.
1787    pub fn stats_history(&self) -> &stats_history::StatsHistory {
1788        &self.stats_history
1789    }
1790
1791    /// Sample the current node state into the stats history ring.
1792    /// Called once per tick from the RX loop.
1793    pub(crate) fn record_stats_history(&mut self) {
1794        let fwd = &self.stats.forwarding;
1795        let peers_with_mmp: Vec<f64> = self
1796            .peers
1797            .values()
1798            .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1799            .collect();
1800        let loss_rate = if peers_with_mmp.is_empty() {
1801            0.0
1802        } else {
1803            peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1804        };
1805
1806        let snap = stats_history::Snapshot {
1807            mesh_size: self.estimated_mesh_size,
1808            tree_depth: self.tree_state.my_coords().depth() as u32,
1809            peer_count: self.peers.len() as u64,
1810            parent_switches_total: self.stats.tree.parent_switches,
1811            bytes_in_total: fwd.received_bytes,
1812            bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1813            packets_in_total: fwd.received_packets,
1814            packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1815            loss_rate,
1816            active_sessions: self.sessions.len() as u64,
1817        };
1818
1819        let now = std::time::Instant::now();
1820        let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1821            .peers
1822            .values()
1823            .map(|p| {
1824                let stats = p.link_stats();
1825                let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1826                    Some(m) => (
1827                        m.metrics.srtt_ms(),
1828                        Some(m.metrics.loss_rate()),
1829                        m.receiver.ecn_ce_count() as u64,
1830                    ),
1831                    None => (None, None, 0),
1832                };
1833                stats_history::PeerSnapshot {
1834                    node_addr: *p.node_addr(),
1835                    last_seen: now,
1836                    srtt_ms,
1837                    loss_rate,
1838                    bytes_in_total: stats.bytes_recv,
1839                    bytes_out_total: stats.bytes_sent,
1840                    packets_in_total: stats.packets_recv,
1841                    packets_out_total: stats.packets_sent,
1842                    ecn_ce_total: ecn_ce,
1843                }
1844            })
1845            .collect();
1846
1847        self.stats_history.tick(now, &snap, &peer_snaps);
1848    }
1849
1850    // === TUN Interface ===
1851
1852    /// Get the TUN state.
1853    pub fn tun_state(&self) -> TunState {
1854        self.tun_state
1855    }
1856
1857    /// Get the TUN interface name, if active.
1858    pub fn tun_name(&self) -> Option<&str> {
1859        self.tun_name.as_deref()
1860    }
1861
1862    // === Resource Limits ===
1863
1864    /// Set the maximum number of connections (handshake phase).
1865    pub fn set_max_connections(&mut self, max: usize) {
1866        self.max_connections = max;
1867    }
1868
1869    /// Set the maximum number of peers (authenticated).
1870    pub fn set_max_peers(&mut self, max: usize) {
1871        self.max_peers = max;
1872    }
1873
1874    /// Returns false when starting more outbound work would exceed a resource
1875    /// cap. A cap of `0` means uncapped.
1876    pub(crate) fn outbound_admission_check(&self) -> bool {
1877        let connection_used = self
1878            .connections
1879            .len()
1880            .saturating_add(self.pending_connects.len());
1881        let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
1882        let connection_allowed =
1883            self.max_connections == 0 || connection_used < self.max_connections;
1884        let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1885        peer_allowed && connection_allowed && link_allowed
1886    }
1887
1888    /// Admission for public/open-discovery outbound work. This includes the
1889    /// general connection/link caps and, when open Nostr discovery is enabled,
1890    /// the configured non-peer budget.
1891    pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
1892        if !self.outbound_admission_check() {
1893            return false;
1894        }
1895
1896        let nostr = &self.config.node.discovery.nostr;
1897        if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
1898            return true;
1899        }
1900
1901        let configured_npubs = self
1902            .config
1903            .peers()
1904            .iter()
1905            .map(|peer| peer.npub.clone())
1906            .collect::<HashSet<_>>();
1907        self.open_discovery_enqueue_budget(&configured_npubs) > 0
1908    }
1909
1910    /// Like `outbound_admission_check`, but for racing a better path to a
1911    /// peer that is already authenticated. This may temporarily add a
1912    /// connection/link, but it does not consume a new peer slot.
1913    pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
1914        let connection_used = self
1915            .connections
1916            .len()
1917            .saturating_add(self.pending_connects.len());
1918        let connection_allowed =
1919            self.max_connections == 0 || connection_used < self.max_connections;
1920        let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1921        connection_allowed && link_allowed
1922    }
1923
1924    /// Set the maximum number of links.
1925    pub fn set_max_links(&mut self, max: usize) {
1926        self.max_links = max;
1927    }
1928
1929    // === Counts ===
1930
1931    /// Number of pending connections (handshake in progress).
1932    pub fn connection_count(&self) -> usize {
1933        self.connections.len()
1934    }
1935
1936    /// Number of authenticated peers.
1937    pub fn peer_count(&self) -> usize {
1938        self.peers.len()
1939    }
1940
1941    /// Number of active links.
1942    pub fn link_count(&self) -> usize {
1943        self.links.len()
1944    }
1945
1946    /// Number of active transports.
1947    pub fn transport_count(&self) -> usize {
1948        self.transports.len()
1949    }
1950
1951    // === Transport Management ===
1952
1953    /// Allocate a new transport ID.
1954    pub fn allocate_transport_id(&mut self) -> TransportId {
1955        let id = TransportId::new(self.next_transport_id);
1956        self.next_transport_id += 1;
1957        id
1958    }
1959
1960    /// Get a transport by ID.
1961    pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1962        self.transports.get(id)
1963    }
1964
1965    /// Get mutable transport by ID.
1966    pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1967        self.transports.get_mut(id)
1968    }
1969
1970    /// Iterate over transport IDs.
1971    pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1972        self.transports.keys()
1973    }
1974
1975    /// Get the packet receiver for the event loop.
1976    pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1977        self.packet_rx.as_mut()
1978    }
1979
1980    // === Link Management ===
1981
1982    /// Allocate a new link ID.
1983    pub fn allocate_link_id(&mut self) -> LinkId {
1984        let id = LinkId::new(self.next_link_id);
1985        self.next_link_id += 1;
1986        id
1987    }
1988
1989    /// Add a link.
1990    pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1991        if self.max_links > 0 && self.links.len() >= self.max_links {
1992            return Err(NodeError::MaxLinksExceeded {
1993                max: self.max_links,
1994            });
1995        }
1996        let link_id = link.link_id();
1997        let transport_id = link.transport_id();
1998        let remote_addr = link.remote_addr().clone();
1999
2000        self.links.insert(link_id, link);
2001        self.addr_to_link
2002            .insert((transport_id, remote_addr), link_id);
2003        Ok(())
2004    }
2005
2006    /// Get a link by ID.
2007    pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2008        self.links.get(link_id)
2009    }
2010
2011    /// Get a mutable link by ID.
2012    pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2013        self.links.get_mut(link_id)
2014    }
2015
2016    /// Find link ID by transport address.
2017    pub fn find_link_by_addr(
2018        &self,
2019        transport_id: TransportId,
2020        addr: &TransportAddr,
2021    ) -> Option<LinkId> {
2022        self.addr_to_link
2023            .get(&(transport_id, addr.clone()))
2024            .copied()
2025    }
2026
2027    /// Remove a link.
2028    ///
2029    /// Only removes the addr_to_link reverse lookup if it still points to this
2030    /// link. In cross-connection scenarios, a newer link may have replaced the
2031    /// entry for the same address.
2032    pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2033        if let Some(link) = self.links.remove(link_id) {
2034            // Clean up reverse lookup only if it still maps to this link
2035            let key = (link.transport_id(), link.remote_addr().clone());
2036            if self.addr_to_link.get(&key) == Some(link_id) {
2037                self.addr_to_link.remove(&key);
2038            }
2039            Some(link)
2040        } else {
2041            None
2042        }
2043    }
2044
2045    pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2046        if !self.bootstrap_transports.contains(&transport_id) {
2047            return;
2048        }
2049
2050        let transport_in_use = self
2051            .links
2052            .values()
2053            .any(|link| link.transport_id() == transport_id)
2054            || self
2055                .connections
2056                .values()
2057                .any(|conn| conn.transport_id() == Some(transport_id))
2058            || self
2059                .peers
2060                .values()
2061                .any(|peer| peer.transport_id() == Some(transport_id))
2062            || self
2063                .pending_connects
2064                .iter()
2065                .any(|pending| pending.transport_id == transport_id);
2066
2067        if transport_in_use {
2068            return;
2069        }
2070
2071        tracing::debug!(
2072            transport_id = %transport_id,
2073            "bootstrap transport has no remaining references; dropping"
2074        );
2075
2076        self.bootstrap_transports.remove(&transport_id);
2077        self.bootstrap_transport_npubs.remove(&transport_id);
2078        self.transport_drops.remove(&transport_id);
2079        self.transports.remove(&transport_id);
2080    }
2081
2082    /// Iterate over all links.
2083    pub fn links(&self) -> impl Iterator<Item = &Link> {
2084        self.links.values()
2085    }
2086
2087    // === Connection Management (Handshake Phase) ===
2088
2089    /// Add a pending connection.
2090    pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2091        let link_id = connection.link_id();
2092
2093        if self.connections.contains_key(&link_id) {
2094            return Err(NodeError::ConnectionAlreadyExists(link_id));
2095        }
2096
2097        if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2098            return Err(NodeError::MaxConnectionsExceeded {
2099                max: self.max_connections,
2100            });
2101        }
2102
2103        self.connections.insert(link_id, connection);
2104        Ok(())
2105    }
2106
2107    /// Get a connection by LinkId.
2108    pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2109        self.connections.get(link_id)
2110    }
2111
2112    /// Get a mutable connection by LinkId.
2113    pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2114        self.connections.get_mut(link_id)
2115    }
2116
2117    /// Remove a connection.
2118    pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2119        self.connections.remove(link_id)
2120    }
2121
2122    /// Iterate over all connections.
2123    pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2124        self.connections.values()
2125    }
2126
2127    // === Peer Management (Active Phase) ===
2128
2129    /// Get a peer by NodeAddr.
2130    pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2131        self.peers.get(node_addr)
2132    }
2133
2134    /// Get a mutable peer by NodeAddr.
2135    pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2136        self.peers.get_mut(node_addr)
2137    }
2138
2139    /// Remove a peer.
2140    pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2141        self.peers.remove(node_addr)
2142    }
2143
2144    /// Iterate over all peers.
2145    pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2146        self.peers.values()
2147    }
2148
2149    /// Reference to the Nostr discovery handle if discovery is enabled.
2150    /// Used by control queries (`show_peers` per-peer Nostr-traversal
2151    /// state) to read failure-state without taking shared ownership.
2152    pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2153        self.nostr_discovery.as_deref()
2154    }
2155
2156    /// Iterate over all peer node IDs.
2157    pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2158        self.peers.keys()
2159    }
2160
2161    /// Iterate over peers that can send traffic.
2162    pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2163        self.peers.values().filter(|p| p.can_send())
2164    }
2165
2166    /// Number of peers that can send traffic.
2167    pub fn sendable_peer_count(&self) -> usize {
2168        self.peers.values().filter(|p| p.can_send()).count()
2169    }
2170
2171    pub(crate) fn set_discovery_fallback_transit_allowed(
2172        &mut self,
2173        peer_addr: NodeAddr,
2174        allowed: bool,
2175    ) {
2176        if allowed {
2177            self.discovery_fallback_transit_blocked_peers
2178                .remove(&peer_addr);
2179        } else {
2180            self.discovery_fallback_transit_blocked_peers
2181                .insert(peer_addr);
2182        }
2183    }
2184
2185    pub(crate) fn configured_discovery_fallback_transit(
2186        &self,
2187        peer_addr: &NodeAddr,
2188    ) -> Option<bool> {
2189        self.configured_peer(peer_addr)
2190            .map(|peer| peer.discovery_fallback_transit)
2191    }
2192
2193    pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2194        self.config.peers().iter().find(|peer| {
2195            PeerIdentity::from_npub(&peer.npub)
2196                .ok()
2197                .is_some_and(|identity| identity.node_addr() == peer_addr)
2198        })
2199    }
2200
2201    pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2202        if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2203            return retry_state.peer_config.discovery_fallback_transit;
2204        }
2205
2206        if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2207            return allowed;
2208        }
2209
2210        self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2211    }
2212
2213    // === End-to-End Sessions ===
2214
2215    /// Get a session by remote NodeAddr.
2216    /// Disable the discovery forward rate limiter (for tests).
2217    #[cfg(test)]
2218    pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2219        self.discovery_forward_limiter
2220            .set_interval(std::time::Duration::ZERO);
2221    }
2222
2223    #[cfg(test)]
2224    pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2225        self.sessions.get(remote)
2226    }
2227
2228    /// Get a mutable session by remote NodeAddr.
2229    #[cfg(test)]
2230    pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2231        self.sessions.get_mut(remote)
2232    }
2233
2234    /// Remove a session.
2235    #[cfg(test)]
2236    pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2237        self.sessions.remove(remote)
2238    }
2239
2240    /// Read the path_mtu_lookup entry for a destination FipsAddress.
2241    #[cfg(test)]
2242    pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2243        self.path_mtu_lookup
2244            .read()
2245            .ok()
2246            .and_then(|map| map.get(fips_addr).copied())
2247    }
2248
2249    /// Write a path_mtu_lookup entry directly (for tests that pre-seed the map).
2250    #[cfg(test)]
2251    pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2252        if let Ok(mut map) = self.path_mtu_lookup.write() {
2253            map.insert(fips_addr, mtu);
2254        }
2255    }
2256
2257    /// Number of end-to-end sessions.
2258    pub fn session_count(&self) -> usize {
2259        self.sessions.len()
2260    }
2261
2262    /// Iterate over all session entries (for control queries).
2263    pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2264        self.sessions.iter()
2265    }
2266
2267    // === Identity Cache ===
2268
2269    /// Register a node in the identity cache for FipsAddress → NodeAddr lookup.
2270    pub(crate) fn register_identity(
2271        &mut self,
2272        node_addr: NodeAddr,
2273        pubkey: secp256k1::PublicKey,
2274    ) -> bool {
2275        let mut prefix = [0u8; 15];
2276        prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2277        if let Some(entry) = self.identity_cache.get(&prefix)
2278            && entry.node_addr == node_addr
2279            && entry.pubkey == pubkey
2280        {
2281            // Endpoint sends pass the same PeerIdentity on every packet. Once
2282            // validated, avoid re-deriving NodeAddr from the public key in the
2283            // data path; that hash showed up in macOS sender profiles.
2284            return true;
2285        }
2286
2287        let (xonly, _) = pubkey.x_only_public_key();
2288        let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2289        if derived_node_addr != node_addr {
2290            debug!(
2291                claimed_node_addr = %node_addr,
2292                derived_node_addr = %derived_node_addr,
2293                "Rejected identity cache entry with mismatched public key"
2294            );
2295            return false;
2296        }
2297
2298        let now_ms = Self::now_ms();
2299        if let Some(entry) = self.identity_cache.get_mut(&prefix)
2300            && entry.node_addr == node_addr
2301        {
2302            entry.pubkey = pubkey;
2303            entry.last_seen_ms = now_ms;
2304            return true;
2305        }
2306
2307        let npub = encode_npub(&xonly);
2308        self.identity_cache.insert(
2309            prefix,
2310            IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2311        );
2312        // LRU eviction
2313        let max = self.config.node.cache.identity_size;
2314        if self.identity_cache.len() > max
2315            && let Some(oldest_key) = self
2316                .identity_cache
2317                .iter()
2318                .min_by_key(|(_, entry)| entry.last_seen_ms)
2319                .map(|(k, _)| *k)
2320        {
2321            self.identity_cache.remove(&oldest_key);
2322        }
2323        true
2324    }
2325
2326    /// Look up a destination by FipsAddress prefix (bytes 1-15 of the IPv6 address).
2327    pub(crate) fn lookup_by_fips_prefix(
2328        &mut self,
2329        prefix: &[u8; 15],
2330    ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2331        if let Some(entry) = self.identity_cache.get_mut(prefix) {
2332            entry.last_seen_ms = Self::now_ms(); // LRU touch
2333            Some((entry.node_addr, entry.pubkey))
2334        } else {
2335            None
2336        }
2337    }
2338
2339    /// Check if a node's identity is in the cache (without LRU touch).
2340    pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2341        let mut prefix = [0u8; 15];
2342        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2343        self.identity_cache.contains_key(&prefix)
2344    }
2345
2346    /// Number of identity cache entries.
2347    pub fn identity_cache_len(&self) -> usize {
2348        self.identity_cache.len()
2349    }
2350
2351    /// Iterate over identity cache entries.
2352    ///
2353    /// Returns `(NodeAddr, PublicKey, last_seen_ms)` for each cached identity.
2354    /// Used by the `show_identity_cache` control query.
2355    pub fn identity_cache_iter(
2356        &self,
2357    ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2358        self.identity_cache
2359            .values()
2360            .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2361    }
2362
2363    /// Configured maximum identity cache size.
2364    pub fn identity_cache_max(&self) -> usize {
2365        self.config.node.cache.identity_size
2366    }
2367
2368    /// Number of pending discovery lookups.
2369    pub fn pending_lookup_count(&self) -> usize {
2370        self.pending_lookups.len()
2371    }
2372
2373    /// Iterate over pending discovery lookups for diagnostics.
2374    pub fn pending_lookups_iter(
2375        &self,
2376    ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2377        self.pending_lookups.iter()
2378    }
2379
2380    /// Number of recent discovery requests tracked.
2381    pub fn recent_request_count(&self) -> usize {
2382        self.recent_requests.len()
2383    }
2384
2385    /// Count of destinations with queued TUN packets awaiting session setup.
2386    pub fn pending_tun_destinations(&self) -> usize {
2387        self.pending_tun_packets.len()
2388    }
2389
2390    /// Total TUN packets queued across all destinations.
2391    pub fn pending_tun_total_packets(&self) -> usize {
2392        self.pending_tun_packets.values().map(|q| q.len()).sum()
2393    }
2394
2395    /// Iterate over retry state for diagnostics.
2396    pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2397        self.retry_pending.iter()
2398    }
2399
2400    // === Routing ===
2401
2402    /// Check if a peer is a tree neighbor (parent or child in the spanning tree).
2403    ///
2404    /// Returns true if the peer is our current tree parent, or if the peer
2405    /// has declared us as their parent (making them our child).
2406    pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2407        // Peer is our parent
2408        if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2409            return true;
2410        }
2411        // Peer is our child (their declaration names us as parent)
2412        if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2413            && decl.parent_id() == self.node_addr()
2414        {
2415            return true;
2416        }
2417        false
2418    }
2419
2420    /// Find next hop for a destination node address.
2421    ///
2422    /// Routing priority:
2423    /// 1. Destination is self → `None` (local delivery)
2424    /// 2. Destination is a direct peer → that peer
2425    /// 3. Reply-learned routes in `reply_learned` mode. These are locally
2426    ///    observed reverse paths, selected with weighted multipath plus
2427    ///    periodic coordinate/tree exploration.
2428    /// 4. Bloom filter candidates with cached dest coords → among peers whose
2429    ///    bloom filter contains the destination, pick the one that minimizes
2430    ///    tree distance to the destination, with
2431    ///    `(link_cost, tree_distance_to_dest, node_addr)` tie-breaking.
2432    ///    The self-distance check ensures only peers strictly closer to the
2433    ///    destination than us are considered (prevents routing loops).
2434    /// 5. Greedy tree routing fallback (requires cached dest coords)
2435    /// 6. No route → `None`
2436    ///
2437    /// Both the bloom filter and tree routing paths require cached destination
2438    /// coordinates (checked in `coord_cache`). Without coordinates, the node
2439    /// cannot make loop-free forwarding decisions. The caller should signal
2440    /// `CoordsRequired` back to the source when `None` is returned for a
2441    /// non-local destination.
2442    pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2443        // 1. Local delivery
2444        if dest_node_addr == self.node_addr() {
2445            return None;
2446        }
2447
2448        // 2. Healthy direct peer. Stale direct links remain usable, but in
2449        // reply-learned mode they must not hide a live mesh route learned from
2450        // recent traffic or discovery.
2451        let direct_peer_can_send = self
2452            .peers
2453            .get(dest_node_addr)
2454            .is_some_and(|peer| peer.can_send());
2455        if let Some(peer) = self.peers.get(dest_node_addr)
2456            && peer.is_healthy()
2457        {
2458            return Some(peer);
2459        }
2460
2461        let now_ms = Self::now_ms();
2462
2463        let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2464            Some(
2465                self.peers
2466                    .iter()
2467                    .filter(|(_, peer)| peer.can_send())
2468                    .map(|(addr, _)| *addr)
2469                    .collect::<HashSet<_>>(),
2470            )
2471        } else {
2472            None
2473        };
2474
2475        // 3. Optional reply-learned routing. These entries are not peer
2476        // claims; they are local observations of which peer carried traffic
2477        // or a verified lookup response back from the destination. Most
2478        // packets use weighted multipath over learned routes, but periodic
2479        // fallback exploration lets coord/bloom/tree routes discover better
2480        // candidates.
2481        let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2482            self.learned_routes.should_explore_fallback(
2483                dest_node_addr,
2484                now_ms,
2485                self.config.node.routing.learned_fallback_explore_interval,
2486                |addr| sendable.contains(addr),
2487            )
2488        });
2489        if let Some(sendable) = &sendable_learned_peers
2490            && !explore_fallback
2491            && let Some(next_hop_addr) =
2492                self.learned_routes
2493                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2494        {
2495            return self.peers.get(&next_hop_addr);
2496        }
2497
2498        // Look up cached destination coordinates (required by both bloom and tree paths).
2499        let Some(dest_coords) = self
2500            .coord_cache
2501            .get_and_touch(dest_node_addr, now_ms)
2502            .cloned()
2503        else {
2504            if let Some(sendable) = &sendable_learned_peers
2505                && let Some(next_hop_addr) =
2506                    self.learned_routes
2507                        .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2508            {
2509                return self.peers.get(&next_hop_addr);
2510            }
2511            if direct_peer_can_send {
2512                return self.peers.get(dest_node_addr);
2513            }
2514            return None;
2515        };
2516
2517        // 4. Bloom filter candidates — requires dest_coords for loop-free selection.
2518        //    If no candidate is strictly closer, fall through to tree routing.
2519        let coordinate_route_addr = {
2520            let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2521            if !candidates.is_empty() {
2522                self.select_best_candidate(&candidates, &dest_coords)
2523                    .map(|peer| *peer.node_addr())
2524            } else {
2525                None
2526            }
2527        };
2528        if let Some(next_hop_addr) = coordinate_route_addr {
2529            return self.peers.get(&next_hop_addr);
2530        }
2531
2532        // 5. Greedy tree routing fallback
2533        let tree_route_addr = self
2534            .tree_state
2535            .find_next_hop(&dest_coords)
2536            .filter(|next_hop_id| {
2537                self.peers
2538                    .get(next_hop_id)
2539                    .is_some_and(|peer| peer.can_send())
2540            });
2541        if let Some(next_hop_addr) = tree_route_addr {
2542            return self.peers.get(&next_hop_addr);
2543        }
2544        if explore_fallback {
2545            return sendable_learned_peers.as_ref().and_then(|sendable| {
2546                self.learned_routes
2547                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2548                    .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2549            });
2550        }
2551
2552        if let Some(sendable) = &sendable_learned_peers
2553            && let Some(next_hop_addr) =
2554                self.learned_routes
2555                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2556        {
2557            return self.peers.get(&next_hop_addr);
2558        }
2559
2560        if direct_peer_can_send {
2561            return self.peers.get(dest_node_addr);
2562        }
2563
2564        None
2565    }
2566
2567    pub(in crate::node) fn learn_reverse_route(
2568        &mut self,
2569        destination: NodeAddr,
2570        next_hop: NodeAddr,
2571    ) {
2572        if self.config.node.routing.mode != RoutingMode::ReplyLearned
2573            || destination == *self.node_addr()
2574        {
2575            return;
2576        }
2577        let now_ms = Self::now_ms();
2578        self.learned_routes.learn(
2579            destination,
2580            next_hop,
2581            now_ms,
2582            self.config.node.routing.learned_ttl_secs,
2583            self.config.node.routing.max_learned_routes_per_dest,
2584        );
2585    }
2586
2587    pub(in crate::node) fn record_route_failure(
2588        &mut self,
2589        destination: NodeAddr,
2590        next_hop: NodeAddr,
2591    ) {
2592        if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2593            return;
2594        }
2595        self.learned_routes.record_failure(&destination, &next_hop);
2596    }
2597
2598    pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2599        self.learned_routes.snapshot(now_ms)
2600    }
2601
2602    pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2603        self.learned_routes.purge_expired(now_ms);
2604    }
2605
2606    /// Select the best peer from a set of bloom filter candidates.
2607    ///
2608    /// Uses distance from each candidate's tree coordinates to the destination
2609    /// as the primary metric (after link_cost). Only selects peers that are
2610    /// strictly closer to the destination than we are (self-distance check
2611    /// prevents routing loops).
2612    ///
2613    /// Ordering: `(link_cost, distance_to_dest, node_addr)`.
2614    fn select_best_candidate<'a>(
2615        &'a self,
2616        candidates: &[&'a ActivePeer],
2617        dest_coords: &crate::tree::TreeCoordinate,
2618    ) -> Option<&'a ActivePeer> {
2619        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2620
2621        let mut best: Option<(&ActivePeer, f64, usize)> = None;
2622
2623        for &candidate in candidates {
2624            if !candidate.can_send() {
2625                continue;
2626            }
2627
2628            let cost = candidate.link_cost();
2629
2630            let dist = self
2631                .tree_state
2632                .peer_coords(candidate.node_addr())
2633                .map(|pc| pc.distance_to(dest_coords))
2634                .unwrap_or(usize::MAX);
2635
2636            // Self-distance check: only consider peers strictly closer
2637            // to the destination than we are (prevents routing loops)
2638            if dist >= my_distance {
2639                continue;
2640            }
2641
2642            let dominated = match &best {
2643                None => true,
2644                Some((_, best_cost, best_dist)) => {
2645                    cost < *best_cost
2646                        || (cost == *best_cost && dist < *best_dist)
2647                        || (cost == *best_cost
2648                            && dist == *best_dist
2649                            && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2650                }
2651            };
2652
2653            if dominated {
2654                best = Some((candidate, cost, dist));
2655            }
2656        }
2657
2658        best.map(|(peer, _, _)| peer)
2659    }
2660
2661    /// Check if a destination is in any peer's bloom filter.
2662    pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2663        self.peers.values().filter(|p| p.may_reach(dest)).collect()
2664    }
2665
2666    /// Get the TUN packet sender channel.
2667    ///
2668    /// Returns None if TUN is not active or the node hasn't been started.
2669    pub fn tun_tx(&self) -> Option<&TunTx> {
2670        self.tun_tx.as_ref()
2671    }
2672
2673    /// Attach app-owned packet I/O for embedded operation without a system TUN.
2674    ///
2675    /// This must be called before [`Node::start`] and requires `tun.enabled =
2676    /// false`. Outbound packets sent to the returned sender are processed by the
2677    /// normal session pipeline. Inbound packets delivered by FIPS sessions are
2678    /// sent to the returned receiver with source attribution.
2679    pub fn attach_external_packet_io(
2680        &mut self,
2681        capacity: usize,
2682    ) -> Result<ExternalPacketIo, NodeError> {
2683        if self.state != NodeState::Created {
2684            return Err(NodeError::Config(ConfigError::Validation(
2685                "external packet I/O must be attached before node start".to_string(),
2686            )));
2687        }
2688        if self.config.tun.enabled {
2689            return Err(NodeError::Config(ConfigError::Validation(
2690                "external packet I/O requires tun.enabled=false".to_string(),
2691            )));
2692        }
2693
2694        let capacity = capacity.max(1);
2695        let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2696        let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2697        self.tun_outbound_rx = Some(outbound_rx);
2698        self.external_packet_tx = Some(inbound_tx);
2699
2700        Ok(ExternalPacketIo {
2701            outbound_tx,
2702            inbound_rx,
2703        })
2704    }
2705
2706    /// Attach app-owned endpoint data I/O for embedded operation.
2707    ///
2708    /// Commands sent to the returned sender are processed by the node RX loop.
2709    /// Incoming endpoint data is emitted as source-attributed events.
2710    pub(crate) fn attach_endpoint_data_io(
2711        &mut self,
2712        capacity: usize,
2713    ) -> Result<EndpointDataIo, NodeError> {
2714        if self.state != NodeState::Created {
2715            return Err(NodeError::Config(ConfigError::Validation(
2716                "endpoint data I/O must be attached before node start".to_string(),
2717            )));
2718        }
2719
2720        let command_capacity = endpoint_data_command_capacity(capacity);
2721        let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2722        // Inbound endpoint-data events use an unbounded channel — see
2723        // `EndpointDataIo::event_rx` docs for the rationale (kills the
2724        // per-packet semaphore + the cross-task relay task that used to
2725        // sit on top of this channel).
2726        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2727        self.endpoint_command_rx = Some(command_rx);
2728        self.endpoint_event_tx = Some(event_tx.clone());
2729
2730        Ok(EndpointDataIo {
2731            command_tx,
2732            event_rx,
2733            event_tx,
2734        })
2735    }
2736
2737    pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2738        let mut prefix = [0u8; 15];
2739        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2740        self.identity_cache
2741            .get(&prefix)
2742            .filter(|entry| &entry.node_addr == addr)
2743            .map(|entry| entry.pubkey)
2744    }
2745
2746    pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2747        let mut prefix = [0u8; 15];
2748        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2749        self.identity_cache
2750            .get(&prefix)
2751            .filter(|entry| &entry.node_addr == addr)
2752            .map(|entry| entry.npub.clone())
2753    }
2754
2755    pub(in crate::node) fn deliver_external_ipv6_packet(
2756        &self,
2757        src_addr: &NodeAddr,
2758        packet: Vec<u8>,
2759    ) {
2760        let Some(external_packet_tx) = &self.external_packet_tx else {
2761            return;
2762        };
2763        if packet.len() < 40 {
2764            return;
2765        }
2766        let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2767            return;
2768        };
2769        let delivered = NodeDeliveredPacket {
2770            source_node_addr: *src_addr,
2771            source_npub: self.npub_for_node_addr(src_addr),
2772            destination,
2773            packet,
2774        };
2775        if let Err(error) = external_packet_tx.try_send(delivered) {
2776            debug!(error = %error, "Failed to deliver packet to external app sink");
2777        }
2778    }
2779
2780    // === Sending ===
2781
2782    /// Encrypt and send a link-layer message to an authenticated peer.
2783    ///
2784    /// The plaintext should include the message type byte followed by the
2785    /// message-specific payload (e.g., `[0x50, reason]` for Disconnect).
2786    ///
2787    /// The send path prepends a 4-byte session-relative timestamp (inner
2788    /// header) before encryption. The full 16-byte outer header is used
2789    /// as AAD for the AEAD construction.
2790    ///
2791    /// This is the standard path for sending any link-layer control message
2792    /// to a peer over their encrypted Noise session.
2793    pub(super) async fn send_encrypted_link_message(
2794        &mut self,
2795        node_addr: &NodeAddr,
2796        plaintext: &[u8],
2797    ) -> Result<(), NodeError> {
2798        self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2799            .await
2800    }
2801
2802    /// Update the local-outbound-broken signal from a `transport.send`
2803    /// outcome. Sets `last_local_send_failure_at` on local-side io
2804    /// errors (NetworkUnreachable / HostUnreachable / AddrNotAvailable);
2805    /// clears it on success. The reaper consults this in
2806    /// `check_link_heartbeats` to switch to `fast_link_dead_timeout_secs`.
2807    pub(in crate::node) fn note_local_send_outcome(
2808        &mut self,
2809        result: &Result<usize, TransportError>,
2810    ) {
2811        match result {
2812            Ok(_) => {
2813                if self.last_local_send_failure_at.is_some() {
2814                    self.last_local_send_failure_at = None;
2815                }
2816            }
2817            Err(error) if error.is_local_route_unavailable() => {
2818                self.last_local_send_failure_at = Some(std::time::Instant::now());
2819            }
2820            Err(_) => {}
2821        }
2822    }
2823
2824    /// Return the active dead-timeout after considering recent local route
2825    /// failures. The fast-dead signal is intentionally short-lived: on the
2826    /// UDP worker path a send call can return before the kernel result is
2827    /// observed, so a single stale route error must not compress liveness for
2828    /// the whole normal dead-timeout window.
2829    pub(in crate::node) fn local_send_failure_dead_timeout(
2830        &mut self,
2831        now: std::time::Instant,
2832        dead_timeout: std::time::Duration,
2833        fast_dead_timeout: std::time::Duration,
2834    ) -> std::time::Duration {
2835        match self.last_local_send_failure_at {
2836            Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
2837                fast_dead_timeout.min(dead_timeout)
2838            }
2839            Some(_) => {
2840                self.last_local_send_failure_at = None;
2841                dead_timeout
2842            }
2843            None => dead_timeout,
2844        }
2845    }
2846
2847    pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
2848        self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
2849    }
2850
2851    pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
2852        let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
2853            return false;
2854        };
2855        let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
2856        std::time::Instant::now().duration_since(t) <= grace
2857    }
2858
2859    /// Like `send_encrypted_link_message` but allows setting the FMP CE flag.
2860    ///
2861    /// Used by the forwarding path to relay congestion signals hop-by-hop.
2862    pub(super) async fn send_encrypted_link_message_with_ce(
2863        &mut self,
2864        node_addr: &NodeAddr,
2865        plaintext: &[u8],
2866        ce_flag: bool,
2867    ) -> Result<(), NodeError> {
2868        let peer = self
2869            .peers
2870            .get_mut(node_addr)
2871            .ok_or(NodeError::PeerNotFound(*node_addr))?;
2872
2873        let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2874            node_addr: *node_addr,
2875            reason: "no their_index".into(),
2876        })?;
2877        let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2878            node_addr: *node_addr,
2879            reason: "no transport_id".into(),
2880        })?;
2881        let remote_addr = peer
2882            .current_addr()
2883            .cloned()
2884            .ok_or_else(|| NodeError::SendFailed {
2885                node_addr: *node_addr,
2886                reason: "no current_addr".into(),
2887            })?;
2888        #[cfg(any(target_os = "linux", target_os = "macos"))]
2889        let connected_socket = peer.connected_udp();
2890
2891        // Prepend 4-byte session-relative timestamp (inner header)
2892        let timestamp_ms = peer.session_elapsed_ms();
2893
2894        // MMP: read spin bit value before entering session borrow
2895        let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2896        let mut flags = if sp_flag { FLAG_SP } else { 0 };
2897        if ce_flag {
2898            flags |= FLAG_CE;
2899        }
2900        if peer.current_k_bit() {
2901            flags |= FLAG_KEY_EPOCH;
2902        }
2903
2904        let session = peer
2905            .noise_session_mut()
2906            .ok_or_else(|| NodeError::SendFailed {
2907                node_addr: *node_addr,
2908                reason: "no noise session".into(),
2909            })?;
2910
2911        // Build 16-byte outer header upfront. The inner-plaintext
2912        // layout is `[ts:4 LE][plaintext...]`, so its length is exactly
2913        // `INNER_TS_LEN + plaintext.len()` — no need to build the Vec
2914        // just to measure it. The worker path uses this length to size
2915        // the wire buffer directly; the legacy path below still
2916        // materialises a separate `inner_plaintext` Vec for the inline
2917        // encrypt-and-send call.
2918        const INNER_TS_LEN: usize = 4;
2919        let counter = session.current_send_counter();
2920        let inner_len = INNER_TS_LEN + plaintext.len();
2921        let payload_len = inner_len as u16;
2922        let header = build_established_header(their_index, counter, flags, payload_len);
2923
2924        // **UDP send fast path.** The encrypt-worker pool is always
2925        // spawned at lifecycle start (workers = num_cpus) in
2926        // production, so this branch is taken for every authentic
2927        // send on every UDP-transported established session. The
2928        // AEAD work + sendmsg syscall run on a dedicated OS thread;
2929        // the rx_loop only builds the wire buffer + reserves the
2930        // counter inline.
2931        //
2932        // Other transport kinds (BLE, TCP, sim, ethernet) fall
2933        // through to the inline encrypt + transport.send path
2934        // below — those don't have raw-fd / sendmmsg / UDP_GSO
2935        // benefits to expose through the worker pool, so the simpler
2936        // synchronous send is the right shape for them.
2937        //
2938        // The `encrypt_workers.is_some()` check below is true in
2939        // production (lifecycle::start spawns the pool); it stays
2940        // checked rather than `expect()`-ed because unit tests
2941        // construct `Node` without calling `start()`.
2942        let transport_for_send = self
2943            .transports
2944            .get(&transport_id)
2945            .ok_or(NodeError::TransportNotFound(transport_id))?;
2946        match transport_for_send.connection_state(&remote_addr) {
2947            ConnectionState::Connected => {}
2948            other => {
2949                if matches!(other, ConnectionState::None) {
2950                    let _ = transport_for_send.connect(&remote_addr).await;
2951                }
2952                return Err(NodeError::SendFailed {
2953                    node_addr: *node_addr,
2954                    reason: format!("transport connection not ready: {:?}", other),
2955                });
2956            }
2957        }
2958        let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2959        if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2960            && is_udp
2961            && let Some(cipher_clone) = session.send_cipher_clone()
2962        {
2963            {
2964                // Reserve the counter on the session so subsequent
2965                // sends don't reuse it. `current_send_counter` only
2966                // peeks; we advance via `take_send_counter`.
2967                let reserved_counter =
2968                    session
2969                        .take_send_counter()
2970                        .map_err(|e| NodeError::SendFailed {
2971                            node_addr: *node_addr,
2972                            reason: format!("counter reservation failed: {}", e),
2973                        })?;
2974                debug_assert_eq!(reserved_counter, counter);
2975                // Re-derive the header with the now-locked-in counter
2976                // value (same value, but the call sequence is more
2977                // explicit).
2978                let header =
2979                    build_established_header(their_index, reserved_counter, flags, payload_len);
2980                let transport = transport_for_send;
2981                // Snapshot the per-peer connected UDP socket before
2982                // resolving the fallback address. On the established
2983                // steady-state path this socket already carries the
2984                // kernel peer address, so re-parsing the configured
2985                // transport address and touching the DNS cache on every
2986                // packet is pure overhead on the sender hot path.
2987                let send_target = {
2988                    if let TransportHandle::Udp(udp) = transport {
2989                        let socket_addr = {
2990                            #[cfg(any(target_os = "linux", target_os = "macos"))]
2991                            {
2992                                match connected_socket.as_ref() {
2993                                    Some(socket) => Some(socket.peer_addr()),
2994                                    None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2995                                }
2996                            }
2997                            #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2998                            {
2999                                udp.resolve_for_off_task(&remote_addr).await.ok()
3000                            }
3001                        };
3002                        match (udp.async_socket(), socket_addr) {
3003                            (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3004                            _ => None,
3005                        }
3006                    } else {
3007                        None
3008                    }
3009                };
3010                if let Some((socket, socket_addr)) = send_target {
3011                    // Build the wire buffer **directly** from
3012                    // `plaintext` with a single allocation:
3013                    //   `[16 header][4 ts][plaintext...]` with
3014                    // +16 trailing capacity for the AEAD tag.
3015                    // The worker seals `wire_buf[16..]` in
3016                    // place and appends the tag — no second
3017                    // alloc, no second memcpy.
3018                    //
3019                    // Previous design built `inner_plaintext`
3020                    // via `prepend_inner_header` (1 alloc + 1
3021                    // copy) and then let the worker memcpy
3022                    // header + plaintext into a fresh Vec
3023                    // (another alloc + copy). At ~100 kpps the
3024                    // saved alloc/copy is ~150 MB/sec of memory
3025                    // bandwidth on the hot rx_loop + worker.
3026                    let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3027                    let mut wire_buf = Vec::with_capacity(wire_capacity);
3028                    wire_buf.extend_from_slice(&header);
3029                    wire_buf.extend_from_slice(&timestamp_ms.to_le_bytes());
3030                    wire_buf.extend_from_slice(plaintext);
3031                    let predicted_bytes = wire_capacity;
3032                    // Stats / MMP update inline — predicted size
3033                    // is exact for ChaCha20-Poly1305 (tag is
3034                    // constant 16 bytes). When `connected_socket` is
3035                    // `Some`, the worker sends on it without a
3036                    // destination sockaddr — the kernel skips the
3037                    // per-packet sockaddr + route + neighbor resolve.
3038                    if let Some(peer) = self.peers.get_mut(node_addr) {
3039                        peer.link_stats_mut().record_sent(predicted_bytes);
3040                        if let Some(mmp) = peer.mmp_mut() {
3041                            mmp.sender
3042                                .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3043                        }
3044                    }
3045                    let scheduling_weight = self.send_weight_for_peer(node_addr);
3046                    workers.dispatch(self::encrypt_worker::FmpSendJob {
3047                        cipher: cipher_clone,
3048                        counter: reserved_counter,
3049                        wire_buf,
3050                        fsp_seal: None,
3051                        socket,
3052                        dest_addr: socket_addr,
3053                        #[cfg(any(target_os = "linux", target_os = "macos"))]
3054                        connected_socket,
3055                        drop_on_backpressure: fmp_plaintext_is_bulk_session_datagram(plaintext),
3056                        scheduling_weight,
3057                        queued_at: crate::perf_profile::stamp(),
3058                    });
3059                    return Ok(());
3060                }
3061            }
3062        }
3063
3064        // Inline (legacy) path: encrypt + send on the rx_loop.
3065        // Build the inner plaintext lazily here — the worker path
3066        // above never reaches this point, so the prepend_inner_header
3067        // alloc is avoided in the fast path.
3068        let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3069        // Encrypt with AAD binding to the outer header
3070        let ciphertext = {
3071            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3072            session
3073                .encrypt_with_aad(&inner_plaintext, &header)
3074                .map_err(|e| NodeError::SendFailed {
3075                    node_addr: *node_addr,
3076                    reason: format!("encryption failed: {}", e),
3077                })?
3078        };
3079
3080        let wire_packet = build_encrypted(&header, &ciphertext);
3081
3082        // Re-borrow peer for stats update after sending
3083        let send_result = {
3084            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3085            let transport = self
3086                .transports
3087                .get(&transport_id)
3088                .ok_or(NodeError::TransportNotFound(transport_id))?;
3089            transport.send(&remote_addr, &wire_packet).await
3090        };
3091        self.note_local_send_outcome(&send_result);
3092        let bytes_sent = send_result.map_err(|e| match e {
3093            TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3094                node_addr: *node_addr,
3095                packet_size,
3096                mtu,
3097            },
3098            other => NodeError::SendFailed {
3099                node_addr: *node_addr,
3100                reason: format!("transport send: {}", other),
3101            },
3102        })?;
3103
3104        // Update send statistics
3105        if let Some(peer) = self.peers.get_mut(node_addr) {
3106            peer.link_stats_mut().record_sent(bytes_sent);
3107            // MMP: record sent frame for sender report generation
3108            if let Some(mmp) = peer.mmp_mut() {
3109                mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3110            }
3111        }
3112
3113        Ok(())
3114    }
3115}
3116
3117impl fmt::Debug for Node {
3118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3119        f.debug_struct("Node")
3120            .field("node_addr", self.node_addr())
3121            .field("state", &self.state)
3122            .field("is_leaf_only", &self.is_leaf_only)
3123            .field("connections", &self.connection_count())
3124            .field("peers", &self.peer_count())
3125            .field("links", &self.link_count())
3126            .field("transports", &self.transport_count())
3127            .finish()
3128    }
3129}