Skip to main content

fips_core/node/
mod.rs

1//! FIPS Node Entity
2//!
3//! Top-level structure representing a running FIPS instance. The Node
4//! holds all state required for mesh routing: identity, tree state,
5//! Bloom filters, coordinate caches, transports, links, and peers.
6
7mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32    ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33    build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
38use crate::node::session::SessionEntry;
39use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
40use crate::peer::{ActivePeer, PeerConnection};
41#[cfg(any(target_os = "linux", target_os = "macos"))]
42use crate::transport::ethernet::EthernetTransport;
43use crate::transport::tcp::TcpTransport;
44use crate::transport::tor::TorTransport;
45use crate::transport::udp::UdpTransport;
46#[cfg(feature = "webrtc-transport")]
47use crate::transport::webrtc::WebRtcTransport;
48use crate::transport::{
49    ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
50    TransportHandle, TransportId,
51};
52use crate::tree::TreeState;
53use crate::upper::hosts::HostMap;
54use crate::upper::icmp_rate_limit::IcmpRateLimiter;
55use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
56use crate::utils::index::IndexAllocator;
57use crate::{
58    Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
59    PeerIdentity, encode_npub,
60};
61use rand::Rng;
62use std::collections::{HashMap, HashSet, VecDeque};
63use std::fmt;
64use std::sync::Arc;
65use std::thread::JoinHandle;
66use thiserror::Error;
67use tracing::{debug, warn};
68
69const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
70const SESSION_DIRECT_DEGRADED_HOLD_MS: u64 = 20_000;
71const SESSION_DIRECT_DEGRADED_MIN_SAMPLE: u64 = 16;
72const SESSION_DIRECT_DEGRADED_LOSS_THRESHOLD: f64 = 0.08;
73const SESSION_DIRECT_RECOVERY_LOSS_THRESHOLD: f64 = 0.02;
74const ROUTING_FALLBACK_MIN_COST_ADVANTAGE: f64 = 0.25;
75
76fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
77    if plaintext
78        .first()
79        .is_none_or(|ty| *ty != LinkMessageType::SessionDatagram.to_byte())
80    {
81        return false;
82    }
83    let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
84        return false;
85    };
86    FspCommonPrefix::parse(fsp_payload).is_some_and(|prefix| {
87        prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted() && !prefix.has_coords()
88    })
89}
90
91/// Half-range of the symmetric jitter applied to per-session rekey timers.
92///
93/// Each FMP/FSP session draws an offset uniformly from
94/// `[-REKEY_JITTER_SECS, +REKEY_JITTER_SECS]` seconds at construction and
95/// after each cutover. This preserves the configured mean interval while
96/// reducing dual-initiation bursts in symmetric-start meshes.
97pub(crate) const REKEY_JITTER_SECS: i64 = 15;
98
99/// Errors related to node operations.
100#[derive(Debug, Error)]
101pub enum NodeError {
102    #[error("node not started")]
103    NotStarted,
104
105    #[error("node already started")]
106    AlreadyStarted,
107
108    #[error("node already stopped")]
109    AlreadyStopped,
110
111    #[error("transport not found: {0}")]
112    TransportNotFound(TransportId),
113
114    #[error("no transport available for type: {0}")]
115    NoTransportForType(String),
116
117    #[error("link not found: {0}")]
118    LinkNotFound(LinkId),
119
120    #[error("connection not found: {0}")]
121    ConnectionNotFound(LinkId),
122
123    #[error("peer not found: {0:?}")]
124    PeerNotFound(NodeAddr),
125
126    #[error("peer already exists: {0:?}")]
127    PeerAlreadyExists(NodeAddr),
128
129    #[error("connection already exists for link: {0}")]
130    ConnectionAlreadyExists(LinkId),
131
132    #[error("invalid peer npub '{npub}': {reason}")]
133    InvalidPeerNpub { npub: String, reason: String },
134
135    #[error("discovery error: {0}")]
136    Discovery(String),
137
138    #[error("access denied: {0}")]
139    AccessDenied(String),
140
141    #[error("max connections exceeded: {max}")]
142    MaxConnectionsExceeded { max: usize },
143
144    #[error("max peers exceeded: {max}")]
145    MaxPeersExceeded { max: usize },
146
147    #[error("max links exceeded: {max}")]
148    MaxLinksExceeded { max: usize },
149
150    #[error("handshake incomplete for link {0}")]
151    HandshakeIncomplete(LinkId),
152
153    #[error("no session available for link {0}")]
154    NoSession(LinkId),
155
156    #[error("promotion failed for link {link_id}: {reason}")]
157    PromotionFailed { link_id: LinkId, reason: String },
158
159    #[error("send failed to {node_addr}: {reason}")]
160    SendFailed { node_addr: NodeAddr, reason: String },
161
162    #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
163    MtuExceeded {
164        node_addr: NodeAddr,
165        packet_size: usize,
166        mtu: u16,
167    },
168
169    #[error("config error: {0}")]
170    Config(#[from] ConfigError),
171
172    #[error("identity error: {0}")]
173    Identity(#[from] IdentityError),
174
175    #[error("TUN error: {0}")]
176    Tun(#[from] TunError),
177
178    #[error("index allocation failed: {0}")]
179    IndexAllocationFailed(String),
180
181    #[error("handshake failed: {0}")]
182    HandshakeFailed(String),
183
184    #[error("transport error: {0}")]
185    TransportError(String),
186
187    #[error("local route unavailable: {0}")]
188    LocalRouteUnavailable(String),
189
190    #[error("bootstrap handoff failed: {0}")]
191    BootstrapHandoff(String),
192}
193
194impl NodeError {
195    pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
196        if error.is_local_route_unavailable() {
197            Self::LocalRouteUnavailable(error.to_string())
198        } else {
199            Self::TransportError(error.to_string())
200        }
201    }
202
203    pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
204        matches!(self, Self::LocalRouteUnavailable(_))
205    }
206}
207
208/// Source-attributed packet delivered by a node running without a system TUN.
209#[derive(Debug, Clone, PartialEq, Eq)]
210pub struct NodeDeliveredPacket {
211    /// FIPS node address that originated the packet.
212    pub source_node_addr: NodeAddr,
213    /// Source Nostr public key when the node has learned it.
214    pub source_npub: Option<String>,
215    /// Destination FIPS address from the IPv6 packet.
216    pub destination: FipsAddress,
217    /// Full IPv6 packet after FIPS session decapsulation.
218    pub packet: Vec<u8>,
219}
220
221#[derive(Debug, Clone)]
222struct IdentityCacheEntry {
223    node_addr: NodeAddr,
224    pubkey: secp256k1::PublicKey,
225    npub: String,
226    last_seen_ms: u64,
227}
228
229impl IdentityCacheEntry {
230    fn new(
231        node_addr: NodeAddr,
232        pubkey: secp256k1::PublicKey,
233        npub: String,
234        last_seen_ms: u64,
235    ) -> Self {
236        Self {
237            node_addr,
238            pubkey,
239            npub,
240            last_seen_ms,
241        }
242    }
243}
244
245/// App-owned packet channels for embedding FIPS without a system TUN.
246#[derive(Debug)]
247pub struct ExternalPacketIo {
248    /// Send outbound IPv6 packets into the node.
249    pub outbound_tx: crate::upper::tun::TunOutboundTx,
250    /// Receive inbound IPv6 packets delivered by FIPS sessions.
251    pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
252}
253
254/// App-owned endpoint data channels for embedding FIPS without a daemon.
255#[derive(Debug)]
256pub(crate) struct EndpointDataIo {
257    /// Send endpoint data commands into the node RX loop.
258    ///
259    /// Bounded with a generous default so normal sender bursts do not
260    /// stall on semaphore acquisition. macOS pacing happens at the UDP
261    /// egress thread where the real Wi-Fi/interface bottleneck is visible;
262    /// constraining this app queue instead caused the inner TCP flow to
263    /// collapse under iperf. `FIPS_ENDPOINT_DATA_QUEUE_CAP` overrides the
264    /// default for benches.
265    pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
266    /// Receive endpoint data delivered by FIPS sessions.
267    ///
268    /// Unbounded so the rx_loop's send on inbound packet delivery is a
269    /// wait-free push (no semaphore acquire), and so we can drop the
270    /// per-packet cross-task relay that previously sat between the node
271    /// task and the `FipsEndpoint::recv()` consumer. Backpressure is
272    /// naturally bounded — the rx_loop both produces here and runs the
273    /// same runtime that schedules the consumer, so a stalled consumer
274    /// stalls production too.
275    pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
276    /// Clone of the event_tx exposed for in-process loopback (e.g.
277    /// `FipsEndpoint::send` to self_npub). Lets the endpoint inject an
278    /// event into the same queue without going through the encrypt /
279    /// decrypt path, while keeping every consumer reading from a single
280    /// channel.
281    pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
282}
283
284fn endpoint_data_command_capacity(requested: usize) -> usize {
285    if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
286        && let Ok(value) = raw.trim().parse::<usize>()
287        && value > 0
288    {
289        return value;
290    }
291
292    requested.max(1).max(32_768)
293}
294
295/// Commands accepted by the node endpoint data service.
296#[derive(Debug)]
297pub(crate) enum NodeEndpointCommand {
298    /// Send with an explicit response channel — used by callers that
299    /// care whether the local-stack handoff succeeded (e.g.
300    /// `blocking_send` waits for the runtime to accept the send).
301    Send {
302        remote: PeerIdentity,
303        payload: Vec<u8>,
304        queued_at: Option<std::time::Instant>,
305        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
306    },
307    /// **Fire-and-forget** variant of `Send` — no oneshot allocation,
308    /// no per-packet result channel. Used by the data-plane fast path
309    /// (`FipsEndpoint::send`) where the caller already discards the
310    /// result. Saves one oneshot::channel() allocation per outbound
311    /// packet on the application's send hot path.
312    SendOneway {
313        remote: PeerIdentity,
314        payload: Vec<u8>,
315        queued_at: Option<std::time::Instant>,
316    },
317    PeerSnapshot {
318        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
319    },
320    RelaySnapshot {
321        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
322    },
323    UpdateRelays {
324        advert_relays: Vec<String>,
325        dm_relays: Vec<String>,
326        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
327    },
328    /// Replace the runtime peer list. Newly added auto-connect peers get
329    /// `initiate_peer_connection` immediately; removed peers are dropped
330    /// from the retry queue (the regular liveness timeout reaps any active
331    /// session). Existing entries are kept and their `addresses` field is
332    /// refreshed so the next retry sees the latest hints.
333    UpdatePeers {
334        peers: Vec<crate::config::PeerConfig>,
335        response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
336    },
337}
338
339/// Reports what changed in response to `UpdatePeers`.
340#[derive(Debug, Clone, Default, PartialEq, Eq)]
341pub(crate) struct UpdatePeersOutcome {
342    pub(crate) added: usize,
343    pub(crate) removed: usize,
344    pub(crate) updated: usize,
345    pub(crate) unchanged: usize,
346}
347
348/// Endpoint data events emitted by the node session receive path.
349#[derive(Debug)]
350pub(crate) enum NodeEndpointEvent {
351    Data {
352        source_node_addr: NodeAddr,
353        source_npub: Option<String>,
354        payload: Vec<u8>,
355        queued_at: Option<std::time::Instant>,
356    },
357}
358
359/// Authenticated peer state exposed to embedded endpoint callers.
360#[derive(Debug, Clone, PartialEq, Eq)]
361pub(crate) struct NodeEndpointPeer {
362    pub(crate) npub: String,
363    pub(crate) connected: bool,
364    pub(crate) transport_addr: Option<String>,
365    pub(crate) transport_type: Option<String>,
366    pub(crate) link_id: u64,
367    pub(crate) srtt_ms: Option<u64>,
368    pub(crate) packets_sent: u64,
369    pub(crate) packets_recv: u64,
370    pub(crate) bytes_sent: u64,
371    pub(crate) bytes_recv: u64,
372    pub(crate) direct_probe_pending: bool,
373    pub(crate) direct_probe_after_ms: Option<u64>,
374}
375
376/// Live Nostr relay state exposed to embedded endpoint callers.
377#[derive(Debug, Clone, PartialEq, Eq)]
378pub(crate) struct NodeEndpointRelayStatus {
379    pub(crate) url: String,
380    pub(crate) status: String,
381}
382
383/// Node operational state.
384#[derive(Clone, Copy, Debug, PartialEq, Eq)]
385pub enum NodeState {
386    /// Created but not started.
387    Created,
388    /// Starting up (initializing transports).
389    Starting,
390    /// Fully operational.
391    Running,
392    /// Shutting down.
393    Stopping,
394    /// Stopped.
395    Stopped,
396}
397
398impl NodeState {
399    /// Check if node is operational.
400    pub fn is_operational(&self) -> bool {
401        matches!(self, NodeState::Running)
402    }
403
404    /// Check if node can be started.
405    pub fn can_start(&self) -> bool {
406        matches!(self, NodeState::Created | NodeState::Stopped)
407    }
408
409    /// Check if node can be stopped.
410    pub fn can_stop(&self) -> bool {
411        matches!(self, NodeState::Running)
412    }
413}
414
415impl fmt::Display for NodeState {
416    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
417        let s = match self {
418            NodeState::Created => "created",
419            NodeState::Starting => "starting",
420            NodeState::Running => "running",
421            NodeState::Stopping => "stopping",
422            NodeState::Stopped => "stopped",
423        };
424        write!(f, "{}", s)
425    }
426}
427
428/// Recent request tracking for dedup and reverse-path forwarding.
429///
430/// When a LookupRequest is forwarded through a node, the node stores the
431/// request_id and which peer sent it. When the corresponding LookupResponse
432/// arrives, it's forwarded back to that peer (reverse-path forwarding).
433/// The `response_forwarded` flag prevents response routing loops.
434#[derive(Clone, Debug)]
435pub(crate) struct RecentRequest {
436    /// The peer who sent this request to us.
437    pub(crate) from_peer: NodeAddr,
438    /// When we received this request (Unix milliseconds).
439    pub(crate) timestamp_ms: u64,
440    /// Whether we've already forwarded a response for this request.
441    /// Prevents response routing loops when convergent request paths
442    /// create bidirectional entries in recent_requests.
443    pub(crate) response_forwarded: bool,
444}
445
446impl RecentRequest {
447    pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
448        Self {
449            from_peer,
450            timestamp_ms,
451            response_forwarded: false,
452        }
453    }
454
455    /// Check if this entry has expired (older than expiry_ms).
456    pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
457        current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
458    }
459}
460
461/// Key for addr_to_link reverse lookup.
462type AddrKey = (TransportId, TransportAddr);
463
464/// Per-transport kernel drop tracking for congestion detection.
465///
466/// Sampled every tick (1s). The `dropping` flag indicates whether new
467/// kernel drops were observed since the previous sample.
468#[derive(Debug, Default)]
469struct TransportDropState {
470    /// Previous `recv_drops` sample (cumulative counter).
471    prev_drops: u64,
472    /// True if drops increased since the last sample.
473    dropping: bool,
474}
475
476/// State for a link waiting for transport-level connection establishment.
477///
478/// For connection-oriented transports (TCP, Tor), the transport connect runs
479/// asynchronously. This struct holds the data needed to complete the handshake
480/// once the connection is ready.
481struct PendingConnect {
482    /// The link that was created for this connection.
483    link_id: LinkId,
484    /// Which transport is being used.
485    transport_id: TransportId,
486    /// The remote address being connected to.
487    remote_addr: TransportAddr,
488    /// The peer identity (for handshake initiation).
489    peer_identity: PeerIdentity,
490}
491
492/// A running FIPS node instance.
493///
494/// This is the top-level container holding all node state.
495///
496/// ## Peer Lifecycle
497///
498/// Peers go through two phases:
499/// 1. **Connection phase** (`connections`): Handshake in progress, indexed by LinkId
500/// 2. **Active phase** (`peers`): Authenticated, indexed by NodeAddr
501///
502/// The `addr_to_link` map enables dispatching incoming packets to the right
503/// connection before authentication completes.
504// Discovery lookup constants moved to config: node.discovery.attempt_timeouts_secs, node.discovery.ttl
505pub struct Node {
506    // === Identity ===
507    /// This node's cryptographic identity.
508    identity: Identity,
509
510    /// Random epoch generated at startup for peer restart detection.
511    /// Exchanged inside Noise handshake messages so peers can detect restarts.
512    startup_epoch: [u8; 8],
513
514    /// Instant when the node was created, for uptime reporting.
515    started_at: std::time::Instant,
516
517    // === Configuration ===
518    /// Loaded configuration.
519    config: Config,
520
521    // === State ===
522    /// Node operational state.
523    state: NodeState,
524
525    /// Whether this is a leaf-only node.
526    is_leaf_only: bool,
527
528    // === Spanning Tree ===
529    /// Local spanning tree state.
530    tree_state: TreeState,
531
532    // === Bloom Filter ===
533    /// Local Bloom filter state.
534    bloom_state: BloomState,
535
536    // === Routing ===
537    /// Address -> coordinates cache (from session setup and discovery).
538    coord_cache: CoordCache,
539    /// Locally learned reverse-path next-hop hints.
540    learned_routes: LearnedRouteTable,
541    /// Destinations whose direct first-hop path is temporarily suspect because
542    /// session-layer MMP observed sustained loss while using that direct path.
543    session_direct_degraded_until_ms: HashMap<NodeAddr, u64>,
544    /// Recent discovery requests (dedup + reverse-path forwarding).
545    /// Maps request_id → RecentRequest.
546    recent_requests: HashMap<u64, RecentRequest>,
547    /// Per-destination path MTU lookup, keyed by FipsAddress (mirrors
548    /// `coord_cache.entries[*].path_mtu`). Sync read-only access from
549    /// the TUN reader/writer threads at TCP MSS clamp time so the
550    /// SYN/SYN-ACK clamp can use the smaller of the local-egress floor
551    /// and the learned per-destination path MTU.
552    path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
553
554    // === Transports & Links ===
555    /// Active transports (owned by Node).
556    transports: HashMap<TransportId, TransportHandle>,
557    /// Per-transport kernel drop tracking for congestion detection.
558    transport_drops: HashMap<TransportId, TransportDropState>,
559    /// Active links.
560    links: HashMap<LinkId, Link>,
561    /// Reverse lookup: (transport_id, remote_addr) -> link_id.
562    addr_to_link: HashMap<AddrKey, LinkId>,
563
564    // === Packet Channel ===
565    /// Packet sender for transports.
566    packet_tx: Option<PacketTx>,
567    /// Packet receiver (for event loop).
568    packet_rx: Option<PacketRx>,
569
570    // === Connections (Handshake Phase) ===
571    /// Pending connections (handshake in progress).
572    /// Indexed by LinkId since we don't know the peer's identity yet.
573    connections: HashMap<LinkId, PeerConnection>,
574
575    // === Peers (Active Phase) ===
576    /// Authenticated peers.
577    /// Indexed by NodeAddr (verified identity).
578    peers: HashMap<NodeAddr, ActivePeer>,
579
580    // === End-to-End Sessions ===
581    /// Session table for end-to-end encrypted sessions.
582    /// Keyed by remote NodeAddr.
583    sessions: HashMap<NodeAddr, SessionEntry>,
584
585    // === Identity Cache ===
586    /// Maps FipsAddress prefix bytes (bytes 1-15) to cached peer identity data.
587    /// Enables reverse lookup from IPv6 destination to session/routing identity.
588    identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
589
590    // === Pending TUN Packets ===
591    /// Packets queued while waiting for session establishment.
592    /// Keyed by destination NodeAddr, bounded per-dest and total.
593    pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
594    /// Endpoint data payloads queued while waiting for session establishment.
595    pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
596    // === Pending Discovery Lookups ===
597    /// Tracks in-flight discovery lookups. Maps target NodeAddr to the
598    /// initiation timestamp (Unix ms). Prevents duplicate flood queries.
599    pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
600
601    // === Resource Limits ===
602    /// Maximum connections (0 = unlimited).
603    max_connections: usize,
604    /// Maximum peers (0 = unlimited).
605    max_peers: usize,
606    /// Maximum links (0 = unlimited).
607    max_links: usize,
608
609    // === Counters ===
610    /// Next link ID to allocate.
611    next_link_id: u64,
612    /// Next transport ID to allocate.
613    next_transport_id: u32,
614
615    // === Node Statistics ===
616    /// Routing, forwarding, discovery, and error signal counters.
617    stats: stats::NodeStats,
618
619    /// Time-series history of node-level metrics (1s/1m rings).
620    stats_history: stats_history::StatsHistory,
621
622    // === TUN Interface ===
623    /// TUN device state.
624    tun_state: TunState,
625    /// TUN interface name (for cleanup).
626    tun_name: Option<String>,
627    /// TUN packet sender channel.
628    tun_tx: Option<TunTx>,
629    /// Receiver for outbound packets from the TUN reader.
630    tun_outbound_rx: Option<TunOutboundRx>,
631    /// App-owned packet sink used by embedded/no-TUN integrations.
632    external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
633    /// Endpoint data command receiver used by embedded/no-daemon integrations.
634    endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
635    /// Endpoint data event sink used by embedded/no-daemon integrations.
636    endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
637    /// Off-task FMP-encrypt + UDP-send worker pool. `None` if not yet
638    /// spawned (set up in `start()` once transports are running).
639    /// `Some(pool)` once available; the pool internally holds
640    /// per-worker mpsc senders and round-robins jobs across them.
641    /// See `node::encrypt_worker` for the rationale and layout.
642    encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
643    /// Off-task FMP + FSP decrypt + delivery worker pool. Mirror of
644    /// `encrypt_workers` for the receive side.
645    decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
646    /// Set of sessions that have been registered with the decrypt
647    /// shard worker pool. Used by rx_loop to decide between fast-path
648    /// dispatch (worker owns the session) and legacy in-place decrypt
649    /// (worker doesn't have it yet). Per the data-plane restructure,
650    /// the worker owns its session state directly — there's no shared
651    /// `Arc<RwLock<HashMap>>` of cipher / replay state anymore, only
652    /// this set tracks **whether** the worker has been told about a
653    /// given session.
654    decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
655    /// Fallback channel: decrypt worker bounces non-fast-path packets
656    /// (anything that's not bulk EndpointData) back here for rx_loop
657    /// to handle via the legacy path. Drained by a new rx_loop arm.
658    decrypt_fallback_rx:
659        Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
660    decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
661    /// TUN reader thread handle.
662    tun_reader_handle: Option<JoinHandle<()>>,
663    /// TUN writer thread handle.
664    tun_writer_handle: Option<JoinHandle<()>>,
665    /// Shutdown pipe: writing to this fd unblocks the TUN reader thread on macOS.
666    /// On Linux, deleting the interface via netlink serves the same purpose.
667    #[cfg(target_os = "macos")]
668    tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
669
670    // === DNS Responder ===
671    /// Receiver for resolved identities from the DNS responder.
672    dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
673    /// DNS responder task handle.
674    dns_task: Option<tokio::task::JoinHandle<()>>,
675
676    // === Index-Based Session Dispatch ===
677    /// Allocator for session indices.
678    index_allocator: IndexAllocator,
679    /// O(1) lookup: (transport_id, our_index) → NodeAddr.
680    /// This maps our session index to the peer that uses it.
681    peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
682    /// Pending outbound handshakes by our sender_idx.
683    /// Tracks which LinkId corresponds to which session index.
684    pending_outbound: HashMap<(TransportId, u32), LinkId>,
685
686    // === Rate Limiting ===
687    /// Rate limiter for msg1 processing (DoS protection).
688    msg1_rate_limiter: HandshakeRateLimiter,
689    /// Rate limiter for ICMP Packet Too Big messages.
690    icmp_rate_limiter: IcmpRateLimiter,
691    /// Rate limiter for routing error signals (CoordsRequired / PathBroken).
692    routing_error_rate_limiter: RoutingErrorRateLimiter,
693    /// Rate limiter for source-side CoordsRequired/PathBroken responses.
694    coords_response_rate_limiter: RoutingErrorRateLimiter,
695    /// Backoff for failed discovery lookups (originator-side).
696    discovery_backoff: DiscoveryBackoff,
697    /// Rate limiter for forwarded discovery requests (transit-side).
698    discovery_forward_limiter: DiscoveryForwardRateLimiter,
699
700    // === Pending Transport Connects ===
701    /// Links waiting for transport-level connection establishment before
702    /// sending handshake msg1. For connection-oriented transports (TCP, Tor),
703    /// the transport connect runs in the background; the tick handler polls
704    /// connection_state() and initiates the handshake when connected.
705    pending_connects: Vec<PendingConnect>,
706
707    // === Connection Retry ===
708    /// Retry state for peers whose outbound connections have failed.
709    /// Keyed by NodeAddr. Entries are created when a handshake times out
710    /// or fails, and removed on successful promotion or when max retries
711    /// are exhausted.
712    retry_pending: HashMap<NodeAddr, retry::RetryState>,
713
714    /// Optional Nostr/STUN overlay discovery coordinator for `udp:nat` peers.
715    nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
716    /// mDNS / DNS-SD responder + browser for local-link peer discovery.
717    /// Identity is unverified at this layer — the Noise XX handshake
718    /// initiated against an mDNS-observed endpoint is what proves the
719    /// peer holds the matching private key.
720    lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
721    /// Same-host JSON registry under `~/.fips/instances`. Records are
722    /// loopback routing hints only; peer identity is still verified by the
723    /// Noise handshake.
724    local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
725    local_instance_started_at_ms: Option<u64>,
726    last_local_instance_publish_ms: Option<u64>,
727    last_local_instance_scan_ms: Option<u64>,
728    /// Wall-clock ms when Nostr discovery successfully started, used to
729    /// schedule the one-shot startup advert sweep after a settle delay.
730    /// `None` until discovery comes up; remains `None` if discovery is
731    /// disabled or failed to start.
732    nostr_discovery_started_at_ms: Option<u64>,
733    /// Whether the one-shot startup advert sweep has run. Set to true
734    /// after the first sweep fires (under `policy: open`); thereafter
735    /// only the per-tick `queue_open_discovery_retries` continues.
736    startup_open_discovery_sweep_done: bool,
737    /// Per-peer UDP transports adopted from NAT traversal handoff.
738    bootstrap_transports: HashSet<TransportId>,
739    /// Originating peer npub (bech32) for each adopted bootstrap
740    /// transport, captured at `adopt_established_traversal` time.
741    /// Populated alongside `bootstrap_transports`; cleared in
742    /// `cleanup_bootstrap_transport_if_unused`. Used by the rx loop to
743    /// route fatal-protocol-mismatch observations back to the
744    /// Nostr-discovery `failure_state` for long cooldown application.
745    bootstrap_transport_npubs: HashMap<TransportId, String>,
746    /// Peers that should not be used as reply-learned fallback transit for
747    /// other destinations. Direct lookups to the peer are still permitted.
748    discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
749
750    // === Periodic Parent Re-evaluation ===
751    /// Timestamp of last periodic parent re-evaluation (for pacing).
752    last_parent_reeval: Option<crate::time::Instant>,
753
754    // === Congestion Logging ===
755    /// Timestamp of last congestion detection log (rate-limited to 5s).
756    last_congestion_log: Option<std::time::Instant>,
757
758    // === Mesh Size Estimate ===
759    /// Cached estimated mesh size (computed once per tick from bloom filters).
760    estimated_mesh_size: Option<u64>,
761    /// Timestamp of last mesh size log emission.
762    last_mesh_size_log: Option<std::time::Instant>,
763
764    // === Bloom Self-Plausibility ===
765    /// Rate-limit state for the self-plausibility WARN. Fires at most
766    /// once per 60s globally when our own outgoing FilterAnnounce has
767    /// an FPR above `node.bloom.max_inbound_fpr`, signalling either
768    /// aggregation drift or an ingress bypass.
769    last_self_warn: Option<std::time::Instant>,
770
771    // === Local Outbound Liveness ===
772    /// Set when a `transport.send` returned a local-side io error
773    /// (`NetworkUnreachable` / `HostUnreachable` / `AddrNotAvailable`),
774    /// cleared on the next successful send. Used by
775    /// `check_link_heartbeats` to compress the dead-timeout to
776    /// `fast_link_dead_timeout_secs` while our outbound is observed
777    /// broken — direct kernel evidence beats waiting on receive-silence.
778    last_local_send_failure_at: Option<std::time::Instant>,
779    /// Set when the rx loop could not complete its 1s maintenance work
780    /// inside the watchdog timeout. Link-dead detection may be valid during
781    /// overload, but traversal cooldown should not punish a path just because
782    /// our own scheduler/worker queue was late.
783    last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
784
785    // === Display Names ===
786    /// Human-readable names for configured peers (alias or short npub).
787    /// Populated at startup from peer config.
788    peer_aliases: HashMap<NodeAddr, String>,
789    /// Scheduler weight for explicitly configured peers. Built when config
790    /// changes so the packet hot path only does a NodeAddr hash lookup.
791    configured_peer_send_weights: HashMap<NodeAddr, u8>,
792
793    /// Reloadable peer ACL state from standard allow/deny files.
794    peer_acl: acl::PeerAclReloader,
795
796    // === Host Map ===
797    /// Static hostname → npub mapping for DNS resolution.
798    /// Built at construction from peer aliases and /etc/fips/hosts.
799    host_map: Arc<HostMap>,
800}
801
802impl Node {
803    /// Create a new node from configuration.
804    pub fn new(config: Config) -> Result<Self, NodeError> {
805        config.validate()?;
806        let identity = config.create_identity()?;
807        let node_addr = *identity.node_addr();
808        let is_leaf_only = config.is_leaf_only();
809
810        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
811        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
812
813        let mut startup_epoch = [0u8; 8];
814        rand::rng().fill_bytes(&mut startup_epoch);
815
816        let mut bloom_state = if is_leaf_only {
817            BloomState::leaf_only(node_addr)
818        } else {
819            BloomState::new(node_addr)
820        };
821        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
822
823        let tun_state = if config.tun.enabled {
824            TunState::Configured
825        } else {
826            TunState::Disabled
827        };
828
829        // Initialize tree state with signed self-declaration
830        let mut tree_state = TreeState::new(node_addr);
831        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
832        tree_state.set_hold_down(config.node.tree.hold_down_secs);
833        tree_state.set_flap_dampening(
834            config.node.tree.flap_threshold,
835            config.node.tree.flap_window_secs,
836            config.node.tree.flap_dampening_secs,
837        );
838        tree_state
839            .sign_declaration(&identity)
840            .expect("signing own declaration should never fail");
841
842        let coord_cache = CoordCache::new(
843            config.node.cache.coord_size,
844            config.node.cache.coord_ttl_secs * 1000,
845        );
846        let rl = &config.node.rate_limit;
847        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
848            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
849            config.node.limits.max_pending_inbound,
850        );
851
852        let max_connections = config.node.limits.max_connections;
853        let max_peers = config.node.limits.max_peers;
854        let max_links = config.node.limits.max_links;
855        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
856        let backoff_base_secs = config.node.discovery.backoff_base_secs;
857        let backoff_max_secs = config.node.discovery.backoff_max_secs;
858        let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
859
860        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
861        let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
862
863        Ok(Self {
864            identity,
865            startup_epoch,
866            started_at: std::time::Instant::now(),
867            config,
868            state: NodeState::Created,
869            is_leaf_only,
870            tree_state,
871            bloom_state,
872            coord_cache,
873            learned_routes: LearnedRouteTable::default(),
874            session_direct_degraded_until_ms: HashMap::new(),
875            recent_requests: HashMap::new(),
876            transports: HashMap::new(),
877            transport_drops: HashMap::new(),
878            links: HashMap::new(),
879            addr_to_link: HashMap::new(),
880            packet_tx: None,
881            packet_rx: None,
882            connections: HashMap::new(),
883            peers: HashMap::new(),
884            sessions: HashMap::new(),
885            identity_cache: HashMap::new(),
886            pending_tun_packets: HashMap::new(),
887            pending_endpoint_data: HashMap::new(),
888            pending_lookups: HashMap::new(),
889            max_connections,
890            max_peers,
891            max_links,
892            next_link_id: 1,
893            next_transport_id: 1,
894            stats: stats::NodeStats::new(),
895            stats_history: stats_history::StatsHistory::new(),
896            tun_state,
897            tun_name: None,
898            tun_tx: None,
899            tun_outbound_rx: None,
900            external_packet_tx: None,
901            endpoint_command_rx: None,
902            endpoint_event_tx: None,
903            encrypt_workers: None,
904            decrypt_workers: None,
905            decrypt_registered_sessions: std::collections::HashSet::new(),
906            decrypt_fallback_tx,
907            decrypt_fallback_rx,
908            tun_reader_handle: None,
909            tun_writer_handle: None,
910            #[cfg(target_os = "macos")]
911            tun_shutdown_fd: None,
912            dns_identity_rx: None,
913            dns_task: None,
914            index_allocator: IndexAllocator::new(),
915            peers_by_index: HashMap::new(),
916            pending_outbound: HashMap::new(),
917            msg1_rate_limiter,
918            icmp_rate_limiter: IcmpRateLimiter::new(),
919            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
920            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
921                std::time::Duration::from_millis(coords_response_interval_ms),
922            ),
923            discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
924            discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
925                std::time::Duration::from_secs(forward_min_interval_secs),
926            ),
927            pending_connects: Vec::new(),
928            retry_pending: HashMap::new(),
929            nostr_discovery: None,
930            nostr_discovery_started_at_ms: None,
931            lan_discovery: None,
932            local_instance_registry: None,
933            local_instance_started_at_ms: None,
934            last_local_instance_publish_ms: None,
935            last_local_instance_scan_ms: None,
936            startup_open_discovery_sweep_done: false,
937            bootstrap_transports: HashSet::new(),
938            bootstrap_transport_npubs: HashMap::new(),
939            discovery_fallback_transit_blocked_peers: HashSet::new(),
940            last_parent_reeval: None,
941            last_congestion_log: None,
942            estimated_mesh_size: None,
943            last_mesh_size_log: None,
944            last_self_warn: None,
945            last_local_send_failure_at: None,
946            last_rx_loop_maintenance_timeout_at: None,
947            peer_aliases: HashMap::new(),
948            configured_peer_send_weights,
949            peer_acl,
950            host_map,
951            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
952        })
953    }
954
955    /// Create a node with a specific identity.
956    ///
957    /// This constructor validates cross-field config invariants before
958    /// constructing the node, same as [`Node::new`].
959    pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
960        config.validate()?;
961        let node_addr = *identity.node_addr();
962
963        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
964        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
965
966        let mut startup_epoch = [0u8; 8];
967        rand::rng().fill_bytes(&mut startup_epoch);
968
969        let tun_state = if config.tun.enabled {
970            TunState::Configured
971        } else {
972            TunState::Disabled
973        };
974
975        // Initialize tree state with signed self-declaration
976        let mut tree_state = TreeState::new(node_addr);
977        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
978        tree_state.set_hold_down(config.node.tree.hold_down_secs);
979        tree_state.set_flap_dampening(
980            config.node.tree.flap_threshold,
981            config.node.tree.flap_window_secs,
982            config.node.tree.flap_dampening_secs,
983        );
984        tree_state
985            .sign_declaration(&identity)
986            .expect("signing own declaration should never fail");
987
988        let mut bloom_state = BloomState::new(node_addr);
989        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
990
991        let coord_cache = CoordCache::new(
992            config.node.cache.coord_size,
993            config.node.cache.coord_ttl_secs * 1000,
994        );
995        let rl = &config.node.rate_limit;
996        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
997            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
998            config.node.limits.max_pending_inbound,
999        );
1000
1001        let max_connections = config.node.limits.max_connections;
1002        let max_peers = config.node.limits.max_peers;
1003        let max_links = config.node.limits.max_links;
1004        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1005
1006        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1007        let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1008
1009        Ok(Self {
1010            identity,
1011            startup_epoch,
1012            started_at: std::time::Instant::now(),
1013            config,
1014            state: NodeState::Created,
1015            is_leaf_only: false,
1016            tree_state,
1017            bloom_state,
1018            coord_cache,
1019            learned_routes: LearnedRouteTable::default(),
1020            session_direct_degraded_until_ms: HashMap::new(),
1021            recent_requests: HashMap::new(),
1022            transports: HashMap::new(),
1023            transport_drops: HashMap::new(),
1024            links: HashMap::new(),
1025            addr_to_link: HashMap::new(),
1026            packet_tx: None,
1027            packet_rx: None,
1028            connections: HashMap::new(),
1029            peers: HashMap::new(),
1030            sessions: HashMap::new(),
1031            identity_cache: HashMap::new(),
1032            pending_tun_packets: HashMap::new(),
1033            pending_endpoint_data: HashMap::new(),
1034            pending_lookups: HashMap::new(),
1035            max_connections,
1036            max_peers,
1037            max_links,
1038            next_link_id: 1,
1039            next_transport_id: 1,
1040            stats: stats::NodeStats::new(),
1041            stats_history: stats_history::StatsHistory::new(),
1042            tun_state,
1043            tun_name: None,
1044            tun_tx: None,
1045            tun_outbound_rx: None,
1046            external_packet_tx: None,
1047            endpoint_command_rx: None,
1048            endpoint_event_tx: None,
1049            encrypt_workers: None,
1050            decrypt_workers: None,
1051            decrypt_registered_sessions: std::collections::HashSet::new(),
1052            decrypt_fallback_tx,
1053            decrypt_fallback_rx,
1054            tun_reader_handle: None,
1055            tun_writer_handle: None,
1056            #[cfg(target_os = "macos")]
1057            tun_shutdown_fd: None,
1058            dns_identity_rx: None,
1059            dns_task: None,
1060            index_allocator: IndexAllocator::new(),
1061            peers_by_index: HashMap::new(),
1062            pending_outbound: HashMap::new(),
1063            msg1_rate_limiter,
1064            icmp_rate_limiter: IcmpRateLimiter::new(),
1065            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1066            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1067                std::time::Duration::from_millis(coords_response_interval_ms),
1068            ),
1069            discovery_backoff: DiscoveryBackoff::new(),
1070            discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1071            pending_connects: Vec::new(),
1072            retry_pending: HashMap::new(),
1073            nostr_discovery: None,
1074            nostr_discovery_started_at_ms: None,
1075            lan_discovery: None,
1076            local_instance_registry: None,
1077            local_instance_started_at_ms: None,
1078            last_local_instance_publish_ms: None,
1079            last_local_instance_scan_ms: None,
1080            startup_open_discovery_sweep_done: false,
1081            bootstrap_transports: HashSet::new(),
1082            bootstrap_transport_npubs: HashMap::new(),
1083            discovery_fallback_transit_blocked_peers: HashSet::new(),
1084            last_parent_reeval: None,
1085            last_congestion_log: None,
1086            estimated_mesh_size: None,
1087            last_mesh_size_log: None,
1088            last_self_warn: None,
1089            last_local_send_failure_at: None,
1090            last_rx_loop_maintenance_timeout_at: None,
1091            peer_aliases: HashMap::new(),
1092            configured_peer_send_weights,
1093            peer_acl,
1094            host_map,
1095            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1096        })
1097    }
1098
1099    /// Create a leaf-only node (simplified state).
1100    pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1101        let mut node = Self::new(config)?;
1102        node.is_leaf_only = true;
1103        node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1104        Ok(node)
1105    }
1106
1107    fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1108        let base_host_map = HostMap::from_peer_configs(config.peers());
1109        if !config.node.system_files_enabled {
1110            return (
1111                Arc::new(base_host_map.clone()),
1112                acl::PeerAclReloader::memory_only(base_host_map),
1113            );
1114        }
1115
1116        let mut host_map = base_host_map.clone();
1117        let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1118        let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1119            crate::upper::hosts::DEFAULT_HOSTS_PATH,
1120        ));
1121        host_map.merge(hosts_file);
1122        let peer_acl = acl::PeerAclReloader::with_alias_sources(
1123            std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1124            std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1125            base_host_map,
1126            hosts_path,
1127        );
1128        (Arc::new(host_map), peer_acl)
1129    }
1130
1131    fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1132        config
1133            .peers()
1134            .iter()
1135            .filter_map(|peer| {
1136                PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1137                    (
1138                        *identity.node_addr(),
1139                        encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1140                    )
1141                })
1142            })
1143            .collect()
1144    }
1145
1146    fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1147        self.configured_peer_send_weights
1148            .get(peer_addr)
1149            .copied()
1150            .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1151    }
1152
1153    /// Create transport instances from configuration.
1154    ///
1155    /// Returns a vector of TransportHandles for all configured transports.
1156    async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1157        let mut transports = Vec::new();
1158
1159        // Collect UDP configs with optional names to avoid borrow conflicts
1160        let udp_instances: Vec<_> = self
1161            .config
1162            .transports
1163            .udp
1164            .iter()
1165            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1166            .collect();
1167
1168        // Create UDP transport instances
1169        for (name, udp_config) in udp_instances {
1170            let transport_id = self.allocate_transport_id();
1171            let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1172            transports.push(TransportHandle::Udp(udp));
1173        }
1174
1175        #[cfg(feature = "sim-transport")]
1176        {
1177            let sim_instances: Vec<_> = self
1178                .config
1179                .transports
1180                .sim
1181                .iter()
1182                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1183                .collect();
1184
1185            for (name, sim_config) in sim_instances {
1186                let transport_id = self.allocate_transport_id();
1187                let sim = crate::transport::sim::SimTransport::new(
1188                    transport_id,
1189                    name,
1190                    sim_config,
1191                    packet_tx.clone(),
1192                );
1193                transports.push(TransportHandle::Sim(sim));
1194            }
1195        }
1196
1197        // Create Ethernet transport instances where raw-socket support exists.
1198        #[cfg(any(target_os = "linux", target_os = "macos"))]
1199        {
1200            let eth_instances: Vec<_> = self
1201                .config
1202                .transports
1203                .ethernet
1204                .iter()
1205                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1206                .collect();
1207            let xonly = self.identity.pubkey();
1208            for (name, eth_config) in eth_instances {
1209                let mut eth_config = eth_config;
1210                if eth_config.discovery_scope.is_none() {
1211                    eth_config.discovery_scope = self.lan_discovery_scope();
1212                }
1213                let transport_id = self.allocate_transport_id();
1214                let mut eth =
1215                    EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1216                eth.set_local_pubkey(xonly);
1217                transports.push(TransportHandle::Ethernet(eth));
1218            }
1219        }
1220
1221        // Create TCP transport instances
1222        let tcp_instances: Vec<_> = self
1223            .config
1224            .transports
1225            .tcp
1226            .iter()
1227            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1228            .collect();
1229
1230        for (name, tcp_config) in tcp_instances {
1231            let transport_id = self.allocate_transport_id();
1232            let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1233            transports.push(TransportHandle::Tcp(tcp));
1234        }
1235
1236        // Create Tor transport instances
1237        let tor_instances: Vec<_> = self
1238            .config
1239            .transports
1240            .tor
1241            .iter()
1242            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1243            .collect();
1244
1245        for (name, tor_config) in tor_instances {
1246            let transport_id = self.allocate_transport_id();
1247            let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1248            transports.push(TransportHandle::Tor(tor));
1249        }
1250
1251        let webrtc_instances: Vec<_> = self
1252            .config
1253            .transports
1254            .webrtc
1255            .iter()
1256            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1257            .collect();
1258
1259        #[cfg(feature = "webrtc-transport")]
1260        {
1261            for (name, webrtc_config) in webrtc_instances {
1262                let transport_id = self.allocate_transport_id();
1263                match WebRtcTransport::new(
1264                    transport_id,
1265                    name,
1266                    webrtc_config,
1267                    packet_tx.clone(),
1268                    &self.identity,
1269                    &self.config.node.discovery.nostr,
1270                ) {
1271                    Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1272                    Err(err) => {
1273                        warn!(
1274                            transport_id = %transport_id,
1275                            error = %err,
1276                            "failed to initialize WebRTC transport"
1277                        );
1278                    }
1279                }
1280            }
1281        }
1282        #[cfg(not(feature = "webrtc-transport"))]
1283        if !webrtc_instances.is_empty() {
1284            warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1285        }
1286
1287        // Create BLE transport instances
1288        #[cfg(bluer_available)]
1289        {
1290            let ble_instances: Vec<_> = self
1291                .config
1292                .transports
1293                .ble
1294                .iter()
1295                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1296                .collect();
1297
1298            #[cfg(all(bluer_available, not(test)))]
1299            for (name, ble_config) in ble_instances {
1300                let transport_id = self.allocate_transport_id();
1301                let adapter = ble_config.adapter().to_string();
1302                let mtu = ble_config.mtu();
1303                match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1304                    Ok(io) => {
1305                        let mut ble = crate::transport::ble::BleTransport::new(
1306                            transport_id,
1307                            name,
1308                            ble_config,
1309                            io,
1310                            packet_tx.clone(),
1311                        );
1312                        ble.set_local_pubkey(self.identity.pubkey().serialize());
1313                        transports.push(TransportHandle::Ble(ble));
1314                    }
1315                    Err(e) => {
1316                        tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1317                    }
1318                }
1319            }
1320
1321            #[cfg(any(not(bluer_available), test))]
1322            if !ble_instances.is_empty() {
1323                #[cfg(not(test))]
1324                tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1325            }
1326        }
1327
1328        transports
1329    }
1330
1331    /// Find an operational transport that matches the given transport type name.
1332    ///
1333    /// Adopted UDP bootstrap transports are point-to-point sockets handed off
1334    /// from Nostr/STUN traversal. They must not be reused for ordinary
1335    /// `udp host:port` dials discovered through static config, mDNS, or overlay
1336    /// adverts: on macOS a `send_to` through the wrong adopted socket can fail
1337    /// with `EINVAL`, and even on platforms that allow it the packet would use
1338    /// the wrong 5-tuple/NAT mapping. Prefer configured transports and make the
1339    /// choice deterministic by lowest transport id instead of HashMap order.
1340    fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1341        self.transports
1342            .iter()
1343            .filter(|(id, handle)| {
1344                handle.transport_type().name == transport_type
1345                    && handle.is_operational()
1346                    && !self.bootstrap_transports.contains(id)
1347            })
1348            .min_by_key(|(id, _)| id.as_u32())
1349            .map(|(id, _)| *id)
1350    }
1351
1352    /// Resolve an Ethernet peer address ("interface/mac") to a transport ID
1353    /// and binary TransportAddr.
1354    ///
1355    /// Finds the Ethernet transport instance bound to the named interface
1356    /// and parses the MAC portion into a 6-byte TransportAddr.
1357    #[allow(unused_variables)]
1358    fn resolve_ethernet_addr(
1359        &self,
1360        addr_str: &str,
1361    ) -> Result<(TransportId, TransportAddr), NodeError> {
1362        #[cfg(any(target_os = "linux", target_os = "macos"))]
1363        {
1364            let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1365                NodeError::NoTransportForType(format!(
1366                    "invalid Ethernet address format '{}': expected 'interface/mac'",
1367                    addr_str
1368                ))
1369            })?;
1370
1371            // Find the Ethernet transport bound to this interface
1372            let transport_id = self
1373                .transports
1374                .iter()
1375                .find(|(_, handle)| {
1376                    handle.transport_type().name == "ethernet"
1377                        && handle.is_operational()
1378                        && handle.interface_name() == Some(iface)
1379                })
1380                .map(|(id, _)| *id)
1381                .ok_or_else(|| {
1382                    NodeError::NoTransportForType(format!(
1383                        "no operational Ethernet transport for interface '{}'",
1384                        iface
1385                    ))
1386                })?;
1387
1388            let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1389                NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1390            })?;
1391
1392            Ok((transport_id, TransportAddr::from_bytes(&mac)))
1393        }
1394        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1395        {
1396            Err(NodeError::NoTransportForType(
1397                "Ethernet transport is not supported on this platform".to_string(),
1398            ))
1399        }
1400    }
1401
1402    /// Resolve a BLE address string (`"adapter/AA:BB:CC:DD:EE:FF"`) to a
1403    /// (TransportId, TransportAddr) pair by finding the BLE transport
1404    /// instance matching the adapter name.
1405    #[cfg(bluer_available)]
1406    fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1407        let ta = TransportAddr::from_string(addr_str);
1408        let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1409            NodeError::NoTransportForType(format!(
1410                "invalid BLE address format '{}': expected 'adapter/mac'",
1411                addr_str
1412            ))
1413        })?;
1414
1415        // Find the BLE transport for this adapter
1416        let transport_id = self
1417            .transports
1418            .iter()
1419            .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1420            .map(|(id, _)| *id)
1421            .ok_or_else(|| {
1422                NodeError::NoTransportForType(format!(
1423                    "no operational BLE transport for adapter '{}'",
1424                    adapter
1425                ))
1426            })?;
1427
1428        // Validate the address format
1429        crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1430            NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1431        })?;
1432
1433        Ok((transport_id, TransportAddr::from_string(addr_str)))
1434    }
1435
1436    // === Identity Accessors ===
1437
1438    /// Get this node's identity.
1439    pub fn identity(&self) -> &Identity {
1440        &self.identity
1441    }
1442
1443    /// Get this node's NodeAddr.
1444    pub fn node_addr(&self) -> &NodeAddr {
1445        self.identity.node_addr()
1446    }
1447
1448    /// Get this node's npub.
1449    pub fn npub(&self) -> String {
1450        self.identity.npub()
1451    }
1452
1453    /// Return a human-readable display name for a NodeAddr.
1454    ///
1455    /// Lookup order:
1456    /// 1. Host map hostname (from peer aliases + /etc/fips/hosts)
1457    /// 2. Configured peer alias or short npub (from startup map)
1458    /// 3. Active peer's short npub (e.g., inbound peer not in config)
1459    /// 4. Session endpoint's short npub (end-to-end, may not be direct peer)
1460    /// 5. Truncated NodeAddr hex (unknown address)
1461    pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1462        if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1463            return hostname.to_string();
1464        }
1465        if let Some(name) = self.peer_aliases.get(addr) {
1466            return name.clone();
1467        }
1468        if let Some(peer) = self.peers.get(addr) {
1469            return peer.identity().short_npub();
1470        }
1471        if let Some(entry) = self.sessions.get(addr) {
1472            let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1473            return PeerIdentity::from_pubkey(xonly).short_npub();
1474        }
1475        addr.short_hex()
1476    }
1477
1478    /// Tear down a `peers_by_index` entry **and** keep the shard-owned
1479    /// decrypt-worker state coherent: removes the same `cache_key`
1480    /// from the registered-sessions tracking set and tells the
1481    /// assigned shard worker to drop its `OwnedSessionState` entry.
1482    ///
1483    /// Use this instead of a bare `self.peers_by_index.remove(&key)`
1484    /// at every session-lifecycle teardown site (rekey cross-connection
1485    /// swap, peer disconnect, dispatch session-rotation) so the worker
1486    /// doesn't keep stale ciphers / replay windows around. The
1487    /// follow-up `RegisterSession` for the NEW key (if any) will then
1488    /// install the fresh state on the same shard.
1489    pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1490        // Find the peer that owns this index BEFORE removing it from
1491        // the index map, so we can decide whether the deregistration
1492        // also tears down the peer's connected UDP socket.
1493        let owning_peer = self.peers_by_index.get(&cache_key).copied();
1494        self.peers_by_index.remove(&cache_key);
1495        if self.decrypt_registered_sessions.remove(&cache_key)
1496            && let Some(workers) = self.decrypt_workers.as_ref()
1497        {
1498            workers.unregister_session(cache_key);
1499        }
1500        // Tear down the per-peer connected UDP socket *only* if no
1501        // other peers_by_index entry still resolves to this peer.
1502        // Rekey drain calls into this helper with the OLD session
1503        // index while the NEW index is already installed and points
1504        // at the same peer — there the connect()-ed 5-tuple is
1505        // still valid for the new session and we must not close it.
1506        // Peer-teardown sites (CrossConnection swap, stale-index
1507        // fall-through in encrypted.rs, disconnect handler) call
1508        // here when this is the peer's last index, so the connected
1509        // socket goes away with the peer.
1510        if let Some(peer_addr) = owning_peer {
1511            let peer_has_other_index = self
1512                .peers_by_index
1513                .values()
1514                .any(|other| *other == peer_addr);
1515            if !peer_has_other_index {
1516                self.clear_connected_udp_for_peer(&peer_addr);
1517            }
1518        }
1519    }
1520
1521    /// Ensure the current FMP receive index resolves to this peer.
1522    ///
1523    /// Rekey msg1/msg2 handlers pre-register the pending index before
1524    /// cutover, but losing that registration in a debug build used to
1525    /// panic in the cutover path. Repairing the map here is safe: the
1526    /// peer has already promoted the pending session, and the decrypt
1527    /// worker registration immediately after cutover depends on the
1528    /// same `(transport_id, our_index)` key.
1529    pub(in crate::node) fn ensure_current_session_index_registered(
1530        &mut self,
1531        node_addr: &NodeAddr,
1532        context: &'static str,
1533    ) -> bool {
1534        let Some(peer) = self.peers.get(node_addr) else {
1535            return false;
1536        };
1537        let Some(transport_id) = peer.transport_id() else {
1538            warn!(
1539                peer = %self.peer_display_name(node_addr),
1540                context,
1541                "Cannot register current session index without transport id"
1542            );
1543            return false;
1544        };
1545        let Some(our_index) = peer.our_index() else {
1546            warn!(
1547                peer = %self.peer_display_name(node_addr),
1548                context,
1549                "Cannot register current session index without local index"
1550            );
1551            return false;
1552        };
1553
1554        let cache_key = (transport_id, our_index.as_u32());
1555        match self.peers_by_index.get(&cache_key).copied() {
1556            Some(existing) if existing == *node_addr => true,
1557            Some(existing) => {
1558                warn!(
1559                    peer = %self.peer_display_name(node_addr),
1560                    previous_owner = %self.peer_display_name(&existing),
1561                    transport_id = %transport_id,
1562                    our_index = %our_index,
1563                    context,
1564                    "Repairing current session index with stale owner"
1565                );
1566                self.peers_by_index.insert(cache_key, *node_addr);
1567                true
1568            }
1569            None => {
1570                warn!(
1571                    peer = %self.peer_display_name(node_addr),
1572                    transport_id = %transport_id,
1573                    our_index = %our_index,
1574                    context,
1575                    "Repairing missing current session index"
1576                );
1577                self.peers_by_index.insert(cache_key, *node_addr);
1578                true
1579            }
1580        }
1581    }
1582
1583    // === Configuration ===
1584
1585    /// Get the configuration.
1586    pub fn config(&self) -> &Config {
1587        &self.config
1588    }
1589
1590    /// Calculate the effective IPv6 MTU that can be sent over FIPS.
1591    ///
1592    /// Delegates to `upper::icmp::effective_ipv6_mtu()` with this node's
1593    /// transport MTU. Returns the maximum IPv6 packet size (including
1594    /// IPv6 header) that can be transmitted through the FIPS mesh.
1595    pub fn effective_ipv6_mtu(&self) -> u16 {
1596        crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1597    }
1598
1599    /// Get the transport MTU governing the global TUN-boundary MSS clamp.
1600    ///
1601    /// Returns the **minimum** MTU across all operational transports, or
1602    /// 1280 (IPv6 minimum) as fallback. Used for initial TUN configuration
1603    /// where a specific egress transport isn't yet known: the resulting
1604    /// `effective_ipv6_mtu` (transport_mtu - 77) and `max_mss`
1605    /// (effective_mtu - 60) form a conservative ceiling that fits ANY
1606    /// configured-transport's egress, eliminating PMTU-D black holes that
1607    /// would otherwise occur when a flow's actual egress is smaller than
1608    /// the clamp ceiling assumed at TUN init.
1609    ///
1610    /// Returning the smallest (rather than the first-iterated, which used
1611    /// to vary across HashMap iteration order + async-startup race) makes
1612    /// the clamp deterministic across daemon restarts.
1613    ///
1614    /// See `ISSUE-2026-0011` for the empirical investigation.
1615    pub fn transport_mtu(&self) -> u16 {
1616        let min_operational = self
1617            .transports
1618            .values()
1619            .filter(|h| h.is_operational())
1620            .map(|h| h.mtu())
1621            .min();
1622        if let Some(mtu) = min_operational {
1623            return mtu;
1624        }
1625        // Fallback to config: try UDP first, then Ethernet
1626        if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1627            return cfg.mtu();
1628        }
1629        1280
1630    }
1631
1632    // === State ===
1633
1634    /// Get the node state.
1635    pub fn state(&self) -> NodeState {
1636        self.state
1637    }
1638
1639    /// Get the node uptime.
1640    pub fn uptime(&self) -> std::time::Duration {
1641        self.started_at.elapsed()
1642    }
1643
1644    /// Check if node is operational.
1645    pub fn is_running(&self) -> bool {
1646        self.state.is_operational()
1647    }
1648
1649    /// Check if this is a leaf-only node.
1650    pub fn is_leaf_only(&self) -> bool {
1651        self.is_leaf_only
1652    }
1653
1654    // === Tree State ===
1655
1656    /// Get the tree state.
1657    pub fn tree_state(&self) -> &TreeState {
1658        &self.tree_state
1659    }
1660
1661    /// Get mutable tree state.
1662    pub fn tree_state_mut(&mut self) -> &mut TreeState {
1663        &mut self.tree_state
1664    }
1665
1666    // === Bloom State ===
1667
1668    /// Get the Bloom filter state.
1669    pub fn bloom_state(&self) -> &BloomState {
1670        &self.bloom_state
1671    }
1672
1673    /// Get mutable Bloom filter state.
1674    pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1675        &mut self.bloom_state
1676    }
1677
1678    // === Mesh Size Estimate ===
1679
1680    /// Get the cached estimated mesh size.
1681    pub fn estimated_mesh_size(&self) -> Option<u64> {
1682        self.estimated_mesh_size
1683    }
1684
1685    /// Compute and cache the estimated mesh size from bloom filters.
1686    ///
1687    /// Uses the spanning tree partition: parent's filter covers nodes reachable
1688    /// upward, children's filters cover disjoint subtrees downward. The sum
1689    /// of estimated entry counts plus one (self) approximates total network size.
1690    pub(crate) fn compute_mesh_size(&mut self) {
1691        let my_addr = *self.tree_state.my_node_addr();
1692        let parent_id = *self.tree_state.my_declaration().parent_id();
1693        let is_root = self.tree_state.is_root();
1694
1695        let max_fpr = self.config.node.bloom.max_inbound_fpr;
1696        let mut total: f64 = 1.0; // count self
1697        let mut child_count: u32 = 0;
1698        let mut has_data = false;
1699
1700        // Parent's filter: nodes reachable upward through the tree.
1701        // If any contributing filter is above the FPR cap, we refuse to
1702        // estimate rather than substitute a partial/biased aggregate —
1703        // Node.estimated_mesh_size is already Option<u64> and consumers
1704        // (control socket, fipstop, periodic debug log) handle None.
1705        if !is_root
1706            && let Some(parent) = self.peers.get(&parent_id)
1707            && let Some(filter) = parent.inbound_filter()
1708        {
1709            match filter.estimated_count(max_fpr) {
1710                Some(n) => {
1711                    total += n;
1712                    has_data = true;
1713                }
1714                None => {
1715                    self.estimated_mesh_size = None;
1716                    return;
1717                }
1718            }
1719        }
1720
1721        // Children's filters: each child's subtree is disjoint
1722        for (peer_addr, peer) in &self.peers {
1723            if peer_addr == &parent_id {
1724                continue;
1725            }
1726            if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1727                && *decl.parent_id() == my_addr
1728            {
1729                child_count += 1;
1730                if let Some(filter) = peer.inbound_filter() {
1731                    match filter.estimated_count(max_fpr) {
1732                        Some(n) => {
1733                            total += n;
1734                            has_data = true;
1735                        }
1736                        None => {
1737                            self.estimated_mesh_size = None;
1738                            return;
1739                        }
1740                    }
1741                }
1742            }
1743        }
1744
1745        if !has_data {
1746            self.estimated_mesh_size = None;
1747            return;
1748        }
1749
1750        let size = total.round() as u64;
1751        self.estimated_mesh_size = Some(size);
1752
1753        // Periodic logging (reuse MMP default interval: 30s)
1754        let now = std::time::Instant::now();
1755        let should_log = match self.last_mesh_size_log {
1756            None => true,
1757            Some(last) => {
1758                now.duration_since(last)
1759                    >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1760            }
1761        };
1762        if should_log {
1763            tracing::debug!(
1764                estimated_mesh_size = size,
1765                peers = self.peers.len(),
1766                children = child_count,
1767                "Mesh size estimate"
1768            );
1769            self.last_mesh_size_log = Some(now);
1770        }
1771    }
1772
1773    // === Coord Cache ===
1774
1775    /// Get the coordinate cache.
1776    pub fn coord_cache(&self) -> &CoordCache {
1777        &self.coord_cache
1778    }
1779
1780    /// Get mutable coordinate cache.
1781    pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1782        &mut self.coord_cache
1783    }
1784
1785    // === Node Statistics ===
1786
1787    /// Get the node statistics.
1788    pub fn stats(&self) -> &stats::NodeStats {
1789        &self.stats
1790    }
1791
1792    /// Get mutable node statistics.
1793    pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1794        &mut self.stats
1795    }
1796
1797    /// Get the stats history collector.
1798    pub fn stats_history(&self) -> &stats_history::StatsHistory {
1799        &self.stats_history
1800    }
1801
1802    /// Sample the current node state into the stats history ring.
1803    /// Called once per tick from the RX loop.
1804    pub(crate) fn record_stats_history(&mut self) {
1805        let fwd = &self.stats.forwarding;
1806        let peers_with_mmp: Vec<f64> = self
1807            .peers
1808            .values()
1809            .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1810            .collect();
1811        let loss_rate = if peers_with_mmp.is_empty() {
1812            0.0
1813        } else {
1814            peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1815        };
1816
1817        let snap = stats_history::Snapshot {
1818            mesh_size: self.estimated_mesh_size,
1819            tree_depth: self.tree_state.my_coords().depth() as u32,
1820            peer_count: self.peers.len() as u64,
1821            parent_switches_total: self.stats.tree.parent_switches,
1822            bytes_in_total: fwd.received_bytes,
1823            bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1824            packets_in_total: fwd.received_packets,
1825            packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1826            loss_rate,
1827            active_sessions: self.sessions.len() as u64,
1828        };
1829
1830        let now = std::time::Instant::now();
1831        let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1832            .peers
1833            .values()
1834            .map(|p| {
1835                let stats = p.link_stats();
1836                let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1837                    Some(m) => (
1838                        m.metrics.srtt_ms(),
1839                        Some(m.metrics.loss_rate()),
1840                        m.receiver.ecn_ce_count() as u64,
1841                    ),
1842                    None => (None, None, 0),
1843                };
1844                stats_history::PeerSnapshot {
1845                    node_addr: *p.node_addr(),
1846                    last_seen: now,
1847                    srtt_ms,
1848                    loss_rate,
1849                    bytes_in_total: stats.bytes_recv,
1850                    bytes_out_total: stats.bytes_sent,
1851                    packets_in_total: stats.packets_recv,
1852                    packets_out_total: stats.packets_sent,
1853                    ecn_ce_total: ecn_ce,
1854                }
1855            })
1856            .collect();
1857
1858        self.stats_history.tick(now, &snap, &peer_snaps);
1859    }
1860
1861    // === TUN Interface ===
1862
1863    /// Get the TUN state.
1864    pub fn tun_state(&self) -> TunState {
1865        self.tun_state
1866    }
1867
1868    /// Get the TUN interface name, if active.
1869    pub fn tun_name(&self) -> Option<&str> {
1870        self.tun_name.as_deref()
1871    }
1872
1873    // === Resource Limits ===
1874
1875    /// Set the maximum number of connections (handshake phase).
1876    pub fn set_max_connections(&mut self, max: usize) {
1877        self.max_connections = max;
1878    }
1879
1880    /// Set the maximum number of peers (authenticated).
1881    pub fn set_max_peers(&mut self, max: usize) {
1882        self.max_peers = max;
1883    }
1884
1885    /// Returns false when starting more outbound work would exceed a resource
1886    /// cap. A cap of `0` means uncapped.
1887    pub(crate) fn outbound_admission_check(&self) -> bool {
1888        let connection_used = self
1889            .connections
1890            .len()
1891            .saturating_add(self.pending_connects.len());
1892        let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
1893        let connection_allowed =
1894            self.max_connections == 0 || connection_used < self.max_connections;
1895        let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1896        peer_allowed && connection_allowed && link_allowed
1897    }
1898
1899    /// Admission for public/open-discovery outbound work. This includes the
1900    /// general connection/link caps and, when open Nostr discovery is enabled,
1901    /// the configured non-peer budget.
1902    pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
1903        if !self.outbound_admission_check() {
1904            return false;
1905        }
1906
1907        let nostr = &self.config.node.discovery.nostr;
1908        if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
1909            return true;
1910        }
1911
1912        let configured_npubs = self
1913            .config
1914            .peers()
1915            .iter()
1916            .map(|peer| peer.npub.clone())
1917            .collect::<HashSet<_>>();
1918        self.open_discovery_enqueue_budget(&configured_npubs) > 0
1919    }
1920
1921    /// Like `outbound_admission_check`, but for racing a better path to a
1922    /// peer that is already authenticated. This may temporarily add a
1923    /// connection/link, but it does not consume a new peer slot.
1924    pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
1925        let connection_used = self
1926            .connections
1927            .len()
1928            .saturating_add(self.pending_connects.len());
1929        let connection_allowed =
1930            self.max_connections == 0 || connection_used < self.max_connections;
1931        let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1932        connection_allowed && link_allowed
1933    }
1934
1935    /// Set the maximum number of links.
1936    pub fn set_max_links(&mut self, max: usize) {
1937        self.max_links = max;
1938    }
1939
1940    // === Counts ===
1941
1942    /// Number of pending connections (handshake in progress).
1943    pub fn connection_count(&self) -> usize {
1944        self.connections.len()
1945    }
1946
1947    /// Number of authenticated peers.
1948    pub fn peer_count(&self) -> usize {
1949        self.peers.len()
1950    }
1951
1952    /// Number of active links.
1953    pub fn link_count(&self) -> usize {
1954        self.links.len()
1955    }
1956
1957    /// Number of active transports.
1958    pub fn transport_count(&self) -> usize {
1959        self.transports.len()
1960    }
1961
1962    // === Transport Management ===
1963
1964    /// Allocate a new transport ID.
1965    pub fn allocate_transport_id(&mut self) -> TransportId {
1966        let id = TransportId::new(self.next_transport_id);
1967        self.next_transport_id += 1;
1968        id
1969    }
1970
1971    /// Get a transport by ID.
1972    pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1973        self.transports.get(id)
1974    }
1975
1976    /// Get mutable transport by ID.
1977    pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1978        self.transports.get_mut(id)
1979    }
1980
1981    /// Iterate over transport IDs.
1982    pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1983        self.transports.keys()
1984    }
1985
1986    /// Get the packet receiver for the event loop.
1987    pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1988        self.packet_rx.as_mut()
1989    }
1990
1991    // === Link Management ===
1992
1993    /// Allocate a new link ID.
1994    pub fn allocate_link_id(&mut self) -> LinkId {
1995        let id = LinkId::new(self.next_link_id);
1996        self.next_link_id += 1;
1997        id
1998    }
1999
2000    /// Add a link.
2001    pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
2002        if self.max_links > 0 && self.links.len() >= self.max_links {
2003            return Err(NodeError::MaxLinksExceeded {
2004                max: self.max_links,
2005            });
2006        }
2007        let link_id = link.link_id();
2008        let transport_id = link.transport_id();
2009        let remote_addr = link.remote_addr().clone();
2010
2011        self.links.insert(link_id, link);
2012        self.addr_to_link
2013            .insert((transport_id, remote_addr), link_id);
2014        Ok(())
2015    }
2016
2017    /// Get a link by ID.
2018    pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2019        self.links.get(link_id)
2020    }
2021
2022    /// Get a mutable link by ID.
2023    pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2024        self.links.get_mut(link_id)
2025    }
2026
2027    /// Find link ID by transport address.
2028    pub fn find_link_by_addr(
2029        &self,
2030        transport_id: TransportId,
2031        addr: &TransportAddr,
2032    ) -> Option<LinkId> {
2033        self.addr_to_link
2034            .get(&(transport_id, addr.clone()))
2035            .copied()
2036    }
2037
2038    /// Remove a link.
2039    ///
2040    /// Only removes the addr_to_link reverse lookup if it still points to this
2041    /// link. In cross-connection scenarios, a newer link may have replaced the
2042    /// entry for the same address.
2043    pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2044        if let Some(link) = self.links.remove(link_id) {
2045            // Clean up reverse lookup only if it still maps to this link
2046            let key = (link.transport_id(), link.remote_addr().clone());
2047            if self.addr_to_link.get(&key) == Some(link_id) {
2048                self.addr_to_link.remove(&key);
2049            }
2050            Some(link)
2051        } else {
2052            None
2053        }
2054    }
2055
2056    pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2057        if !self.bootstrap_transports.contains(&transport_id) {
2058            return;
2059        }
2060
2061        let transport_in_use = self
2062            .links
2063            .values()
2064            .any(|link| link.transport_id() == transport_id)
2065            || self
2066                .connections
2067                .values()
2068                .any(|conn| conn.transport_id() == Some(transport_id))
2069            || self
2070                .peers
2071                .values()
2072                .any(|peer| peer.transport_id() == Some(transport_id))
2073            || self
2074                .pending_connects
2075                .iter()
2076                .any(|pending| pending.transport_id == transport_id);
2077
2078        if transport_in_use {
2079            return;
2080        }
2081
2082        tracing::debug!(
2083            transport_id = %transport_id,
2084            "bootstrap transport has no remaining references; dropping"
2085        );
2086
2087        self.bootstrap_transports.remove(&transport_id);
2088        self.bootstrap_transport_npubs.remove(&transport_id);
2089        self.transport_drops.remove(&transport_id);
2090        self.transports.remove(&transport_id);
2091    }
2092
2093    /// Iterate over all links.
2094    pub fn links(&self) -> impl Iterator<Item = &Link> {
2095        self.links.values()
2096    }
2097
2098    // === Connection Management (Handshake Phase) ===
2099
2100    /// Add a pending connection.
2101    pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2102        let link_id = connection.link_id();
2103
2104        if self.connections.contains_key(&link_id) {
2105            return Err(NodeError::ConnectionAlreadyExists(link_id));
2106        }
2107
2108        if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2109            return Err(NodeError::MaxConnectionsExceeded {
2110                max: self.max_connections,
2111            });
2112        }
2113
2114        self.connections.insert(link_id, connection);
2115        Ok(())
2116    }
2117
2118    /// Get a connection by LinkId.
2119    pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2120        self.connections.get(link_id)
2121    }
2122
2123    /// Get a mutable connection by LinkId.
2124    pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2125        self.connections.get_mut(link_id)
2126    }
2127
2128    /// Remove a connection.
2129    pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2130        self.connections.remove(link_id)
2131    }
2132
2133    /// Iterate over all connections.
2134    pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2135        self.connections.values()
2136    }
2137
2138    // === Peer Management (Active Phase) ===
2139
2140    /// Get a peer by NodeAddr.
2141    pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2142        self.peers.get(node_addr)
2143    }
2144
2145    /// Get a mutable peer by NodeAddr.
2146    pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2147        self.peers.get_mut(node_addr)
2148    }
2149
2150    /// Remove a peer.
2151    pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2152        self.peers.remove(node_addr)
2153    }
2154
2155    /// Iterate over all peers.
2156    pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2157        self.peers.values()
2158    }
2159
2160    /// Reference to the Nostr discovery handle if discovery is enabled.
2161    /// Used by control queries (`show_peers` per-peer Nostr-traversal
2162    /// state) to read failure-state without taking shared ownership.
2163    pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2164        self.nostr_discovery.as_deref()
2165    }
2166
2167    /// Iterate over all peer node IDs.
2168    pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2169        self.peers.keys()
2170    }
2171
2172    /// Iterate over peers that can send traffic.
2173    pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2174        self.peers.values().filter(|p| p.can_send())
2175    }
2176
2177    /// Number of peers that can send traffic.
2178    pub fn sendable_peer_count(&self) -> usize {
2179        self.peers.values().filter(|p| p.can_send()).count()
2180    }
2181
2182    pub(crate) fn set_discovery_fallback_transit_allowed(
2183        &mut self,
2184        peer_addr: NodeAddr,
2185        allowed: bool,
2186    ) {
2187        if allowed {
2188            self.discovery_fallback_transit_blocked_peers
2189                .remove(&peer_addr);
2190        } else {
2191            self.discovery_fallback_transit_blocked_peers
2192                .insert(peer_addr);
2193        }
2194    }
2195
2196    pub(crate) fn configured_discovery_fallback_transit(
2197        &self,
2198        peer_addr: &NodeAddr,
2199    ) -> Option<bool> {
2200        self.configured_peer(peer_addr)
2201            .map(|peer| peer.discovery_fallback_transit)
2202    }
2203
2204    pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2205        self.config.peers().iter().find(|peer| {
2206            PeerIdentity::from_npub(&peer.npub)
2207                .ok()
2208                .is_some_and(|identity| identity.node_addr() == peer_addr)
2209        })
2210    }
2211
2212    pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2213        if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2214            return retry_state.peer_config.discovery_fallback_transit;
2215        }
2216
2217        if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2218            return allowed;
2219        }
2220
2221        self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2222    }
2223
2224    // === End-to-End Sessions ===
2225
2226    /// Get a session by remote NodeAddr.
2227    /// Disable the discovery forward rate limiter (for tests).
2228    #[cfg(test)]
2229    pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2230        self.discovery_forward_limiter
2231            .set_interval(std::time::Duration::ZERO);
2232    }
2233
2234    #[cfg(test)]
2235    pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2236        self.sessions.get(remote)
2237    }
2238
2239    /// Get a mutable session by remote NodeAddr.
2240    #[cfg(test)]
2241    pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2242        self.sessions.get_mut(remote)
2243    }
2244
2245    /// Remove a session.
2246    #[cfg(test)]
2247    pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2248        self.sessions.remove(remote)
2249    }
2250
2251    /// Read the path_mtu_lookup entry for a destination FipsAddress.
2252    #[cfg(test)]
2253    pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2254        self.path_mtu_lookup
2255            .read()
2256            .ok()
2257            .and_then(|map| map.get(fips_addr).copied())
2258    }
2259
2260    /// Write a path_mtu_lookup entry directly (for tests that pre-seed the map).
2261    #[cfg(test)]
2262    pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2263        if let Ok(mut map) = self.path_mtu_lookup.write() {
2264            map.insert(fips_addr, mtu);
2265        }
2266    }
2267
2268    /// Number of end-to-end sessions.
2269    pub fn session_count(&self) -> usize {
2270        self.sessions.len()
2271    }
2272
2273    /// Iterate over all session entries (for control queries).
2274    pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2275        self.sessions.iter()
2276    }
2277
2278    // === Identity Cache ===
2279
2280    /// Register a node in the identity cache for FipsAddress → NodeAddr lookup.
2281    pub(crate) fn register_identity(
2282        &mut self,
2283        node_addr: NodeAddr,
2284        pubkey: secp256k1::PublicKey,
2285    ) -> bool {
2286        let mut prefix = [0u8; 15];
2287        prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2288        if let Some(entry) = self.identity_cache.get(&prefix)
2289            && entry.node_addr == node_addr
2290            && entry.pubkey == pubkey
2291        {
2292            // Endpoint sends pass the same PeerIdentity on every packet. Once
2293            // validated, avoid re-deriving NodeAddr from the public key in the
2294            // data path; that hash showed up in macOS sender profiles.
2295            return true;
2296        }
2297
2298        let (xonly, _) = pubkey.x_only_public_key();
2299        let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2300        if derived_node_addr != node_addr {
2301            debug!(
2302                claimed_node_addr = %node_addr,
2303                derived_node_addr = %derived_node_addr,
2304                "Rejected identity cache entry with mismatched public key"
2305            );
2306            return false;
2307        }
2308
2309        let now_ms = Self::now_ms();
2310        if let Some(entry) = self.identity_cache.get_mut(&prefix)
2311            && entry.node_addr == node_addr
2312        {
2313            entry.pubkey = pubkey;
2314            entry.last_seen_ms = now_ms;
2315            return true;
2316        }
2317
2318        let npub = encode_npub(&xonly);
2319        self.identity_cache.insert(
2320            prefix,
2321            IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2322        );
2323        // LRU eviction
2324        let max = self.config.node.cache.identity_size;
2325        if self.identity_cache.len() > max
2326            && let Some(oldest_key) = self
2327                .identity_cache
2328                .iter()
2329                .min_by_key(|(_, entry)| entry.last_seen_ms)
2330                .map(|(k, _)| *k)
2331        {
2332            self.identity_cache.remove(&oldest_key);
2333        }
2334        true
2335    }
2336
2337    /// Look up a destination by FipsAddress prefix (bytes 1-15 of the IPv6 address).
2338    pub(crate) fn lookup_by_fips_prefix(
2339        &mut self,
2340        prefix: &[u8; 15],
2341    ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2342        if let Some(entry) = self.identity_cache.get_mut(prefix) {
2343            entry.last_seen_ms = Self::now_ms(); // LRU touch
2344            Some((entry.node_addr, entry.pubkey))
2345        } else {
2346            None
2347        }
2348    }
2349
2350    /// Check if a node's identity is in the cache (without LRU touch).
2351    pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2352        let mut prefix = [0u8; 15];
2353        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2354        self.identity_cache.contains_key(&prefix)
2355    }
2356
2357    /// Number of identity cache entries.
2358    pub fn identity_cache_len(&self) -> usize {
2359        self.identity_cache.len()
2360    }
2361
2362    /// Iterate over identity cache entries.
2363    ///
2364    /// Returns `(NodeAddr, PublicKey, last_seen_ms)` for each cached identity.
2365    /// Used by the `show_identity_cache` control query.
2366    pub fn identity_cache_iter(
2367        &self,
2368    ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2369        self.identity_cache
2370            .values()
2371            .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2372    }
2373
2374    /// Configured maximum identity cache size.
2375    pub fn identity_cache_max(&self) -> usize {
2376        self.config.node.cache.identity_size
2377    }
2378
2379    /// Number of pending discovery lookups.
2380    pub fn pending_lookup_count(&self) -> usize {
2381        self.pending_lookups.len()
2382    }
2383
2384    /// Iterate over pending discovery lookups for diagnostics.
2385    pub fn pending_lookups_iter(
2386        &self,
2387    ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2388        self.pending_lookups.iter()
2389    }
2390
2391    /// Number of recent discovery requests tracked.
2392    pub fn recent_request_count(&self) -> usize {
2393        self.recent_requests.len()
2394    }
2395
2396    /// Count of destinations with queued TUN packets awaiting session setup.
2397    pub fn pending_tun_destinations(&self) -> usize {
2398        self.pending_tun_packets.len()
2399    }
2400
2401    /// Total TUN packets queued across all destinations.
2402    pub fn pending_tun_total_packets(&self) -> usize {
2403        self.pending_tun_packets.values().map(|q| q.len()).sum()
2404    }
2405
2406    /// Iterate over retry state for diagnostics.
2407    pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2408        self.retry_pending.iter()
2409    }
2410
2411    // === Routing ===
2412
2413    /// Check if a peer is a tree neighbor (parent or child in the spanning tree).
2414    ///
2415    /// Returns true if the peer is our current tree parent, or if the peer
2416    /// has declared us as their parent (making them our child).
2417    pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2418        // Peer is our parent
2419        if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2420            return true;
2421        }
2422        // Peer is our child (their declaration names us as parent)
2423        if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2424            && decl.parent_id() == self.node_addr()
2425        {
2426            return true;
2427        }
2428        false
2429    }
2430
2431    /// Find next hop for a destination node address.
2432    ///
2433    /// Routing priority:
2434    /// 1. Destination is self → `None` (local delivery)
2435    /// 2. Destination is a healthy direct peer → that peer, unless a known
2436    ///    fallback next-hop has a meaningful link-quality advantage.
2437    /// 3. Reply-learned routes in `reply_learned` mode. These are locally
2438    ///    observed reverse paths, selected with weighted multipath plus
2439    ///    periodic coordinate/tree exploration.
2440    /// 4. Bloom filter candidates with cached dest coords → among peers whose
2441    ///    bloom filter contains the destination, pick the one that minimizes
2442    ///    tree distance to the destination, with
2443    ///    `(link_cost, tree_distance_to_dest, node_addr)` tie-breaking.
2444    ///    The self-distance check ensures only peers strictly closer to the
2445    ///    destination than us are considered (prevents routing loops).
2446    /// 5. Greedy tree routing fallback (requires cached dest coords)
2447    /// 6. No route → `None`
2448    ///
2449    /// Both the bloom filter and tree routing paths require cached destination
2450    /// coordinates (checked in `coord_cache`). Without coordinates, the node
2451    /// cannot make loop-free forwarding decisions. The caller should signal
2452    /// `CoordsRequired` back to the source when `None` is returned for a
2453    /// non-local destination.
2454    pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2455        // 1. Local delivery
2456        if dest_node_addr == self.node_addr() {
2457            return None;
2458        }
2459        let now_ms = Self::now_ms();
2460        let direct_session_degraded = self.session_direct_path_is_degraded(dest_node_addr, now_ms);
2461
2462        let healthy_direct_route = self
2463            .peers
2464            .get(dest_node_addr)
2465            .filter(|peer| peer.is_healthy() && !direct_session_degraded)
2466            .map(|_| *dest_node_addr);
2467        let direct_payload_eligible = healthy_direct_route.is_some();
2468        let payload_candidate_can_send = |addr: &NodeAddr, peer: &ActivePeer| {
2469            if addr == dest_node_addr {
2470                direct_payload_eligible
2471            } else {
2472                peer.is_healthy()
2473            }
2474        };
2475
2476        // A healthy direct path is not automatically the best path. A
2477        // hotspot/NAT hairpin can remain sendable with high RTT or mild loss;
2478        // in that case a lower-cost mesh next-hop should carry traffic while
2479        // direct probes continue in the background.
2480        let fallback_beats_direct = |node: &Self, fallback_addr: NodeAddr| {
2481            node.route_candidate_beats_direct(healthy_direct_route, fallback_addr)
2482        };
2483
2484        let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2485            Some(
2486                self.peers
2487                    .iter()
2488                    .filter(|(addr, peer)| payload_candidate_can_send(addr, peer))
2489                    .map(|(addr, _)| *addr)
2490                    .collect::<HashSet<_>>(),
2491            )
2492        } else {
2493            None
2494        };
2495
2496        // 3. Optional reply-learned routing. These entries are not peer
2497        // claims; they are local observations of which peer carried traffic
2498        // or a verified lookup response back from the destination. Most
2499        // packets use weighted multipath over learned routes, but periodic
2500        // fallback exploration lets coord/bloom/tree routes discover better
2501        // candidates.
2502        let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2503            self.learned_routes.should_explore_fallback(
2504                dest_node_addr,
2505                now_ms,
2506                self.config.node.routing.learned_fallback_explore_interval,
2507                |addr| sendable.contains(addr),
2508            )
2509        });
2510        if let Some(sendable) = &sendable_learned_peers
2511            && !explore_fallback
2512        {
2513            let eligible = sendable
2514                .iter()
2515                .copied()
2516                .filter(|addr| fallback_beats_direct(self, *addr))
2517                .collect::<HashSet<_>>();
2518            if !eligible.is_empty()
2519                && let Some(next_hop_addr) =
2520                    self.learned_routes
2521                        .select_next_hop(dest_node_addr, now_ms, |addr| eligible.contains(addr))
2522            {
2523                return self.peers.get(&next_hop_addr);
2524            }
2525        }
2526
2527        // Look up cached destination coordinates (required by both bloom and tree paths).
2528        let Some(dest_coords) = self
2529            .coord_cache
2530            .get_and_touch(dest_node_addr, now_ms)
2531            .cloned()
2532        else {
2533            if (healthy_direct_route.is_none() || explore_fallback)
2534                && let Some(sendable) = &sendable_learned_peers
2535                && let Some(next_hop_addr) =
2536                    self.learned_routes
2537                        .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2538            {
2539                return self.peers.get(&next_hop_addr);
2540            }
2541            if let Some(direct_addr) = healthy_direct_route {
2542                return self.peers.get(&direct_addr);
2543            }
2544            return None;
2545        };
2546
2547        // 4. Bloom filter candidates — requires dest_coords for loop-free selection.
2548        //    If no candidate is strictly closer, fall through to tree routing.
2549        let coordinate_route_addr = {
2550            let candidates: Vec<&ActivePeer> = self
2551                .peers
2552                .iter()
2553                .filter(|(addr, peer)| {
2554                    payload_candidate_can_send(addr, peer) && peer.may_reach(dest_node_addr)
2555                })
2556                .map(|(_, peer)| peer)
2557                .collect();
2558            if !candidates.is_empty() {
2559                self.select_best_candidate(&candidates, &dest_coords)
2560                    .map(|peer| *peer.node_addr())
2561            } else {
2562                None
2563            }
2564        };
2565        if let Some(next_hop_addr) = coordinate_route_addr
2566            && fallback_beats_direct(self, next_hop_addr)
2567        {
2568            return self.peers.get(&next_hop_addr);
2569        }
2570
2571        // 5. Greedy tree routing fallback
2572        let tree_route_addr = self.select_tree_payload_candidate(
2573            &dest_coords,
2574            dest_node_addr,
2575            direct_payload_eligible,
2576        );
2577        if let Some(next_hop_addr) = tree_route_addr
2578            && fallback_beats_direct(self, next_hop_addr)
2579        {
2580            return self.peers.get(&next_hop_addr);
2581        }
2582
2583        if explore_fallback {
2584            return sendable_learned_peers.as_ref().and_then(|sendable| {
2585                self.learned_routes
2586                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2587                    .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2588            });
2589        }
2590
2591        if let Some(direct_addr) = healthy_direct_route {
2592            return self.peers.get(&direct_addr);
2593        }
2594
2595        if let Some(sendable) = &sendable_learned_peers
2596            && let Some(next_hop_addr) =
2597                self.learned_routes
2598                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2599        {
2600            return self.peers.get(&next_hop_addr);
2601        }
2602
2603        None
2604    }
2605
2606    pub(in crate::node) fn find_transit_next_hop(
2607        &mut self,
2608        dest_node_addr: &NodeAddr,
2609        previous_hop: &NodeAddr,
2610    ) -> Option<NodeAddr> {
2611        if dest_node_addr == self.node_addr() {
2612            return None;
2613        }
2614
2615        if dest_node_addr != previous_hop
2616            && self
2617                .peers
2618                .get(dest_node_addr)
2619                .is_some_and(|peer| peer.is_healthy())
2620        {
2621            return Some(*dest_node_addr);
2622        }
2623
2624        let next_hop_addr = *self.find_next_hop(dest_node_addr)?.node_addr();
2625        if &next_hop_addr == previous_hop {
2626            self.record_route_failure(*dest_node_addr, next_hop_addr);
2627            return None;
2628        }
2629        Some(next_hop_addr)
2630    }
2631
2632    fn route_candidate_beats_direct(
2633        &self,
2634        healthy_direct_route: Option<NodeAddr>,
2635        candidate_addr: NodeAddr,
2636    ) -> bool {
2637        let Some(direct_addr) = healthy_direct_route else {
2638            return true;
2639        };
2640        if candidate_addr == direct_addr {
2641            return false;
2642        }
2643
2644        let Some(direct) = self.peers.get(&direct_addr) else {
2645            return true;
2646        };
2647        let Some(candidate) = self.peers.get(&candidate_addr) else {
2648            return false;
2649        };
2650        if !candidate.is_healthy() {
2651            return false;
2652        }
2653
2654        let direct_cost = direct.link_cost();
2655        let candidate_cost = candidate.link_cost();
2656        candidate_cost + ROUTING_FALLBACK_MIN_COST_ADVANTAGE < direct_cost
2657    }
2658
2659    fn select_tree_payload_candidate(
2660        &self,
2661        dest_coords: &crate::tree::TreeCoordinate,
2662        direct_dest: &NodeAddr,
2663        direct_payload_eligible: bool,
2664    ) -> Option<NodeAddr> {
2665        if self.tree_state.my_coords().root_id() != dest_coords.root_id() {
2666            return None;
2667        }
2668
2669        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2670        let mut best: Option<(NodeAddr, usize)> = None;
2671
2672        for (peer_addr, peer) in &self.peers {
2673            if peer_addr == direct_dest {
2674                if !direct_payload_eligible {
2675                    continue;
2676                }
2677            } else if !peer.is_healthy() {
2678                continue;
2679            }
2680
2681            let Some(peer_coords) = self.tree_state.peer_coords(peer_addr) else {
2682                continue;
2683            };
2684            let distance = peer_coords.distance_to(dest_coords);
2685            if distance >= my_distance {
2686                continue;
2687            }
2688
2689            let dominated = match &best {
2690                None => true,
2691                Some((best_id, best_dist)) => {
2692                    distance < *best_dist || (distance == *best_dist && peer_addr < best_id)
2693                }
2694            };
2695            if dominated {
2696                best = Some((*peer_addr, distance));
2697            }
2698        }
2699
2700        best.map(|(peer_addr, _)| peer_addr)
2701    }
2702
2703    pub(in crate::node) fn session_direct_path_is_degraded(
2704        &mut self,
2705        dest: &NodeAddr,
2706        now_ms: u64,
2707    ) -> bool {
2708        match self.session_direct_degraded_until_ms.get(dest).copied() {
2709            Some(until_ms) if until_ms > now_ms => true,
2710            Some(_) => {
2711                self.session_direct_degraded_until_ms.remove(dest);
2712                false
2713            }
2714            None => false,
2715        }
2716    }
2717
2718    pub(in crate::node) fn mark_session_direct_path_degraded(
2719        &mut self,
2720        dest: NodeAddr,
2721        now_ms: u64,
2722    ) -> bool {
2723        let until_ms = now_ms.saturating_add(SESSION_DIRECT_DEGRADED_HOLD_MS);
2724        let entry = self
2725            .session_direct_degraded_until_ms
2726            .entry(dest)
2727            .or_insert(0);
2728        let was_degraded = *entry > now_ms;
2729        *entry = (*entry).max(until_ms);
2730        !was_degraded
2731    }
2732
2733    pub(in crate::node) fn clear_session_direct_path_degraded(&mut self, dest: &NodeAddr) -> bool {
2734        self.session_direct_degraded_until_ms.remove(dest).is_some()
2735    }
2736
2737    pub(in crate::node) fn learn_reverse_route(
2738        &mut self,
2739        destination: NodeAddr,
2740        next_hop: NodeAddr,
2741    ) {
2742        if self.config.node.routing.mode != RoutingMode::ReplyLearned
2743            || destination == *self.node_addr()
2744        {
2745            return;
2746        }
2747        let now_ms = Self::now_ms();
2748        self.learned_routes.learn(
2749            destination,
2750            next_hop,
2751            now_ms,
2752            self.config.node.routing.learned_ttl_secs,
2753            self.config.node.routing.max_learned_routes_per_dest,
2754        );
2755    }
2756
2757    pub(in crate::node) fn record_route_failure(
2758        &mut self,
2759        destination: NodeAddr,
2760        next_hop: NodeAddr,
2761    ) {
2762        if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2763            return;
2764        }
2765        self.learned_routes.record_failure(&destination, &next_hop);
2766    }
2767
2768    pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2769        self.learned_routes.snapshot(now_ms)
2770    }
2771
2772    pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2773        self.learned_routes.purge_expired(now_ms);
2774    }
2775
2776    /// Select the best peer from a set of bloom filter candidates.
2777    ///
2778    /// Uses distance from each candidate's tree coordinates to the destination
2779    /// as the primary metric (after link_cost). Only selects peers that are
2780    /// strictly closer to the destination than we are (self-distance check
2781    /// prevents routing loops).
2782    ///
2783    /// Ordering: `(link_cost, distance_to_dest, node_addr)`.
2784    fn select_best_candidate<'a>(
2785        &'a self,
2786        candidates: &[&'a ActivePeer],
2787        dest_coords: &crate::tree::TreeCoordinate,
2788    ) -> Option<&'a ActivePeer> {
2789        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2790
2791        let mut best: Option<(&ActivePeer, f64, usize)> = None;
2792
2793        for &candidate in candidates {
2794            if !candidate.can_send() {
2795                continue;
2796            }
2797
2798            let cost = candidate.link_cost();
2799
2800            let dist = self
2801                .tree_state
2802                .peer_coords(candidate.node_addr())
2803                .map(|pc| pc.distance_to(dest_coords))
2804                .unwrap_or(usize::MAX);
2805
2806            // Self-distance check: only consider peers strictly closer
2807            // to the destination than we are (prevents routing loops)
2808            if dist >= my_distance {
2809                continue;
2810            }
2811
2812            let dominated = match &best {
2813                None => true,
2814                Some((_, best_cost, best_dist)) => {
2815                    cost < *best_cost
2816                        || (cost == *best_cost && dist < *best_dist)
2817                        || (cost == *best_cost
2818                            && dist == *best_dist
2819                            && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2820                }
2821            };
2822
2823            if dominated {
2824                best = Some((candidate, cost, dist));
2825            }
2826        }
2827
2828        best.map(|(peer, _, _)| peer)
2829    }
2830
2831    /// Check if a destination is in any peer's bloom filter.
2832    pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2833        self.peers.values().filter(|p| p.may_reach(dest)).collect()
2834    }
2835
2836    /// Get the TUN packet sender channel.
2837    ///
2838    /// Returns None if TUN is not active or the node hasn't been started.
2839    pub fn tun_tx(&self) -> Option<&TunTx> {
2840        self.tun_tx.as_ref()
2841    }
2842
2843    /// Attach app-owned packet I/O for embedded operation without a system TUN.
2844    ///
2845    /// This must be called before [`Node::start`] and requires `tun.enabled =
2846    /// false`. Outbound packets sent to the returned sender are processed by the
2847    /// normal session pipeline. Inbound packets delivered by FIPS sessions are
2848    /// sent to the returned receiver with source attribution.
2849    pub fn attach_external_packet_io(
2850        &mut self,
2851        capacity: usize,
2852    ) -> Result<ExternalPacketIo, NodeError> {
2853        if self.state != NodeState::Created {
2854            return Err(NodeError::Config(ConfigError::Validation(
2855                "external packet I/O must be attached before node start".to_string(),
2856            )));
2857        }
2858        if self.config.tun.enabled {
2859            return Err(NodeError::Config(ConfigError::Validation(
2860                "external packet I/O requires tun.enabled=false".to_string(),
2861            )));
2862        }
2863
2864        let capacity = capacity.max(1);
2865        let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2866        let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2867        self.tun_outbound_rx = Some(outbound_rx);
2868        self.external_packet_tx = Some(inbound_tx);
2869
2870        Ok(ExternalPacketIo {
2871            outbound_tx,
2872            inbound_rx,
2873        })
2874    }
2875
2876    /// Attach app-owned endpoint data I/O for embedded operation.
2877    ///
2878    /// Commands sent to the returned sender are processed by the node RX loop.
2879    /// Incoming endpoint data is emitted as source-attributed events.
2880    pub(crate) fn attach_endpoint_data_io(
2881        &mut self,
2882        capacity: usize,
2883    ) -> Result<EndpointDataIo, NodeError> {
2884        if self.state != NodeState::Created {
2885            return Err(NodeError::Config(ConfigError::Validation(
2886                "endpoint data I/O must be attached before node start".to_string(),
2887            )));
2888        }
2889
2890        let command_capacity = endpoint_data_command_capacity(capacity);
2891        let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2892        // Inbound endpoint-data events use an unbounded channel — see
2893        // `EndpointDataIo::event_rx` docs for the rationale (kills the
2894        // per-packet semaphore + the cross-task relay task that used to
2895        // sit on top of this channel).
2896        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2897        self.endpoint_command_rx = Some(command_rx);
2898        self.endpoint_event_tx = Some(event_tx.clone());
2899
2900        Ok(EndpointDataIo {
2901            command_tx,
2902            event_rx,
2903            event_tx,
2904        })
2905    }
2906
2907    pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2908        let mut prefix = [0u8; 15];
2909        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2910        self.identity_cache
2911            .get(&prefix)
2912            .filter(|entry| &entry.node_addr == addr)
2913            .map(|entry| entry.pubkey)
2914    }
2915
2916    pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2917        let mut prefix = [0u8; 15];
2918        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2919        self.identity_cache
2920            .get(&prefix)
2921            .filter(|entry| &entry.node_addr == addr)
2922            .map(|entry| entry.npub.clone())
2923    }
2924
2925    pub(in crate::node) fn deliver_external_ipv6_packet(
2926        &self,
2927        src_addr: &NodeAddr,
2928        packet: Vec<u8>,
2929    ) {
2930        let Some(external_packet_tx) = &self.external_packet_tx else {
2931            return;
2932        };
2933        if packet.len() < 40 {
2934            return;
2935        }
2936        let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2937            return;
2938        };
2939        let delivered = NodeDeliveredPacket {
2940            source_node_addr: *src_addr,
2941            source_npub: self.npub_for_node_addr(src_addr),
2942            destination,
2943            packet,
2944        };
2945        if let Err(error) = external_packet_tx.try_send(delivered) {
2946            debug!(error = %error, "Failed to deliver packet to external app sink");
2947        }
2948    }
2949
2950    // === Sending ===
2951
2952    /// Encrypt and send a link-layer message to an authenticated peer.
2953    ///
2954    /// The plaintext should include the message type byte followed by the
2955    /// message-specific payload (e.g., `[0x50, reason]` for Disconnect).
2956    ///
2957    /// The send path prepends a 4-byte session-relative timestamp (inner
2958    /// header) before encryption. The full 16-byte outer header is used
2959    /// as AAD for the AEAD construction.
2960    ///
2961    /// This is the standard path for sending any link-layer control message
2962    /// to a peer over their encrypted Noise session.
2963    pub(super) async fn send_encrypted_link_message(
2964        &mut self,
2965        node_addr: &NodeAddr,
2966        plaintext: &[u8],
2967    ) -> Result<(), NodeError> {
2968        self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2969            .await
2970    }
2971
2972    /// Update the local-outbound-broken signal from a `transport.send`
2973    /// outcome. Sets `last_local_send_failure_at` on local-side io
2974    /// errors (NetworkUnreachable / HostUnreachable / AddrNotAvailable);
2975    /// clears it on success. The reaper consults this in
2976    /// `check_link_heartbeats` to switch to `fast_link_dead_timeout_secs`.
2977    pub(in crate::node) fn note_local_send_outcome(
2978        &mut self,
2979        result: &Result<usize, TransportError>,
2980    ) {
2981        match result {
2982            Ok(_) => {
2983                if self.last_local_send_failure_at.is_some() {
2984                    self.last_local_send_failure_at = None;
2985                }
2986            }
2987            Err(error) if error.is_local_route_unavailable() => {
2988                self.last_local_send_failure_at = Some(std::time::Instant::now());
2989            }
2990            Err(_) => {}
2991        }
2992    }
2993
2994    /// Return the active dead-timeout after considering recent local route
2995    /// failures. The fast-dead signal is intentionally short-lived: on the
2996    /// UDP worker path a send call can return before the kernel result is
2997    /// observed, so a single stale route error must not compress liveness for
2998    /// the whole normal dead-timeout window.
2999    pub(in crate::node) fn local_send_failure_dead_timeout(
3000        &mut self,
3001        now: std::time::Instant,
3002        dead_timeout: std::time::Duration,
3003        fast_dead_timeout: std::time::Duration,
3004    ) -> std::time::Duration {
3005        match self.last_local_send_failure_at {
3006            Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
3007                fast_dead_timeout.min(dead_timeout)
3008            }
3009            Some(_) => {
3010                self.last_local_send_failure_at = None;
3011                dead_timeout
3012            }
3013            None => dead_timeout,
3014        }
3015    }
3016
3017    pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
3018        self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
3019    }
3020
3021    pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
3022        let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
3023            return false;
3024        };
3025        let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
3026        std::time::Instant::now().duration_since(t) <= grace
3027    }
3028
3029    /// Like `send_encrypted_link_message` but allows setting the FMP CE flag.
3030    ///
3031    /// Used by the forwarding path to relay congestion signals hop-by-hop.
3032    pub(super) async fn send_encrypted_link_message_with_ce(
3033        &mut self,
3034        node_addr: &NodeAddr,
3035        plaintext: &[u8],
3036        ce_flag: bool,
3037    ) -> Result<(), NodeError> {
3038        let peer = self
3039            .peers
3040            .get_mut(node_addr)
3041            .ok_or(NodeError::PeerNotFound(*node_addr))?;
3042
3043        let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
3044            node_addr: *node_addr,
3045            reason: "no their_index".into(),
3046        })?;
3047        let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
3048            node_addr: *node_addr,
3049            reason: "no transport_id".into(),
3050        })?;
3051        let remote_addr = peer
3052            .current_addr()
3053            .cloned()
3054            .ok_or_else(|| NodeError::SendFailed {
3055                node_addr: *node_addr,
3056                reason: "no current_addr".into(),
3057            })?;
3058        #[cfg(any(target_os = "linux", target_os = "macos"))]
3059        let connected_socket = peer.connected_udp();
3060
3061        // Prepend 4-byte session-relative timestamp (inner header)
3062        let timestamp_ms = peer.session_elapsed_ms();
3063
3064        // MMP: read spin bit value before entering session borrow
3065        let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
3066        let mut flags = if sp_flag { FLAG_SP } else { 0 };
3067        if ce_flag {
3068            flags |= FLAG_CE;
3069        }
3070        if peer.current_k_bit() {
3071            flags |= FLAG_KEY_EPOCH;
3072        }
3073
3074        let session = peer
3075            .noise_session_mut()
3076            .ok_or_else(|| NodeError::SendFailed {
3077                node_addr: *node_addr,
3078                reason: "no noise session".into(),
3079            })?;
3080
3081        // Build 16-byte outer header upfront. The inner-plaintext
3082        // layout is `[ts:4 LE][plaintext...]`, so its length is exactly
3083        // `INNER_TS_LEN + plaintext.len()` — no need to build the Vec
3084        // just to measure it. The worker path uses this length to size
3085        // the wire buffer directly; the legacy path below still
3086        // materialises a separate `inner_plaintext` Vec for the inline
3087        // encrypt-and-send call.
3088        const INNER_TS_LEN: usize = 4;
3089        let counter = session.current_send_counter();
3090        let inner_len = INNER_TS_LEN + plaintext.len();
3091        let payload_len = inner_len as u16;
3092        let header = build_established_header(their_index, counter, flags, payload_len);
3093
3094        // **UDP send fast path.** The encrypt-worker pool is always
3095        // spawned at lifecycle start (workers = num_cpus) in
3096        // production, so this branch is taken for every authentic
3097        // send on every UDP-transported established session. The
3098        // AEAD work + sendmsg syscall run on a dedicated OS thread;
3099        // the rx_loop only builds the wire buffer + reserves the
3100        // counter inline.
3101        //
3102        // Other transport kinds (BLE, TCP, sim, ethernet) fall
3103        // through to the inline encrypt + transport.send path
3104        // below — those don't have raw-fd / sendmmsg / UDP_GSO
3105        // benefits to expose through the worker pool, so the simpler
3106        // synchronous send is the right shape for them.
3107        //
3108        // The `encrypt_workers.is_some()` check below is true in
3109        // production (lifecycle::start spawns the pool); it stays
3110        // checked rather than `expect()`-ed because unit tests
3111        // construct `Node` without calling `start()`.
3112        let transport_for_send = self
3113            .transports
3114            .get(&transport_id)
3115            .ok_or(NodeError::TransportNotFound(transport_id))?;
3116        match transport_for_send.connection_state(&remote_addr) {
3117            ConnectionState::Connected => {}
3118            other => {
3119                if matches!(other, ConnectionState::None) {
3120                    let _ = transport_for_send.connect(&remote_addr).await;
3121                }
3122                return Err(NodeError::SendFailed {
3123                    node_addr: *node_addr,
3124                    reason: format!("transport connection not ready: {:?}", other),
3125                });
3126            }
3127        }
3128        let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
3129        if let Some(workers) = self.encrypt_workers.as_ref().cloned()
3130            && is_udp
3131            && let Some(cipher_clone) = session.send_cipher_clone()
3132        {
3133            {
3134                // Reserve the counter on the session so subsequent
3135                // sends don't reuse it. `current_send_counter` only
3136                // peeks; we advance via `take_send_counter`.
3137                let reserved_counter =
3138                    session
3139                        .take_send_counter()
3140                        .map_err(|e| NodeError::SendFailed {
3141                            node_addr: *node_addr,
3142                            reason: format!("counter reservation failed: {}", e),
3143                        })?;
3144                debug_assert_eq!(reserved_counter, counter);
3145                // Re-derive the header with the now-locked-in counter
3146                // value (same value, but the call sequence is more
3147                // explicit).
3148                let header =
3149                    build_established_header(their_index, reserved_counter, flags, payload_len);
3150                let transport = transport_for_send;
3151                // Snapshot the per-peer connected UDP socket before
3152                // resolving the fallback address. On the established
3153                // steady-state path this socket already carries the
3154                // kernel peer address, so re-parsing the configured
3155                // transport address and touching the DNS cache on every
3156                // packet is pure overhead on the sender hot path.
3157                let send_target = {
3158                    if let TransportHandle::Udp(udp) = transport {
3159                        let socket_addr = {
3160                            #[cfg(any(target_os = "linux", target_os = "macos"))]
3161                            {
3162                                match connected_socket.as_ref() {
3163                                    Some(socket) => Some(socket.peer_addr()),
3164                                    None => udp.resolve_for_off_task(&remote_addr).await.ok(),
3165                                }
3166                            }
3167                            #[cfg(not(any(target_os = "linux", target_os = "macos")))]
3168                            {
3169                                udp.resolve_for_off_task(&remote_addr).await.ok()
3170                            }
3171                        };
3172                        match (udp.async_socket(), socket_addr) {
3173                            (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3174                            _ => None,
3175                        }
3176                    } else {
3177                        None
3178                    }
3179                };
3180                if let Some((socket, socket_addr)) = send_target {
3181                    // Build the wire buffer **directly** from
3182                    // `plaintext` with a single allocation:
3183                    //   `[16 header][4 ts][plaintext...]` with
3184                    // +16 trailing capacity for the AEAD tag.
3185                    // The worker seals `wire_buf[16..]` in
3186                    // place and appends the tag — no second
3187                    // alloc, no second memcpy.
3188                    //
3189                    // Previous design built `inner_plaintext`
3190                    // via `prepend_inner_header` (1 alloc + 1
3191                    // copy) and then let the worker memcpy
3192                    // header + plaintext into a fresh Vec
3193                    // (another alloc + copy). At ~100 kpps the
3194                    // saved alloc/copy is ~150 MB/sec of memory
3195                    // bandwidth on the hot rx_loop + worker.
3196                    let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3197                    let mut wire_buf = Vec::with_capacity(wire_capacity);
3198                    wire_buf.extend_from_slice(&header);
3199                    wire_buf.extend_from_slice(&timestamp_ms.to_le_bytes());
3200                    wire_buf.extend_from_slice(plaintext);
3201                    let predicted_bytes = wire_capacity;
3202                    // Stats / MMP update inline — predicted size
3203                    // is exact for ChaCha20-Poly1305 (tag is
3204                    // constant 16 bytes). When `connected_socket` is
3205                    // `Some`, the worker sends on it without a
3206                    // destination sockaddr — the kernel skips the
3207                    // per-packet sockaddr + route + neighbor resolve.
3208                    if let Some(peer) = self.peers.get_mut(node_addr) {
3209                        peer.link_stats_mut().record_sent(predicted_bytes);
3210                        if let Some(mmp) = peer.mmp_mut() {
3211                            mmp.sender
3212                                .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3213                        }
3214                    }
3215                    let scheduling_weight = self.send_weight_for_peer(node_addr);
3216                    workers.dispatch(self::encrypt_worker::FmpSendJob {
3217                        cipher: cipher_clone,
3218                        counter: reserved_counter,
3219                        wire_buf,
3220                        fsp_seal: None,
3221                        socket,
3222                        dest_addr: socket_addr,
3223                        #[cfg(any(target_os = "linux", target_os = "macos"))]
3224                        connected_socket,
3225                        drop_on_backpressure: fmp_plaintext_is_bulk_session_datagram(plaintext),
3226                        scheduling_weight,
3227                        queued_at: crate::perf_profile::stamp(),
3228                    });
3229                    return Ok(());
3230                }
3231            }
3232        }
3233
3234        // Inline (legacy) path: encrypt + send on the rx_loop.
3235        // Build the inner plaintext lazily here — the worker path
3236        // above never reaches this point, so the prepend_inner_header
3237        // alloc is avoided in the fast path.
3238        let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3239        // Encrypt with AAD binding to the outer header
3240        let ciphertext = {
3241            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3242            session
3243                .encrypt_with_aad(&inner_plaintext, &header)
3244                .map_err(|e| NodeError::SendFailed {
3245                    node_addr: *node_addr,
3246                    reason: format!("encryption failed: {}", e),
3247                })?
3248        };
3249
3250        let wire_packet = build_encrypted(&header, &ciphertext);
3251
3252        // Re-borrow peer for stats update after sending
3253        let send_result = {
3254            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3255            let transport = self
3256                .transports
3257                .get(&transport_id)
3258                .ok_or(NodeError::TransportNotFound(transport_id))?;
3259            transport.send(&remote_addr, &wire_packet).await
3260        };
3261        self.note_local_send_outcome(&send_result);
3262        let bytes_sent = send_result.map_err(|e| match e {
3263            TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3264                node_addr: *node_addr,
3265                packet_size,
3266                mtu,
3267            },
3268            other => NodeError::SendFailed {
3269                node_addr: *node_addr,
3270                reason: format!("transport send: {}", other),
3271            },
3272        })?;
3273
3274        // Update send statistics
3275        if let Some(peer) = self.peers.get_mut(node_addr) {
3276            peer.link_stats_mut().record_sent(bytes_sent);
3277            // MMP: record sent frame for sender report generation
3278            if let Some(mmp) = peer.mmp_mut() {
3279                mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3280            }
3281        }
3282
3283        Ok(())
3284    }
3285}
3286
3287impl fmt::Debug for Node {
3288    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3289        f.debug_struct("Node")
3290            .field("node_addr", self.node_addr())
3291            .field("state", &self.state)
3292            .field("is_leaf_only", &self.is_leaf_only)
3293            .field("connections", &self.connection_count())
3294            .field("peers", &self.peer_count())
3295            .field("links", &self.link_count())
3296            .field("transports", &self.transport_count())
3297            .finish()
3298    }
3299}