Skip to main content

fips_core/node/
mod.rs

1//! FIPS Node Entity
2//!
3//! Top-level structure representing a running FIPS instance. The Node
4//! holds all state required for mesh routing: identity, tree state,
5//! Bloom filters, coordinate caches, transports, links, and peers.
6
7mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32    ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33    build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
38use crate::node::session::SessionEntry;
39use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
40use crate::peer::{ActivePeer, PeerConnection};
41#[cfg(any(target_os = "linux", target_os = "macos"))]
42use crate::transport::ethernet::EthernetTransport;
43use crate::transport::tcp::TcpTransport;
44use crate::transport::tor::TorTransport;
45use crate::transport::udp::UdpTransport;
46#[cfg(feature = "webrtc-transport")]
47use crate::transport::webrtc::WebRtcTransport;
48use crate::transport::{
49    ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
50    TransportHandle, TransportId,
51};
52use crate::tree::TreeState;
53use crate::upper::hosts::HostMap;
54use crate::upper::icmp_rate_limit::IcmpRateLimiter;
55use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
56use crate::utils::index::IndexAllocator;
57use crate::{
58    Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
59    PeerIdentity, encode_npub,
60};
61use rand::Rng;
62use std::collections::{HashMap, HashSet, VecDeque};
63use std::fmt;
64use std::sync::Arc;
65use std::thread::JoinHandle;
66use thiserror::Error;
67use tracing::{debug, warn};
68
69const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
70const SESSION_DIRECT_DEGRADED_HOLD_MS: u64 = 20_000;
71const SESSION_DIRECT_DEGRADED_MIN_SAMPLE: u64 = 16;
72const SESSION_DIRECT_DEGRADED_LOSS_THRESHOLD: f64 = 0.08;
73const SESSION_DIRECT_RECOVERY_LOSS_THRESHOLD: f64 = 0.02;
74const ROUTING_FALLBACK_MIN_COST_ADVANTAGE: f64 = 0.25;
75
76fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
77    if !plaintext
78        .first()
79        .is_some_and(|ty| *ty == LinkMessageType::SessionDatagram.to_byte())
80    {
81        return false;
82    }
83    let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
84        return false;
85    };
86    FspCommonPrefix::parse(fsp_payload)
87        .is_some_and(|prefix| prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted())
88}
89
90/// Half-range of the symmetric jitter applied to per-session rekey timers.
91///
92/// Each FMP/FSP session draws an offset uniformly from
93/// `[-REKEY_JITTER_SECS, +REKEY_JITTER_SECS]` seconds at construction and
94/// after each cutover. This preserves the configured mean interval while
95/// reducing dual-initiation bursts in symmetric-start meshes.
96pub(crate) const REKEY_JITTER_SECS: i64 = 15;
97
98/// Errors related to node operations.
99#[derive(Debug, Error)]
100pub enum NodeError {
101    #[error("node not started")]
102    NotStarted,
103
104    #[error("node already started")]
105    AlreadyStarted,
106
107    #[error("node already stopped")]
108    AlreadyStopped,
109
110    #[error("transport not found: {0}")]
111    TransportNotFound(TransportId),
112
113    #[error("no transport available for type: {0}")]
114    NoTransportForType(String),
115
116    #[error("link not found: {0}")]
117    LinkNotFound(LinkId),
118
119    #[error("connection not found: {0}")]
120    ConnectionNotFound(LinkId),
121
122    #[error("peer not found: {0:?}")]
123    PeerNotFound(NodeAddr),
124
125    #[error("peer already exists: {0:?}")]
126    PeerAlreadyExists(NodeAddr),
127
128    #[error("connection already exists for link: {0}")]
129    ConnectionAlreadyExists(LinkId),
130
131    #[error("invalid peer npub '{npub}': {reason}")]
132    InvalidPeerNpub { npub: String, reason: String },
133
134    #[error("discovery error: {0}")]
135    Discovery(String),
136
137    #[error("access denied: {0}")]
138    AccessDenied(String),
139
140    #[error("max connections exceeded: {max}")]
141    MaxConnectionsExceeded { max: usize },
142
143    #[error("max peers exceeded: {max}")]
144    MaxPeersExceeded { max: usize },
145
146    #[error("max links exceeded: {max}")]
147    MaxLinksExceeded { max: usize },
148
149    #[error("handshake incomplete for link {0}")]
150    HandshakeIncomplete(LinkId),
151
152    #[error("no session available for link {0}")]
153    NoSession(LinkId),
154
155    #[error("promotion failed for link {link_id}: {reason}")]
156    PromotionFailed { link_id: LinkId, reason: String },
157
158    #[error("send failed to {node_addr}: {reason}")]
159    SendFailed { node_addr: NodeAddr, reason: String },
160
161    #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
162    MtuExceeded {
163        node_addr: NodeAddr,
164        packet_size: usize,
165        mtu: u16,
166    },
167
168    #[error("config error: {0}")]
169    Config(#[from] ConfigError),
170
171    #[error("identity error: {0}")]
172    Identity(#[from] IdentityError),
173
174    #[error("TUN error: {0}")]
175    Tun(#[from] TunError),
176
177    #[error("index allocation failed: {0}")]
178    IndexAllocationFailed(String),
179
180    #[error("handshake failed: {0}")]
181    HandshakeFailed(String),
182
183    #[error("transport error: {0}")]
184    TransportError(String),
185
186    #[error("local route unavailable: {0}")]
187    LocalRouteUnavailable(String),
188
189    #[error("bootstrap handoff failed: {0}")]
190    BootstrapHandoff(String),
191}
192
193impl NodeError {
194    pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
195        if error.is_local_route_unavailable() {
196            Self::LocalRouteUnavailable(error.to_string())
197        } else {
198            Self::TransportError(error.to_string())
199        }
200    }
201
202    pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
203        matches!(self, Self::LocalRouteUnavailable(_))
204    }
205}
206
207/// Source-attributed packet delivered by a node running without a system TUN.
208#[derive(Debug, Clone, PartialEq, Eq)]
209pub struct NodeDeliveredPacket {
210    /// FIPS node address that originated the packet.
211    pub source_node_addr: NodeAddr,
212    /// Source Nostr public key when the node has learned it.
213    pub source_npub: Option<String>,
214    /// Destination FIPS address from the IPv6 packet.
215    pub destination: FipsAddress,
216    /// Full IPv6 packet after FIPS session decapsulation.
217    pub packet: Vec<u8>,
218}
219
220#[derive(Debug, Clone)]
221struct IdentityCacheEntry {
222    node_addr: NodeAddr,
223    pubkey: secp256k1::PublicKey,
224    npub: String,
225    last_seen_ms: u64,
226}
227
228impl IdentityCacheEntry {
229    fn new(
230        node_addr: NodeAddr,
231        pubkey: secp256k1::PublicKey,
232        npub: String,
233        last_seen_ms: u64,
234    ) -> Self {
235        Self {
236            node_addr,
237            pubkey,
238            npub,
239            last_seen_ms,
240        }
241    }
242}
243
244/// App-owned packet channels for embedding FIPS without a system TUN.
245#[derive(Debug)]
246pub struct ExternalPacketIo {
247    /// Send outbound IPv6 packets into the node.
248    pub outbound_tx: crate::upper::tun::TunOutboundTx,
249    /// Receive inbound IPv6 packets delivered by FIPS sessions.
250    pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
251}
252
253/// App-owned endpoint data channels for embedding FIPS without a daemon.
254#[derive(Debug)]
255pub(crate) struct EndpointDataIo {
256    /// Send endpoint data commands into the node RX loop.
257    ///
258    /// Bounded with a generous default so normal sender bursts do not
259    /// stall on semaphore acquisition. macOS pacing happens at the UDP
260    /// egress thread where the real Wi-Fi/interface bottleneck is visible;
261    /// constraining this app queue instead caused the inner TCP flow to
262    /// collapse under iperf. `FIPS_ENDPOINT_DATA_QUEUE_CAP` overrides the
263    /// default for benches.
264    pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
265    /// Receive endpoint data delivered by FIPS sessions.
266    ///
267    /// Unbounded so the rx_loop's send on inbound packet delivery is a
268    /// wait-free push (no semaphore acquire), and so we can drop the
269    /// per-packet cross-task relay that previously sat between the node
270    /// task and the `FipsEndpoint::recv()` consumer. Backpressure is
271    /// naturally bounded — the rx_loop both produces here and runs the
272    /// same runtime that schedules the consumer, so a stalled consumer
273    /// stalls production too.
274    pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
275    /// Clone of the event_tx exposed for in-process loopback (e.g.
276    /// `FipsEndpoint::send` to self_npub). Lets the endpoint inject an
277    /// event into the same queue without going through the encrypt /
278    /// decrypt path, while keeping every consumer reading from a single
279    /// channel.
280    pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
281}
282
283fn endpoint_data_command_capacity(requested: usize) -> usize {
284    if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
285        && let Ok(value) = raw.trim().parse::<usize>()
286        && value > 0
287    {
288        return value;
289    }
290
291    requested.max(1).max(32_768)
292}
293
294/// Commands accepted by the node endpoint data service.
295#[derive(Debug)]
296pub(crate) enum NodeEndpointCommand {
297    /// Send with an explicit response channel — used by callers that
298    /// care whether the local-stack handoff succeeded (e.g.
299    /// `blocking_send` waits for the runtime to accept the send).
300    Send {
301        remote: PeerIdentity,
302        payload: Vec<u8>,
303        queued_at: Option<std::time::Instant>,
304        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
305    },
306    /// **Fire-and-forget** variant of `Send` — no oneshot allocation,
307    /// no per-packet result channel. Used by the data-plane fast path
308    /// (`FipsEndpoint::send`) where the caller already discards the
309    /// result. Saves one oneshot::channel() allocation per outbound
310    /// packet on the application's send hot path.
311    SendOneway {
312        remote: PeerIdentity,
313        payload: Vec<u8>,
314        queued_at: Option<std::time::Instant>,
315    },
316    PeerSnapshot {
317        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
318    },
319    RelaySnapshot {
320        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
321    },
322    UpdateRelays {
323        advert_relays: Vec<String>,
324        dm_relays: Vec<String>,
325        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
326    },
327    /// Replace the runtime peer list. Newly added auto-connect peers get
328    /// `initiate_peer_connection` immediately; removed peers are dropped
329    /// from the retry queue (the regular liveness timeout reaps any active
330    /// session). Existing entries are kept and their `addresses` field is
331    /// refreshed so the next retry sees the latest hints.
332    UpdatePeers {
333        peers: Vec<crate::config::PeerConfig>,
334        response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
335    },
336}
337
338/// Reports what changed in response to `UpdatePeers`.
339#[derive(Debug, Clone, Default, PartialEq, Eq)]
340pub(crate) struct UpdatePeersOutcome {
341    pub(crate) added: usize,
342    pub(crate) removed: usize,
343    pub(crate) updated: usize,
344    pub(crate) unchanged: usize,
345}
346
347/// Endpoint data events emitted by the node session receive path.
348#[derive(Debug)]
349pub(crate) enum NodeEndpointEvent {
350    Data {
351        source_node_addr: NodeAddr,
352        source_npub: Option<String>,
353        payload: Vec<u8>,
354        queued_at: Option<std::time::Instant>,
355    },
356}
357
358/// Authenticated peer state exposed to embedded endpoint callers.
359#[derive(Debug, Clone, PartialEq, Eq)]
360pub(crate) struct NodeEndpointPeer {
361    pub(crate) npub: String,
362    pub(crate) connected: bool,
363    pub(crate) transport_addr: Option<String>,
364    pub(crate) transport_type: Option<String>,
365    pub(crate) link_id: u64,
366    pub(crate) srtt_ms: Option<u64>,
367    pub(crate) packets_sent: u64,
368    pub(crate) packets_recv: u64,
369    pub(crate) bytes_sent: u64,
370    pub(crate) bytes_recv: u64,
371    pub(crate) direct_probe_pending: bool,
372    pub(crate) direct_probe_after_ms: Option<u64>,
373}
374
375/// Live Nostr relay state exposed to embedded endpoint callers.
376#[derive(Debug, Clone, PartialEq, Eq)]
377pub(crate) struct NodeEndpointRelayStatus {
378    pub(crate) url: String,
379    pub(crate) status: String,
380}
381
382/// Node operational state.
383#[derive(Clone, Copy, Debug, PartialEq, Eq)]
384pub enum NodeState {
385    /// Created but not started.
386    Created,
387    /// Starting up (initializing transports).
388    Starting,
389    /// Fully operational.
390    Running,
391    /// Shutting down.
392    Stopping,
393    /// Stopped.
394    Stopped,
395}
396
397impl NodeState {
398    /// Check if node is operational.
399    pub fn is_operational(&self) -> bool {
400        matches!(self, NodeState::Running)
401    }
402
403    /// Check if node can be started.
404    pub fn can_start(&self) -> bool {
405        matches!(self, NodeState::Created | NodeState::Stopped)
406    }
407
408    /// Check if node can be stopped.
409    pub fn can_stop(&self) -> bool {
410        matches!(self, NodeState::Running)
411    }
412}
413
414impl fmt::Display for NodeState {
415    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
416        let s = match self {
417            NodeState::Created => "created",
418            NodeState::Starting => "starting",
419            NodeState::Running => "running",
420            NodeState::Stopping => "stopping",
421            NodeState::Stopped => "stopped",
422        };
423        write!(f, "{}", s)
424    }
425}
426
427/// Recent request tracking for dedup and reverse-path forwarding.
428///
429/// When a LookupRequest is forwarded through a node, the node stores the
430/// request_id and which peer sent it. When the corresponding LookupResponse
431/// arrives, it's forwarded back to that peer (reverse-path forwarding).
432/// The `response_forwarded` flag prevents response routing loops.
433#[derive(Clone, Debug)]
434pub(crate) struct RecentRequest {
435    /// The peer who sent this request to us.
436    pub(crate) from_peer: NodeAddr,
437    /// When we received this request (Unix milliseconds).
438    pub(crate) timestamp_ms: u64,
439    /// Whether we've already forwarded a response for this request.
440    /// Prevents response routing loops when convergent request paths
441    /// create bidirectional entries in recent_requests.
442    pub(crate) response_forwarded: bool,
443}
444
445impl RecentRequest {
446    pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
447        Self {
448            from_peer,
449            timestamp_ms,
450            response_forwarded: false,
451        }
452    }
453
454    /// Check if this entry has expired (older than expiry_ms).
455    pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
456        current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
457    }
458}
459
460/// Key for addr_to_link reverse lookup.
461type AddrKey = (TransportId, TransportAddr);
462
463/// Per-transport kernel drop tracking for congestion detection.
464///
465/// Sampled every tick (1s). The `dropping` flag indicates whether new
466/// kernel drops were observed since the previous sample.
467#[derive(Debug, Default)]
468struct TransportDropState {
469    /// Previous `recv_drops` sample (cumulative counter).
470    prev_drops: u64,
471    /// True if drops increased since the last sample.
472    dropping: bool,
473}
474
475/// State for a link waiting for transport-level connection establishment.
476///
477/// For connection-oriented transports (TCP, Tor), the transport connect runs
478/// asynchronously. This struct holds the data needed to complete the handshake
479/// once the connection is ready.
480struct PendingConnect {
481    /// The link that was created for this connection.
482    link_id: LinkId,
483    /// Which transport is being used.
484    transport_id: TransportId,
485    /// The remote address being connected to.
486    remote_addr: TransportAddr,
487    /// The peer identity (for handshake initiation).
488    peer_identity: PeerIdentity,
489}
490
491/// A running FIPS node instance.
492///
493/// This is the top-level container holding all node state.
494///
495/// ## Peer Lifecycle
496///
497/// Peers go through two phases:
498/// 1. **Connection phase** (`connections`): Handshake in progress, indexed by LinkId
499/// 2. **Active phase** (`peers`): Authenticated, indexed by NodeAddr
500///
501/// The `addr_to_link` map enables dispatching incoming packets to the right
502/// connection before authentication completes.
503// Discovery lookup constants moved to config: node.discovery.attempt_timeouts_secs, node.discovery.ttl
504pub struct Node {
505    // === Identity ===
506    /// This node's cryptographic identity.
507    identity: Identity,
508
509    /// Random epoch generated at startup for peer restart detection.
510    /// Exchanged inside Noise handshake messages so peers can detect restarts.
511    startup_epoch: [u8; 8],
512
513    /// Instant when the node was created, for uptime reporting.
514    started_at: std::time::Instant,
515
516    // === Configuration ===
517    /// Loaded configuration.
518    config: Config,
519
520    // === State ===
521    /// Node operational state.
522    state: NodeState,
523
524    /// Whether this is a leaf-only node.
525    is_leaf_only: bool,
526
527    // === Spanning Tree ===
528    /// Local spanning tree state.
529    tree_state: TreeState,
530
531    // === Bloom Filter ===
532    /// Local Bloom filter state.
533    bloom_state: BloomState,
534
535    // === Routing ===
536    /// Address -> coordinates cache (from session setup and discovery).
537    coord_cache: CoordCache,
538    /// Locally learned reverse-path next-hop hints.
539    learned_routes: LearnedRouteTable,
540    /// Destinations whose direct first-hop path is temporarily suspect because
541    /// session-layer MMP observed sustained loss while using that direct path.
542    session_direct_degraded_until_ms: HashMap<NodeAddr, u64>,
543    /// Recent discovery requests (dedup + reverse-path forwarding).
544    /// Maps request_id → RecentRequest.
545    recent_requests: HashMap<u64, RecentRequest>,
546    /// Per-destination path MTU lookup, keyed by FipsAddress (mirrors
547    /// `coord_cache.entries[*].path_mtu`). Sync read-only access from
548    /// the TUN reader/writer threads at TCP MSS clamp time so the
549    /// SYN/SYN-ACK clamp can use the smaller of the local-egress floor
550    /// and the learned per-destination path MTU.
551    path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
552
553    // === Transports & Links ===
554    /// Active transports (owned by Node).
555    transports: HashMap<TransportId, TransportHandle>,
556    /// Per-transport kernel drop tracking for congestion detection.
557    transport_drops: HashMap<TransportId, TransportDropState>,
558    /// Active links.
559    links: HashMap<LinkId, Link>,
560    /// Reverse lookup: (transport_id, remote_addr) -> link_id.
561    addr_to_link: HashMap<AddrKey, LinkId>,
562
563    // === Packet Channel ===
564    /// Packet sender for transports.
565    packet_tx: Option<PacketTx>,
566    /// Packet receiver (for event loop).
567    packet_rx: Option<PacketRx>,
568
569    // === Connections (Handshake Phase) ===
570    /// Pending connections (handshake in progress).
571    /// Indexed by LinkId since we don't know the peer's identity yet.
572    connections: HashMap<LinkId, PeerConnection>,
573
574    // === Peers (Active Phase) ===
575    /// Authenticated peers.
576    /// Indexed by NodeAddr (verified identity).
577    peers: HashMap<NodeAddr, ActivePeer>,
578
579    // === End-to-End Sessions ===
580    /// Session table for end-to-end encrypted sessions.
581    /// Keyed by remote NodeAddr.
582    sessions: HashMap<NodeAddr, SessionEntry>,
583
584    // === Identity Cache ===
585    /// Maps FipsAddress prefix bytes (bytes 1-15) to cached peer identity data.
586    /// Enables reverse lookup from IPv6 destination to session/routing identity.
587    identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
588
589    // === Pending TUN Packets ===
590    /// Packets queued while waiting for session establishment.
591    /// Keyed by destination NodeAddr, bounded per-dest and total.
592    pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
593    /// Endpoint data payloads queued while waiting for session establishment.
594    pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
595    // === Pending Discovery Lookups ===
596    /// Tracks in-flight discovery lookups. Maps target NodeAddr to the
597    /// initiation timestamp (Unix ms). Prevents duplicate flood queries.
598    pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
599
600    // === Resource Limits ===
601    /// Maximum connections (0 = unlimited).
602    max_connections: usize,
603    /// Maximum peers (0 = unlimited).
604    max_peers: usize,
605    /// Maximum links (0 = unlimited).
606    max_links: usize,
607
608    // === Counters ===
609    /// Next link ID to allocate.
610    next_link_id: u64,
611    /// Next transport ID to allocate.
612    next_transport_id: u32,
613
614    // === Node Statistics ===
615    /// Routing, forwarding, discovery, and error signal counters.
616    stats: stats::NodeStats,
617
618    /// Time-series history of node-level metrics (1s/1m rings).
619    stats_history: stats_history::StatsHistory,
620
621    // === TUN Interface ===
622    /// TUN device state.
623    tun_state: TunState,
624    /// TUN interface name (for cleanup).
625    tun_name: Option<String>,
626    /// TUN packet sender channel.
627    tun_tx: Option<TunTx>,
628    /// Receiver for outbound packets from the TUN reader.
629    tun_outbound_rx: Option<TunOutboundRx>,
630    /// App-owned packet sink used by embedded/no-TUN integrations.
631    external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
632    /// Endpoint data command receiver used by embedded/no-daemon integrations.
633    endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
634    /// Endpoint data event sink used by embedded/no-daemon integrations.
635    endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
636    /// Off-task FMP-encrypt + UDP-send worker pool. `None` if not yet
637    /// spawned (set up in `start()` once transports are running).
638    /// `Some(pool)` once available; the pool internally holds
639    /// per-worker mpsc senders and round-robins jobs across them.
640    /// See `node::encrypt_worker` for the rationale and layout.
641    encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
642    /// Off-task FMP + FSP decrypt + delivery worker pool. Mirror of
643    /// `encrypt_workers` for the receive side.
644    decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
645    /// Set of sessions that have been registered with the decrypt
646    /// shard worker pool. Used by rx_loop to decide between fast-path
647    /// dispatch (worker owns the session) and legacy in-place decrypt
648    /// (worker doesn't have it yet). Per the data-plane restructure,
649    /// the worker owns its session state directly — there's no shared
650    /// `Arc<RwLock<HashMap>>` of cipher / replay state anymore, only
651    /// this set tracks **whether** the worker has been told about a
652    /// given session.
653    decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
654    /// Fallback channel: decrypt worker bounces non-fast-path packets
655    /// (anything that's not bulk EndpointData) back here for rx_loop
656    /// to handle via the legacy path. Drained by a new rx_loop arm.
657    decrypt_fallback_rx:
658        Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
659    decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
660    /// TUN reader thread handle.
661    tun_reader_handle: Option<JoinHandle<()>>,
662    /// TUN writer thread handle.
663    tun_writer_handle: Option<JoinHandle<()>>,
664    /// Shutdown pipe: writing to this fd unblocks the TUN reader thread on macOS.
665    /// On Linux, deleting the interface via netlink serves the same purpose.
666    #[cfg(target_os = "macos")]
667    tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
668
669    // === DNS Responder ===
670    /// Receiver for resolved identities from the DNS responder.
671    dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
672    /// DNS responder task handle.
673    dns_task: Option<tokio::task::JoinHandle<()>>,
674
675    // === Index-Based Session Dispatch ===
676    /// Allocator for session indices.
677    index_allocator: IndexAllocator,
678    /// O(1) lookup: (transport_id, our_index) → NodeAddr.
679    /// This maps our session index to the peer that uses it.
680    peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
681    /// Pending outbound handshakes by our sender_idx.
682    /// Tracks which LinkId corresponds to which session index.
683    pending_outbound: HashMap<(TransportId, u32), LinkId>,
684
685    // === Rate Limiting ===
686    /// Rate limiter for msg1 processing (DoS protection).
687    msg1_rate_limiter: HandshakeRateLimiter,
688    /// Rate limiter for ICMP Packet Too Big messages.
689    icmp_rate_limiter: IcmpRateLimiter,
690    /// Rate limiter for routing error signals (CoordsRequired / PathBroken).
691    routing_error_rate_limiter: RoutingErrorRateLimiter,
692    /// Rate limiter for source-side CoordsRequired/PathBroken responses.
693    coords_response_rate_limiter: RoutingErrorRateLimiter,
694    /// Backoff for failed discovery lookups (originator-side).
695    discovery_backoff: DiscoveryBackoff,
696    /// Rate limiter for forwarded discovery requests (transit-side).
697    discovery_forward_limiter: DiscoveryForwardRateLimiter,
698
699    // === Pending Transport Connects ===
700    /// Links waiting for transport-level connection establishment before
701    /// sending handshake msg1. For connection-oriented transports (TCP, Tor),
702    /// the transport connect runs in the background; the tick handler polls
703    /// connection_state() and initiates the handshake when connected.
704    pending_connects: Vec<PendingConnect>,
705
706    // === Connection Retry ===
707    /// Retry state for peers whose outbound connections have failed.
708    /// Keyed by NodeAddr. Entries are created when a handshake times out
709    /// or fails, and removed on successful promotion or when max retries
710    /// are exhausted.
711    retry_pending: HashMap<NodeAddr, retry::RetryState>,
712
713    /// Optional Nostr/STUN overlay discovery coordinator for `udp:nat` peers.
714    nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
715    /// mDNS / DNS-SD responder + browser for local-link peer discovery.
716    /// Identity is unverified at this layer — the Noise XX handshake
717    /// initiated against an mDNS-observed endpoint is what proves the
718    /// peer holds the matching private key.
719    lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
720    /// Same-host JSON registry under `~/.fips/instances`. Records are
721    /// loopback routing hints only; peer identity is still verified by the
722    /// Noise handshake.
723    local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
724    local_instance_started_at_ms: Option<u64>,
725    last_local_instance_publish_ms: Option<u64>,
726    last_local_instance_scan_ms: Option<u64>,
727    /// Wall-clock ms when Nostr discovery successfully started, used to
728    /// schedule the one-shot startup advert sweep after a settle delay.
729    /// `None` until discovery comes up; remains `None` if discovery is
730    /// disabled or failed to start.
731    nostr_discovery_started_at_ms: Option<u64>,
732    /// Whether the one-shot startup advert sweep has run. Set to true
733    /// after the first sweep fires (under `policy: open`); thereafter
734    /// only the per-tick `queue_open_discovery_retries` continues.
735    startup_open_discovery_sweep_done: bool,
736    /// Per-peer UDP transports adopted from NAT traversal handoff.
737    bootstrap_transports: HashSet<TransportId>,
738    /// Originating peer npub (bech32) for each adopted bootstrap
739    /// transport, captured at `adopt_established_traversal` time.
740    /// Populated alongside `bootstrap_transports`; cleared in
741    /// `cleanup_bootstrap_transport_if_unused`. Used by the rx loop to
742    /// route fatal-protocol-mismatch observations back to the
743    /// Nostr-discovery `failure_state` for long cooldown application.
744    bootstrap_transport_npubs: HashMap<TransportId, String>,
745    /// Peers that should not be used as reply-learned fallback transit for
746    /// other destinations. Direct lookups to the peer are still permitted.
747    discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
748
749    // === Periodic Parent Re-evaluation ===
750    /// Timestamp of last periodic parent re-evaluation (for pacing).
751    last_parent_reeval: Option<crate::time::Instant>,
752
753    // === Congestion Logging ===
754    /// Timestamp of last congestion detection log (rate-limited to 5s).
755    last_congestion_log: Option<std::time::Instant>,
756
757    // === Mesh Size Estimate ===
758    /// Cached estimated mesh size (computed once per tick from bloom filters).
759    estimated_mesh_size: Option<u64>,
760    /// Timestamp of last mesh size log emission.
761    last_mesh_size_log: Option<std::time::Instant>,
762
763    // === Bloom Self-Plausibility ===
764    /// Rate-limit state for the self-plausibility WARN. Fires at most
765    /// once per 60s globally when our own outgoing FilterAnnounce has
766    /// an FPR above `node.bloom.max_inbound_fpr`, signalling either
767    /// aggregation drift or an ingress bypass.
768    last_self_warn: Option<std::time::Instant>,
769
770    // === Local Outbound Liveness ===
771    /// Set when a `transport.send` returned a local-side io error
772    /// (`NetworkUnreachable` / `HostUnreachable` / `AddrNotAvailable`),
773    /// cleared on the next successful send. Used by
774    /// `check_link_heartbeats` to compress the dead-timeout to
775    /// `fast_link_dead_timeout_secs` while our outbound is observed
776    /// broken — direct kernel evidence beats waiting on receive-silence.
777    last_local_send_failure_at: Option<std::time::Instant>,
778    /// Set when the rx loop could not complete its 1s maintenance work
779    /// inside the watchdog timeout. Link-dead detection may be valid during
780    /// overload, but traversal cooldown should not punish a path just because
781    /// our own scheduler/worker queue was late.
782    last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
783
784    // === Display Names ===
785    /// Human-readable names for configured peers (alias or short npub).
786    /// Populated at startup from peer config.
787    peer_aliases: HashMap<NodeAddr, String>,
788    /// Scheduler weight for explicitly configured peers. Built when config
789    /// changes so the packet hot path only does a NodeAddr hash lookup.
790    configured_peer_send_weights: HashMap<NodeAddr, u8>,
791
792    /// Reloadable peer ACL state from standard allow/deny files.
793    peer_acl: acl::PeerAclReloader,
794
795    // === Host Map ===
796    /// Static hostname → npub mapping for DNS resolution.
797    /// Built at construction from peer aliases and /etc/fips/hosts.
798    host_map: Arc<HostMap>,
799}
800
801impl Node {
802    /// Create a new node from configuration.
803    pub fn new(config: Config) -> Result<Self, NodeError> {
804        config.validate()?;
805        let identity = config.create_identity()?;
806        let node_addr = *identity.node_addr();
807        let is_leaf_only = config.is_leaf_only();
808
809        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
810        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
811
812        let mut startup_epoch = [0u8; 8];
813        rand::rng().fill_bytes(&mut startup_epoch);
814
815        let mut bloom_state = if is_leaf_only {
816            BloomState::leaf_only(node_addr)
817        } else {
818            BloomState::new(node_addr)
819        };
820        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
821
822        let tun_state = if config.tun.enabled {
823            TunState::Configured
824        } else {
825            TunState::Disabled
826        };
827
828        // Initialize tree state with signed self-declaration
829        let mut tree_state = TreeState::new(node_addr);
830        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
831        tree_state.set_hold_down(config.node.tree.hold_down_secs);
832        tree_state.set_flap_dampening(
833            config.node.tree.flap_threshold,
834            config.node.tree.flap_window_secs,
835            config.node.tree.flap_dampening_secs,
836        );
837        tree_state
838            .sign_declaration(&identity)
839            .expect("signing own declaration should never fail");
840
841        let coord_cache = CoordCache::new(
842            config.node.cache.coord_size,
843            config.node.cache.coord_ttl_secs * 1000,
844        );
845        let rl = &config.node.rate_limit;
846        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
847            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
848            config.node.limits.max_pending_inbound,
849        );
850
851        let max_connections = config.node.limits.max_connections;
852        let max_peers = config.node.limits.max_peers;
853        let max_links = config.node.limits.max_links;
854        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
855        let backoff_base_secs = config.node.discovery.backoff_base_secs;
856        let backoff_max_secs = config.node.discovery.backoff_max_secs;
857        let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
858
859        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
860        let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
861
862        Ok(Self {
863            identity,
864            startup_epoch,
865            started_at: std::time::Instant::now(),
866            config,
867            state: NodeState::Created,
868            is_leaf_only,
869            tree_state,
870            bloom_state,
871            coord_cache,
872            learned_routes: LearnedRouteTable::default(),
873            session_direct_degraded_until_ms: HashMap::new(),
874            recent_requests: HashMap::new(),
875            transports: HashMap::new(),
876            transport_drops: HashMap::new(),
877            links: HashMap::new(),
878            addr_to_link: HashMap::new(),
879            packet_tx: None,
880            packet_rx: None,
881            connections: HashMap::new(),
882            peers: HashMap::new(),
883            sessions: HashMap::new(),
884            identity_cache: HashMap::new(),
885            pending_tun_packets: HashMap::new(),
886            pending_endpoint_data: HashMap::new(),
887            pending_lookups: HashMap::new(),
888            max_connections,
889            max_peers,
890            max_links,
891            next_link_id: 1,
892            next_transport_id: 1,
893            stats: stats::NodeStats::new(),
894            stats_history: stats_history::StatsHistory::new(),
895            tun_state,
896            tun_name: None,
897            tun_tx: None,
898            tun_outbound_rx: None,
899            external_packet_tx: None,
900            endpoint_command_rx: None,
901            endpoint_event_tx: None,
902            encrypt_workers: None,
903            decrypt_workers: None,
904            decrypt_registered_sessions: std::collections::HashSet::new(),
905            decrypt_fallback_tx,
906            decrypt_fallback_rx,
907            tun_reader_handle: None,
908            tun_writer_handle: None,
909            #[cfg(target_os = "macos")]
910            tun_shutdown_fd: None,
911            dns_identity_rx: None,
912            dns_task: None,
913            index_allocator: IndexAllocator::new(),
914            peers_by_index: HashMap::new(),
915            pending_outbound: HashMap::new(),
916            msg1_rate_limiter,
917            icmp_rate_limiter: IcmpRateLimiter::new(),
918            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
919            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
920                std::time::Duration::from_millis(coords_response_interval_ms),
921            ),
922            discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
923            discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
924                std::time::Duration::from_secs(forward_min_interval_secs),
925            ),
926            pending_connects: Vec::new(),
927            retry_pending: HashMap::new(),
928            nostr_discovery: None,
929            nostr_discovery_started_at_ms: None,
930            lan_discovery: None,
931            local_instance_registry: None,
932            local_instance_started_at_ms: None,
933            last_local_instance_publish_ms: None,
934            last_local_instance_scan_ms: None,
935            startup_open_discovery_sweep_done: false,
936            bootstrap_transports: HashSet::new(),
937            bootstrap_transport_npubs: HashMap::new(),
938            discovery_fallback_transit_blocked_peers: HashSet::new(),
939            last_parent_reeval: None,
940            last_congestion_log: None,
941            estimated_mesh_size: None,
942            last_mesh_size_log: None,
943            last_self_warn: None,
944            last_local_send_failure_at: None,
945            last_rx_loop_maintenance_timeout_at: None,
946            peer_aliases: HashMap::new(),
947            configured_peer_send_weights,
948            peer_acl,
949            host_map,
950            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
951        })
952    }
953
954    /// Create a node with a specific identity.
955    ///
956    /// This constructor validates cross-field config invariants before
957    /// constructing the node, same as [`Node::new`].
958    pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
959        config.validate()?;
960        let node_addr = *identity.node_addr();
961
962        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
963        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
964
965        let mut startup_epoch = [0u8; 8];
966        rand::rng().fill_bytes(&mut startup_epoch);
967
968        let tun_state = if config.tun.enabled {
969            TunState::Configured
970        } else {
971            TunState::Disabled
972        };
973
974        // Initialize tree state with signed self-declaration
975        let mut tree_state = TreeState::new(node_addr);
976        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
977        tree_state.set_hold_down(config.node.tree.hold_down_secs);
978        tree_state.set_flap_dampening(
979            config.node.tree.flap_threshold,
980            config.node.tree.flap_window_secs,
981            config.node.tree.flap_dampening_secs,
982        );
983        tree_state
984            .sign_declaration(&identity)
985            .expect("signing own declaration should never fail");
986
987        let mut bloom_state = BloomState::new(node_addr);
988        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
989
990        let coord_cache = CoordCache::new(
991            config.node.cache.coord_size,
992            config.node.cache.coord_ttl_secs * 1000,
993        );
994        let rl = &config.node.rate_limit;
995        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
996            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
997            config.node.limits.max_pending_inbound,
998        );
999
1000        let max_connections = config.node.limits.max_connections;
1001        let max_peers = config.node.limits.max_peers;
1002        let max_links = config.node.limits.max_links;
1003        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1004
1005        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1006        let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1007
1008        Ok(Self {
1009            identity,
1010            startup_epoch,
1011            started_at: std::time::Instant::now(),
1012            config,
1013            state: NodeState::Created,
1014            is_leaf_only: false,
1015            tree_state,
1016            bloom_state,
1017            coord_cache,
1018            learned_routes: LearnedRouteTable::default(),
1019            session_direct_degraded_until_ms: HashMap::new(),
1020            recent_requests: HashMap::new(),
1021            transports: HashMap::new(),
1022            transport_drops: HashMap::new(),
1023            links: HashMap::new(),
1024            addr_to_link: HashMap::new(),
1025            packet_tx: None,
1026            packet_rx: None,
1027            connections: HashMap::new(),
1028            peers: HashMap::new(),
1029            sessions: HashMap::new(),
1030            identity_cache: HashMap::new(),
1031            pending_tun_packets: HashMap::new(),
1032            pending_endpoint_data: HashMap::new(),
1033            pending_lookups: HashMap::new(),
1034            max_connections,
1035            max_peers,
1036            max_links,
1037            next_link_id: 1,
1038            next_transport_id: 1,
1039            stats: stats::NodeStats::new(),
1040            stats_history: stats_history::StatsHistory::new(),
1041            tun_state,
1042            tun_name: None,
1043            tun_tx: None,
1044            tun_outbound_rx: None,
1045            external_packet_tx: None,
1046            endpoint_command_rx: None,
1047            endpoint_event_tx: None,
1048            encrypt_workers: None,
1049            decrypt_workers: None,
1050            decrypt_registered_sessions: std::collections::HashSet::new(),
1051            decrypt_fallback_tx,
1052            decrypt_fallback_rx,
1053            tun_reader_handle: None,
1054            tun_writer_handle: None,
1055            #[cfg(target_os = "macos")]
1056            tun_shutdown_fd: None,
1057            dns_identity_rx: None,
1058            dns_task: None,
1059            index_allocator: IndexAllocator::new(),
1060            peers_by_index: HashMap::new(),
1061            pending_outbound: HashMap::new(),
1062            msg1_rate_limiter,
1063            icmp_rate_limiter: IcmpRateLimiter::new(),
1064            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1065            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1066                std::time::Duration::from_millis(coords_response_interval_ms),
1067            ),
1068            discovery_backoff: DiscoveryBackoff::new(),
1069            discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1070            pending_connects: Vec::new(),
1071            retry_pending: HashMap::new(),
1072            nostr_discovery: None,
1073            nostr_discovery_started_at_ms: None,
1074            lan_discovery: None,
1075            local_instance_registry: None,
1076            local_instance_started_at_ms: None,
1077            last_local_instance_publish_ms: None,
1078            last_local_instance_scan_ms: None,
1079            startup_open_discovery_sweep_done: false,
1080            bootstrap_transports: HashSet::new(),
1081            bootstrap_transport_npubs: HashMap::new(),
1082            discovery_fallback_transit_blocked_peers: HashSet::new(),
1083            last_parent_reeval: None,
1084            last_congestion_log: None,
1085            estimated_mesh_size: None,
1086            last_mesh_size_log: None,
1087            last_self_warn: None,
1088            last_local_send_failure_at: None,
1089            last_rx_loop_maintenance_timeout_at: None,
1090            peer_aliases: HashMap::new(),
1091            configured_peer_send_weights,
1092            peer_acl,
1093            host_map,
1094            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1095        })
1096    }
1097
1098    /// Create a leaf-only node (simplified state).
1099    pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1100        let mut node = Self::new(config)?;
1101        node.is_leaf_only = true;
1102        node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1103        Ok(node)
1104    }
1105
1106    fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1107        let base_host_map = HostMap::from_peer_configs(config.peers());
1108        if !config.node.system_files_enabled {
1109            return (
1110                Arc::new(base_host_map.clone()),
1111                acl::PeerAclReloader::memory_only(base_host_map),
1112            );
1113        }
1114
1115        let mut host_map = base_host_map.clone();
1116        let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1117        let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1118            crate::upper::hosts::DEFAULT_HOSTS_PATH,
1119        ));
1120        host_map.merge(hosts_file);
1121        let peer_acl = acl::PeerAclReloader::with_alias_sources(
1122            std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1123            std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1124            base_host_map,
1125            hosts_path,
1126        );
1127        (Arc::new(host_map), peer_acl)
1128    }
1129
1130    fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1131        config
1132            .peers()
1133            .iter()
1134            .filter_map(|peer| {
1135                PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1136                    (
1137                        *identity.node_addr(),
1138                        encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1139                    )
1140                })
1141            })
1142            .collect()
1143    }
1144
1145    fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1146        self.configured_peer_send_weights
1147            .get(peer_addr)
1148            .copied()
1149            .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1150    }
1151
1152    /// Create transport instances from configuration.
1153    ///
1154    /// Returns a vector of TransportHandles for all configured transports.
1155    async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1156        let mut transports = Vec::new();
1157
1158        // Collect UDP configs with optional names to avoid borrow conflicts
1159        let udp_instances: Vec<_> = self
1160            .config
1161            .transports
1162            .udp
1163            .iter()
1164            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1165            .collect();
1166
1167        // Create UDP transport instances
1168        for (name, udp_config) in udp_instances {
1169            let transport_id = self.allocate_transport_id();
1170            let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1171            transports.push(TransportHandle::Udp(udp));
1172        }
1173
1174        #[cfg(feature = "sim-transport")]
1175        {
1176            let sim_instances: Vec<_> = self
1177                .config
1178                .transports
1179                .sim
1180                .iter()
1181                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1182                .collect();
1183
1184            for (name, sim_config) in sim_instances {
1185                let transport_id = self.allocate_transport_id();
1186                let sim = crate::transport::sim::SimTransport::new(
1187                    transport_id,
1188                    name,
1189                    sim_config,
1190                    packet_tx.clone(),
1191                );
1192                transports.push(TransportHandle::Sim(sim));
1193            }
1194        }
1195
1196        // Create Ethernet transport instances where raw-socket support exists.
1197        #[cfg(any(target_os = "linux", target_os = "macos"))]
1198        {
1199            let eth_instances: Vec<_> = self
1200                .config
1201                .transports
1202                .ethernet
1203                .iter()
1204                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1205                .collect();
1206            let xonly = self.identity.pubkey();
1207            for (name, eth_config) in eth_instances {
1208                let mut eth_config = eth_config;
1209                if eth_config.discovery_scope.is_none() {
1210                    eth_config.discovery_scope = self.lan_discovery_scope();
1211                }
1212                let transport_id = self.allocate_transport_id();
1213                let mut eth =
1214                    EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1215                eth.set_local_pubkey(xonly);
1216                transports.push(TransportHandle::Ethernet(eth));
1217            }
1218        }
1219
1220        // Create TCP transport instances
1221        let tcp_instances: Vec<_> = self
1222            .config
1223            .transports
1224            .tcp
1225            .iter()
1226            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1227            .collect();
1228
1229        for (name, tcp_config) in tcp_instances {
1230            let transport_id = self.allocate_transport_id();
1231            let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1232            transports.push(TransportHandle::Tcp(tcp));
1233        }
1234
1235        // Create Tor transport instances
1236        let tor_instances: Vec<_> = self
1237            .config
1238            .transports
1239            .tor
1240            .iter()
1241            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1242            .collect();
1243
1244        for (name, tor_config) in tor_instances {
1245            let transport_id = self.allocate_transport_id();
1246            let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1247            transports.push(TransportHandle::Tor(tor));
1248        }
1249
1250        let webrtc_instances: Vec<_> = self
1251            .config
1252            .transports
1253            .webrtc
1254            .iter()
1255            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1256            .collect();
1257
1258        #[cfg(feature = "webrtc-transport")]
1259        {
1260            for (name, webrtc_config) in webrtc_instances {
1261                let transport_id = self.allocate_transport_id();
1262                match WebRtcTransport::new(
1263                    transport_id,
1264                    name,
1265                    webrtc_config,
1266                    packet_tx.clone(),
1267                    &self.identity,
1268                    &self.config.node.discovery.nostr,
1269                ) {
1270                    Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1271                    Err(err) => {
1272                        warn!(
1273                            transport_id = %transport_id,
1274                            error = %err,
1275                            "failed to initialize WebRTC transport"
1276                        );
1277                    }
1278                }
1279            }
1280        }
1281        #[cfg(not(feature = "webrtc-transport"))]
1282        if !webrtc_instances.is_empty() {
1283            warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1284        }
1285
1286        // Create BLE transport instances
1287        #[cfg(bluer_available)]
1288        {
1289            let ble_instances: Vec<_> = self
1290                .config
1291                .transports
1292                .ble
1293                .iter()
1294                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1295                .collect();
1296
1297            #[cfg(all(bluer_available, not(test)))]
1298            for (name, ble_config) in ble_instances {
1299                let transport_id = self.allocate_transport_id();
1300                let adapter = ble_config.adapter().to_string();
1301                let mtu = ble_config.mtu();
1302                match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1303                    Ok(io) => {
1304                        let mut ble = crate::transport::ble::BleTransport::new(
1305                            transport_id,
1306                            name,
1307                            ble_config,
1308                            io,
1309                            packet_tx.clone(),
1310                        );
1311                        ble.set_local_pubkey(self.identity.pubkey().serialize());
1312                        transports.push(TransportHandle::Ble(ble));
1313                    }
1314                    Err(e) => {
1315                        tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1316                    }
1317                }
1318            }
1319
1320            #[cfg(any(not(bluer_available), test))]
1321            if !ble_instances.is_empty() {
1322                #[cfg(not(test))]
1323                tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1324            }
1325        }
1326
1327        transports
1328    }
1329
1330    /// Find an operational transport that matches the given transport type name.
1331    ///
1332    /// Adopted UDP bootstrap transports are point-to-point sockets handed off
1333    /// from Nostr/STUN traversal. They must not be reused for ordinary
1334    /// `udp host:port` dials discovered through static config, mDNS, or overlay
1335    /// adverts: on macOS a `send_to` through the wrong adopted socket can fail
1336    /// with `EINVAL`, and even on platforms that allow it the packet would use
1337    /// the wrong 5-tuple/NAT mapping. Prefer configured transports and make the
1338    /// choice deterministic by lowest transport id instead of HashMap order.
1339    fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1340        self.transports
1341            .iter()
1342            .filter(|(id, handle)| {
1343                handle.transport_type().name == transport_type
1344                    && handle.is_operational()
1345                    && !self.bootstrap_transports.contains(id)
1346            })
1347            .min_by_key(|(id, _)| id.as_u32())
1348            .map(|(id, _)| *id)
1349    }
1350
1351    /// Resolve an Ethernet peer address ("interface/mac") to a transport ID
1352    /// and binary TransportAddr.
1353    ///
1354    /// Finds the Ethernet transport instance bound to the named interface
1355    /// and parses the MAC portion into a 6-byte TransportAddr.
1356    #[allow(unused_variables)]
1357    fn resolve_ethernet_addr(
1358        &self,
1359        addr_str: &str,
1360    ) -> Result<(TransportId, TransportAddr), NodeError> {
1361        #[cfg(any(target_os = "linux", target_os = "macos"))]
1362        {
1363            let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1364                NodeError::NoTransportForType(format!(
1365                    "invalid Ethernet address format '{}': expected 'interface/mac'",
1366                    addr_str
1367                ))
1368            })?;
1369
1370            // Find the Ethernet transport bound to this interface
1371            let transport_id = self
1372                .transports
1373                .iter()
1374                .find(|(_, handle)| {
1375                    handle.transport_type().name == "ethernet"
1376                        && handle.is_operational()
1377                        && handle.interface_name() == Some(iface)
1378                })
1379                .map(|(id, _)| *id)
1380                .ok_or_else(|| {
1381                    NodeError::NoTransportForType(format!(
1382                        "no operational Ethernet transport for interface '{}'",
1383                        iface
1384                    ))
1385                })?;
1386
1387            let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1388                NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1389            })?;
1390
1391            Ok((transport_id, TransportAddr::from_bytes(&mac)))
1392        }
1393        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1394        {
1395            Err(NodeError::NoTransportForType(
1396                "Ethernet transport is not supported on this platform".to_string(),
1397            ))
1398        }
1399    }
1400
1401    /// Resolve a BLE address string (`"adapter/AA:BB:CC:DD:EE:FF"`) to a
1402    /// (TransportId, TransportAddr) pair by finding the BLE transport
1403    /// instance matching the adapter name.
1404    #[cfg(bluer_available)]
1405    fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1406        let ta = TransportAddr::from_string(addr_str);
1407        let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1408            NodeError::NoTransportForType(format!(
1409                "invalid BLE address format '{}': expected 'adapter/mac'",
1410                addr_str
1411            ))
1412        })?;
1413
1414        // Find the BLE transport for this adapter
1415        let transport_id = self
1416            .transports
1417            .iter()
1418            .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1419            .map(|(id, _)| *id)
1420            .ok_or_else(|| {
1421                NodeError::NoTransportForType(format!(
1422                    "no operational BLE transport for adapter '{}'",
1423                    adapter
1424                ))
1425            })?;
1426
1427        // Validate the address format
1428        crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1429            NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1430        })?;
1431
1432        Ok((transport_id, TransportAddr::from_string(addr_str)))
1433    }
1434
1435    // === Identity Accessors ===
1436
1437    /// Get this node's identity.
1438    pub fn identity(&self) -> &Identity {
1439        &self.identity
1440    }
1441
1442    /// Get this node's NodeAddr.
1443    pub fn node_addr(&self) -> &NodeAddr {
1444        self.identity.node_addr()
1445    }
1446
1447    /// Get this node's npub.
1448    pub fn npub(&self) -> String {
1449        self.identity.npub()
1450    }
1451
1452    /// Return a human-readable display name for a NodeAddr.
1453    ///
1454    /// Lookup order:
1455    /// 1. Host map hostname (from peer aliases + /etc/fips/hosts)
1456    /// 2. Configured peer alias or short npub (from startup map)
1457    /// 3. Active peer's short npub (e.g., inbound peer not in config)
1458    /// 4. Session endpoint's short npub (end-to-end, may not be direct peer)
1459    /// 5. Truncated NodeAddr hex (unknown address)
1460    pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1461        if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1462            return hostname.to_string();
1463        }
1464        if let Some(name) = self.peer_aliases.get(addr) {
1465            return name.clone();
1466        }
1467        if let Some(peer) = self.peers.get(addr) {
1468            return peer.identity().short_npub();
1469        }
1470        if let Some(entry) = self.sessions.get(addr) {
1471            let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1472            return PeerIdentity::from_pubkey(xonly).short_npub();
1473        }
1474        addr.short_hex()
1475    }
1476
1477    /// Tear down a `peers_by_index` entry **and** keep the shard-owned
1478    /// decrypt-worker state coherent: removes the same `cache_key`
1479    /// from the registered-sessions tracking set and tells the
1480    /// assigned shard worker to drop its `OwnedSessionState` entry.
1481    ///
1482    /// Use this instead of a bare `self.peers_by_index.remove(&key)`
1483    /// at every session-lifecycle teardown site (rekey cross-connection
1484    /// swap, peer disconnect, dispatch session-rotation) so the worker
1485    /// doesn't keep stale ciphers / replay windows around. The
1486    /// follow-up `RegisterSession` for the NEW key (if any) will then
1487    /// install the fresh state on the same shard.
1488    pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1489        // Find the peer that owns this index BEFORE removing it from
1490        // the index map, so we can decide whether the deregistration
1491        // also tears down the peer's connected UDP socket.
1492        let owning_peer = self.peers_by_index.get(&cache_key).copied();
1493        self.peers_by_index.remove(&cache_key);
1494        if self.decrypt_registered_sessions.remove(&cache_key)
1495            && let Some(workers) = self.decrypt_workers.as_ref()
1496        {
1497            workers.unregister_session(cache_key);
1498        }
1499        // Tear down the per-peer connected UDP socket *only* if no
1500        // other peers_by_index entry still resolves to this peer.
1501        // Rekey drain calls into this helper with the OLD session
1502        // index while the NEW index is already installed and points
1503        // at the same peer — there the connect()-ed 5-tuple is
1504        // still valid for the new session and we must not close it.
1505        // Peer-teardown sites (CrossConnection swap, stale-index
1506        // fall-through in encrypted.rs, disconnect handler) call
1507        // here when this is the peer's last index, so the connected
1508        // socket goes away with the peer.
1509        if let Some(peer_addr) = owning_peer {
1510            let peer_has_other_index = self
1511                .peers_by_index
1512                .values()
1513                .any(|other| *other == peer_addr);
1514            if !peer_has_other_index {
1515                self.clear_connected_udp_for_peer(&peer_addr);
1516            }
1517        }
1518    }
1519
1520    /// Ensure the current FMP receive index resolves to this peer.
1521    ///
1522    /// Rekey msg1/msg2 handlers pre-register the pending index before
1523    /// cutover, but losing that registration in a debug build used to
1524    /// panic in the cutover path. Repairing the map here is safe: the
1525    /// peer has already promoted the pending session, and the decrypt
1526    /// worker registration immediately after cutover depends on the
1527    /// same `(transport_id, our_index)` key.
1528    pub(in crate::node) fn ensure_current_session_index_registered(
1529        &mut self,
1530        node_addr: &NodeAddr,
1531        context: &'static str,
1532    ) -> bool {
1533        let Some(peer) = self.peers.get(node_addr) else {
1534            return false;
1535        };
1536        let Some(transport_id) = peer.transport_id() else {
1537            warn!(
1538                peer = %self.peer_display_name(node_addr),
1539                context,
1540                "Cannot register current session index without transport id"
1541            );
1542            return false;
1543        };
1544        let Some(our_index) = peer.our_index() else {
1545            warn!(
1546                peer = %self.peer_display_name(node_addr),
1547                context,
1548                "Cannot register current session index without local index"
1549            );
1550            return false;
1551        };
1552
1553        let cache_key = (transport_id, our_index.as_u32());
1554        match self.peers_by_index.get(&cache_key).copied() {
1555            Some(existing) if existing == *node_addr => true,
1556            Some(existing) => {
1557                warn!(
1558                    peer = %self.peer_display_name(node_addr),
1559                    previous_owner = %self.peer_display_name(&existing),
1560                    transport_id = %transport_id,
1561                    our_index = %our_index,
1562                    context,
1563                    "Repairing current session index with stale owner"
1564                );
1565                self.peers_by_index.insert(cache_key, *node_addr);
1566                true
1567            }
1568            None => {
1569                warn!(
1570                    peer = %self.peer_display_name(node_addr),
1571                    transport_id = %transport_id,
1572                    our_index = %our_index,
1573                    context,
1574                    "Repairing missing current session index"
1575                );
1576                self.peers_by_index.insert(cache_key, *node_addr);
1577                true
1578            }
1579        }
1580    }
1581
1582    // === Configuration ===
1583
1584    /// Get the configuration.
1585    pub fn config(&self) -> &Config {
1586        &self.config
1587    }
1588
1589    /// Calculate the effective IPv6 MTU that can be sent over FIPS.
1590    ///
1591    /// Delegates to `upper::icmp::effective_ipv6_mtu()` with this node's
1592    /// transport MTU. Returns the maximum IPv6 packet size (including
1593    /// IPv6 header) that can be transmitted through the FIPS mesh.
1594    pub fn effective_ipv6_mtu(&self) -> u16 {
1595        crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1596    }
1597
1598    /// Get the transport MTU governing the global TUN-boundary MSS clamp.
1599    ///
1600    /// Returns the **minimum** MTU across all operational transports, or
1601    /// 1280 (IPv6 minimum) as fallback. Used for initial TUN configuration
1602    /// where a specific egress transport isn't yet known: the resulting
1603    /// `effective_ipv6_mtu` (transport_mtu - 77) and `max_mss`
1604    /// (effective_mtu - 60) form a conservative ceiling that fits ANY
1605    /// configured-transport's egress, eliminating PMTU-D black holes that
1606    /// would otherwise occur when a flow's actual egress is smaller than
1607    /// the clamp ceiling assumed at TUN init.
1608    ///
1609    /// Returning the smallest (rather than the first-iterated, which used
1610    /// to vary across HashMap iteration order + async-startup race) makes
1611    /// the clamp deterministic across daemon restarts.
1612    ///
1613    /// See `ISSUE-2026-0011` for the empirical investigation.
1614    pub fn transport_mtu(&self) -> u16 {
1615        let min_operational = self
1616            .transports
1617            .values()
1618            .filter(|h| h.is_operational())
1619            .map(|h| h.mtu())
1620            .min();
1621        if let Some(mtu) = min_operational {
1622            return mtu;
1623        }
1624        // Fallback to config: try UDP first, then Ethernet
1625        if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1626            return cfg.mtu();
1627        }
1628        1280
1629    }
1630
1631    // === State ===
1632
1633    /// Get the node state.
1634    pub fn state(&self) -> NodeState {
1635        self.state
1636    }
1637
1638    /// Get the node uptime.
1639    pub fn uptime(&self) -> std::time::Duration {
1640        self.started_at.elapsed()
1641    }
1642
1643    /// Check if node is operational.
1644    pub fn is_running(&self) -> bool {
1645        self.state.is_operational()
1646    }
1647
1648    /// Check if this is a leaf-only node.
1649    pub fn is_leaf_only(&self) -> bool {
1650        self.is_leaf_only
1651    }
1652
1653    // === Tree State ===
1654
1655    /// Get the tree state.
1656    pub fn tree_state(&self) -> &TreeState {
1657        &self.tree_state
1658    }
1659
1660    /// Get mutable tree state.
1661    pub fn tree_state_mut(&mut self) -> &mut TreeState {
1662        &mut self.tree_state
1663    }
1664
1665    // === Bloom State ===
1666
1667    /// Get the Bloom filter state.
1668    pub fn bloom_state(&self) -> &BloomState {
1669        &self.bloom_state
1670    }
1671
1672    /// Get mutable Bloom filter state.
1673    pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1674        &mut self.bloom_state
1675    }
1676
1677    // === Mesh Size Estimate ===
1678
1679    /// Get the cached estimated mesh size.
1680    pub fn estimated_mesh_size(&self) -> Option<u64> {
1681        self.estimated_mesh_size
1682    }
1683
1684    /// Compute and cache the estimated mesh size from bloom filters.
1685    ///
1686    /// Uses the spanning tree partition: parent's filter covers nodes reachable
1687    /// upward, children's filters cover disjoint subtrees downward. The sum
1688    /// of estimated entry counts plus one (self) approximates total network size.
1689    pub(crate) fn compute_mesh_size(&mut self) {
1690        let my_addr = *self.tree_state.my_node_addr();
1691        let parent_id = *self.tree_state.my_declaration().parent_id();
1692        let is_root = self.tree_state.is_root();
1693
1694        let max_fpr = self.config.node.bloom.max_inbound_fpr;
1695        let mut total: f64 = 1.0; // count self
1696        let mut child_count: u32 = 0;
1697        let mut has_data = false;
1698
1699        // Parent's filter: nodes reachable upward through the tree.
1700        // If any contributing filter is above the FPR cap, we refuse to
1701        // estimate rather than substitute a partial/biased aggregate —
1702        // Node.estimated_mesh_size is already Option<u64> and consumers
1703        // (control socket, fipstop, periodic debug log) handle None.
1704        if !is_root
1705            && let Some(parent) = self.peers.get(&parent_id)
1706            && let Some(filter) = parent.inbound_filter()
1707        {
1708            match filter.estimated_count(max_fpr) {
1709                Some(n) => {
1710                    total += n;
1711                    has_data = true;
1712                }
1713                None => {
1714                    self.estimated_mesh_size = None;
1715                    return;
1716                }
1717            }
1718        }
1719
1720        // Children's filters: each child's subtree is disjoint
1721        for (peer_addr, peer) in &self.peers {
1722            if peer_addr == &parent_id {
1723                continue;
1724            }
1725            if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1726                && *decl.parent_id() == my_addr
1727            {
1728                child_count += 1;
1729                if let Some(filter) = peer.inbound_filter() {
1730                    match filter.estimated_count(max_fpr) {
1731                        Some(n) => {
1732                            total += n;
1733                            has_data = true;
1734                        }
1735                        None => {
1736                            self.estimated_mesh_size = None;
1737                            return;
1738                        }
1739                    }
1740                }
1741            }
1742        }
1743
1744        if !has_data {
1745            self.estimated_mesh_size = None;
1746            return;
1747        }
1748
1749        let size = total.round() as u64;
1750        self.estimated_mesh_size = Some(size);
1751
1752        // Periodic logging (reuse MMP default interval: 30s)
1753        let now = std::time::Instant::now();
1754        let should_log = match self.last_mesh_size_log {
1755            None => true,
1756            Some(last) => {
1757                now.duration_since(last)
1758                    >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1759            }
1760        };
1761        if should_log {
1762            tracing::debug!(
1763                estimated_mesh_size = size,
1764                peers = self.peers.len(),
1765                children = child_count,
1766                "Mesh size estimate"
1767            );
1768            self.last_mesh_size_log = Some(now);
1769        }
1770    }
1771
1772    // === Coord Cache ===
1773
1774    /// Get the coordinate cache.
1775    pub fn coord_cache(&self) -> &CoordCache {
1776        &self.coord_cache
1777    }
1778
1779    /// Get mutable coordinate cache.
1780    pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1781        &mut self.coord_cache
1782    }
1783
1784    // === Node Statistics ===
1785
1786    /// Get the node statistics.
1787    pub fn stats(&self) -> &stats::NodeStats {
1788        &self.stats
1789    }
1790
1791    /// Get mutable node statistics.
1792    pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1793        &mut self.stats
1794    }
1795
1796    /// Get the stats history collector.
1797    pub fn stats_history(&self) -> &stats_history::StatsHistory {
1798        &self.stats_history
1799    }
1800
1801    /// Sample the current node state into the stats history ring.
1802    /// Called once per tick from the RX loop.
1803    pub(crate) fn record_stats_history(&mut self) {
1804        let fwd = &self.stats.forwarding;
1805        let peers_with_mmp: Vec<f64> = self
1806            .peers
1807            .values()
1808            .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1809            .collect();
1810        let loss_rate = if peers_with_mmp.is_empty() {
1811            0.0
1812        } else {
1813            peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1814        };
1815
1816        let snap = stats_history::Snapshot {
1817            mesh_size: self.estimated_mesh_size,
1818            tree_depth: self.tree_state.my_coords().depth() as u32,
1819            peer_count: self.peers.len() as u64,
1820            parent_switches_total: self.stats.tree.parent_switches,
1821            bytes_in_total: fwd.received_bytes,
1822            bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1823            packets_in_total: fwd.received_packets,
1824            packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1825            loss_rate,
1826            active_sessions: self.sessions.len() as u64,
1827        };
1828
1829        let now = std::time::Instant::now();
1830        let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1831            .peers
1832            .values()
1833            .map(|p| {
1834                let stats = p.link_stats();
1835                let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1836                    Some(m) => (
1837                        m.metrics.srtt_ms(),
1838                        Some(m.metrics.loss_rate()),
1839                        m.receiver.ecn_ce_count() as u64,
1840                    ),
1841                    None => (None, None, 0),
1842                };
1843                stats_history::PeerSnapshot {
1844                    node_addr: *p.node_addr(),
1845                    last_seen: now,
1846                    srtt_ms,
1847                    loss_rate,
1848                    bytes_in_total: stats.bytes_recv,
1849                    bytes_out_total: stats.bytes_sent,
1850                    packets_in_total: stats.packets_recv,
1851                    packets_out_total: stats.packets_sent,
1852                    ecn_ce_total: ecn_ce,
1853                }
1854            })
1855            .collect();
1856
1857        self.stats_history.tick(now, &snap, &peer_snaps);
1858    }
1859
1860    // === TUN Interface ===
1861
1862    /// Get the TUN state.
1863    pub fn tun_state(&self) -> TunState {
1864        self.tun_state
1865    }
1866
1867    /// Get the TUN interface name, if active.
1868    pub fn tun_name(&self) -> Option<&str> {
1869        self.tun_name.as_deref()
1870    }
1871
1872    // === Resource Limits ===
1873
1874    /// Set the maximum number of connections (handshake phase).
1875    pub fn set_max_connections(&mut self, max: usize) {
1876        self.max_connections = max;
1877    }
1878
1879    /// Set the maximum number of peers (authenticated).
1880    pub fn set_max_peers(&mut self, max: usize) {
1881        self.max_peers = max;
1882    }
1883
1884    /// Returns false when starting more outbound work would exceed a resource
1885    /// cap. A cap of `0` means uncapped.
1886    pub(crate) fn outbound_admission_check(&self) -> bool {
1887        let connection_used = self
1888            .connections
1889            .len()
1890            .saturating_add(self.pending_connects.len());
1891        let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
1892        let connection_allowed =
1893            self.max_connections == 0 || connection_used < self.max_connections;
1894        let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1895        peer_allowed && connection_allowed && link_allowed
1896    }
1897
1898    /// Admission for public/open-discovery outbound work. This includes the
1899    /// general connection/link caps and, when open Nostr discovery is enabled,
1900    /// the configured non-peer budget.
1901    pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
1902        if !self.outbound_admission_check() {
1903            return false;
1904        }
1905
1906        let nostr = &self.config.node.discovery.nostr;
1907        if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
1908            return true;
1909        }
1910
1911        let configured_npubs = self
1912            .config
1913            .peers()
1914            .iter()
1915            .map(|peer| peer.npub.clone())
1916            .collect::<HashSet<_>>();
1917        self.open_discovery_enqueue_budget(&configured_npubs) > 0
1918    }
1919
1920    /// Like `outbound_admission_check`, but for racing a better path to a
1921    /// peer that is already authenticated. This may temporarily add a
1922    /// connection/link, but it does not consume a new peer slot.
1923    pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
1924        let connection_used = self
1925            .connections
1926            .len()
1927            .saturating_add(self.pending_connects.len());
1928        let connection_allowed =
1929            self.max_connections == 0 || connection_used < self.max_connections;
1930        let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1931        connection_allowed && link_allowed
1932    }
1933
1934    /// Set the maximum number of links.
1935    pub fn set_max_links(&mut self, max: usize) {
1936        self.max_links = max;
1937    }
1938
1939    // === Counts ===
1940
1941    /// Number of pending connections (handshake in progress).
1942    pub fn connection_count(&self) -> usize {
1943        self.connections.len()
1944    }
1945
1946    /// Number of authenticated peers.
1947    pub fn peer_count(&self) -> usize {
1948        self.peers.len()
1949    }
1950
1951    /// Number of active links.
1952    pub fn link_count(&self) -> usize {
1953        self.links.len()
1954    }
1955
1956    /// Number of active transports.
1957    pub fn transport_count(&self) -> usize {
1958        self.transports.len()
1959    }
1960
1961    // === Transport Management ===
1962
1963    /// Allocate a new transport ID.
1964    pub fn allocate_transport_id(&mut self) -> TransportId {
1965        let id = TransportId::new(self.next_transport_id);
1966        self.next_transport_id += 1;
1967        id
1968    }
1969
1970    /// Get a transport by ID.
1971    pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1972        self.transports.get(id)
1973    }
1974
1975    /// Get mutable transport by ID.
1976    pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1977        self.transports.get_mut(id)
1978    }
1979
1980    /// Iterate over transport IDs.
1981    pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1982        self.transports.keys()
1983    }
1984
1985    /// Get the packet receiver for the event loop.
1986    pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1987        self.packet_rx.as_mut()
1988    }
1989
1990    // === Link Management ===
1991
1992    /// Allocate a new link ID.
1993    pub fn allocate_link_id(&mut self) -> LinkId {
1994        let id = LinkId::new(self.next_link_id);
1995        self.next_link_id += 1;
1996        id
1997    }
1998
1999    /// Add a link.
2000    pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
2001        if self.max_links > 0 && self.links.len() >= self.max_links {
2002            return Err(NodeError::MaxLinksExceeded {
2003                max: self.max_links,
2004            });
2005        }
2006        let link_id = link.link_id();
2007        let transport_id = link.transport_id();
2008        let remote_addr = link.remote_addr().clone();
2009
2010        self.links.insert(link_id, link);
2011        self.addr_to_link
2012            .insert((transport_id, remote_addr), link_id);
2013        Ok(())
2014    }
2015
2016    /// Get a link by ID.
2017    pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2018        self.links.get(link_id)
2019    }
2020
2021    /// Get a mutable link by ID.
2022    pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2023        self.links.get_mut(link_id)
2024    }
2025
2026    /// Find link ID by transport address.
2027    pub fn find_link_by_addr(
2028        &self,
2029        transport_id: TransportId,
2030        addr: &TransportAddr,
2031    ) -> Option<LinkId> {
2032        self.addr_to_link
2033            .get(&(transport_id, addr.clone()))
2034            .copied()
2035    }
2036
2037    /// Remove a link.
2038    ///
2039    /// Only removes the addr_to_link reverse lookup if it still points to this
2040    /// link. In cross-connection scenarios, a newer link may have replaced the
2041    /// entry for the same address.
2042    pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2043        if let Some(link) = self.links.remove(link_id) {
2044            // Clean up reverse lookup only if it still maps to this link
2045            let key = (link.transport_id(), link.remote_addr().clone());
2046            if self.addr_to_link.get(&key) == Some(link_id) {
2047                self.addr_to_link.remove(&key);
2048            }
2049            Some(link)
2050        } else {
2051            None
2052        }
2053    }
2054
2055    pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2056        if !self.bootstrap_transports.contains(&transport_id) {
2057            return;
2058        }
2059
2060        let transport_in_use = self
2061            .links
2062            .values()
2063            .any(|link| link.transport_id() == transport_id)
2064            || self
2065                .connections
2066                .values()
2067                .any(|conn| conn.transport_id() == Some(transport_id))
2068            || self
2069                .peers
2070                .values()
2071                .any(|peer| peer.transport_id() == Some(transport_id))
2072            || self
2073                .pending_connects
2074                .iter()
2075                .any(|pending| pending.transport_id == transport_id);
2076
2077        if transport_in_use {
2078            return;
2079        }
2080
2081        tracing::debug!(
2082            transport_id = %transport_id,
2083            "bootstrap transport has no remaining references; dropping"
2084        );
2085
2086        self.bootstrap_transports.remove(&transport_id);
2087        self.bootstrap_transport_npubs.remove(&transport_id);
2088        self.transport_drops.remove(&transport_id);
2089        self.transports.remove(&transport_id);
2090    }
2091
2092    /// Iterate over all links.
2093    pub fn links(&self) -> impl Iterator<Item = &Link> {
2094        self.links.values()
2095    }
2096
2097    // === Connection Management (Handshake Phase) ===
2098
2099    /// Add a pending connection.
2100    pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2101        let link_id = connection.link_id();
2102
2103        if self.connections.contains_key(&link_id) {
2104            return Err(NodeError::ConnectionAlreadyExists(link_id));
2105        }
2106
2107        if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2108            return Err(NodeError::MaxConnectionsExceeded {
2109                max: self.max_connections,
2110            });
2111        }
2112
2113        self.connections.insert(link_id, connection);
2114        Ok(())
2115    }
2116
2117    /// Get a connection by LinkId.
2118    pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2119        self.connections.get(link_id)
2120    }
2121
2122    /// Get a mutable connection by LinkId.
2123    pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2124        self.connections.get_mut(link_id)
2125    }
2126
2127    /// Remove a connection.
2128    pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2129        self.connections.remove(link_id)
2130    }
2131
2132    /// Iterate over all connections.
2133    pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2134        self.connections.values()
2135    }
2136
2137    // === Peer Management (Active Phase) ===
2138
2139    /// Get a peer by NodeAddr.
2140    pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2141        self.peers.get(node_addr)
2142    }
2143
2144    /// Get a mutable peer by NodeAddr.
2145    pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2146        self.peers.get_mut(node_addr)
2147    }
2148
2149    /// Remove a peer.
2150    pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2151        self.peers.remove(node_addr)
2152    }
2153
2154    /// Iterate over all peers.
2155    pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2156        self.peers.values()
2157    }
2158
2159    /// Reference to the Nostr discovery handle if discovery is enabled.
2160    /// Used by control queries (`show_peers` per-peer Nostr-traversal
2161    /// state) to read failure-state without taking shared ownership.
2162    pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2163        self.nostr_discovery.as_deref()
2164    }
2165
2166    /// Iterate over all peer node IDs.
2167    pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2168        self.peers.keys()
2169    }
2170
2171    /// Iterate over peers that can send traffic.
2172    pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2173        self.peers.values().filter(|p| p.can_send())
2174    }
2175
2176    /// Number of peers that can send traffic.
2177    pub fn sendable_peer_count(&self) -> usize {
2178        self.peers.values().filter(|p| p.can_send()).count()
2179    }
2180
2181    pub(crate) fn set_discovery_fallback_transit_allowed(
2182        &mut self,
2183        peer_addr: NodeAddr,
2184        allowed: bool,
2185    ) {
2186        if allowed {
2187            self.discovery_fallback_transit_blocked_peers
2188                .remove(&peer_addr);
2189        } else {
2190            self.discovery_fallback_transit_blocked_peers
2191                .insert(peer_addr);
2192        }
2193    }
2194
2195    pub(crate) fn configured_discovery_fallback_transit(
2196        &self,
2197        peer_addr: &NodeAddr,
2198    ) -> Option<bool> {
2199        self.configured_peer(peer_addr)
2200            .map(|peer| peer.discovery_fallback_transit)
2201    }
2202
2203    pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2204        self.config.peers().iter().find(|peer| {
2205            PeerIdentity::from_npub(&peer.npub)
2206                .ok()
2207                .is_some_and(|identity| identity.node_addr() == peer_addr)
2208        })
2209    }
2210
2211    pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2212        if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2213            return retry_state.peer_config.discovery_fallback_transit;
2214        }
2215
2216        if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2217            return allowed;
2218        }
2219
2220        self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2221    }
2222
2223    // === End-to-End Sessions ===
2224
2225    /// Get a session by remote NodeAddr.
2226    /// Disable the discovery forward rate limiter (for tests).
2227    #[cfg(test)]
2228    pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2229        self.discovery_forward_limiter
2230            .set_interval(std::time::Duration::ZERO);
2231    }
2232
2233    #[cfg(test)]
2234    pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2235        self.sessions.get(remote)
2236    }
2237
2238    /// Get a mutable session by remote NodeAddr.
2239    #[cfg(test)]
2240    pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2241        self.sessions.get_mut(remote)
2242    }
2243
2244    /// Remove a session.
2245    #[cfg(test)]
2246    pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2247        self.sessions.remove(remote)
2248    }
2249
2250    /// Read the path_mtu_lookup entry for a destination FipsAddress.
2251    #[cfg(test)]
2252    pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2253        self.path_mtu_lookup
2254            .read()
2255            .ok()
2256            .and_then(|map| map.get(fips_addr).copied())
2257    }
2258
2259    /// Write a path_mtu_lookup entry directly (for tests that pre-seed the map).
2260    #[cfg(test)]
2261    pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2262        if let Ok(mut map) = self.path_mtu_lookup.write() {
2263            map.insert(fips_addr, mtu);
2264        }
2265    }
2266
2267    /// Number of end-to-end sessions.
2268    pub fn session_count(&self) -> usize {
2269        self.sessions.len()
2270    }
2271
2272    /// Iterate over all session entries (for control queries).
2273    pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2274        self.sessions.iter()
2275    }
2276
2277    // === Identity Cache ===
2278
2279    /// Register a node in the identity cache for FipsAddress → NodeAddr lookup.
2280    pub(crate) fn register_identity(
2281        &mut self,
2282        node_addr: NodeAddr,
2283        pubkey: secp256k1::PublicKey,
2284    ) -> bool {
2285        let mut prefix = [0u8; 15];
2286        prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2287        if let Some(entry) = self.identity_cache.get(&prefix)
2288            && entry.node_addr == node_addr
2289            && entry.pubkey == pubkey
2290        {
2291            // Endpoint sends pass the same PeerIdentity on every packet. Once
2292            // validated, avoid re-deriving NodeAddr from the public key in the
2293            // data path; that hash showed up in macOS sender profiles.
2294            return true;
2295        }
2296
2297        let (xonly, _) = pubkey.x_only_public_key();
2298        let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2299        if derived_node_addr != node_addr {
2300            debug!(
2301                claimed_node_addr = %node_addr,
2302                derived_node_addr = %derived_node_addr,
2303                "Rejected identity cache entry with mismatched public key"
2304            );
2305            return false;
2306        }
2307
2308        let now_ms = Self::now_ms();
2309        if let Some(entry) = self.identity_cache.get_mut(&prefix)
2310            && entry.node_addr == node_addr
2311        {
2312            entry.pubkey = pubkey;
2313            entry.last_seen_ms = now_ms;
2314            return true;
2315        }
2316
2317        let npub = encode_npub(&xonly);
2318        self.identity_cache.insert(
2319            prefix,
2320            IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2321        );
2322        // LRU eviction
2323        let max = self.config.node.cache.identity_size;
2324        if self.identity_cache.len() > max
2325            && let Some(oldest_key) = self
2326                .identity_cache
2327                .iter()
2328                .min_by_key(|(_, entry)| entry.last_seen_ms)
2329                .map(|(k, _)| *k)
2330        {
2331            self.identity_cache.remove(&oldest_key);
2332        }
2333        true
2334    }
2335
2336    /// Look up a destination by FipsAddress prefix (bytes 1-15 of the IPv6 address).
2337    pub(crate) fn lookup_by_fips_prefix(
2338        &mut self,
2339        prefix: &[u8; 15],
2340    ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2341        if let Some(entry) = self.identity_cache.get_mut(prefix) {
2342            entry.last_seen_ms = Self::now_ms(); // LRU touch
2343            Some((entry.node_addr, entry.pubkey))
2344        } else {
2345            None
2346        }
2347    }
2348
2349    /// Check if a node's identity is in the cache (without LRU touch).
2350    pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2351        let mut prefix = [0u8; 15];
2352        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2353        self.identity_cache.contains_key(&prefix)
2354    }
2355
2356    /// Number of identity cache entries.
2357    pub fn identity_cache_len(&self) -> usize {
2358        self.identity_cache.len()
2359    }
2360
2361    /// Iterate over identity cache entries.
2362    ///
2363    /// Returns `(NodeAddr, PublicKey, last_seen_ms)` for each cached identity.
2364    /// Used by the `show_identity_cache` control query.
2365    pub fn identity_cache_iter(
2366        &self,
2367    ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2368        self.identity_cache
2369            .values()
2370            .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2371    }
2372
2373    /// Configured maximum identity cache size.
2374    pub fn identity_cache_max(&self) -> usize {
2375        self.config.node.cache.identity_size
2376    }
2377
2378    /// Number of pending discovery lookups.
2379    pub fn pending_lookup_count(&self) -> usize {
2380        self.pending_lookups.len()
2381    }
2382
2383    /// Iterate over pending discovery lookups for diagnostics.
2384    pub fn pending_lookups_iter(
2385        &self,
2386    ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2387        self.pending_lookups.iter()
2388    }
2389
2390    /// Number of recent discovery requests tracked.
2391    pub fn recent_request_count(&self) -> usize {
2392        self.recent_requests.len()
2393    }
2394
2395    /// Count of destinations with queued TUN packets awaiting session setup.
2396    pub fn pending_tun_destinations(&self) -> usize {
2397        self.pending_tun_packets.len()
2398    }
2399
2400    /// Total TUN packets queued across all destinations.
2401    pub fn pending_tun_total_packets(&self) -> usize {
2402        self.pending_tun_packets.values().map(|q| q.len()).sum()
2403    }
2404
2405    /// Iterate over retry state for diagnostics.
2406    pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2407        self.retry_pending.iter()
2408    }
2409
2410    // === Routing ===
2411
2412    /// Check if a peer is a tree neighbor (parent or child in the spanning tree).
2413    ///
2414    /// Returns true if the peer is our current tree parent, or if the peer
2415    /// has declared us as their parent (making them our child).
2416    pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2417        // Peer is our parent
2418        if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2419            return true;
2420        }
2421        // Peer is our child (their declaration names us as parent)
2422        if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2423            && decl.parent_id() == self.node_addr()
2424        {
2425            return true;
2426        }
2427        false
2428    }
2429
2430    /// Find next hop for a destination node address.
2431    ///
2432    /// Routing priority:
2433    /// 1. Destination is self → `None` (local delivery)
2434    /// 2. Destination is a healthy direct peer → that peer, unless a known
2435    ///    fallback next-hop has a meaningful link-quality advantage.
2436    /// 3. Reply-learned routes in `reply_learned` mode. These are locally
2437    ///    observed reverse paths, selected with weighted multipath plus
2438    ///    periodic coordinate/tree exploration.
2439    /// 4. Bloom filter candidates with cached dest coords → among peers whose
2440    ///    bloom filter contains the destination, pick the one that minimizes
2441    ///    tree distance to the destination, with
2442    ///    `(link_cost, tree_distance_to_dest, node_addr)` tie-breaking.
2443    ///    The self-distance check ensures only peers strictly closer to the
2444    ///    destination than us are considered (prevents routing loops).
2445    /// 5. Greedy tree routing fallback (requires cached dest coords)
2446    /// 6. No route → `None`
2447    ///
2448    /// Both the bloom filter and tree routing paths require cached destination
2449    /// coordinates (checked in `coord_cache`). Without coordinates, the node
2450    /// cannot make loop-free forwarding decisions. The caller should signal
2451    /// `CoordsRequired` back to the source when `None` is returned for a
2452    /// non-local destination.
2453    pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2454        // 1. Local delivery
2455        if dest_node_addr == self.node_addr() {
2456            return None;
2457        }
2458        let now_ms = Self::now_ms();
2459        let direct_session_degraded = self.session_direct_path_is_degraded(dest_node_addr, now_ms);
2460
2461        let direct_peer_can_send = self
2462            .peers
2463            .get(dest_node_addr)
2464            .is_some_and(|peer| peer.can_send());
2465        let healthy_direct_route = self
2466            .peers
2467            .get(dest_node_addr)
2468            .filter(|peer| peer.is_healthy() && !direct_session_degraded)
2469            .map(|_| *dest_node_addr);
2470
2471        // A healthy direct path is not automatically the best path. A
2472        // hotspot/NAT hairpin can remain sendable with high RTT or mild loss;
2473        // in that case a lower-cost mesh next-hop should carry traffic while
2474        // direct probes continue in the background.
2475        let fallback_beats_direct = |node: &Self, fallback_addr: NodeAddr| {
2476            node.route_candidate_beats_direct(healthy_direct_route, fallback_addr)
2477        };
2478
2479        let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2480            Some(
2481                self.peers
2482                    .iter()
2483                    .filter(|(_, peer)| peer.can_send())
2484                    .map(|(addr, _)| *addr)
2485                    .collect::<HashSet<_>>(),
2486            )
2487        } else {
2488            None
2489        };
2490
2491        // 3. Optional reply-learned routing. These entries are not peer
2492        // claims; they are local observations of which peer carried traffic
2493        // or a verified lookup response back from the destination. Most
2494        // packets use weighted multipath over learned routes, but periodic
2495        // fallback exploration lets coord/bloom/tree routes discover better
2496        // candidates.
2497        let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2498            self.learned_routes.should_explore_fallback(
2499                dest_node_addr,
2500                now_ms,
2501                self.config.node.routing.learned_fallback_explore_interval,
2502                |addr| sendable.contains(addr),
2503            )
2504        });
2505        if let Some(sendable) = &sendable_learned_peers
2506            && !explore_fallback
2507        {
2508            let eligible = sendable
2509                .iter()
2510                .copied()
2511                .filter(|addr| fallback_beats_direct(self, *addr))
2512                .collect::<HashSet<_>>();
2513            if !eligible.is_empty()
2514                && let Some(next_hop_addr) =
2515                    self.learned_routes
2516                        .select_next_hop(dest_node_addr, now_ms, |addr| eligible.contains(addr))
2517            {
2518                return self.peers.get(&next_hop_addr);
2519            }
2520        }
2521
2522        // Look up cached destination coordinates (required by both bloom and tree paths).
2523        let Some(dest_coords) = self
2524            .coord_cache
2525            .get_and_touch(dest_node_addr, now_ms)
2526            .cloned()
2527        else {
2528            if (healthy_direct_route.is_none() || explore_fallback)
2529                && let Some(sendable) = &sendable_learned_peers
2530                && let Some(next_hop_addr) =
2531                    self.learned_routes
2532                        .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2533            {
2534                return self.peers.get(&next_hop_addr);
2535            }
2536            if let Some(direct_addr) = healthy_direct_route {
2537                return self.peers.get(&direct_addr);
2538            }
2539            if direct_peer_can_send {
2540                return self.peers.get(dest_node_addr);
2541            }
2542            return None;
2543        };
2544
2545        // 4. Bloom filter candidates — requires dest_coords for loop-free selection.
2546        //    If no candidate is strictly closer, fall through to tree routing.
2547        let coordinate_route_addr = {
2548            let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2549            if !candidates.is_empty() {
2550                self.select_best_candidate(&candidates, &dest_coords)
2551                    .map(|peer| *peer.node_addr())
2552            } else {
2553                None
2554            }
2555        };
2556        if let Some(next_hop_addr) = coordinate_route_addr
2557            && fallback_beats_direct(self, next_hop_addr)
2558        {
2559            return self.peers.get(&next_hop_addr);
2560        }
2561
2562        // 5. Greedy tree routing fallback
2563        let tree_route_addr = self
2564            .tree_state
2565            .find_next_hop(&dest_coords)
2566            .filter(|next_hop_id| {
2567                self.peers
2568                    .get(next_hop_id)
2569                    .is_some_and(|peer| peer.can_send())
2570            });
2571        if let Some(next_hop_addr) = tree_route_addr
2572            && fallback_beats_direct(self, next_hop_addr)
2573        {
2574            return self.peers.get(&next_hop_addr);
2575        }
2576
2577        if explore_fallback {
2578            return sendable_learned_peers.as_ref().and_then(|sendable| {
2579                self.learned_routes
2580                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2581                    .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2582            });
2583        }
2584
2585        if let Some(direct_addr) = healthy_direct_route {
2586            return self.peers.get(&direct_addr);
2587        }
2588
2589        if let Some(sendable) = &sendable_learned_peers
2590            && let Some(next_hop_addr) =
2591                self.learned_routes
2592                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2593        {
2594            return self.peers.get(&next_hop_addr);
2595        }
2596
2597        if direct_peer_can_send {
2598            return self.peers.get(dest_node_addr);
2599        }
2600
2601        None
2602    }
2603
2604    fn route_candidate_beats_direct(
2605        &self,
2606        healthy_direct_route: Option<NodeAddr>,
2607        candidate_addr: NodeAddr,
2608    ) -> bool {
2609        let Some(direct_addr) = healthy_direct_route else {
2610            return true;
2611        };
2612        if candidate_addr == direct_addr {
2613            return false;
2614        }
2615
2616        let Some(direct) = self.peers.get(&direct_addr) else {
2617            return true;
2618        };
2619        let Some(candidate) = self.peers.get(&candidate_addr) else {
2620            return false;
2621        };
2622        if !candidate.can_send() {
2623            return false;
2624        }
2625
2626        let direct_cost = direct.link_cost();
2627        let candidate_cost = candidate.link_cost();
2628        candidate_cost + ROUTING_FALLBACK_MIN_COST_ADVANTAGE < direct_cost
2629    }
2630
2631    pub(in crate::node) fn session_direct_path_is_degraded(
2632        &mut self,
2633        dest: &NodeAddr,
2634        now_ms: u64,
2635    ) -> bool {
2636        match self.session_direct_degraded_until_ms.get(dest).copied() {
2637            Some(until_ms) if until_ms > now_ms => true,
2638            Some(_) => {
2639                self.session_direct_degraded_until_ms.remove(dest);
2640                false
2641            }
2642            None => false,
2643        }
2644    }
2645
2646    pub(in crate::node) fn mark_session_direct_path_degraded(
2647        &mut self,
2648        dest: NodeAddr,
2649        now_ms: u64,
2650    ) -> bool {
2651        let until_ms = now_ms.saturating_add(SESSION_DIRECT_DEGRADED_HOLD_MS);
2652        let entry = self
2653            .session_direct_degraded_until_ms
2654            .entry(dest)
2655            .or_insert(0);
2656        let was_degraded = *entry > now_ms;
2657        *entry = (*entry).max(until_ms);
2658        !was_degraded
2659    }
2660
2661    pub(in crate::node) fn clear_session_direct_path_degraded(&mut self, dest: &NodeAddr) -> bool {
2662        self.session_direct_degraded_until_ms.remove(dest).is_some()
2663    }
2664
2665    pub(in crate::node) fn learn_reverse_route(
2666        &mut self,
2667        destination: NodeAddr,
2668        next_hop: NodeAddr,
2669    ) {
2670        if self.config.node.routing.mode != RoutingMode::ReplyLearned
2671            || destination == *self.node_addr()
2672        {
2673            return;
2674        }
2675        let now_ms = Self::now_ms();
2676        self.learned_routes.learn(
2677            destination,
2678            next_hop,
2679            now_ms,
2680            self.config.node.routing.learned_ttl_secs,
2681            self.config.node.routing.max_learned_routes_per_dest,
2682        );
2683    }
2684
2685    pub(in crate::node) fn record_route_failure(
2686        &mut self,
2687        destination: NodeAddr,
2688        next_hop: NodeAddr,
2689    ) {
2690        if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2691            return;
2692        }
2693        self.learned_routes.record_failure(&destination, &next_hop);
2694    }
2695
2696    pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2697        self.learned_routes.snapshot(now_ms)
2698    }
2699
2700    pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2701        self.learned_routes.purge_expired(now_ms);
2702    }
2703
2704    /// Select the best peer from a set of bloom filter candidates.
2705    ///
2706    /// Uses distance from each candidate's tree coordinates to the destination
2707    /// as the primary metric (after link_cost). Only selects peers that are
2708    /// strictly closer to the destination than we are (self-distance check
2709    /// prevents routing loops).
2710    ///
2711    /// Ordering: `(link_cost, distance_to_dest, node_addr)`.
2712    fn select_best_candidate<'a>(
2713        &'a self,
2714        candidates: &[&'a ActivePeer],
2715        dest_coords: &crate::tree::TreeCoordinate,
2716    ) -> Option<&'a ActivePeer> {
2717        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2718
2719        let mut best: Option<(&ActivePeer, f64, usize)> = None;
2720
2721        for &candidate in candidates {
2722            if !candidate.can_send() {
2723                continue;
2724            }
2725
2726            let cost = candidate.link_cost();
2727
2728            let dist = self
2729                .tree_state
2730                .peer_coords(candidate.node_addr())
2731                .map(|pc| pc.distance_to(dest_coords))
2732                .unwrap_or(usize::MAX);
2733
2734            // Self-distance check: only consider peers strictly closer
2735            // to the destination than we are (prevents routing loops)
2736            if dist >= my_distance {
2737                continue;
2738            }
2739
2740            let dominated = match &best {
2741                None => true,
2742                Some((_, best_cost, best_dist)) => {
2743                    cost < *best_cost
2744                        || (cost == *best_cost && dist < *best_dist)
2745                        || (cost == *best_cost
2746                            && dist == *best_dist
2747                            && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2748                }
2749            };
2750
2751            if dominated {
2752                best = Some((candidate, cost, dist));
2753            }
2754        }
2755
2756        best.map(|(peer, _, _)| peer)
2757    }
2758
2759    /// Check if a destination is in any peer's bloom filter.
2760    pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2761        self.peers.values().filter(|p| p.may_reach(dest)).collect()
2762    }
2763
2764    /// Get the TUN packet sender channel.
2765    ///
2766    /// Returns None if TUN is not active or the node hasn't been started.
2767    pub fn tun_tx(&self) -> Option<&TunTx> {
2768        self.tun_tx.as_ref()
2769    }
2770
2771    /// Attach app-owned packet I/O for embedded operation without a system TUN.
2772    ///
2773    /// This must be called before [`Node::start`] and requires `tun.enabled =
2774    /// false`. Outbound packets sent to the returned sender are processed by the
2775    /// normal session pipeline. Inbound packets delivered by FIPS sessions are
2776    /// sent to the returned receiver with source attribution.
2777    pub fn attach_external_packet_io(
2778        &mut self,
2779        capacity: usize,
2780    ) -> Result<ExternalPacketIo, NodeError> {
2781        if self.state != NodeState::Created {
2782            return Err(NodeError::Config(ConfigError::Validation(
2783                "external packet I/O must be attached before node start".to_string(),
2784            )));
2785        }
2786        if self.config.tun.enabled {
2787            return Err(NodeError::Config(ConfigError::Validation(
2788                "external packet I/O requires tun.enabled=false".to_string(),
2789            )));
2790        }
2791
2792        let capacity = capacity.max(1);
2793        let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2794        let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2795        self.tun_outbound_rx = Some(outbound_rx);
2796        self.external_packet_tx = Some(inbound_tx);
2797
2798        Ok(ExternalPacketIo {
2799            outbound_tx,
2800            inbound_rx,
2801        })
2802    }
2803
2804    /// Attach app-owned endpoint data I/O for embedded operation.
2805    ///
2806    /// Commands sent to the returned sender are processed by the node RX loop.
2807    /// Incoming endpoint data is emitted as source-attributed events.
2808    pub(crate) fn attach_endpoint_data_io(
2809        &mut self,
2810        capacity: usize,
2811    ) -> Result<EndpointDataIo, NodeError> {
2812        if self.state != NodeState::Created {
2813            return Err(NodeError::Config(ConfigError::Validation(
2814                "endpoint data I/O must be attached before node start".to_string(),
2815            )));
2816        }
2817
2818        let command_capacity = endpoint_data_command_capacity(capacity);
2819        let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2820        // Inbound endpoint-data events use an unbounded channel — see
2821        // `EndpointDataIo::event_rx` docs for the rationale (kills the
2822        // per-packet semaphore + the cross-task relay task that used to
2823        // sit on top of this channel).
2824        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2825        self.endpoint_command_rx = Some(command_rx);
2826        self.endpoint_event_tx = Some(event_tx.clone());
2827
2828        Ok(EndpointDataIo {
2829            command_tx,
2830            event_rx,
2831            event_tx,
2832        })
2833    }
2834
2835    pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2836        let mut prefix = [0u8; 15];
2837        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2838        self.identity_cache
2839            .get(&prefix)
2840            .filter(|entry| &entry.node_addr == addr)
2841            .map(|entry| entry.pubkey)
2842    }
2843
2844    pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2845        let mut prefix = [0u8; 15];
2846        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2847        self.identity_cache
2848            .get(&prefix)
2849            .filter(|entry| &entry.node_addr == addr)
2850            .map(|entry| entry.npub.clone())
2851    }
2852
2853    pub(in crate::node) fn deliver_external_ipv6_packet(
2854        &self,
2855        src_addr: &NodeAddr,
2856        packet: Vec<u8>,
2857    ) {
2858        let Some(external_packet_tx) = &self.external_packet_tx else {
2859            return;
2860        };
2861        if packet.len() < 40 {
2862            return;
2863        }
2864        let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2865            return;
2866        };
2867        let delivered = NodeDeliveredPacket {
2868            source_node_addr: *src_addr,
2869            source_npub: self.npub_for_node_addr(src_addr),
2870            destination,
2871            packet,
2872        };
2873        if let Err(error) = external_packet_tx.try_send(delivered) {
2874            debug!(error = %error, "Failed to deliver packet to external app sink");
2875        }
2876    }
2877
2878    // === Sending ===
2879
2880    /// Encrypt and send a link-layer message to an authenticated peer.
2881    ///
2882    /// The plaintext should include the message type byte followed by the
2883    /// message-specific payload (e.g., `[0x50, reason]` for Disconnect).
2884    ///
2885    /// The send path prepends a 4-byte session-relative timestamp (inner
2886    /// header) before encryption. The full 16-byte outer header is used
2887    /// as AAD for the AEAD construction.
2888    ///
2889    /// This is the standard path for sending any link-layer control message
2890    /// to a peer over their encrypted Noise session.
2891    pub(super) async fn send_encrypted_link_message(
2892        &mut self,
2893        node_addr: &NodeAddr,
2894        plaintext: &[u8],
2895    ) -> Result<(), NodeError> {
2896        self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2897            .await
2898    }
2899
2900    /// Update the local-outbound-broken signal from a `transport.send`
2901    /// outcome. Sets `last_local_send_failure_at` on local-side io
2902    /// errors (NetworkUnreachable / HostUnreachable / AddrNotAvailable);
2903    /// clears it on success. The reaper consults this in
2904    /// `check_link_heartbeats` to switch to `fast_link_dead_timeout_secs`.
2905    pub(in crate::node) fn note_local_send_outcome(
2906        &mut self,
2907        result: &Result<usize, TransportError>,
2908    ) {
2909        match result {
2910            Ok(_) => {
2911                if self.last_local_send_failure_at.is_some() {
2912                    self.last_local_send_failure_at = None;
2913                }
2914            }
2915            Err(error) if error.is_local_route_unavailable() => {
2916                self.last_local_send_failure_at = Some(std::time::Instant::now());
2917            }
2918            Err(_) => {}
2919        }
2920    }
2921
2922    /// Return the active dead-timeout after considering recent local route
2923    /// failures. The fast-dead signal is intentionally short-lived: on the
2924    /// UDP worker path a send call can return before the kernel result is
2925    /// observed, so a single stale route error must not compress liveness for
2926    /// the whole normal dead-timeout window.
2927    pub(in crate::node) fn local_send_failure_dead_timeout(
2928        &mut self,
2929        now: std::time::Instant,
2930        dead_timeout: std::time::Duration,
2931        fast_dead_timeout: std::time::Duration,
2932    ) -> std::time::Duration {
2933        match self.last_local_send_failure_at {
2934            Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
2935                fast_dead_timeout.min(dead_timeout)
2936            }
2937            Some(_) => {
2938                self.last_local_send_failure_at = None;
2939                dead_timeout
2940            }
2941            None => dead_timeout,
2942        }
2943    }
2944
2945    pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
2946        self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
2947    }
2948
2949    pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
2950        let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
2951            return false;
2952        };
2953        let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
2954        std::time::Instant::now().duration_since(t) <= grace
2955    }
2956
2957    /// Like `send_encrypted_link_message` but allows setting the FMP CE flag.
2958    ///
2959    /// Used by the forwarding path to relay congestion signals hop-by-hop.
2960    pub(super) async fn send_encrypted_link_message_with_ce(
2961        &mut self,
2962        node_addr: &NodeAddr,
2963        plaintext: &[u8],
2964        ce_flag: bool,
2965    ) -> Result<(), NodeError> {
2966        let peer = self
2967            .peers
2968            .get_mut(node_addr)
2969            .ok_or(NodeError::PeerNotFound(*node_addr))?;
2970
2971        let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2972            node_addr: *node_addr,
2973            reason: "no their_index".into(),
2974        })?;
2975        let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2976            node_addr: *node_addr,
2977            reason: "no transport_id".into(),
2978        })?;
2979        let remote_addr = peer
2980            .current_addr()
2981            .cloned()
2982            .ok_or_else(|| NodeError::SendFailed {
2983                node_addr: *node_addr,
2984                reason: "no current_addr".into(),
2985            })?;
2986        #[cfg(any(target_os = "linux", target_os = "macos"))]
2987        let connected_socket = peer.connected_udp();
2988
2989        // Prepend 4-byte session-relative timestamp (inner header)
2990        let timestamp_ms = peer.session_elapsed_ms();
2991
2992        // MMP: read spin bit value before entering session borrow
2993        let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2994        let mut flags = if sp_flag { FLAG_SP } else { 0 };
2995        if ce_flag {
2996            flags |= FLAG_CE;
2997        }
2998        if peer.current_k_bit() {
2999            flags |= FLAG_KEY_EPOCH;
3000        }
3001
3002        let session = peer
3003            .noise_session_mut()
3004            .ok_or_else(|| NodeError::SendFailed {
3005                node_addr: *node_addr,
3006                reason: "no noise session".into(),
3007            })?;
3008
3009        // Build 16-byte outer header upfront. The inner-plaintext
3010        // layout is `[ts:4 LE][plaintext...]`, so its length is exactly
3011        // `INNER_TS_LEN + plaintext.len()` — no need to build the Vec
3012        // just to measure it. The worker path uses this length to size
3013        // the wire buffer directly; the legacy path below still
3014        // materialises a separate `inner_plaintext` Vec for the inline
3015        // encrypt-and-send call.
3016        const INNER_TS_LEN: usize = 4;
3017        let counter = session.current_send_counter();
3018        let inner_len = INNER_TS_LEN + plaintext.len();
3019        let payload_len = inner_len as u16;
3020        let header = build_established_header(their_index, counter, flags, payload_len);
3021
3022        // **UDP send fast path.** The encrypt-worker pool is always
3023        // spawned at lifecycle start (workers = num_cpus) in
3024        // production, so this branch is taken for every authentic
3025        // send on every UDP-transported established session. The
3026        // AEAD work + sendmsg syscall run on a dedicated OS thread;
3027        // the rx_loop only builds the wire buffer + reserves the
3028        // counter inline.
3029        //
3030        // Other transport kinds (BLE, TCP, sim, ethernet) fall
3031        // through to the inline encrypt + transport.send path
3032        // below — those don't have raw-fd / sendmmsg / UDP_GSO
3033        // benefits to expose through the worker pool, so the simpler
3034        // synchronous send is the right shape for them.
3035        //
3036        // The `encrypt_workers.is_some()` check below is true in
3037        // production (lifecycle::start spawns the pool); it stays
3038        // checked rather than `expect()`-ed because unit tests
3039        // construct `Node` without calling `start()`.
3040        let transport_for_send = self
3041            .transports
3042            .get(&transport_id)
3043            .ok_or(NodeError::TransportNotFound(transport_id))?;
3044        match transport_for_send.connection_state(&remote_addr) {
3045            ConnectionState::Connected => {}
3046            other => {
3047                if matches!(other, ConnectionState::None) {
3048                    let _ = transport_for_send.connect(&remote_addr).await;
3049                }
3050                return Err(NodeError::SendFailed {
3051                    node_addr: *node_addr,
3052                    reason: format!("transport connection not ready: {:?}", other),
3053                });
3054            }
3055        }
3056        let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
3057        if let Some(workers) = self.encrypt_workers.as_ref().cloned()
3058            && is_udp
3059            && let Some(cipher_clone) = session.send_cipher_clone()
3060        {
3061            {
3062                // Reserve the counter on the session so subsequent
3063                // sends don't reuse it. `current_send_counter` only
3064                // peeks; we advance via `take_send_counter`.
3065                let reserved_counter =
3066                    session
3067                        .take_send_counter()
3068                        .map_err(|e| NodeError::SendFailed {
3069                            node_addr: *node_addr,
3070                            reason: format!("counter reservation failed: {}", e),
3071                        })?;
3072                debug_assert_eq!(reserved_counter, counter);
3073                // Re-derive the header with the now-locked-in counter
3074                // value (same value, but the call sequence is more
3075                // explicit).
3076                let header =
3077                    build_established_header(their_index, reserved_counter, flags, payload_len);
3078                let transport = transport_for_send;
3079                // Snapshot the per-peer connected UDP socket before
3080                // resolving the fallback address. On the established
3081                // steady-state path this socket already carries the
3082                // kernel peer address, so re-parsing the configured
3083                // transport address and touching the DNS cache on every
3084                // packet is pure overhead on the sender hot path.
3085                let send_target = {
3086                    if let TransportHandle::Udp(udp) = transport {
3087                        let socket_addr = {
3088                            #[cfg(any(target_os = "linux", target_os = "macos"))]
3089                            {
3090                                match connected_socket.as_ref() {
3091                                    Some(socket) => Some(socket.peer_addr()),
3092                                    None => udp.resolve_for_off_task(&remote_addr).await.ok(),
3093                                }
3094                            }
3095                            #[cfg(not(any(target_os = "linux", target_os = "macos")))]
3096                            {
3097                                udp.resolve_for_off_task(&remote_addr).await.ok()
3098                            }
3099                        };
3100                        match (udp.async_socket(), socket_addr) {
3101                            (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3102                            _ => None,
3103                        }
3104                    } else {
3105                        None
3106                    }
3107                };
3108                if let Some((socket, socket_addr)) = send_target {
3109                    // Build the wire buffer **directly** from
3110                    // `plaintext` with a single allocation:
3111                    //   `[16 header][4 ts][plaintext...]` with
3112                    // +16 trailing capacity for the AEAD tag.
3113                    // The worker seals `wire_buf[16..]` in
3114                    // place and appends the tag — no second
3115                    // alloc, no second memcpy.
3116                    //
3117                    // Previous design built `inner_plaintext`
3118                    // via `prepend_inner_header` (1 alloc + 1
3119                    // copy) and then let the worker memcpy
3120                    // header + plaintext into a fresh Vec
3121                    // (another alloc + copy). At ~100 kpps the
3122                    // saved alloc/copy is ~150 MB/sec of memory
3123                    // bandwidth on the hot rx_loop + worker.
3124                    let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3125                    let mut wire_buf = Vec::with_capacity(wire_capacity);
3126                    wire_buf.extend_from_slice(&header);
3127                    wire_buf.extend_from_slice(&timestamp_ms.to_le_bytes());
3128                    wire_buf.extend_from_slice(plaintext);
3129                    let predicted_bytes = wire_capacity;
3130                    // Stats / MMP update inline — predicted size
3131                    // is exact for ChaCha20-Poly1305 (tag is
3132                    // constant 16 bytes). When `connected_socket` is
3133                    // `Some`, the worker sends on it without a
3134                    // destination sockaddr — the kernel skips the
3135                    // per-packet sockaddr + route + neighbor resolve.
3136                    if let Some(peer) = self.peers.get_mut(node_addr) {
3137                        peer.link_stats_mut().record_sent(predicted_bytes);
3138                        if let Some(mmp) = peer.mmp_mut() {
3139                            mmp.sender
3140                                .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3141                        }
3142                    }
3143                    let scheduling_weight = self.send_weight_for_peer(node_addr);
3144                    workers.dispatch(self::encrypt_worker::FmpSendJob {
3145                        cipher: cipher_clone,
3146                        counter: reserved_counter,
3147                        wire_buf,
3148                        fsp_seal: None,
3149                        socket,
3150                        dest_addr: socket_addr,
3151                        #[cfg(any(target_os = "linux", target_os = "macos"))]
3152                        connected_socket,
3153                        drop_on_backpressure: fmp_plaintext_is_bulk_session_datagram(plaintext),
3154                        scheduling_weight,
3155                        queued_at: crate::perf_profile::stamp(),
3156                    });
3157                    return Ok(());
3158                }
3159            }
3160        }
3161
3162        // Inline (legacy) path: encrypt + send on the rx_loop.
3163        // Build the inner plaintext lazily here — the worker path
3164        // above never reaches this point, so the prepend_inner_header
3165        // alloc is avoided in the fast path.
3166        let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3167        // Encrypt with AAD binding to the outer header
3168        let ciphertext = {
3169            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3170            session
3171                .encrypt_with_aad(&inner_plaintext, &header)
3172                .map_err(|e| NodeError::SendFailed {
3173                    node_addr: *node_addr,
3174                    reason: format!("encryption failed: {}", e),
3175                })?
3176        };
3177
3178        let wire_packet = build_encrypted(&header, &ciphertext);
3179
3180        // Re-borrow peer for stats update after sending
3181        let send_result = {
3182            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3183            let transport = self
3184                .transports
3185                .get(&transport_id)
3186                .ok_or(NodeError::TransportNotFound(transport_id))?;
3187            transport.send(&remote_addr, &wire_packet).await
3188        };
3189        self.note_local_send_outcome(&send_result);
3190        let bytes_sent = send_result.map_err(|e| match e {
3191            TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3192                node_addr: *node_addr,
3193                packet_size,
3194                mtu,
3195            },
3196            other => NodeError::SendFailed {
3197                node_addr: *node_addr,
3198                reason: format!("transport send: {}", other),
3199            },
3200        })?;
3201
3202        // Update send statistics
3203        if let Some(peer) = self.peers.get_mut(node_addr) {
3204            peer.link_stats_mut().record_sent(bytes_sent);
3205            // MMP: record sent frame for sender report generation
3206            if let Some(mmp) = peer.mmp_mut() {
3207                mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3208            }
3209        }
3210
3211        Ok(())
3212    }
3213}
3214
3215impl fmt::Debug for Node {
3216    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3217        f.debug_struct("Node")
3218            .field("node_addr", self.node_addr())
3219            .field("state", &self.state)
3220            .field("is_leaf_only", &self.is_leaf_only)
3221            .field("connections", &self.connection_count())
3222            .field("peers", &self.peer_count())
3223            .field("links", &self.link_count())
3224            .field("transports", &self.transport_count())
3225            .finish()
3226    }
3227}