Skip to main content

fips_core/node/
mod.rs

1//! FIPS Node Entity
2//!
3//! Top-level structure representing a running FIPS instance. The Node
4//! holds all state required for mesh routing: identity, tree state,
5//! Bloom filters, coordinate caches, transports, links, and peers.
6
7mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32    ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33    build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::RoutingMode;
38use crate::node::session::SessionEntry;
39use crate::peer::{ActivePeer, PeerConnection};
40#[cfg(any(target_os = "linux", target_os = "macos"))]
41use crate::transport::ethernet::EthernetTransport;
42use crate::transport::tcp::TcpTransport;
43use crate::transport::tor::TorTransport;
44use crate::transport::udp::UdpTransport;
45use crate::transport::{
46    Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError, TransportHandle, TransportId,
47};
48use crate::tree::TreeState;
49use crate::upper::hosts::HostMap;
50use crate::upper::icmp_rate_limit::IcmpRateLimiter;
51use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
52use crate::utils::index::IndexAllocator;
53use crate::{
54    Config, ConfigError, FipsAddress, Identity, IdentityError, NodeAddr, PeerIdentity,
55    SessionMessageType, encode_npub,
56};
57use rand::Rng;
58use std::collections::{HashMap, HashSet, VecDeque};
59use std::fmt;
60use std::sync::Arc;
61use std::thread::JoinHandle;
62use thiserror::Error;
63use tracing::{debug, warn};
64
65/// Half-range of the symmetric jitter applied to per-session rekey timers.
66///
67/// Each FMP/FSP session draws an offset uniformly from
68/// `[-REKEY_JITTER_SECS, +REKEY_JITTER_SECS]` seconds at construction and
69/// after each cutover. This preserves the configured mean interval while
70/// reducing dual-initiation bursts in symmetric-start meshes.
71pub(crate) const REKEY_JITTER_SECS: i64 = 15;
72
73/// Errors related to node operations.
74#[derive(Debug, Error)]
75pub enum NodeError {
76    #[error("node not started")]
77    NotStarted,
78
79    #[error("node already started")]
80    AlreadyStarted,
81
82    #[error("node already stopped")]
83    AlreadyStopped,
84
85    #[error("transport not found: {0}")]
86    TransportNotFound(TransportId),
87
88    #[error("no transport available for type: {0}")]
89    NoTransportForType(String),
90
91    #[error("link not found: {0}")]
92    LinkNotFound(LinkId),
93
94    #[error("connection not found: {0}")]
95    ConnectionNotFound(LinkId),
96
97    #[error("peer not found: {0:?}")]
98    PeerNotFound(NodeAddr),
99
100    #[error("peer already exists: {0:?}")]
101    PeerAlreadyExists(NodeAddr),
102
103    #[error("connection already exists for link: {0}")]
104    ConnectionAlreadyExists(LinkId),
105
106    #[error("invalid peer npub '{npub}': {reason}")]
107    InvalidPeerNpub { npub: String, reason: String },
108
109    #[error("access denied: {0}")]
110    AccessDenied(String),
111
112    #[error("max connections exceeded: {max}")]
113    MaxConnectionsExceeded { max: usize },
114
115    #[error("max peers exceeded: {max}")]
116    MaxPeersExceeded { max: usize },
117
118    #[error("max links exceeded: {max}")]
119    MaxLinksExceeded { max: usize },
120
121    #[error("handshake incomplete for link {0}")]
122    HandshakeIncomplete(LinkId),
123
124    #[error("no session available for link {0}")]
125    NoSession(LinkId),
126
127    #[error("promotion failed for link {link_id}: {reason}")]
128    PromotionFailed { link_id: LinkId, reason: String },
129
130    #[error("send failed to {node_addr}: {reason}")]
131    SendFailed { node_addr: NodeAddr, reason: String },
132
133    #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
134    MtuExceeded {
135        node_addr: NodeAddr,
136        packet_size: usize,
137        mtu: u16,
138    },
139
140    #[error("config error: {0}")]
141    Config(#[from] ConfigError),
142
143    #[error("identity error: {0}")]
144    Identity(#[from] IdentityError),
145
146    #[error("TUN error: {0}")]
147    Tun(#[from] TunError),
148
149    #[error("index allocation failed: {0}")]
150    IndexAllocationFailed(String),
151
152    #[error("handshake failed: {0}")]
153    HandshakeFailed(String),
154
155    #[error("transport error: {0}")]
156    TransportError(String),
157
158    #[error("bootstrap handoff failed: {0}")]
159    BootstrapHandoff(String),
160}
161
162/// Source-attributed packet delivered by a node running without a system TUN.
163#[derive(Debug, Clone, PartialEq, Eq)]
164pub struct NodeDeliveredPacket {
165    /// FIPS node address that originated the packet.
166    pub source_node_addr: NodeAddr,
167    /// Source Nostr public key when the node has learned it.
168    pub source_npub: Option<String>,
169    /// Destination FIPS address from the IPv6 packet.
170    pub destination: FipsAddress,
171    /// Full IPv6 packet after FIPS session decapsulation.
172    pub packet: Vec<u8>,
173}
174
175#[derive(Debug, Clone)]
176struct IdentityCacheEntry {
177    node_addr: NodeAddr,
178    pubkey: secp256k1::PublicKey,
179    npub: String,
180    last_seen_ms: u64,
181}
182
183impl IdentityCacheEntry {
184    fn new(
185        node_addr: NodeAddr,
186        pubkey: secp256k1::PublicKey,
187        npub: String,
188        last_seen_ms: u64,
189    ) -> Self {
190        Self {
191            node_addr,
192            pubkey,
193            npub,
194            last_seen_ms,
195        }
196    }
197}
198
199/// App-owned packet channels for embedding FIPS without a system TUN.
200#[derive(Debug)]
201pub struct ExternalPacketIo {
202    /// Send outbound IPv6 packets into the node.
203    pub outbound_tx: crate::upper::tun::TunOutboundTx,
204    /// Receive inbound IPv6 packets delivered by FIPS sessions.
205    pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
206}
207
208/// App-owned endpoint data channels for embedding FIPS without a daemon.
209#[derive(Debug)]
210pub(crate) struct EndpointDataIo {
211    /// Send endpoint data commands into the node RX loop.
212    ///
213    /// Bounded with a generous default so normal sender bursts do not
214    /// stall on semaphore acquisition. macOS pacing happens at the UDP
215    /// egress thread where the real Wi-Fi/interface bottleneck is visible;
216    /// constraining this app queue instead caused the inner TCP flow to
217    /// collapse under iperf. `FIPS_ENDPOINT_DATA_QUEUE_CAP` overrides the
218    /// default for benches.
219    pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
220    /// Receive endpoint data delivered by FIPS sessions.
221    ///
222    /// Unbounded so the rx_loop's send on inbound packet delivery is a
223    /// wait-free push (no semaphore acquire), and so we can drop the
224    /// per-packet cross-task relay that previously sat between the node
225    /// task and the `FipsEndpoint::recv()` consumer. Backpressure is
226    /// naturally bounded — the rx_loop both produces here and runs the
227    /// same runtime that schedules the consumer, so a stalled consumer
228    /// stalls production too.
229    pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
230    /// Clone of the event_tx exposed for in-process loopback (e.g.
231    /// `FipsEndpoint::send` to self_npub). Lets the endpoint inject an
232    /// event into the same queue without going through the encrypt /
233    /// decrypt path, while keeping every consumer reading from a single
234    /// channel.
235    pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
236}
237
238fn endpoint_data_command_capacity(requested: usize) -> usize {
239    if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
240        && let Ok(value) = raw.trim().parse::<usize>()
241        && value > 0
242    {
243        return value;
244    }
245
246    requested.max(1).max(32_768)
247}
248
249/// Commands accepted by the node endpoint data service.
250#[derive(Debug)]
251pub(crate) enum NodeEndpointCommand {
252    /// Send with an explicit response channel — used by callers that
253    /// care whether the local-stack handoff succeeded (e.g.
254    /// `blocking_send` waits for the runtime to accept the send).
255    Send {
256        remote: PeerIdentity,
257        payload: Vec<u8>,
258        queued_at: Option<std::time::Instant>,
259        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
260    },
261    /// **Fire-and-forget** variant of `Send` — no oneshot allocation,
262    /// no per-packet result channel. Used by the data-plane fast path
263    /// (`FipsEndpoint::send`) where the caller already discards the
264    /// result. Saves one oneshot::channel() allocation per outbound
265    /// packet on the application's send hot path.
266    SendOneway {
267        remote: PeerIdentity,
268        payload: Vec<u8>,
269        queued_at: Option<std::time::Instant>,
270    },
271    PeerSnapshot {
272        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
273    },
274}
275
276/// Endpoint data events emitted by the node session receive path.
277#[derive(Debug)]
278pub(crate) enum NodeEndpointEvent {
279    Data {
280        source_node_addr: NodeAddr,
281        source_npub: Option<String>,
282        payload: Vec<u8>,
283        queued_at: Option<std::time::Instant>,
284    },
285}
286
287/// Authenticated peer state exposed to embedded endpoint callers.
288#[derive(Debug, Clone, PartialEq, Eq)]
289pub(crate) struct NodeEndpointPeer {
290    pub(crate) npub: String,
291    pub(crate) transport_addr: Option<String>,
292    pub(crate) transport_type: Option<String>,
293    pub(crate) link_id: u64,
294    pub(crate) srtt_ms: Option<u64>,
295    pub(crate) packets_sent: u64,
296    pub(crate) packets_recv: u64,
297    pub(crate) bytes_sent: u64,
298    pub(crate) bytes_recv: u64,
299}
300
301/// Node operational state.
302#[derive(Clone, Copy, Debug, PartialEq, Eq)]
303pub enum NodeState {
304    /// Created but not started.
305    Created,
306    /// Starting up (initializing transports).
307    Starting,
308    /// Fully operational.
309    Running,
310    /// Shutting down.
311    Stopping,
312    /// Stopped.
313    Stopped,
314}
315
316impl NodeState {
317    /// Check if node is operational.
318    pub fn is_operational(&self) -> bool {
319        matches!(self, NodeState::Running)
320    }
321
322    /// Check if node can be started.
323    pub fn can_start(&self) -> bool {
324        matches!(self, NodeState::Created | NodeState::Stopped)
325    }
326
327    /// Check if node can be stopped.
328    pub fn can_stop(&self) -> bool {
329        matches!(self, NodeState::Running)
330    }
331}
332
333impl fmt::Display for NodeState {
334    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
335        let s = match self {
336            NodeState::Created => "created",
337            NodeState::Starting => "starting",
338            NodeState::Running => "running",
339            NodeState::Stopping => "stopping",
340            NodeState::Stopped => "stopped",
341        };
342        write!(f, "{}", s)
343    }
344}
345
346/// Recent request tracking for dedup and reverse-path forwarding.
347///
348/// When a LookupRequest is forwarded through a node, the node stores the
349/// request_id and which peer sent it. When the corresponding LookupResponse
350/// arrives, it's forwarded back to that peer (reverse-path forwarding).
351/// The `response_forwarded` flag prevents response routing loops.
352#[derive(Clone, Debug)]
353pub(crate) struct RecentRequest {
354    /// The peer who sent this request to us.
355    pub(crate) from_peer: NodeAddr,
356    /// When we received this request (Unix milliseconds).
357    pub(crate) timestamp_ms: u64,
358    /// Whether we've already forwarded a response for this request.
359    /// Prevents response routing loops when convergent request paths
360    /// create bidirectional entries in recent_requests.
361    pub(crate) response_forwarded: bool,
362}
363
364impl RecentRequest {
365    pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
366        Self {
367            from_peer,
368            timestamp_ms,
369            response_forwarded: false,
370        }
371    }
372
373    /// Check if this entry has expired (older than expiry_ms).
374    pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
375        current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
376    }
377}
378
379/// Key for addr_to_link reverse lookup.
380type AddrKey = (TransportId, TransportAddr);
381
382/// Per-transport kernel drop tracking for congestion detection.
383///
384/// Sampled every tick (1s). The `dropping` flag indicates whether new
385/// kernel drops were observed since the previous sample.
386#[derive(Debug, Default)]
387struct TransportDropState {
388    /// Previous `recv_drops` sample (cumulative counter).
389    prev_drops: u64,
390    /// True if drops increased since the last sample.
391    dropping: bool,
392}
393
394/// State for a link waiting for transport-level connection establishment.
395///
396/// For connection-oriented transports (TCP, Tor), the transport connect runs
397/// asynchronously. This struct holds the data needed to complete the handshake
398/// once the connection is ready.
399struct PendingConnect {
400    /// The link that was created for this connection.
401    link_id: LinkId,
402    /// Which transport is being used.
403    transport_id: TransportId,
404    /// The remote address being connected to.
405    remote_addr: TransportAddr,
406    /// The peer identity (for handshake initiation).
407    peer_identity: PeerIdentity,
408}
409
410/// A running FIPS node instance.
411///
412/// This is the top-level container holding all node state.
413///
414/// ## Peer Lifecycle
415///
416/// Peers go through two phases:
417/// 1. **Connection phase** (`connections`): Handshake in progress, indexed by LinkId
418/// 2. **Active phase** (`peers`): Authenticated, indexed by NodeAddr
419///
420/// The `addr_to_link` map enables dispatching incoming packets to the right
421/// connection before authentication completes.
422// Discovery lookup constants moved to config: node.discovery.attempt_timeouts_secs, node.discovery.ttl
423pub struct Node {
424    // === Identity ===
425    /// This node's cryptographic identity.
426    identity: Identity,
427
428    /// Random epoch generated at startup for peer restart detection.
429    /// Exchanged inside Noise handshake messages so peers can detect restarts.
430    startup_epoch: [u8; 8],
431
432    /// Instant when the node was created, for uptime reporting.
433    started_at: std::time::Instant,
434
435    // === Configuration ===
436    /// Loaded configuration.
437    config: Config,
438
439    // === State ===
440    /// Node operational state.
441    state: NodeState,
442
443    /// Whether this is a leaf-only node.
444    is_leaf_only: bool,
445
446    // === Spanning Tree ===
447    /// Local spanning tree state.
448    tree_state: TreeState,
449
450    // === Bloom Filter ===
451    /// Local Bloom filter state.
452    bloom_state: BloomState,
453
454    // === Routing ===
455    /// Address -> coordinates cache (from session setup and discovery).
456    coord_cache: CoordCache,
457    /// Locally learned reverse-path next-hop hints.
458    learned_routes: LearnedRouteTable,
459    /// Recent discovery requests (dedup + reverse-path forwarding).
460    /// Maps request_id → RecentRequest.
461    recent_requests: HashMap<u64, RecentRequest>,
462    /// Per-destination path MTU lookup, keyed by FipsAddress (mirrors
463    /// `coord_cache.entries[*].path_mtu`). Sync read-only access from
464    /// the TUN reader/writer threads at TCP MSS clamp time so the
465    /// SYN/SYN-ACK clamp can use the smaller of the local-egress floor
466    /// and the learned per-destination path MTU.
467    path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
468
469    // === Transports & Links ===
470    /// Active transports (owned by Node).
471    transports: HashMap<TransportId, TransportHandle>,
472    /// Per-transport kernel drop tracking for congestion detection.
473    transport_drops: HashMap<TransportId, TransportDropState>,
474    /// Active links.
475    links: HashMap<LinkId, Link>,
476    /// Reverse lookup: (transport_id, remote_addr) -> link_id.
477    addr_to_link: HashMap<AddrKey, LinkId>,
478
479    // === Packet Channel ===
480    /// Packet sender for transports.
481    packet_tx: Option<PacketTx>,
482    /// Packet receiver (for event loop).
483    packet_rx: Option<PacketRx>,
484
485    // === Connections (Handshake Phase) ===
486    /// Pending connections (handshake in progress).
487    /// Indexed by LinkId since we don't know the peer's identity yet.
488    connections: HashMap<LinkId, PeerConnection>,
489
490    // === Peers (Active Phase) ===
491    /// Authenticated peers.
492    /// Indexed by NodeAddr (verified identity).
493    peers: HashMap<NodeAddr, ActivePeer>,
494
495    // === End-to-End Sessions ===
496    /// Session table for end-to-end encrypted sessions.
497    /// Keyed by remote NodeAddr.
498    sessions: HashMap<NodeAddr, SessionEntry>,
499
500    // === Identity Cache ===
501    /// Maps FipsAddress prefix bytes (bytes 1-15) to cached peer identity data.
502    /// Enables reverse lookup from IPv6 destination to session/routing identity.
503    identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
504
505    // === Pending TUN Packets ===
506    /// Packets queued while waiting for session establishment.
507    /// Keyed by destination NodeAddr, bounded per-dest and total.
508    pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
509    /// Endpoint data payloads queued while waiting for session establishment.
510    pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
511    // === Pending Discovery Lookups ===
512    /// Tracks in-flight discovery lookups. Maps target NodeAddr to the
513    /// initiation timestamp (Unix ms). Prevents duplicate flood queries.
514    pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
515
516    // === Resource Limits ===
517    /// Maximum connections (0 = unlimited).
518    max_connections: usize,
519    /// Maximum peers (0 = unlimited).
520    max_peers: usize,
521    /// Maximum links (0 = unlimited).
522    max_links: usize,
523
524    // === Counters ===
525    /// Next link ID to allocate.
526    next_link_id: u64,
527    /// Next transport ID to allocate.
528    next_transport_id: u32,
529
530    // === Node Statistics ===
531    /// Routing, forwarding, discovery, and error signal counters.
532    stats: stats::NodeStats,
533
534    /// Time-series history of node-level metrics (1s/1m rings).
535    stats_history: stats_history::StatsHistory,
536
537    // === TUN Interface ===
538    /// TUN device state.
539    tun_state: TunState,
540    /// TUN interface name (for cleanup).
541    tun_name: Option<String>,
542    /// TUN packet sender channel.
543    tun_tx: Option<TunTx>,
544    /// Receiver for outbound packets from the TUN reader.
545    tun_outbound_rx: Option<TunOutboundRx>,
546    /// App-owned packet sink used by embedded/no-TUN integrations.
547    external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
548    /// Endpoint data command receiver used by embedded/no-daemon integrations.
549    endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
550    /// Endpoint data event sink used by embedded/no-daemon integrations.
551    endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
552    /// Off-task FMP-encrypt + UDP-send worker pool. `None` if not yet
553    /// spawned (set up in `start()` once transports are running).
554    /// `Some(pool)` once available; the pool internally holds
555    /// per-worker mpsc senders and round-robins jobs across them.
556    /// See `node::encrypt_worker` for the rationale and layout.
557    encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
558    /// Off-task FMP + FSP decrypt + delivery worker pool. Mirror of
559    /// `encrypt_workers` for the receive side.
560    decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
561    /// Set of sessions that have been registered with the decrypt
562    /// shard worker pool. Used by rx_loop to decide between fast-path
563    /// dispatch (worker owns the session) and legacy in-place decrypt
564    /// (worker doesn't have it yet). Per the data-plane restructure,
565    /// the worker owns its session state directly — there's no shared
566    /// `Arc<RwLock<HashMap>>` of cipher / replay state anymore, only
567    /// this set tracks **whether** the worker has been told about a
568    /// given session.
569    decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
570    /// Fallback channel: decrypt worker bounces non-fast-path packets
571    /// (anything that's not bulk EndpointData) back here for rx_loop
572    /// to handle via the legacy path. Drained by a new rx_loop arm.
573    decrypt_fallback_rx:
574        Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
575    decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
576    /// TUN reader thread handle.
577    tun_reader_handle: Option<JoinHandle<()>>,
578    /// TUN writer thread handle.
579    tun_writer_handle: Option<JoinHandle<()>>,
580    /// Shutdown pipe: writing to this fd unblocks the TUN reader thread on macOS.
581    /// On Linux, deleting the interface via netlink serves the same purpose.
582    #[cfg(target_os = "macos")]
583    tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
584
585    // === DNS Responder ===
586    /// Receiver for resolved identities from the DNS responder.
587    dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
588    /// DNS responder task handle.
589    dns_task: Option<tokio::task::JoinHandle<()>>,
590
591    // === Index-Based Session Dispatch ===
592    /// Allocator for session indices.
593    index_allocator: IndexAllocator,
594    /// O(1) lookup: (transport_id, our_index) → NodeAddr.
595    /// This maps our session index to the peer that uses it.
596    peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
597    /// Pending outbound handshakes by our sender_idx.
598    /// Tracks which LinkId corresponds to which session index.
599    pending_outbound: HashMap<(TransportId, u32), LinkId>,
600
601    // === Rate Limiting ===
602    /// Rate limiter for msg1 processing (DoS protection).
603    msg1_rate_limiter: HandshakeRateLimiter,
604    /// Rate limiter for ICMP Packet Too Big messages.
605    icmp_rate_limiter: IcmpRateLimiter,
606    /// Rate limiter for routing error signals (CoordsRequired / PathBroken).
607    routing_error_rate_limiter: RoutingErrorRateLimiter,
608    /// Rate limiter for source-side CoordsRequired/PathBroken responses.
609    coords_response_rate_limiter: RoutingErrorRateLimiter,
610    /// Backoff for failed discovery lookups (originator-side).
611    discovery_backoff: DiscoveryBackoff,
612    /// Rate limiter for forwarded discovery requests (transit-side).
613    discovery_forward_limiter: DiscoveryForwardRateLimiter,
614
615    // === Pending Transport Connects ===
616    /// Links waiting for transport-level connection establishment before
617    /// sending handshake msg1. For connection-oriented transports (TCP, Tor),
618    /// the transport connect runs in the background; the tick handler polls
619    /// connection_state() and initiates the handshake when connected.
620    pending_connects: Vec<PendingConnect>,
621
622    // === Connection Retry ===
623    /// Retry state for peers whose outbound connections have failed.
624    /// Keyed by NodeAddr. Entries are created when a handshake times out
625    /// or fails, and removed on successful promotion or when max retries
626    /// are exhausted.
627    retry_pending: HashMap<NodeAddr, retry::RetryState>,
628
629    /// Optional Nostr/STUN overlay discovery coordinator for `udp:nat` peers.
630    nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
631    /// mDNS / DNS-SD responder + browser for local-link peer discovery.
632    /// Identity is unverified at this layer — the Noise XX handshake
633    /// initiated against an mDNS-observed endpoint is what proves the
634    /// peer holds the matching private key.
635    lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
636    /// Wall-clock ms when Nostr discovery successfully started, used to
637    /// schedule the one-shot startup advert sweep after a settle delay.
638    /// `None` until discovery comes up; remains `None` if discovery is
639    /// disabled or failed to start.
640    nostr_discovery_started_at_ms: Option<u64>,
641    /// Whether the one-shot startup advert sweep has run. Set to true
642    /// after the first sweep fires (under `policy: open`); thereafter
643    /// only the per-tick `queue_open_discovery_retries` continues.
644    startup_open_discovery_sweep_done: bool,
645    /// Per-peer UDP transports adopted from NAT traversal handoff.
646    bootstrap_transports: HashSet<TransportId>,
647    /// Originating peer npub (bech32) for each adopted bootstrap
648    /// transport, captured at `adopt_established_traversal` time.
649    /// Populated alongside `bootstrap_transports`; cleared in
650    /// `cleanup_bootstrap_transport_if_unused`. Used by the rx loop to
651    /// route fatal-protocol-mismatch observations back to the
652    /// Nostr-discovery `failure_state` for long cooldown application.
653    bootstrap_transport_npubs: HashMap<TransportId, String>,
654
655    // === Periodic Parent Re-evaluation ===
656    /// Timestamp of last periodic parent re-evaluation (for pacing).
657    last_parent_reeval: Option<crate::time::Instant>,
658
659    // === Congestion Logging ===
660    /// Timestamp of last congestion detection log (rate-limited to 5s).
661    last_congestion_log: Option<std::time::Instant>,
662
663    // === Mesh Size Estimate ===
664    /// Cached estimated mesh size (computed once per tick from bloom filters).
665    estimated_mesh_size: Option<u64>,
666    /// Timestamp of last mesh size log emission.
667    last_mesh_size_log: Option<std::time::Instant>,
668
669    // === Bloom Self-Plausibility ===
670    /// Rate-limit state for the self-plausibility WARN. Fires at most
671    /// once per 60s globally when our own outgoing FilterAnnounce has
672    /// an FPR above `node.bloom.max_inbound_fpr`, signalling either
673    /// aggregation drift or an ingress bypass.
674    last_self_warn: Option<std::time::Instant>,
675
676    // === Local Outbound Liveness ===
677    /// Set when a `transport.send` returned a local-side io error
678    /// (`NetworkUnreachable` / `HostUnreachable` / `AddrNotAvailable`),
679    /// cleared on the next successful send. Used by
680    /// `check_link_heartbeats` to compress the dead-timeout to
681    /// `fast_link_dead_timeout_secs` while our outbound is observed
682    /// broken — direct kernel evidence beats waiting on receive-silence.
683    last_local_send_failure_at: Option<std::time::Instant>,
684
685    // === Display Names ===
686    /// Human-readable names for configured peers (alias or short npub).
687    /// Populated at startup from peer config.
688    peer_aliases: HashMap<NodeAddr, String>,
689
690    /// Reloadable peer ACL state from standard allow/deny files.
691    peer_acl: acl::PeerAclReloader,
692
693    // === Host Map ===
694    /// Static hostname → npub mapping for DNS resolution.
695    /// Built at construction from peer aliases and /etc/fips/hosts.
696    host_map: Arc<HostMap>,
697}
698
699impl Node {
700    /// Create a new node from configuration.
701    pub fn new(config: Config) -> Result<Self, NodeError> {
702        config.validate()?;
703        let identity = config.create_identity()?;
704        let node_addr = *identity.node_addr();
705        let is_leaf_only = config.is_leaf_only();
706
707        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
708        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
709
710        let mut startup_epoch = [0u8; 8];
711        rand::rng().fill_bytes(&mut startup_epoch);
712
713        let mut bloom_state = if is_leaf_only {
714            BloomState::leaf_only(node_addr)
715        } else {
716            BloomState::new(node_addr)
717        };
718        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
719
720        let tun_state = if config.tun.enabled {
721            TunState::Configured
722        } else {
723            TunState::Disabled
724        };
725
726        // Initialize tree state with signed self-declaration
727        let mut tree_state = TreeState::new(node_addr);
728        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
729        tree_state.set_hold_down(config.node.tree.hold_down_secs);
730        tree_state.set_flap_dampening(
731            config.node.tree.flap_threshold,
732            config.node.tree.flap_window_secs,
733            config.node.tree.flap_dampening_secs,
734        );
735        tree_state
736            .sign_declaration(&identity)
737            .expect("signing own declaration should never fail");
738
739        let coord_cache = CoordCache::new(
740            config.node.cache.coord_size,
741            config.node.cache.coord_ttl_secs * 1000,
742        );
743        let rl = &config.node.rate_limit;
744        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
745            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
746            config.node.limits.max_pending_inbound,
747        );
748
749        let max_connections = config.node.limits.max_connections;
750        let max_peers = config.node.limits.max_peers;
751        let max_links = config.node.limits.max_links;
752        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
753        let backoff_base_secs = config.node.discovery.backoff_base_secs;
754        let backoff_max_secs = config.node.discovery.backoff_max_secs;
755        let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
756
757        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
758
759        Ok(Self {
760            identity,
761            startup_epoch,
762            started_at: std::time::Instant::now(),
763            config,
764            state: NodeState::Created,
765            is_leaf_only,
766            tree_state,
767            bloom_state,
768            coord_cache,
769            learned_routes: LearnedRouteTable::default(),
770            recent_requests: HashMap::new(),
771            transports: HashMap::new(),
772            transport_drops: HashMap::new(),
773            links: HashMap::new(),
774            addr_to_link: HashMap::new(),
775            packet_tx: None,
776            packet_rx: None,
777            connections: HashMap::new(),
778            peers: HashMap::new(),
779            sessions: HashMap::new(),
780            identity_cache: HashMap::new(),
781            pending_tun_packets: HashMap::new(),
782            pending_endpoint_data: HashMap::new(),
783            pending_lookups: HashMap::new(),
784            max_connections,
785            max_peers,
786            max_links,
787            next_link_id: 1,
788            next_transport_id: 1,
789            stats: stats::NodeStats::new(),
790            stats_history: stats_history::StatsHistory::new(),
791            tun_state,
792            tun_name: None,
793            tun_tx: None,
794            tun_outbound_rx: None,
795            external_packet_tx: None,
796            endpoint_command_rx: None,
797            endpoint_event_tx: None,
798            encrypt_workers: None,
799            decrypt_workers: None,
800            decrypt_registered_sessions: std::collections::HashSet::new(),
801            decrypt_fallback_tx,
802            decrypt_fallback_rx,
803            tun_reader_handle: None,
804            tun_writer_handle: None,
805            #[cfg(target_os = "macos")]
806            tun_shutdown_fd: None,
807            dns_identity_rx: None,
808            dns_task: None,
809            index_allocator: IndexAllocator::new(),
810            peers_by_index: HashMap::new(),
811            pending_outbound: HashMap::new(),
812            msg1_rate_limiter,
813            icmp_rate_limiter: IcmpRateLimiter::new(),
814            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
815            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
816                std::time::Duration::from_millis(coords_response_interval_ms),
817            ),
818            discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
819            discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
820                std::time::Duration::from_secs(forward_min_interval_secs),
821            ),
822            pending_connects: Vec::new(),
823            retry_pending: HashMap::new(),
824            nostr_discovery: None,
825            nostr_discovery_started_at_ms: None,
826            lan_discovery: None,
827            startup_open_discovery_sweep_done: false,
828            bootstrap_transports: HashSet::new(),
829            bootstrap_transport_npubs: HashMap::new(),
830            last_parent_reeval: None,
831            last_congestion_log: None,
832            estimated_mesh_size: None,
833            last_mesh_size_log: None,
834            last_self_warn: None,
835            last_local_send_failure_at: None,
836            peer_aliases: HashMap::new(),
837            peer_acl,
838            host_map,
839            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
840        })
841    }
842
843    /// Create a node with a specific identity.
844    ///
845    /// This constructor validates cross-field config invariants before
846    /// constructing the node, same as [`Node::new`].
847    pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
848        config.validate()?;
849        let node_addr = *identity.node_addr();
850
851        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
852        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
853
854        let mut startup_epoch = [0u8; 8];
855        rand::rng().fill_bytes(&mut startup_epoch);
856
857        let tun_state = if config.tun.enabled {
858            TunState::Configured
859        } else {
860            TunState::Disabled
861        };
862
863        // Initialize tree state with signed self-declaration
864        let mut tree_state = TreeState::new(node_addr);
865        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
866        tree_state.set_hold_down(config.node.tree.hold_down_secs);
867        tree_state.set_flap_dampening(
868            config.node.tree.flap_threshold,
869            config.node.tree.flap_window_secs,
870            config.node.tree.flap_dampening_secs,
871        );
872        tree_state
873            .sign_declaration(&identity)
874            .expect("signing own declaration should never fail");
875
876        let mut bloom_state = BloomState::new(node_addr);
877        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
878
879        let coord_cache = CoordCache::new(
880            config.node.cache.coord_size,
881            config.node.cache.coord_ttl_secs * 1000,
882        );
883        let rl = &config.node.rate_limit;
884        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
885            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
886            config.node.limits.max_pending_inbound,
887        );
888
889        let max_connections = config.node.limits.max_connections;
890        let max_peers = config.node.limits.max_peers;
891        let max_links = config.node.limits.max_links;
892        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
893
894        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
895
896        Ok(Self {
897            identity,
898            startup_epoch,
899            started_at: std::time::Instant::now(),
900            config,
901            state: NodeState::Created,
902            is_leaf_only: false,
903            tree_state,
904            bloom_state,
905            coord_cache,
906            learned_routes: LearnedRouteTable::default(),
907            recent_requests: HashMap::new(),
908            transports: HashMap::new(),
909            transport_drops: HashMap::new(),
910            links: HashMap::new(),
911            addr_to_link: HashMap::new(),
912            packet_tx: None,
913            packet_rx: None,
914            connections: HashMap::new(),
915            peers: HashMap::new(),
916            sessions: HashMap::new(),
917            identity_cache: HashMap::new(),
918            pending_tun_packets: HashMap::new(),
919            pending_endpoint_data: HashMap::new(),
920            pending_lookups: HashMap::new(),
921            max_connections,
922            max_peers,
923            max_links,
924            next_link_id: 1,
925            next_transport_id: 1,
926            stats: stats::NodeStats::new(),
927            stats_history: stats_history::StatsHistory::new(),
928            tun_state,
929            tun_name: None,
930            tun_tx: None,
931            tun_outbound_rx: None,
932            external_packet_tx: None,
933            endpoint_command_rx: None,
934            endpoint_event_tx: None,
935            encrypt_workers: None,
936            decrypt_workers: None,
937            decrypt_registered_sessions: std::collections::HashSet::new(),
938            decrypt_fallback_tx,
939            decrypt_fallback_rx,
940            tun_reader_handle: None,
941            tun_writer_handle: None,
942            #[cfg(target_os = "macos")]
943            tun_shutdown_fd: None,
944            dns_identity_rx: None,
945            dns_task: None,
946            index_allocator: IndexAllocator::new(),
947            peers_by_index: HashMap::new(),
948            pending_outbound: HashMap::new(),
949            msg1_rate_limiter,
950            icmp_rate_limiter: IcmpRateLimiter::new(),
951            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
952            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
953                std::time::Duration::from_millis(coords_response_interval_ms),
954            ),
955            discovery_backoff: DiscoveryBackoff::new(),
956            discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
957            pending_connects: Vec::new(),
958            retry_pending: HashMap::new(),
959            nostr_discovery: None,
960            nostr_discovery_started_at_ms: None,
961            lan_discovery: None,
962            startup_open_discovery_sweep_done: false,
963            bootstrap_transports: HashSet::new(),
964            bootstrap_transport_npubs: HashMap::new(),
965            last_parent_reeval: None,
966            last_congestion_log: None,
967            estimated_mesh_size: None,
968            last_mesh_size_log: None,
969            last_self_warn: None,
970            last_local_send_failure_at: None,
971            peer_aliases: HashMap::new(),
972            peer_acl,
973            host_map,
974            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
975        })
976    }
977
978    /// Create a leaf-only node (simplified state).
979    pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
980        let mut node = Self::new(config)?;
981        node.is_leaf_only = true;
982        node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
983        Ok(node)
984    }
985
986    fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
987        let base_host_map = HostMap::from_peer_configs(config.peers());
988        if !config.node.system_files_enabled {
989            return (
990                Arc::new(base_host_map.clone()),
991                acl::PeerAclReloader::memory_only(base_host_map),
992            );
993        }
994
995        let mut host_map = base_host_map.clone();
996        let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
997        let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
998            crate::upper::hosts::DEFAULT_HOSTS_PATH,
999        ));
1000        host_map.merge(hosts_file);
1001        let peer_acl = acl::PeerAclReloader::with_alias_sources(
1002            std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1003            std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1004            base_host_map,
1005            hosts_path,
1006        );
1007        (Arc::new(host_map), peer_acl)
1008    }
1009
1010    /// Create transport instances from configuration.
1011    ///
1012    /// Returns a vector of TransportHandles for all configured transports.
1013    async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1014        let mut transports = Vec::new();
1015
1016        // Collect UDP configs with optional names to avoid borrow conflicts
1017        let udp_instances: Vec<_> = self
1018            .config
1019            .transports
1020            .udp
1021            .iter()
1022            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1023            .collect();
1024
1025        // Create UDP transport instances
1026        for (name, udp_config) in udp_instances {
1027            let transport_id = self.allocate_transport_id();
1028            let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1029            transports.push(TransportHandle::Udp(udp));
1030        }
1031
1032        #[cfg(feature = "sim-transport")]
1033        {
1034            let sim_instances: Vec<_> = self
1035                .config
1036                .transports
1037                .sim
1038                .iter()
1039                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1040                .collect();
1041
1042            for (name, sim_config) in sim_instances {
1043                let transport_id = self.allocate_transport_id();
1044                let sim = crate::transport::sim::SimTransport::new(
1045                    transport_id,
1046                    name,
1047                    sim_config,
1048                    packet_tx.clone(),
1049                );
1050                transports.push(TransportHandle::Sim(sim));
1051            }
1052        }
1053
1054        // Create Ethernet transport instances where raw-socket support exists.
1055        #[cfg(any(target_os = "linux", target_os = "macos"))]
1056        {
1057            let eth_instances: Vec<_> = self
1058                .config
1059                .transports
1060                .ethernet
1061                .iter()
1062                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1063                .collect();
1064            let xonly = self.identity.pubkey();
1065            for (name, eth_config) in eth_instances {
1066                let transport_id = self.allocate_transport_id();
1067                let mut eth =
1068                    EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1069                eth.set_local_pubkey(xonly);
1070                transports.push(TransportHandle::Ethernet(eth));
1071            }
1072        }
1073
1074        // Create TCP transport instances
1075        let tcp_instances: Vec<_> = self
1076            .config
1077            .transports
1078            .tcp
1079            .iter()
1080            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1081            .collect();
1082
1083        for (name, tcp_config) in tcp_instances {
1084            let transport_id = self.allocate_transport_id();
1085            let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1086            transports.push(TransportHandle::Tcp(tcp));
1087        }
1088
1089        // Create Tor transport instances
1090        let tor_instances: Vec<_> = self
1091            .config
1092            .transports
1093            .tor
1094            .iter()
1095            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1096            .collect();
1097
1098        for (name, tor_config) in tor_instances {
1099            let transport_id = self.allocate_transport_id();
1100            let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1101            transports.push(TransportHandle::Tor(tor));
1102        }
1103
1104        // Create BLE transport instances
1105        #[cfg(bluer_available)]
1106        {
1107            let ble_instances: Vec<_> = self
1108                .config
1109                .transports
1110                .ble
1111                .iter()
1112                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1113                .collect();
1114
1115            #[cfg(all(bluer_available, not(test)))]
1116            for (name, ble_config) in ble_instances {
1117                let transport_id = self.allocate_transport_id();
1118                let adapter = ble_config.adapter().to_string();
1119                let mtu = ble_config.mtu();
1120                match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1121                    Ok(io) => {
1122                        let mut ble = crate::transport::ble::BleTransport::new(
1123                            transport_id,
1124                            name,
1125                            ble_config,
1126                            io,
1127                            packet_tx.clone(),
1128                        );
1129                        ble.set_local_pubkey(self.identity.pubkey().serialize());
1130                        transports.push(TransportHandle::Ble(ble));
1131                    }
1132                    Err(e) => {
1133                        tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1134                    }
1135                }
1136            }
1137
1138            #[cfg(any(not(bluer_available), test))]
1139            if !ble_instances.is_empty() {
1140                #[cfg(not(test))]
1141                tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1142            }
1143        }
1144
1145        transports
1146    }
1147
1148    /// Find an operational transport that matches the given transport type name.
1149    ///
1150    /// Adopted UDP bootstrap transports are point-to-point sockets handed off
1151    /// from Nostr/STUN traversal. They must not be reused for ordinary
1152    /// `udp host:port` dials discovered through static config, mDNS, or overlay
1153    /// adverts: on macOS a `send_to` through the wrong adopted socket can fail
1154    /// with `EINVAL`, and even on platforms that allow it the packet would use
1155    /// the wrong 5-tuple/NAT mapping. Prefer configured transports and make the
1156    /// choice deterministic by lowest transport id instead of HashMap order.
1157    fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1158        self.transports
1159            .iter()
1160            .filter(|(id, handle)| {
1161                handle.transport_type().name == transport_type
1162                    && handle.is_operational()
1163                    && !self.bootstrap_transports.contains(id)
1164            })
1165            .min_by_key(|(id, _)| id.as_u32())
1166            .map(|(id, _)| *id)
1167    }
1168
1169    /// Resolve an Ethernet peer address ("interface/mac") to a transport ID
1170    /// and binary TransportAddr.
1171    ///
1172    /// Finds the Ethernet transport instance bound to the named interface
1173    /// and parses the MAC portion into a 6-byte TransportAddr.
1174    #[allow(unused_variables)]
1175    fn resolve_ethernet_addr(
1176        &self,
1177        addr_str: &str,
1178    ) -> Result<(TransportId, TransportAddr), NodeError> {
1179        #[cfg(any(target_os = "linux", target_os = "macos"))]
1180        {
1181            let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1182                NodeError::NoTransportForType(format!(
1183                    "invalid Ethernet address format '{}': expected 'interface/mac'",
1184                    addr_str
1185                ))
1186            })?;
1187
1188            // Find the Ethernet transport bound to this interface
1189            let transport_id = self
1190                .transports
1191                .iter()
1192                .find(|(_, handle)| {
1193                    handle.transport_type().name == "ethernet"
1194                        && handle.is_operational()
1195                        && handle.interface_name() == Some(iface)
1196                })
1197                .map(|(id, _)| *id)
1198                .ok_or_else(|| {
1199                    NodeError::NoTransportForType(format!(
1200                        "no operational Ethernet transport for interface '{}'",
1201                        iface
1202                    ))
1203                })?;
1204
1205            let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1206                NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1207            })?;
1208
1209            Ok((transport_id, TransportAddr::from_bytes(&mac)))
1210        }
1211        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1212        {
1213            Err(NodeError::NoTransportForType(
1214                "Ethernet transport is not supported on this platform".to_string(),
1215            ))
1216        }
1217    }
1218
1219    /// Resolve a BLE address string (`"adapter/AA:BB:CC:DD:EE:FF"`) to a
1220    /// (TransportId, TransportAddr) pair by finding the BLE transport
1221    /// instance matching the adapter name.
1222    #[cfg(bluer_available)]
1223    fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1224        let ta = TransportAddr::from_string(addr_str);
1225        let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1226            NodeError::NoTransportForType(format!(
1227                "invalid BLE address format '{}': expected 'adapter/mac'",
1228                addr_str
1229            ))
1230        })?;
1231
1232        // Find the BLE transport for this adapter
1233        let transport_id = self
1234            .transports
1235            .iter()
1236            .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1237            .map(|(id, _)| *id)
1238            .ok_or_else(|| {
1239                NodeError::NoTransportForType(format!(
1240                    "no operational BLE transport for adapter '{}'",
1241                    adapter
1242                ))
1243            })?;
1244
1245        // Validate the address format
1246        crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1247            NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1248        })?;
1249
1250        Ok((transport_id, TransportAddr::from_string(addr_str)))
1251    }
1252
1253    // === Identity Accessors ===
1254
1255    /// Get this node's identity.
1256    pub fn identity(&self) -> &Identity {
1257        &self.identity
1258    }
1259
1260    /// Get this node's NodeAddr.
1261    pub fn node_addr(&self) -> &NodeAddr {
1262        self.identity.node_addr()
1263    }
1264
1265    /// Get this node's npub.
1266    pub fn npub(&self) -> String {
1267        self.identity.npub()
1268    }
1269
1270    /// Return a human-readable display name for a NodeAddr.
1271    ///
1272    /// Lookup order:
1273    /// 1. Host map hostname (from peer aliases + /etc/fips/hosts)
1274    /// 2. Configured peer alias or short npub (from startup map)
1275    /// 3. Active peer's short npub (e.g., inbound peer not in config)
1276    /// 4. Session endpoint's short npub (end-to-end, may not be direct peer)
1277    /// 5. Truncated NodeAddr hex (unknown address)
1278    pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1279        if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1280            return hostname.to_string();
1281        }
1282        if let Some(name) = self.peer_aliases.get(addr) {
1283            return name.clone();
1284        }
1285        if let Some(peer) = self.peers.get(addr) {
1286            return peer.identity().short_npub();
1287        }
1288        if let Some(entry) = self.sessions.get(addr) {
1289            let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1290            return PeerIdentity::from_pubkey(xonly).short_npub();
1291        }
1292        addr.short_hex()
1293    }
1294
1295    /// Tear down a `peers_by_index` entry **and** keep the shard-owned
1296    /// decrypt-worker state coherent: removes the same `cache_key`
1297    /// from the registered-sessions tracking set and tells the
1298    /// assigned shard worker to drop its `OwnedSessionState` entry.
1299    ///
1300    /// Use this instead of a bare `self.peers_by_index.remove(&key)`
1301    /// at every session-lifecycle teardown site (rekey cross-connection
1302    /// swap, peer disconnect, dispatch session-rotation) so the worker
1303    /// doesn't keep stale ciphers / replay windows around. The
1304    /// follow-up `RegisterSession` for the NEW key (if any) will then
1305    /// install the fresh state on the same shard.
1306    pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1307        // Find the peer that owns this index BEFORE removing it from
1308        // the index map, so we can decide whether the deregistration
1309        // also tears down the peer's connected UDP socket.
1310        let owning_peer = self.peers_by_index.get(&cache_key).copied();
1311        self.peers_by_index.remove(&cache_key);
1312        if self.decrypt_registered_sessions.remove(&cache_key)
1313            && let Some(workers) = self.decrypt_workers.as_ref()
1314        {
1315            workers.unregister_session(cache_key);
1316        }
1317        // Tear down the per-peer connected UDP socket *only* if no
1318        // other peers_by_index entry still resolves to this peer.
1319        // Rekey drain calls into this helper with the OLD session
1320        // index while the NEW index is already installed and points
1321        // at the same peer — there the connect()-ed 5-tuple is
1322        // still valid for the new session and we must not close it.
1323        // Peer-teardown sites (CrossConnection swap, stale-index
1324        // fall-through in encrypted.rs, disconnect handler) call
1325        // here when this is the peer's last index, so the connected
1326        // socket goes away with the peer.
1327        if let Some(peer_addr) = owning_peer {
1328            let peer_has_other_index = self
1329                .peers_by_index
1330                .values()
1331                .any(|other| *other == peer_addr);
1332            if !peer_has_other_index {
1333                self.clear_connected_udp_for_peer(&peer_addr);
1334            }
1335        }
1336    }
1337
1338    /// Ensure the current FMP receive index resolves to this peer.
1339    ///
1340    /// Rekey msg1/msg2 handlers pre-register the pending index before
1341    /// cutover, but losing that registration in a debug build used to
1342    /// panic in the cutover path. Repairing the map here is safe: the
1343    /// peer has already promoted the pending session, and the decrypt
1344    /// worker registration immediately after cutover depends on the
1345    /// same `(transport_id, our_index)` key.
1346    pub(in crate::node) fn ensure_current_session_index_registered(
1347        &mut self,
1348        node_addr: &NodeAddr,
1349        context: &'static str,
1350    ) -> bool {
1351        let Some(peer) = self.peers.get(node_addr) else {
1352            return false;
1353        };
1354        let Some(transport_id) = peer.transport_id() else {
1355            warn!(
1356                peer = %self.peer_display_name(node_addr),
1357                context,
1358                "Cannot register current session index without transport id"
1359            );
1360            return false;
1361        };
1362        let Some(our_index) = peer.our_index() else {
1363            warn!(
1364                peer = %self.peer_display_name(node_addr),
1365                context,
1366                "Cannot register current session index without local index"
1367            );
1368            return false;
1369        };
1370
1371        let cache_key = (transport_id, our_index.as_u32());
1372        match self.peers_by_index.get(&cache_key).copied() {
1373            Some(existing) if existing == *node_addr => true,
1374            Some(existing) => {
1375                warn!(
1376                    peer = %self.peer_display_name(node_addr),
1377                    previous_owner = %self.peer_display_name(&existing),
1378                    transport_id = %transport_id,
1379                    our_index = %our_index,
1380                    context,
1381                    "Repairing current session index with stale owner"
1382                );
1383                self.peers_by_index.insert(cache_key, *node_addr);
1384                true
1385            }
1386            None => {
1387                warn!(
1388                    peer = %self.peer_display_name(node_addr),
1389                    transport_id = %transport_id,
1390                    our_index = %our_index,
1391                    context,
1392                    "Repairing missing current session index"
1393                );
1394                self.peers_by_index.insert(cache_key, *node_addr);
1395                true
1396            }
1397        }
1398    }
1399
1400    // === Configuration ===
1401
1402    /// Get the configuration.
1403    pub fn config(&self) -> &Config {
1404        &self.config
1405    }
1406
1407    /// Calculate the effective IPv6 MTU that can be sent over FIPS.
1408    ///
1409    /// Delegates to `upper::icmp::effective_ipv6_mtu()` with this node's
1410    /// transport MTU. Returns the maximum IPv6 packet size (including
1411    /// IPv6 header) that can be transmitted through the FIPS mesh.
1412    pub fn effective_ipv6_mtu(&self) -> u16 {
1413        crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1414    }
1415
1416    /// Get the transport MTU governing the global TUN-boundary MSS clamp.
1417    ///
1418    /// Returns the **minimum** MTU across all operational transports, or
1419    /// 1280 (IPv6 minimum) as fallback. Used for initial TUN configuration
1420    /// where a specific egress transport isn't yet known: the resulting
1421    /// `effective_ipv6_mtu` (transport_mtu - 77) and `max_mss`
1422    /// (effective_mtu - 60) form a conservative ceiling that fits ANY
1423    /// configured-transport's egress, eliminating PMTU-D black holes that
1424    /// would otherwise occur when a flow's actual egress is smaller than
1425    /// the clamp ceiling assumed at TUN init.
1426    ///
1427    /// Returning the smallest (rather than the first-iterated, which used
1428    /// to vary across HashMap iteration order + async-startup race) makes
1429    /// the clamp deterministic across daemon restarts.
1430    ///
1431    /// See `ISSUE-2026-0011` for the empirical investigation.
1432    pub fn transport_mtu(&self) -> u16 {
1433        let min_operational = self
1434            .transports
1435            .values()
1436            .filter(|h| h.is_operational())
1437            .map(|h| h.mtu())
1438            .min();
1439        if let Some(mtu) = min_operational {
1440            return mtu;
1441        }
1442        // Fallback to config: try UDP first, then Ethernet
1443        if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1444            return cfg.mtu();
1445        }
1446        1280
1447    }
1448
1449    // === State ===
1450
1451    /// Get the node state.
1452    pub fn state(&self) -> NodeState {
1453        self.state
1454    }
1455
1456    /// Get the node uptime.
1457    pub fn uptime(&self) -> std::time::Duration {
1458        self.started_at.elapsed()
1459    }
1460
1461    /// Check if node is operational.
1462    pub fn is_running(&self) -> bool {
1463        self.state.is_operational()
1464    }
1465
1466    /// Check if this is a leaf-only node.
1467    pub fn is_leaf_only(&self) -> bool {
1468        self.is_leaf_only
1469    }
1470
1471    // === Tree State ===
1472
1473    /// Get the tree state.
1474    pub fn tree_state(&self) -> &TreeState {
1475        &self.tree_state
1476    }
1477
1478    /// Get mutable tree state.
1479    pub fn tree_state_mut(&mut self) -> &mut TreeState {
1480        &mut self.tree_state
1481    }
1482
1483    // === Bloom State ===
1484
1485    /// Get the Bloom filter state.
1486    pub fn bloom_state(&self) -> &BloomState {
1487        &self.bloom_state
1488    }
1489
1490    /// Get mutable Bloom filter state.
1491    pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1492        &mut self.bloom_state
1493    }
1494
1495    // === Mesh Size Estimate ===
1496
1497    /// Get the cached estimated mesh size.
1498    pub fn estimated_mesh_size(&self) -> Option<u64> {
1499        self.estimated_mesh_size
1500    }
1501
1502    /// Compute and cache the estimated mesh size from bloom filters.
1503    ///
1504    /// Uses the spanning tree partition: parent's filter covers nodes reachable
1505    /// upward, children's filters cover disjoint subtrees downward. The sum
1506    /// of estimated entry counts plus one (self) approximates total network size.
1507    pub(crate) fn compute_mesh_size(&mut self) {
1508        let my_addr = *self.tree_state.my_node_addr();
1509        let parent_id = *self.tree_state.my_declaration().parent_id();
1510        let is_root = self.tree_state.is_root();
1511
1512        let max_fpr = self.config.node.bloom.max_inbound_fpr;
1513        let mut total: f64 = 1.0; // count self
1514        let mut child_count: u32 = 0;
1515        let mut has_data = false;
1516
1517        // Parent's filter: nodes reachable upward through the tree.
1518        // If any contributing filter is above the FPR cap, we refuse to
1519        // estimate rather than substitute a partial/biased aggregate —
1520        // Node.estimated_mesh_size is already Option<u64> and consumers
1521        // (control socket, fipstop, periodic debug log) handle None.
1522        if !is_root
1523            && let Some(parent) = self.peers.get(&parent_id)
1524            && let Some(filter) = parent.inbound_filter()
1525        {
1526            match filter.estimated_count(max_fpr) {
1527                Some(n) => {
1528                    total += n;
1529                    has_data = true;
1530                }
1531                None => {
1532                    self.estimated_mesh_size = None;
1533                    return;
1534                }
1535            }
1536        }
1537
1538        // Children's filters: each child's subtree is disjoint
1539        for (peer_addr, peer) in &self.peers {
1540            if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1541                && *decl.parent_id() == my_addr
1542            {
1543                child_count += 1;
1544                if let Some(filter) = peer.inbound_filter() {
1545                    match filter.estimated_count(max_fpr) {
1546                        Some(n) => {
1547                            total += n;
1548                            has_data = true;
1549                        }
1550                        None => {
1551                            self.estimated_mesh_size = None;
1552                            return;
1553                        }
1554                    }
1555                }
1556            }
1557        }
1558
1559        if !has_data {
1560            self.estimated_mesh_size = None;
1561            return;
1562        }
1563
1564        let size = total.round() as u64;
1565        self.estimated_mesh_size = Some(size);
1566
1567        // Periodic logging (reuse MMP default interval: 30s)
1568        let now = std::time::Instant::now();
1569        let should_log = match self.last_mesh_size_log {
1570            None => true,
1571            Some(last) => {
1572                now.duration_since(last)
1573                    >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1574            }
1575        };
1576        if should_log {
1577            tracing::debug!(
1578                estimated_mesh_size = size,
1579                peers = self.peers.len(),
1580                children = child_count,
1581                "Mesh size estimate"
1582            );
1583            self.last_mesh_size_log = Some(now);
1584        }
1585    }
1586
1587    // === Coord Cache ===
1588
1589    /// Get the coordinate cache.
1590    pub fn coord_cache(&self) -> &CoordCache {
1591        &self.coord_cache
1592    }
1593
1594    /// Get mutable coordinate cache.
1595    pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1596        &mut self.coord_cache
1597    }
1598
1599    // === Node Statistics ===
1600
1601    /// Get the node statistics.
1602    pub fn stats(&self) -> &stats::NodeStats {
1603        &self.stats
1604    }
1605
1606    /// Get mutable node statistics.
1607    pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1608        &mut self.stats
1609    }
1610
1611    /// Get the stats history collector.
1612    pub fn stats_history(&self) -> &stats_history::StatsHistory {
1613        &self.stats_history
1614    }
1615
1616    /// Sample the current node state into the stats history ring.
1617    /// Called once per tick from the RX loop.
1618    pub(crate) fn record_stats_history(&mut self) {
1619        let fwd = &self.stats.forwarding;
1620        let peers_with_mmp: Vec<f64> = self
1621            .peers
1622            .values()
1623            .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1624            .collect();
1625        let loss_rate = if peers_with_mmp.is_empty() {
1626            0.0
1627        } else {
1628            peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1629        };
1630
1631        let snap = stats_history::Snapshot {
1632            mesh_size: self.estimated_mesh_size,
1633            tree_depth: self.tree_state.my_coords().depth() as u32,
1634            peer_count: self.peers.len() as u64,
1635            parent_switches_total: self.stats.tree.parent_switches,
1636            bytes_in_total: fwd.received_bytes,
1637            bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1638            packets_in_total: fwd.received_packets,
1639            packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1640            loss_rate,
1641            active_sessions: self.sessions.len() as u64,
1642        };
1643
1644        let now = std::time::Instant::now();
1645        let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1646            .peers
1647            .values()
1648            .map(|p| {
1649                let stats = p.link_stats();
1650                let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1651                    Some(m) => (
1652                        m.metrics.srtt_ms(),
1653                        Some(m.metrics.loss_rate()),
1654                        m.receiver.ecn_ce_count() as u64,
1655                    ),
1656                    None => (None, None, 0),
1657                };
1658                stats_history::PeerSnapshot {
1659                    node_addr: *p.node_addr(),
1660                    last_seen: now,
1661                    srtt_ms,
1662                    loss_rate,
1663                    bytes_in_total: stats.bytes_recv,
1664                    bytes_out_total: stats.bytes_sent,
1665                    packets_in_total: stats.packets_recv,
1666                    packets_out_total: stats.packets_sent,
1667                    ecn_ce_total: ecn_ce,
1668                }
1669            })
1670            .collect();
1671
1672        self.stats_history.tick(now, &snap, &peer_snaps);
1673    }
1674
1675    // === TUN Interface ===
1676
1677    /// Get the TUN state.
1678    pub fn tun_state(&self) -> TunState {
1679        self.tun_state
1680    }
1681
1682    /// Get the TUN interface name, if active.
1683    pub fn tun_name(&self) -> Option<&str> {
1684        self.tun_name.as_deref()
1685    }
1686
1687    // === Resource Limits ===
1688
1689    /// Set the maximum number of connections (handshake phase).
1690    pub fn set_max_connections(&mut self, max: usize) {
1691        self.max_connections = max;
1692    }
1693
1694    /// Set the maximum number of peers (authenticated).
1695    pub fn set_max_peers(&mut self, max: usize) {
1696        self.max_peers = max;
1697    }
1698
1699    /// Set the maximum number of links.
1700    pub fn set_max_links(&mut self, max: usize) {
1701        self.max_links = max;
1702    }
1703
1704    // === Counts ===
1705
1706    /// Number of pending connections (handshake in progress).
1707    pub fn connection_count(&self) -> usize {
1708        self.connections.len()
1709    }
1710
1711    /// Number of authenticated peers.
1712    pub fn peer_count(&self) -> usize {
1713        self.peers.len()
1714    }
1715
1716    /// Number of active links.
1717    pub fn link_count(&self) -> usize {
1718        self.links.len()
1719    }
1720
1721    /// Number of active transports.
1722    pub fn transport_count(&self) -> usize {
1723        self.transports.len()
1724    }
1725
1726    // === Transport Management ===
1727
1728    /// Allocate a new transport ID.
1729    pub fn allocate_transport_id(&mut self) -> TransportId {
1730        let id = TransportId::new(self.next_transport_id);
1731        self.next_transport_id += 1;
1732        id
1733    }
1734
1735    /// Get a transport by ID.
1736    pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1737        self.transports.get(id)
1738    }
1739
1740    /// Get mutable transport by ID.
1741    pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1742        self.transports.get_mut(id)
1743    }
1744
1745    /// Iterate over transport IDs.
1746    pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1747        self.transports.keys()
1748    }
1749
1750    /// Get the packet receiver for the event loop.
1751    pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1752        self.packet_rx.as_mut()
1753    }
1754
1755    // === Link Management ===
1756
1757    /// Allocate a new link ID.
1758    pub fn allocate_link_id(&mut self) -> LinkId {
1759        let id = LinkId::new(self.next_link_id);
1760        self.next_link_id += 1;
1761        id
1762    }
1763
1764    /// Add a link.
1765    pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1766        if self.max_links > 0 && self.links.len() >= self.max_links {
1767            return Err(NodeError::MaxLinksExceeded {
1768                max: self.max_links,
1769            });
1770        }
1771        let link_id = link.link_id();
1772        let transport_id = link.transport_id();
1773        let remote_addr = link.remote_addr().clone();
1774
1775        self.links.insert(link_id, link);
1776        self.addr_to_link
1777            .insert((transport_id, remote_addr), link_id);
1778        Ok(())
1779    }
1780
1781    /// Get a link by ID.
1782    pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
1783        self.links.get(link_id)
1784    }
1785
1786    /// Get a mutable link by ID.
1787    pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
1788        self.links.get_mut(link_id)
1789    }
1790
1791    /// Find link ID by transport address.
1792    pub fn find_link_by_addr(
1793        &self,
1794        transport_id: TransportId,
1795        addr: &TransportAddr,
1796    ) -> Option<LinkId> {
1797        self.addr_to_link
1798            .get(&(transport_id, addr.clone()))
1799            .copied()
1800    }
1801
1802    /// Remove a link.
1803    ///
1804    /// Only removes the addr_to_link reverse lookup if it still points to this
1805    /// link. In cross-connection scenarios, a newer link may have replaced the
1806    /// entry for the same address.
1807    pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
1808        if let Some(link) = self.links.remove(link_id) {
1809            // Clean up reverse lookup only if it still maps to this link
1810            let key = (link.transport_id(), link.remote_addr().clone());
1811            if self.addr_to_link.get(&key) == Some(link_id) {
1812                self.addr_to_link.remove(&key);
1813            }
1814            Some(link)
1815        } else {
1816            None
1817        }
1818    }
1819
1820    pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
1821        if !self.bootstrap_transports.contains(&transport_id) {
1822            return;
1823        }
1824
1825        let transport_in_use = self
1826            .links
1827            .values()
1828            .any(|link| link.transport_id() == transport_id)
1829            || self
1830                .connections
1831                .values()
1832                .any(|conn| conn.transport_id() == Some(transport_id))
1833            || self
1834                .peers
1835                .values()
1836                .any(|peer| peer.transport_id() == Some(transport_id))
1837            || self
1838                .pending_connects
1839                .iter()
1840                .any(|pending| pending.transport_id == transport_id);
1841
1842        if transport_in_use {
1843            return;
1844        }
1845
1846        tracing::debug!(
1847            transport_id = %transport_id,
1848            "bootstrap transport has no remaining references; dropping"
1849        );
1850
1851        self.bootstrap_transports.remove(&transport_id);
1852        self.bootstrap_transport_npubs.remove(&transport_id);
1853        self.transport_drops.remove(&transport_id);
1854        self.transports.remove(&transport_id);
1855    }
1856
1857    /// Iterate over all links.
1858    pub fn links(&self) -> impl Iterator<Item = &Link> {
1859        self.links.values()
1860    }
1861
1862    // === Connection Management (Handshake Phase) ===
1863
1864    /// Add a pending connection.
1865    pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
1866        let link_id = connection.link_id();
1867
1868        if self.connections.contains_key(&link_id) {
1869            return Err(NodeError::ConnectionAlreadyExists(link_id));
1870        }
1871
1872        if self.max_connections > 0 && self.connections.len() >= self.max_connections {
1873            return Err(NodeError::MaxConnectionsExceeded {
1874                max: self.max_connections,
1875            });
1876        }
1877
1878        self.connections.insert(link_id, connection);
1879        Ok(())
1880    }
1881
1882    /// Get a connection by LinkId.
1883    pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
1884        self.connections.get(link_id)
1885    }
1886
1887    /// Get a mutable connection by LinkId.
1888    pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
1889        self.connections.get_mut(link_id)
1890    }
1891
1892    /// Remove a connection.
1893    pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
1894        self.connections.remove(link_id)
1895    }
1896
1897    /// Iterate over all connections.
1898    pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
1899        self.connections.values()
1900    }
1901
1902    // === Peer Management (Active Phase) ===
1903
1904    /// Get a peer by NodeAddr.
1905    pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
1906        self.peers.get(node_addr)
1907    }
1908
1909    /// Get a mutable peer by NodeAddr.
1910    pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
1911        self.peers.get_mut(node_addr)
1912    }
1913
1914    /// Remove a peer.
1915    pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
1916        self.peers.remove(node_addr)
1917    }
1918
1919    /// Iterate over all peers.
1920    pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
1921        self.peers.values()
1922    }
1923
1924    /// Reference to the Nostr discovery handle if discovery is enabled.
1925    /// Used by control queries (`show_peers` per-peer Nostr-traversal
1926    /// state) to read failure-state without taking shared ownership.
1927    pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
1928        self.nostr_discovery.as_deref()
1929    }
1930
1931    /// Iterate over all peer node IDs.
1932    pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
1933        self.peers.keys()
1934    }
1935
1936    /// Iterate over peers that can send traffic.
1937    pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
1938        self.peers.values().filter(|p| p.can_send())
1939    }
1940
1941    /// Number of peers that can send traffic.
1942    pub fn sendable_peer_count(&self) -> usize {
1943        self.peers.values().filter(|p| p.can_send()).count()
1944    }
1945
1946    // === End-to-End Sessions ===
1947
1948    /// Get a session by remote NodeAddr.
1949    /// Disable the discovery forward rate limiter (for tests).
1950    #[cfg(test)]
1951    pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
1952        self.discovery_forward_limiter
1953            .set_interval(std::time::Duration::ZERO);
1954    }
1955
1956    #[cfg(test)]
1957    pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
1958        self.sessions.get(remote)
1959    }
1960
1961    /// Get a mutable session by remote NodeAddr.
1962    #[cfg(test)]
1963    pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
1964        self.sessions.get_mut(remote)
1965    }
1966
1967    /// Remove a session.
1968    #[cfg(test)]
1969    pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
1970        self.sessions.remove(remote)
1971    }
1972
1973    /// Read the path_mtu_lookup entry for a destination FipsAddress.
1974    #[cfg(test)]
1975    pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
1976        self.path_mtu_lookup
1977            .read()
1978            .ok()
1979            .and_then(|map| map.get(fips_addr).copied())
1980    }
1981
1982    /// Write a path_mtu_lookup entry directly (for tests that pre-seed the map).
1983    #[cfg(test)]
1984    pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
1985        if let Ok(mut map) = self.path_mtu_lookup.write() {
1986            map.insert(fips_addr, mtu);
1987        }
1988    }
1989
1990    /// Number of end-to-end sessions.
1991    pub fn session_count(&self) -> usize {
1992        self.sessions.len()
1993    }
1994
1995    /// Iterate over all session entries (for control queries).
1996    pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
1997        self.sessions.iter()
1998    }
1999
2000    // === Identity Cache ===
2001
2002    /// Register a node in the identity cache for FipsAddress → NodeAddr lookup.
2003    pub(crate) fn register_identity(
2004        &mut self,
2005        node_addr: NodeAddr,
2006        pubkey: secp256k1::PublicKey,
2007    ) -> bool {
2008        let mut prefix = [0u8; 15];
2009        prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2010        if let Some(entry) = self.identity_cache.get(&prefix)
2011            && entry.node_addr == node_addr
2012            && entry.pubkey == pubkey
2013        {
2014            // Endpoint sends pass the same PeerIdentity on every packet. Once
2015            // validated, avoid re-deriving NodeAddr from the public key in the
2016            // data path; that hash showed up in macOS sender profiles.
2017            return true;
2018        }
2019
2020        let (xonly, _) = pubkey.x_only_public_key();
2021        let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2022        if derived_node_addr != node_addr {
2023            debug!(
2024                claimed_node_addr = %node_addr,
2025                derived_node_addr = %derived_node_addr,
2026                "Rejected identity cache entry with mismatched public key"
2027            );
2028            return false;
2029        }
2030
2031        let now_ms = Self::now_ms();
2032        if let Some(entry) = self.identity_cache.get_mut(&prefix)
2033            && entry.node_addr == node_addr
2034        {
2035            entry.pubkey = pubkey;
2036            entry.last_seen_ms = now_ms;
2037            return true;
2038        }
2039
2040        let npub = encode_npub(&xonly);
2041        self.identity_cache.insert(
2042            prefix,
2043            IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2044        );
2045        // LRU eviction
2046        let max = self.config.node.cache.identity_size;
2047        if self.identity_cache.len() > max
2048            && let Some(oldest_key) = self
2049                .identity_cache
2050                .iter()
2051                .min_by_key(|(_, entry)| entry.last_seen_ms)
2052                .map(|(k, _)| *k)
2053        {
2054            self.identity_cache.remove(&oldest_key);
2055        }
2056        true
2057    }
2058
2059    /// Look up a destination by FipsAddress prefix (bytes 1-15 of the IPv6 address).
2060    pub(crate) fn lookup_by_fips_prefix(
2061        &mut self,
2062        prefix: &[u8; 15],
2063    ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2064        if let Some(entry) = self.identity_cache.get_mut(prefix) {
2065            entry.last_seen_ms = Self::now_ms(); // LRU touch
2066            Some((entry.node_addr, entry.pubkey))
2067        } else {
2068            None
2069        }
2070    }
2071
2072    /// Check if a node's identity is in the cache (without LRU touch).
2073    pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2074        let mut prefix = [0u8; 15];
2075        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2076        self.identity_cache.contains_key(&prefix)
2077    }
2078
2079    /// Number of identity cache entries.
2080    pub fn identity_cache_len(&self) -> usize {
2081        self.identity_cache.len()
2082    }
2083
2084    /// Iterate over identity cache entries.
2085    ///
2086    /// Returns `(NodeAddr, PublicKey, last_seen_ms)` for each cached identity.
2087    /// Used by the `show_identity_cache` control query.
2088    pub fn identity_cache_iter(
2089        &self,
2090    ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2091        self.identity_cache
2092            .values()
2093            .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2094    }
2095
2096    /// Configured maximum identity cache size.
2097    pub fn identity_cache_max(&self) -> usize {
2098        self.config.node.cache.identity_size
2099    }
2100
2101    /// Number of pending discovery lookups.
2102    pub fn pending_lookup_count(&self) -> usize {
2103        self.pending_lookups.len()
2104    }
2105
2106    /// Iterate over pending discovery lookups for diagnostics.
2107    pub fn pending_lookups_iter(
2108        &self,
2109    ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2110        self.pending_lookups.iter()
2111    }
2112
2113    /// Number of recent discovery requests tracked.
2114    pub fn recent_request_count(&self) -> usize {
2115        self.recent_requests.len()
2116    }
2117
2118    /// Count of destinations with queued TUN packets awaiting session setup.
2119    pub fn pending_tun_destinations(&self) -> usize {
2120        self.pending_tun_packets.len()
2121    }
2122
2123    /// Total TUN packets queued across all destinations.
2124    pub fn pending_tun_total_packets(&self) -> usize {
2125        self.pending_tun_packets.values().map(|q| q.len()).sum()
2126    }
2127
2128    /// Iterate over retry state for diagnostics.
2129    pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2130        self.retry_pending.iter()
2131    }
2132
2133    // === Routing ===
2134
2135    /// Check if a peer is a tree neighbor (parent or child in the spanning tree).
2136    ///
2137    /// Returns true if the peer is our current tree parent, or if the peer
2138    /// has declared us as their parent (making them our child).
2139    pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2140        // Peer is our parent
2141        if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2142            return true;
2143        }
2144        // Peer is our child (their declaration names us as parent)
2145        if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2146            && decl.parent_id() == self.node_addr()
2147        {
2148            return true;
2149        }
2150        false
2151    }
2152
2153    /// Find next hop for a destination node address.
2154    ///
2155    /// Routing priority:
2156    /// 1. Destination is self → `None` (local delivery)
2157    /// 2. Destination is a direct peer → that peer
2158    /// 3. Reply-learned routes in `reply_learned` mode. These are locally
2159    ///    observed reverse paths, selected with weighted multipath plus
2160    ///    periodic coordinate/tree exploration.
2161    /// 4. Bloom filter candidates with cached dest coords → among peers whose
2162    ///    bloom filter contains the destination, pick the one that minimizes
2163    ///    tree distance to the destination, with
2164    ///    `(link_cost, tree_distance_to_dest, node_addr)` tie-breaking.
2165    ///    The self-distance check ensures only peers strictly closer to the
2166    ///    destination than us are considered (prevents routing loops).
2167    /// 5. Greedy tree routing fallback (requires cached dest coords)
2168    /// 6. No route → `None`
2169    ///
2170    /// Both the bloom filter and tree routing paths require cached destination
2171    /// coordinates (checked in `coord_cache`). Without coordinates, the node
2172    /// cannot make loop-free forwarding decisions. The caller should signal
2173    /// `CoordsRequired` back to the source when `None` is returned for a
2174    /// non-local destination.
2175    pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2176        // 1. Local delivery
2177        if dest_node_addr == self.node_addr() {
2178            return None;
2179        }
2180
2181        // 2. Direct peer
2182        if let Some(peer) = self.peers.get(dest_node_addr)
2183            && peer.can_send()
2184        {
2185            return Some(peer);
2186        }
2187
2188        let now_ms = Self::now_ms();
2189
2190        let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2191            Some(
2192                self.peers
2193                    .iter()
2194                    .filter(|(_, peer)| peer.can_send())
2195                    .map(|(addr, _)| *addr)
2196                    .collect::<HashSet<_>>(),
2197            )
2198        } else {
2199            None
2200        };
2201
2202        // 3. Optional reply-learned routing. These entries are not peer
2203        // claims; they are local observations of which peer carried traffic
2204        // or a verified lookup response back from the destination. Most
2205        // packets use weighted multipath over learned routes, but periodic
2206        // fallback exploration lets coord/bloom/tree routes discover better
2207        // candidates.
2208        let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2209            self.learned_routes.should_explore_fallback(
2210                dest_node_addr,
2211                now_ms,
2212                self.config.node.routing.learned_fallback_explore_interval,
2213                |addr| sendable.contains(addr),
2214            )
2215        });
2216        if let Some(sendable) = &sendable_learned_peers
2217            && !explore_fallback
2218            && let Some(next_hop_addr) =
2219                self.learned_routes
2220                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2221        {
2222            return self.peers.get(&next_hop_addr);
2223        }
2224
2225        // Look up cached destination coordinates (required by both bloom and tree paths).
2226        let Some(dest_coords) = self
2227            .coord_cache
2228            .get_and_touch(dest_node_addr, now_ms)
2229            .cloned()
2230        else {
2231            if let Some(sendable) = &sendable_learned_peers
2232                && let Some(next_hop_addr) =
2233                    self.learned_routes
2234                        .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2235            {
2236                return self.peers.get(&next_hop_addr);
2237            }
2238            return None;
2239        };
2240
2241        // 4. Bloom filter candidates — requires dest_coords for loop-free selection.
2242        //    If no candidate is strictly closer, fall through to tree routing.
2243        let coordinate_route_addr = {
2244            let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2245            if !candidates.is_empty() {
2246                self.select_best_candidate(&candidates, &dest_coords)
2247                    .map(|peer| *peer.node_addr())
2248            } else {
2249                None
2250            }
2251        };
2252        if let Some(next_hop_addr) = coordinate_route_addr {
2253            return self.peers.get(&next_hop_addr);
2254        }
2255
2256        // 5. Greedy tree routing fallback
2257        let tree_route_addr = self
2258            .tree_state
2259            .find_next_hop(&dest_coords)
2260            .filter(|next_hop_id| {
2261                self.peers
2262                    .get(next_hop_id)
2263                    .is_some_and(|peer| peer.can_send())
2264            });
2265        if let Some(next_hop_addr) = tree_route_addr {
2266            return self.peers.get(&next_hop_addr);
2267        }
2268        if explore_fallback {
2269            return sendable_learned_peers.as_ref().and_then(|sendable| {
2270                self.learned_routes
2271                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2272                    .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2273            });
2274        }
2275
2276        if let Some(sendable) = &sendable_learned_peers
2277            && let Some(next_hop_addr) =
2278                self.learned_routes
2279                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2280        {
2281            return self.peers.get(&next_hop_addr);
2282        }
2283
2284        None
2285    }
2286
2287    pub(in crate::node) fn learn_reverse_route(
2288        &mut self,
2289        destination: NodeAddr,
2290        next_hop: NodeAddr,
2291    ) {
2292        if self.config.node.routing.mode != RoutingMode::ReplyLearned
2293            || destination == *self.node_addr()
2294        {
2295            return;
2296        }
2297        let now_ms = Self::now_ms();
2298        self.learned_routes.learn(
2299            destination,
2300            next_hop,
2301            now_ms,
2302            self.config.node.routing.learned_ttl_secs,
2303            self.config.node.routing.max_learned_routes_per_dest,
2304        );
2305    }
2306
2307    pub(in crate::node) fn record_route_failure(
2308        &mut self,
2309        destination: NodeAddr,
2310        next_hop: NodeAddr,
2311    ) {
2312        if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2313            return;
2314        }
2315        self.learned_routes.record_failure(&destination, &next_hop);
2316    }
2317
2318    pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2319        self.learned_routes.snapshot(now_ms)
2320    }
2321
2322    pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2323        self.learned_routes.purge_expired(now_ms);
2324    }
2325
2326    /// Select the best peer from a set of bloom filter candidates.
2327    ///
2328    /// Uses distance from each candidate's tree coordinates to the destination
2329    /// as the primary metric (after link_cost). Only selects peers that are
2330    /// strictly closer to the destination than we are (self-distance check
2331    /// prevents routing loops).
2332    ///
2333    /// Ordering: `(link_cost, distance_to_dest, node_addr)`.
2334    fn select_best_candidate<'a>(
2335        &'a self,
2336        candidates: &[&'a ActivePeer],
2337        dest_coords: &crate::tree::TreeCoordinate,
2338    ) -> Option<&'a ActivePeer> {
2339        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2340
2341        let mut best: Option<(&ActivePeer, f64, usize)> = None;
2342
2343        for &candidate in candidates {
2344            if !candidate.can_send() {
2345                continue;
2346            }
2347
2348            let cost = candidate.link_cost();
2349
2350            let dist = self
2351                .tree_state
2352                .peer_coords(candidate.node_addr())
2353                .map(|pc| pc.distance_to(dest_coords))
2354                .unwrap_or(usize::MAX);
2355
2356            // Self-distance check: only consider peers strictly closer
2357            // to the destination than we are (prevents routing loops)
2358            if dist >= my_distance {
2359                continue;
2360            }
2361
2362            let dominated = match &best {
2363                None => true,
2364                Some((_, best_cost, best_dist)) => {
2365                    cost < *best_cost
2366                        || (cost == *best_cost && dist < *best_dist)
2367                        || (cost == *best_cost
2368                            && dist == *best_dist
2369                            && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2370                }
2371            };
2372
2373            if dominated {
2374                best = Some((candidate, cost, dist));
2375            }
2376        }
2377
2378        best.map(|(peer, _, _)| peer)
2379    }
2380
2381    /// Check if a destination is in any peer's bloom filter.
2382    pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2383        self.peers.values().filter(|p| p.may_reach(dest)).collect()
2384    }
2385
2386    /// Get the TUN packet sender channel.
2387    ///
2388    /// Returns None if TUN is not active or the node hasn't been started.
2389    pub fn tun_tx(&self) -> Option<&TunTx> {
2390        self.tun_tx.as_ref()
2391    }
2392
2393    /// Attach app-owned packet I/O for embedded operation without a system TUN.
2394    ///
2395    /// This must be called before [`Node::start`] and requires `tun.enabled =
2396    /// false`. Outbound packets sent to the returned sender are processed by the
2397    /// normal session pipeline. Inbound packets delivered by FIPS sessions are
2398    /// sent to the returned receiver with source attribution.
2399    pub fn attach_external_packet_io(
2400        &mut self,
2401        capacity: usize,
2402    ) -> Result<ExternalPacketIo, NodeError> {
2403        if self.state != NodeState::Created {
2404            return Err(NodeError::Config(ConfigError::Validation(
2405                "external packet I/O must be attached before node start".to_string(),
2406            )));
2407        }
2408        if self.config.tun.enabled {
2409            return Err(NodeError::Config(ConfigError::Validation(
2410                "external packet I/O requires tun.enabled=false".to_string(),
2411            )));
2412        }
2413
2414        let capacity = capacity.max(1);
2415        let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2416        let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2417        self.tun_outbound_rx = Some(outbound_rx);
2418        self.external_packet_tx = Some(inbound_tx);
2419
2420        Ok(ExternalPacketIo {
2421            outbound_tx,
2422            inbound_rx,
2423        })
2424    }
2425
2426    /// Attach app-owned endpoint data I/O for embedded operation.
2427    ///
2428    /// Commands sent to the returned sender are processed by the node RX loop.
2429    /// Incoming endpoint data is emitted as source-attributed events.
2430    pub(crate) fn attach_endpoint_data_io(
2431        &mut self,
2432        capacity: usize,
2433    ) -> Result<EndpointDataIo, NodeError> {
2434        if self.state != NodeState::Created {
2435            return Err(NodeError::Config(ConfigError::Validation(
2436                "endpoint data I/O must be attached before node start".to_string(),
2437            )));
2438        }
2439
2440        let command_capacity = endpoint_data_command_capacity(capacity);
2441        let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2442        // Inbound endpoint-data events use an unbounded channel — see
2443        // `EndpointDataIo::event_rx` docs for the rationale (kills the
2444        // per-packet semaphore + the cross-task relay task that used to
2445        // sit on top of this channel).
2446        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2447        self.endpoint_command_rx = Some(command_rx);
2448        self.endpoint_event_tx = Some(event_tx.clone());
2449
2450        Ok(EndpointDataIo {
2451            command_tx,
2452            event_rx,
2453            event_tx,
2454        })
2455    }
2456
2457    pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2458        let mut prefix = [0u8; 15];
2459        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2460        self.identity_cache
2461            .get(&prefix)
2462            .filter(|entry| &entry.node_addr == addr)
2463            .map(|entry| entry.pubkey)
2464    }
2465
2466    pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2467        let mut prefix = [0u8; 15];
2468        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2469        self.identity_cache
2470            .get(&prefix)
2471            .filter(|entry| &entry.node_addr == addr)
2472            .map(|entry| entry.npub.clone())
2473    }
2474
2475    pub(in crate::node) fn deliver_external_ipv6_packet(
2476        &self,
2477        src_addr: &NodeAddr,
2478        packet: Vec<u8>,
2479    ) {
2480        let Some(external_packet_tx) = &self.external_packet_tx else {
2481            return;
2482        };
2483        if packet.len() < 40 {
2484            return;
2485        }
2486        let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2487            return;
2488        };
2489        let delivered = NodeDeliveredPacket {
2490            source_node_addr: *src_addr,
2491            source_npub: self.npub_for_node_addr(src_addr),
2492            destination,
2493            packet,
2494        };
2495        if let Err(error) = external_packet_tx.try_send(delivered) {
2496            debug!(error = %error, "Failed to deliver packet to external app sink");
2497        }
2498    }
2499
2500    // === Sending ===
2501
2502    /// Encrypt and send a link-layer message to an authenticated peer.
2503    ///
2504    /// The plaintext should include the message type byte followed by the
2505    /// message-specific payload (e.g., `[0x50, reason]` for Disconnect).
2506    ///
2507    /// The send path prepends a 4-byte session-relative timestamp (inner
2508    /// header) before encryption. The full 16-byte outer header is used
2509    /// as AAD for the AEAD construction.
2510    ///
2511    /// This is the standard path for sending any link-layer control message
2512    /// to a peer over their encrypted Noise session.
2513    pub(super) async fn send_encrypted_link_message(
2514        &mut self,
2515        node_addr: &NodeAddr,
2516        plaintext: &[u8],
2517    ) -> Result<(), NodeError> {
2518        self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2519            .await
2520    }
2521
2522    /// Update the local-outbound-broken signal from a `transport.send`
2523    /// outcome. Sets `last_local_send_failure_at` on local-side io
2524    /// errors (NetworkUnreachable / HostUnreachable / AddrNotAvailable);
2525    /// clears it on success. The reaper consults this in
2526    /// `check_link_heartbeats` to switch to `fast_link_dead_timeout_secs`.
2527    pub(in crate::node) fn note_local_send_outcome(
2528        &mut self,
2529        result: &Result<usize, TransportError>,
2530    ) {
2531        match result {
2532            Ok(_) => {
2533                if self.last_local_send_failure_at.is_some() {
2534                    self.last_local_send_failure_at = None;
2535                }
2536            }
2537            Err(TransportError::Io(e))
2538                if matches!(
2539                    e.kind(),
2540                    std::io::ErrorKind::NetworkUnreachable
2541                        | std::io::ErrorKind::HostUnreachable
2542                        | std::io::ErrorKind::AddrNotAvailable
2543                ) =>
2544            {
2545                self.last_local_send_failure_at = Some(std::time::Instant::now());
2546            }
2547            Err(_) => {}
2548        }
2549    }
2550
2551    /// Returns the wall-clock instant of the most recent observed local
2552    /// outbound failure, if any. Used by the link-dead reaper.
2553    pub(in crate::node) fn last_local_send_failure_at(&self) -> Option<std::time::Instant> {
2554        self.last_local_send_failure_at
2555    }
2556
2557    /// Like `send_encrypted_link_message` but allows setting the FMP CE flag.
2558    ///
2559    /// Used by the forwarding path to relay congestion signals hop-by-hop.
2560    pub(super) async fn send_encrypted_link_message_with_ce(
2561        &mut self,
2562        node_addr: &NodeAddr,
2563        plaintext: &[u8],
2564        ce_flag: bool,
2565    ) -> Result<(), NodeError> {
2566        let peer = self
2567            .peers
2568            .get_mut(node_addr)
2569            .ok_or(NodeError::PeerNotFound(*node_addr))?;
2570
2571        let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2572            node_addr: *node_addr,
2573            reason: "no their_index".into(),
2574        })?;
2575        let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2576            node_addr: *node_addr,
2577            reason: "no transport_id".into(),
2578        })?;
2579        let remote_addr = peer
2580            .current_addr()
2581            .cloned()
2582            .ok_or_else(|| NodeError::SendFailed {
2583                node_addr: *node_addr,
2584                reason: "no current_addr".into(),
2585            })?;
2586        #[cfg(any(target_os = "linux", target_os = "macos"))]
2587        let connected_socket = peer.connected_udp();
2588
2589        // Prepend 4-byte session-relative timestamp (inner header)
2590        let timestamp_ms = peer.session_elapsed_ms();
2591
2592        // MMP: read spin bit value before entering session borrow
2593        let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2594        let mut flags = if sp_flag { FLAG_SP } else { 0 };
2595        if ce_flag {
2596            flags |= FLAG_CE;
2597        }
2598        if peer.current_k_bit() {
2599            flags |= FLAG_KEY_EPOCH;
2600        }
2601
2602        let session = peer
2603            .noise_session_mut()
2604            .ok_or_else(|| NodeError::SendFailed {
2605                node_addr: *node_addr,
2606                reason: "no noise session".into(),
2607            })?;
2608
2609        // Build 16-byte outer header upfront. The inner-plaintext
2610        // layout is `[ts:4 LE][plaintext...]`, so its length is exactly
2611        // `INNER_TS_LEN + plaintext.len()` — no need to build the Vec
2612        // just to measure it. The worker path uses this length to size
2613        // the wire buffer directly; the legacy path below still
2614        // materialises a separate `inner_plaintext` Vec for the inline
2615        // encrypt-and-send call.
2616        const INNER_TS_LEN: usize = 4;
2617        let counter = session.current_send_counter();
2618        let inner_len = INNER_TS_LEN + plaintext.len();
2619        let payload_len = inner_len as u16;
2620        let header = build_established_header(their_index, counter, flags, payload_len);
2621
2622        // **UDP send fast path.** The encrypt-worker pool is always
2623        // spawned at lifecycle start (workers = num_cpus) in
2624        // production, so this branch is taken for every authentic
2625        // send on every UDP-transported established session. The
2626        // AEAD work + sendmsg syscall run on a dedicated OS thread;
2627        // the rx_loop only builds the wire buffer + reserves the
2628        // counter inline.
2629        //
2630        // Other transport kinds (BLE, TCP, sim, ethernet) fall
2631        // through to the inline encrypt + transport.send path
2632        // below — those don't have raw-fd / sendmmsg / UDP_GSO
2633        // benefits to expose through the worker pool, so the simpler
2634        // synchronous send is the right shape for them.
2635        //
2636        // The `encrypt_workers.is_some()` check below is true in
2637        // production (lifecycle::start spawns the pool); it stays
2638        // checked rather than `expect()`-ed because unit tests
2639        // construct `Node` without calling `start()`.
2640        let transport_for_send = self
2641            .transports
2642            .get(&transport_id)
2643            .ok_or(NodeError::TransportNotFound(transport_id))?;
2644        let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2645        if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2646            && is_udp
2647            && let Some(cipher_clone) = session.send_cipher_clone()
2648        {
2649            {
2650                // Reserve the counter on the session so subsequent
2651                // sends don't reuse it. `current_send_counter` only
2652                // peeks; we advance via `take_send_counter`.
2653                let reserved_counter =
2654                    session
2655                        .take_send_counter()
2656                        .map_err(|e| NodeError::SendFailed {
2657                            node_addr: *node_addr,
2658                            reason: format!("counter reservation failed: {}", e),
2659                        })?;
2660                debug_assert_eq!(reserved_counter, counter);
2661                // Re-derive the header with the now-locked-in counter
2662                // value (same value, but the call sequence is more
2663                // explicit).
2664                let header =
2665                    build_established_header(their_index, reserved_counter, flags, payload_len);
2666                let transport = transport_for_send;
2667                // Snapshot the per-peer connected UDP socket before
2668                // resolving the fallback address. On the established
2669                // steady-state path this socket already carries the
2670                // kernel peer address, so re-parsing the configured
2671                // transport address and touching the DNS cache on every
2672                // packet is pure overhead on the sender hot path.
2673                let send_target = {
2674                    if let TransportHandle::Udp(udp) = transport {
2675                        let socket_addr = {
2676                            #[cfg(any(target_os = "linux", target_os = "macos"))]
2677                            {
2678                                match connected_socket.as_ref() {
2679                                    Some(socket) => Some(socket.peer_addr()),
2680                                    None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2681                                }
2682                            }
2683                            #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2684                            {
2685                                udp.resolve_for_off_task(&remote_addr).await.ok()
2686                            }
2687                        };
2688                        match (udp.async_socket(), socket_addr) {
2689                            (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
2690                            _ => None,
2691                        }
2692                    } else {
2693                        None
2694                    }
2695                };
2696                if let Some((socket, socket_addr)) = send_target {
2697                    // Build the wire buffer **directly** from
2698                    // `plaintext` with a single allocation:
2699                    //   `[16 header][4 ts][plaintext...]` with
2700                    // +16 trailing capacity for the AEAD tag.
2701                    // The worker seals `wire_buf[16..]` in
2702                    // place and appends the tag — no second
2703                    // alloc, no second memcpy.
2704                    //
2705                    // Previous design built `inner_plaintext`
2706                    // via `prepend_inner_header` (1 alloc + 1
2707                    // copy) and then let the worker memcpy
2708                    // header + plaintext into a fresh Vec
2709                    // (another alloc + copy). At ~100 kpps the
2710                    // saved alloc/copy is ~150 MB/sec of memory
2711                    // bandwidth on the hot rx_loop + worker.
2712                    let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
2713                    let mut wire_buf = Vec::with_capacity(wire_capacity);
2714                    wire_buf.extend_from_slice(&header);
2715                    wire_buf.extend_from_slice(&timestamp_ms.to_le_bytes());
2716                    wire_buf.extend_from_slice(plaintext);
2717                    let predicted_bytes = wire_capacity;
2718                    // Stats / MMP update inline — predicted size
2719                    // is exact for ChaCha20-Poly1305 (tag is
2720                    // constant 16 bytes). When `connected_socket` is
2721                    // `Some`, the worker sends on it without a
2722                    // destination sockaddr — the kernel skips the
2723                    // per-packet sockaddr + route + neighbor resolve.
2724                    if let Some(peer) = self.peers.get_mut(node_addr) {
2725                        peer.link_stats_mut().record_sent(predicted_bytes);
2726                        if let Some(mmp) = peer.mmp_mut() {
2727                            mmp.sender
2728                                .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
2729                        }
2730                    }
2731                    workers.dispatch(self::encrypt_worker::FmpSendJob {
2732                        cipher: cipher_clone,
2733                        counter: reserved_counter,
2734                        wire_buf,
2735                        fsp_seal: None,
2736                        socket,
2737                        dest_addr: socket_addr,
2738                        #[cfg(any(target_os = "linux", target_os = "macos"))]
2739                        connected_socket,
2740                        drop_on_backpressure: plaintext
2741                            .first()
2742                            .is_some_and(|ty| *ty == SessionMessageType::EndpointData.to_byte()),
2743                        queued_at: crate::perf_profile::stamp(),
2744                    });
2745                    return Ok(());
2746                }
2747            }
2748        }
2749
2750        // Inline (legacy) path: encrypt + send on the rx_loop.
2751        // Build the inner plaintext lazily here — the worker path
2752        // above never reaches this point, so the prepend_inner_header
2753        // alloc is avoided in the fast path.
2754        let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
2755        // Encrypt with AAD binding to the outer header
2756        let ciphertext = {
2757            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
2758            session
2759                .encrypt_with_aad(&inner_plaintext, &header)
2760                .map_err(|e| NodeError::SendFailed {
2761                    node_addr: *node_addr,
2762                    reason: format!("encryption failed: {}", e),
2763                })?
2764        };
2765
2766        let wire_packet = build_encrypted(&header, &ciphertext);
2767
2768        // Re-borrow peer for stats update after sending
2769        let send_result = {
2770            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
2771            let transport = self
2772                .transports
2773                .get(&transport_id)
2774                .ok_or(NodeError::TransportNotFound(transport_id))?;
2775            transport.send(&remote_addr, &wire_packet).await
2776        };
2777        self.note_local_send_outcome(&send_result);
2778        let bytes_sent = send_result.map_err(|e| match e {
2779            TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
2780                node_addr: *node_addr,
2781                packet_size,
2782                mtu,
2783            },
2784            other => NodeError::SendFailed {
2785                node_addr: *node_addr,
2786                reason: format!("transport send: {}", other),
2787            },
2788        })?;
2789
2790        // Update send statistics
2791        if let Some(peer) = self.peers.get_mut(node_addr) {
2792            peer.link_stats_mut().record_sent(bytes_sent);
2793            // MMP: record sent frame for sender report generation
2794            if let Some(mmp) = peer.mmp_mut() {
2795                mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
2796            }
2797        }
2798
2799        Ok(())
2800    }
2801}
2802
2803impl fmt::Debug for Node {
2804    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2805        f.debug_struct("Node")
2806            .field("node_addr", self.node_addr())
2807            .field("state", &self.state)
2808            .field("is_leaf_only", &self.is_leaf_only)
2809            .field("connections", &self.connection_count())
2810            .field("peers", &self.peer_count())
2811            .field("links", &self.link_count())
2812            .field("transports", &self.transport_count())
2813            .finish()
2814    }
2815}