Skip to main content

fips_core/node/
mod.rs

1//! FIPS Node Entity
2//!
3//! Top-level structure representing a running FIPS instance. The Node
4//! holds all state required for mesh routing: identity, tree state,
5//! Bloom filters, coordinate caches, transports, links, and peers.
6
7mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32    ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33    build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
38use crate::node::session::SessionEntry;
39use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
40use crate::peer::{ActivePeer, PeerConnection};
41#[cfg(any(target_os = "linux", target_os = "macos"))]
42use crate::transport::ethernet::EthernetTransport;
43use crate::transport::tcp::TcpTransport;
44use crate::transport::tor::TorTransport;
45use crate::transport::udp::UdpTransport;
46#[cfg(feature = "webrtc-transport")]
47use crate::transport::webrtc::WebRtcTransport;
48use crate::transport::{
49    ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
50    TransportHandle, TransportId,
51};
52use crate::tree::TreeState;
53use crate::upper::hosts::HostMap;
54use crate::upper::icmp_rate_limit::IcmpRateLimiter;
55use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
56use crate::utils::index::IndexAllocator;
57use crate::{
58    Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
59    PeerIdentity, encode_npub,
60};
61use rand::Rng;
62use std::collections::{HashMap, HashSet, VecDeque};
63use std::fmt;
64use std::sync::Arc;
65use std::thread::JoinHandle;
66use thiserror::Error;
67use tracing::{debug, warn};
68
69const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
70const SESSION_DIRECT_DEGRADED_HOLD_MS: u64 = 20_000;
71const SESSION_DIRECT_DEGRADED_MIN_SAMPLE: u64 = 8;
72const SESSION_DIRECT_DEGRADED_LOSS_THRESHOLD: f64 = 0.20;
73const SESSION_DIRECT_RECOVERY_LOSS_THRESHOLD: f64 = 0.05;
74
75fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
76    if !plaintext
77        .first()
78        .is_some_and(|ty| *ty == LinkMessageType::SessionDatagram.to_byte())
79    {
80        return false;
81    }
82    let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
83        return false;
84    };
85    FspCommonPrefix::parse(fsp_payload)
86        .is_some_and(|prefix| prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted())
87}
88
89/// Half-range of the symmetric jitter applied to per-session rekey timers.
90///
91/// Each FMP/FSP session draws an offset uniformly from
92/// `[-REKEY_JITTER_SECS, +REKEY_JITTER_SECS]` seconds at construction and
93/// after each cutover. This preserves the configured mean interval while
94/// reducing dual-initiation bursts in symmetric-start meshes.
95pub(crate) const REKEY_JITTER_SECS: i64 = 15;
96
97/// Errors related to node operations.
98#[derive(Debug, Error)]
99pub enum NodeError {
100    #[error("node not started")]
101    NotStarted,
102
103    #[error("node already started")]
104    AlreadyStarted,
105
106    #[error("node already stopped")]
107    AlreadyStopped,
108
109    #[error("transport not found: {0}")]
110    TransportNotFound(TransportId),
111
112    #[error("no transport available for type: {0}")]
113    NoTransportForType(String),
114
115    #[error("link not found: {0}")]
116    LinkNotFound(LinkId),
117
118    #[error("connection not found: {0}")]
119    ConnectionNotFound(LinkId),
120
121    #[error("peer not found: {0:?}")]
122    PeerNotFound(NodeAddr),
123
124    #[error("peer already exists: {0:?}")]
125    PeerAlreadyExists(NodeAddr),
126
127    #[error("connection already exists for link: {0}")]
128    ConnectionAlreadyExists(LinkId),
129
130    #[error("invalid peer npub '{npub}': {reason}")]
131    InvalidPeerNpub { npub: String, reason: String },
132
133    #[error("discovery error: {0}")]
134    Discovery(String),
135
136    #[error("access denied: {0}")]
137    AccessDenied(String),
138
139    #[error("max connections exceeded: {max}")]
140    MaxConnectionsExceeded { max: usize },
141
142    #[error("max peers exceeded: {max}")]
143    MaxPeersExceeded { max: usize },
144
145    #[error("max links exceeded: {max}")]
146    MaxLinksExceeded { max: usize },
147
148    #[error("handshake incomplete for link {0}")]
149    HandshakeIncomplete(LinkId),
150
151    #[error("no session available for link {0}")]
152    NoSession(LinkId),
153
154    #[error("promotion failed for link {link_id}: {reason}")]
155    PromotionFailed { link_id: LinkId, reason: String },
156
157    #[error("send failed to {node_addr}: {reason}")]
158    SendFailed { node_addr: NodeAddr, reason: String },
159
160    #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
161    MtuExceeded {
162        node_addr: NodeAddr,
163        packet_size: usize,
164        mtu: u16,
165    },
166
167    #[error("config error: {0}")]
168    Config(#[from] ConfigError),
169
170    #[error("identity error: {0}")]
171    Identity(#[from] IdentityError),
172
173    #[error("TUN error: {0}")]
174    Tun(#[from] TunError),
175
176    #[error("index allocation failed: {0}")]
177    IndexAllocationFailed(String),
178
179    #[error("handshake failed: {0}")]
180    HandshakeFailed(String),
181
182    #[error("transport error: {0}")]
183    TransportError(String),
184
185    #[error("local route unavailable: {0}")]
186    LocalRouteUnavailable(String),
187
188    #[error("bootstrap handoff failed: {0}")]
189    BootstrapHandoff(String),
190}
191
192impl NodeError {
193    pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
194        if error.is_local_route_unavailable() {
195            Self::LocalRouteUnavailable(error.to_string())
196        } else {
197            Self::TransportError(error.to_string())
198        }
199    }
200
201    pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
202        matches!(self, Self::LocalRouteUnavailable(_))
203    }
204}
205
206/// Source-attributed packet delivered by a node running without a system TUN.
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub struct NodeDeliveredPacket {
209    /// FIPS node address that originated the packet.
210    pub source_node_addr: NodeAddr,
211    /// Source Nostr public key when the node has learned it.
212    pub source_npub: Option<String>,
213    /// Destination FIPS address from the IPv6 packet.
214    pub destination: FipsAddress,
215    /// Full IPv6 packet after FIPS session decapsulation.
216    pub packet: Vec<u8>,
217}
218
219#[derive(Debug, Clone)]
220struct IdentityCacheEntry {
221    node_addr: NodeAddr,
222    pubkey: secp256k1::PublicKey,
223    npub: String,
224    last_seen_ms: u64,
225}
226
227impl IdentityCacheEntry {
228    fn new(
229        node_addr: NodeAddr,
230        pubkey: secp256k1::PublicKey,
231        npub: String,
232        last_seen_ms: u64,
233    ) -> Self {
234        Self {
235            node_addr,
236            pubkey,
237            npub,
238            last_seen_ms,
239        }
240    }
241}
242
243/// App-owned packet channels for embedding FIPS without a system TUN.
244#[derive(Debug)]
245pub struct ExternalPacketIo {
246    /// Send outbound IPv6 packets into the node.
247    pub outbound_tx: crate::upper::tun::TunOutboundTx,
248    /// Receive inbound IPv6 packets delivered by FIPS sessions.
249    pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
250}
251
252/// App-owned endpoint data channels for embedding FIPS without a daemon.
253#[derive(Debug)]
254pub(crate) struct EndpointDataIo {
255    /// Send endpoint data commands into the node RX loop.
256    ///
257    /// Bounded with a generous default so normal sender bursts do not
258    /// stall on semaphore acquisition. macOS pacing happens at the UDP
259    /// egress thread where the real Wi-Fi/interface bottleneck is visible;
260    /// constraining this app queue instead caused the inner TCP flow to
261    /// collapse under iperf. `FIPS_ENDPOINT_DATA_QUEUE_CAP` overrides the
262    /// default for benches.
263    pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
264    /// Receive endpoint data delivered by FIPS sessions.
265    ///
266    /// Unbounded so the rx_loop's send on inbound packet delivery is a
267    /// wait-free push (no semaphore acquire), and so we can drop the
268    /// per-packet cross-task relay that previously sat between the node
269    /// task and the `FipsEndpoint::recv()` consumer. Backpressure is
270    /// naturally bounded — the rx_loop both produces here and runs the
271    /// same runtime that schedules the consumer, so a stalled consumer
272    /// stalls production too.
273    pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
274    /// Clone of the event_tx exposed for in-process loopback (e.g.
275    /// `FipsEndpoint::send` to self_npub). Lets the endpoint inject an
276    /// event into the same queue without going through the encrypt /
277    /// decrypt path, while keeping every consumer reading from a single
278    /// channel.
279    pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
280}
281
282fn endpoint_data_command_capacity(requested: usize) -> usize {
283    if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
284        && let Ok(value) = raw.trim().parse::<usize>()
285        && value > 0
286    {
287        return value;
288    }
289
290    requested.max(1).max(32_768)
291}
292
293/// Commands accepted by the node endpoint data service.
294#[derive(Debug)]
295pub(crate) enum NodeEndpointCommand {
296    /// Send with an explicit response channel — used by callers that
297    /// care whether the local-stack handoff succeeded (e.g.
298    /// `blocking_send` waits for the runtime to accept the send).
299    Send {
300        remote: PeerIdentity,
301        payload: Vec<u8>,
302        queued_at: Option<std::time::Instant>,
303        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
304    },
305    /// **Fire-and-forget** variant of `Send` — no oneshot allocation,
306    /// no per-packet result channel. Used by the data-plane fast path
307    /// (`FipsEndpoint::send`) where the caller already discards the
308    /// result. Saves one oneshot::channel() allocation per outbound
309    /// packet on the application's send hot path.
310    SendOneway {
311        remote: PeerIdentity,
312        payload: Vec<u8>,
313        queued_at: Option<std::time::Instant>,
314    },
315    PeerSnapshot {
316        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
317    },
318    RelaySnapshot {
319        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
320    },
321    UpdateRelays {
322        advert_relays: Vec<String>,
323        dm_relays: Vec<String>,
324        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
325    },
326    /// Replace the runtime peer list. Newly added auto-connect peers get
327    /// `initiate_peer_connection` immediately; removed peers are dropped
328    /// from the retry queue (the regular liveness timeout reaps any active
329    /// session). Existing entries are kept and their `addresses` field is
330    /// refreshed so the next retry sees the latest hints.
331    UpdatePeers {
332        peers: Vec<crate::config::PeerConfig>,
333        response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
334    },
335}
336
337/// Reports what changed in response to `UpdatePeers`.
338#[derive(Debug, Clone, Default, PartialEq, Eq)]
339pub(crate) struct UpdatePeersOutcome {
340    pub(crate) added: usize,
341    pub(crate) removed: usize,
342    pub(crate) updated: usize,
343    pub(crate) unchanged: usize,
344}
345
346/// Endpoint data events emitted by the node session receive path.
347#[derive(Debug)]
348pub(crate) enum NodeEndpointEvent {
349    Data {
350        source_node_addr: NodeAddr,
351        source_npub: Option<String>,
352        payload: Vec<u8>,
353        queued_at: Option<std::time::Instant>,
354    },
355}
356
357/// Authenticated peer state exposed to embedded endpoint callers.
358#[derive(Debug, Clone, PartialEq, Eq)]
359pub(crate) struct NodeEndpointPeer {
360    pub(crate) npub: String,
361    pub(crate) connected: bool,
362    pub(crate) transport_addr: Option<String>,
363    pub(crate) transport_type: Option<String>,
364    pub(crate) link_id: u64,
365    pub(crate) srtt_ms: Option<u64>,
366    pub(crate) packets_sent: u64,
367    pub(crate) packets_recv: u64,
368    pub(crate) bytes_sent: u64,
369    pub(crate) bytes_recv: u64,
370    pub(crate) direct_probe_pending: bool,
371    pub(crate) direct_probe_after_ms: Option<u64>,
372}
373
374/// Live Nostr relay state exposed to embedded endpoint callers.
375#[derive(Debug, Clone, PartialEq, Eq)]
376pub(crate) struct NodeEndpointRelayStatus {
377    pub(crate) url: String,
378    pub(crate) status: String,
379}
380
381/// Node operational state.
382#[derive(Clone, Copy, Debug, PartialEq, Eq)]
383pub enum NodeState {
384    /// Created but not started.
385    Created,
386    /// Starting up (initializing transports).
387    Starting,
388    /// Fully operational.
389    Running,
390    /// Shutting down.
391    Stopping,
392    /// Stopped.
393    Stopped,
394}
395
396impl NodeState {
397    /// Check if node is operational.
398    pub fn is_operational(&self) -> bool {
399        matches!(self, NodeState::Running)
400    }
401
402    /// Check if node can be started.
403    pub fn can_start(&self) -> bool {
404        matches!(self, NodeState::Created | NodeState::Stopped)
405    }
406
407    /// Check if node can be stopped.
408    pub fn can_stop(&self) -> bool {
409        matches!(self, NodeState::Running)
410    }
411}
412
413impl fmt::Display for NodeState {
414    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
415        let s = match self {
416            NodeState::Created => "created",
417            NodeState::Starting => "starting",
418            NodeState::Running => "running",
419            NodeState::Stopping => "stopping",
420            NodeState::Stopped => "stopped",
421        };
422        write!(f, "{}", s)
423    }
424}
425
426/// Recent request tracking for dedup and reverse-path forwarding.
427///
428/// When a LookupRequest is forwarded through a node, the node stores the
429/// request_id and which peer sent it. When the corresponding LookupResponse
430/// arrives, it's forwarded back to that peer (reverse-path forwarding).
431/// The `response_forwarded` flag prevents response routing loops.
432#[derive(Clone, Debug)]
433pub(crate) struct RecentRequest {
434    /// The peer who sent this request to us.
435    pub(crate) from_peer: NodeAddr,
436    /// When we received this request (Unix milliseconds).
437    pub(crate) timestamp_ms: u64,
438    /// Whether we've already forwarded a response for this request.
439    /// Prevents response routing loops when convergent request paths
440    /// create bidirectional entries in recent_requests.
441    pub(crate) response_forwarded: bool,
442}
443
444impl RecentRequest {
445    pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
446        Self {
447            from_peer,
448            timestamp_ms,
449            response_forwarded: false,
450        }
451    }
452
453    /// Check if this entry has expired (older than expiry_ms).
454    pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
455        current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
456    }
457}
458
459/// Key for addr_to_link reverse lookup.
460type AddrKey = (TransportId, TransportAddr);
461
462/// Per-transport kernel drop tracking for congestion detection.
463///
464/// Sampled every tick (1s). The `dropping` flag indicates whether new
465/// kernel drops were observed since the previous sample.
466#[derive(Debug, Default)]
467struct TransportDropState {
468    /// Previous `recv_drops` sample (cumulative counter).
469    prev_drops: u64,
470    /// True if drops increased since the last sample.
471    dropping: bool,
472}
473
474/// State for a link waiting for transport-level connection establishment.
475///
476/// For connection-oriented transports (TCP, Tor), the transport connect runs
477/// asynchronously. This struct holds the data needed to complete the handshake
478/// once the connection is ready.
479struct PendingConnect {
480    /// The link that was created for this connection.
481    link_id: LinkId,
482    /// Which transport is being used.
483    transport_id: TransportId,
484    /// The remote address being connected to.
485    remote_addr: TransportAddr,
486    /// The peer identity (for handshake initiation).
487    peer_identity: PeerIdentity,
488}
489
490/// A running FIPS node instance.
491///
492/// This is the top-level container holding all node state.
493///
494/// ## Peer Lifecycle
495///
496/// Peers go through two phases:
497/// 1. **Connection phase** (`connections`): Handshake in progress, indexed by LinkId
498/// 2. **Active phase** (`peers`): Authenticated, indexed by NodeAddr
499///
500/// The `addr_to_link` map enables dispatching incoming packets to the right
501/// connection before authentication completes.
502// Discovery lookup constants moved to config: node.discovery.attempt_timeouts_secs, node.discovery.ttl
503pub struct Node {
504    // === Identity ===
505    /// This node's cryptographic identity.
506    identity: Identity,
507
508    /// Random epoch generated at startup for peer restart detection.
509    /// Exchanged inside Noise handshake messages so peers can detect restarts.
510    startup_epoch: [u8; 8],
511
512    /// Instant when the node was created, for uptime reporting.
513    started_at: std::time::Instant,
514
515    // === Configuration ===
516    /// Loaded configuration.
517    config: Config,
518
519    // === State ===
520    /// Node operational state.
521    state: NodeState,
522
523    /// Whether this is a leaf-only node.
524    is_leaf_only: bool,
525
526    // === Spanning Tree ===
527    /// Local spanning tree state.
528    tree_state: TreeState,
529
530    // === Bloom Filter ===
531    /// Local Bloom filter state.
532    bloom_state: BloomState,
533
534    // === Routing ===
535    /// Address -> coordinates cache (from session setup and discovery).
536    coord_cache: CoordCache,
537    /// Locally learned reverse-path next-hop hints.
538    learned_routes: LearnedRouteTable,
539    /// Destinations whose direct first-hop path is temporarily suspect because
540    /// session-layer MMP observed sustained loss while using that direct path.
541    session_direct_degraded_until_ms: HashMap<NodeAddr, u64>,
542    /// Recent discovery requests (dedup + reverse-path forwarding).
543    /// Maps request_id → RecentRequest.
544    recent_requests: HashMap<u64, RecentRequest>,
545    /// Per-destination path MTU lookup, keyed by FipsAddress (mirrors
546    /// `coord_cache.entries[*].path_mtu`). Sync read-only access from
547    /// the TUN reader/writer threads at TCP MSS clamp time so the
548    /// SYN/SYN-ACK clamp can use the smaller of the local-egress floor
549    /// and the learned per-destination path MTU.
550    path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
551
552    // === Transports & Links ===
553    /// Active transports (owned by Node).
554    transports: HashMap<TransportId, TransportHandle>,
555    /// Per-transport kernel drop tracking for congestion detection.
556    transport_drops: HashMap<TransportId, TransportDropState>,
557    /// Active links.
558    links: HashMap<LinkId, Link>,
559    /// Reverse lookup: (transport_id, remote_addr) -> link_id.
560    addr_to_link: HashMap<AddrKey, LinkId>,
561
562    // === Packet Channel ===
563    /// Packet sender for transports.
564    packet_tx: Option<PacketTx>,
565    /// Packet receiver (for event loop).
566    packet_rx: Option<PacketRx>,
567
568    // === Connections (Handshake Phase) ===
569    /// Pending connections (handshake in progress).
570    /// Indexed by LinkId since we don't know the peer's identity yet.
571    connections: HashMap<LinkId, PeerConnection>,
572
573    // === Peers (Active Phase) ===
574    /// Authenticated peers.
575    /// Indexed by NodeAddr (verified identity).
576    peers: HashMap<NodeAddr, ActivePeer>,
577
578    // === End-to-End Sessions ===
579    /// Session table for end-to-end encrypted sessions.
580    /// Keyed by remote NodeAddr.
581    sessions: HashMap<NodeAddr, SessionEntry>,
582
583    // === Identity Cache ===
584    /// Maps FipsAddress prefix bytes (bytes 1-15) to cached peer identity data.
585    /// Enables reverse lookup from IPv6 destination to session/routing identity.
586    identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
587
588    // === Pending TUN Packets ===
589    /// Packets queued while waiting for session establishment.
590    /// Keyed by destination NodeAddr, bounded per-dest and total.
591    pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
592    /// Endpoint data payloads queued while waiting for session establishment.
593    pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
594    // === Pending Discovery Lookups ===
595    /// Tracks in-flight discovery lookups. Maps target NodeAddr to the
596    /// initiation timestamp (Unix ms). Prevents duplicate flood queries.
597    pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
598
599    // === Resource Limits ===
600    /// Maximum connections (0 = unlimited).
601    max_connections: usize,
602    /// Maximum peers (0 = unlimited).
603    max_peers: usize,
604    /// Maximum links (0 = unlimited).
605    max_links: usize,
606
607    // === Counters ===
608    /// Next link ID to allocate.
609    next_link_id: u64,
610    /// Next transport ID to allocate.
611    next_transport_id: u32,
612
613    // === Node Statistics ===
614    /// Routing, forwarding, discovery, and error signal counters.
615    stats: stats::NodeStats,
616
617    /// Time-series history of node-level metrics (1s/1m rings).
618    stats_history: stats_history::StatsHistory,
619
620    // === TUN Interface ===
621    /// TUN device state.
622    tun_state: TunState,
623    /// TUN interface name (for cleanup).
624    tun_name: Option<String>,
625    /// TUN packet sender channel.
626    tun_tx: Option<TunTx>,
627    /// Receiver for outbound packets from the TUN reader.
628    tun_outbound_rx: Option<TunOutboundRx>,
629    /// App-owned packet sink used by embedded/no-TUN integrations.
630    external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
631    /// Endpoint data command receiver used by embedded/no-daemon integrations.
632    endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
633    /// Endpoint data event sink used by embedded/no-daemon integrations.
634    endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
635    /// Off-task FMP-encrypt + UDP-send worker pool. `None` if not yet
636    /// spawned (set up in `start()` once transports are running).
637    /// `Some(pool)` once available; the pool internally holds
638    /// per-worker mpsc senders and round-robins jobs across them.
639    /// See `node::encrypt_worker` for the rationale and layout.
640    encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
641    /// Off-task FMP + FSP decrypt + delivery worker pool. Mirror of
642    /// `encrypt_workers` for the receive side.
643    decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
644    /// Set of sessions that have been registered with the decrypt
645    /// shard worker pool. Used by rx_loop to decide between fast-path
646    /// dispatch (worker owns the session) and legacy in-place decrypt
647    /// (worker doesn't have it yet). Per the data-plane restructure,
648    /// the worker owns its session state directly — there's no shared
649    /// `Arc<RwLock<HashMap>>` of cipher / replay state anymore, only
650    /// this set tracks **whether** the worker has been told about a
651    /// given session.
652    decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
653    /// Fallback channel: decrypt worker bounces non-fast-path packets
654    /// (anything that's not bulk EndpointData) back here for rx_loop
655    /// to handle via the legacy path. Drained by a new rx_loop arm.
656    decrypt_fallback_rx:
657        Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
658    decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
659    /// TUN reader thread handle.
660    tun_reader_handle: Option<JoinHandle<()>>,
661    /// TUN writer thread handle.
662    tun_writer_handle: Option<JoinHandle<()>>,
663    /// Shutdown pipe: writing to this fd unblocks the TUN reader thread on macOS.
664    /// On Linux, deleting the interface via netlink serves the same purpose.
665    #[cfg(target_os = "macos")]
666    tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
667
668    // === DNS Responder ===
669    /// Receiver for resolved identities from the DNS responder.
670    dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
671    /// DNS responder task handle.
672    dns_task: Option<tokio::task::JoinHandle<()>>,
673
674    // === Index-Based Session Dispatch ===
675    /// Allocator for session indices.
676    index_allocator: IndexAllocator,
677    /// O(1) lookup: (transport_id, our_index) → NodeAddr.
678    /// This maps our session index to the peer that uses it.
679    peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
680    /// Pending outbound handshakes by our sender_idx.
681    /// Tracks which LinkId corresponds to which session index.
682    pending_outbound: HashMap<(TransportId, u32), LinkId>,
683
684    // === Rate Limiting ===
685    /// Rate limiter for msg1 processing (DoS protection).
686    msg1_rate_limiter: HandshakeRateLimiter,
687    /// Rate limiter for ICMP Packet Too Big messages.
688    icmp_rate_limiter: IcmpRateLimiter,
689    /// Rate limiter for routing error signals (CoordsRequired / PathBroken).
690    routing_error_rate_limiter: RoutingErrorRateLimiter,
691    /// Rate limiter for source-side CoordsRequired/PathBroken responses.
692    coords_response_rate_limiter: RoutingErrorRateLimiter,
693    /// Backoff for failed discovery lookups (originator-side).
694    discovery_backoff: DiscoveryBackoff,
695    /// Rate limiter for forwarded discovery requests (transit-side).
696    discovery_forward_limiter: DiscoveryForwardRateLimiter,
697
698    // === Pending Transport Connects ===
699    /// Links waiting for transport-level connection establishment before
700    /// sending handshake msg1. For connection-oriented transports (TCP, Tor),
701    /// the transport connect runs in the background; the tick handler polls
702    /// connection_state() and initiates the handshake when connected.
703    pending_connects: Vec<PendingConnect>,
704
705    // === Connection Retry ===
706    /// Retry state for peers whose outbound connections have failed.
707    /// Keyed by NodeAddr. Entries are created when a handshake times out
708    /// or fails, and removed on successful promotion or when max retries
709    /// are exhausted.
710    retry_pending: HashMap<NodeAddr, retry::RetryState>,
711
712    /// Optional Nostr/STUN overlay discovery coordinator for `udp:nat` peers.
713    nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
714    /// mDNS / DNS-SD responder + browser for local-link peer discovery.
715    /// Identity is unverified at this layer — the Noise XX handshake
716    /// initiated against an mDNS-observed endpoint is what proves the
717    /// peer holds the matching private key.
718    lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
719    /// Same-host JSON registry under `~/.fips/instances`. Records are
720    /// loopback routing hints only; peer identity is still verified by the
721    /// Noise handshake.
722    local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
723    local_instance_started_at_ms: Option<u64>,
724    last_local_instance_publish_ms: Option<u64>,
725    last_local_instance_scan_ms: Option<u64>,
726    /// Wall-clock ms when Nostr discovery successfully started, used to
727    /// schedule the one-shot startup advert sweep after a settle delay.
728    /// `None` until discovery comes up; remains `None` if discovery is
729    /// disabled or failed to start.
730    nostr_discovery_started_at_ms: Option<u64>,
731    /// Whether the one-shot startup advert sweep has run. Set to true
732    /// after the first sweep fires (under `policy: open`); thereafter
733    /// only the per-tick `queue_open_discovery_retries` continues.
734    startup_open_discovery_sweep_done: bool,
735    /// Per-peer UDP transports adopted from NAT traversal handoff.
736    bootstrap_transports: HashSet<TransportId>,
737    /// Originating peer npub (bech32) for each adopted bootstrap
738    /// transport, captured at `adopt_established_traversal` time.
739    /// Populated alongside `bootstrap_transports`; cleared in
740    /// `cleanup_bootstrap_transport_if_unused`. Used by the rx loop to
741    /// route fatal-protocol-mismatch observations back to the
742    /// Nostr-discovery `failure_state` for long cooldown application.
743    bootstrap_transport_npubs: HashMap<TransportId, String>,
744    /// Peers that should not be used as reply-learned fallback transit for
745    /// other destinations. Direct lookups to the peer are still permitted.
746    discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
747
748    // === Periodic Parent Re-evaluation ===
749    /// Timestamp of last periodic parent re-evaluation (for pacing).
750    last_parent_reeval: Option<crate::time::Instant>,
751
752    // === Congestion Logging ===
753    /// Timestamp of last congestion detection log (rate-limited to 5s).
754    last_congestion_log: Option<std::time::Instant>,
755
756    // === Mesh Size Estimate ===
757    /// Cached estimated mesh size (computed once per tick from bloom filters).
758    estimated_mesh_size: Option<u64>,
759    /// Timestamp of last mesh size log emission.
760    last_mesh_size_log: Option<std::time::Instant>,
761
762    // === Bloom Self-Plausibility ===
763    /// Rate-limit state for the self-plausibility WARN. Fires at most
764    /// once per 60s globally when our own outgoing FilterAnnounce has
765    /// an FPR above `node.bloom.max_inbound_fpr`, signalling either
766    /// aggregation drift or an ingress bypass.
767    last_self_warn: Option<std::time::Instant>,
768
769    // === Local Outbound Liveness ===
770    /// Set when a `transport.send` returned a local-side io error
771    /// (`NetworkUnreachable` / `HostUnreachable` / `AddrNotAvailable`),
772    /// cleared on the next successful send. Used by
773    /// `check_link_heartbeats` to compress the dead-timeout to
774    /// `fast_link_dead_timeout_secs` while our outbound is observed
775    /// broken — direct kernel evidence beats waiting on receive-silence.
776    last_local_send_failure_at: Option<std::time::Instant>,
777    /// Set when the rx loop could not complete its 1s maintenance work
778    /// inside the watchdog timeout. Link-dead detection may be valid during
779    /// overload, but traversal cooldown should not punish a path just because
780    /// our own scheduler/worker queue was late.
781    last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
782
783    // === Display Names ===
784    /// Human-readable names for configured peers (alias or short npub).
785    /// Populated at startup from peer config.
786    peer_aliases: HashMap<NodeAddr, String>,
787    /// Scheduler weight for explicitly configured peers. Built when config
788    /// changes so the packet hot path only does a NodeAddr hash lookup.
789    configured_peer_send_weights: HashMap<NodeAddr, u8>,
790
791    /// Reloadable peer ACL state from standard allow/deny files.
792    peer_acl: acl::PeerAclReloader,
793
794    // === Host Map ===
795    /// Static hostname → npub mapping for DNS resolution.
796    /// Built at construction from peer aliases and /etc/fips/hosts.
797    host_map: Arc<HostMap>,
798}
799
800impl Node {
801    /// Create a new node from configuration.
802    pub fn new(config: Config) -> Result<Self, NodeError> {
803        config.validate()?;
804        let identity = config.create_identity()?;
805        let node_addr = *identity.node_addr();
806        let is_leaf_only = config.is_leaf_only();
807
808        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
809        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
810
811        let mut startup_epoch = [0u8; 8];
812        rand::rng().fill_bytes(&mut startup_epoch);
813
814        let mut bloom_state = if is_leaf_only {
815            BloomState::leaf_only(node_addr)
816        } else {
817            BloomState::new(node_addr)
818        };
819        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
820
821        let tun_state = if config.tun.enabled {
822            TunState::Configured
823        } else {
824            TunState::Disabled
825        };
826
827        // Initialize tree state with signed self-declaration
828        let mut tree_state = TreeState::new(node_addr);
829        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
830        tree_state.set_hold_down(config.node.tree.hold_down_secs);
831        tree_state.set_flap_dampening(
832            config.node.tree.flap_threshold,
833            config.node.tree.flap_window_secs,
834            config.node.tree.flap_dampening_secs,
835        );
836        tree_state
837            .sign_declaration(&identity)
838            .expect("signing own declaration should never fail");
839
840        let coord_cache = CoordCache::new(
841            config.node.cache.coord_size,
842            config.node.cache.coord_ttl_secs * 1000,
843        );
844        let rl = &config.node.rate_limit;
845        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
846            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
847            config.node.limits.max_pending_inbound,
848        );
849
850        let max_connections = config.node.limits.max_connections;
851        let max_peers = config.node.limits.max_peers;
852        let max_links = config.node.limits.max_links;
853        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
854        let backoff_base_secs = config.node.discovery.backoff_base_secs;
855        let backoff_max_secs = config.node.discovery.backoff_max_secs;
856        let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
857
858        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
859        let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
860
861        Ok(Self {
862            identity,
863            startup_epoch,
864            started_at: std::time::Instant::now(),
865            config,
866            state: NodeState::Created,
867            is_leaf_only,
868            tree_state,
869            bloom_state,
870            coord_cache,
871            learned_routes: LearnedRouteTable::default(),
872            session_direct_degraded_until_ms: HashMap::new(),
873            recent_requests: HashMap::new(),
874            transports: HashMap::new(),
875            transport_drops: HashMap::new(),
876            links: HashMap::new(),
877            addr_to_link: HashMap::new(),
878            packet_tx: None,
879            packet_rx: None,
880            connections: HashMap::new(),
881            peers: HashMap::new(),
882            sessions: HashMap::new(),
883            identity_cache: HashMap::new(),
884            pending_tun_packets: HashMap::new(),
885            pending_endpoint_data: HashMap::new(),
886            pending_lookups: HashMap::new(),
887            max_connections,
888            max_peers,
889            max_links,
890            next_link_id: 1,
891            next_transport_id: 1,
892            stats: stats::NodeStats::new(),
893            stats_history: stats_history::StatsHistory::new(),
894            tun_state,
895            tun_name: None,
896            tun_tx: None,
897            tun_outbound_rx: None,
898            external_packet_tx: None,
899            endpoint_command_rx: None,
900            endpoint_event_tx: None,
901            encrypt_workers: None,
902            decrypt_workers: None,
903            decrypt_registered_sessions: std::collections::HashSet::new(),
904            decrypt_fallback_tx,
905            decrypt_fallback_rx,
906            tun_reader_handle: None,
907            tun_writer_handle: None,
908            #[cfg(target_os = "macos")]
909            tun_shutdown_fd: None,
910            dns_identity_rx: None,
911            dns_task: None,
912            index_allocator: IndexAllocator::new(),
913            peers_by_index: HashMap::new(),
914            pending_outbound: HashMap::new(),
915            msg1_rate_limiter,
916            icmp_rate_limiter: IcmpRateLimiter::new(),
917            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
918            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
919                std::time::Duration::from_millis(coords_response_interval_ms),
920            ),
921            discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
922            discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
923                std::time::Duration::from_secs(forward_min_interval_secs),
924            ),
925            pending_connects: Vec::new(),
926            retry_pending: HashMap::new(),
927            nostr_discovery: None,
928            nostr_discovery_started_at_ms: None,
929            lan_discovery: None,
930            local_instance_registry: None,
931            local_instance_started_at_ms: None,
932            last_local_instance_publish_ms: None,
933            last_local_instance_scan_ms: None,
934            startup_open_discovery_sweep_done: false,
935            bootstrap_transports: HashSet::new(),
936            bootstrap_transport_npubs: HashMap::new(),
937            discovery_fallback_transit_blocked_peers: HashSet::new(),
938            last_parent_reeval: None,
939            last_congestion_log: None,
940            estimated_mesh_size: None,
941            last_mesh_size_log: None,
942            last_self_warn: None,
943            last_local_send_failure_at: None,
944            last_rx_loop_maintenance_timeout_at: None,
945            peer_aliases: HashMap::new(),
946            configured_peer_send_weights,
947            peer_acl,
948            host_map,
949            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
950        })
951    }
952
953    /// Create a node with a specific identity.
954    ///
955    /// This constructor validates cross-field config invariants before
956    /// constructing the node, same as [`Node::new`].
957    pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
958        config.validate()?;
959        let node_addr = *identity.node_addr();
960
961        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
962        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
963
964        let mut startup_epoch = [0u8; 8];
965        rand::rng().fill_bytes(&mut startup_epoch);
966
967        let tun_state = if config.tun.enabled {
968            TunState::Configured
969        } else {
970            TunState::Disabled
971        };
972
973        // Initialize tree state with signed self-declaration
974        let mut tree_state = TreeState::new(node_addr);
975        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
976        tree_state.set_hold_down(config.node.tree.hold_down_secs);
977        tree_state.set_flap_dampening(
978            config.node.tree.flap_threshold,
979            config.node.tree.flap_window_secs,
980            config.node.tree.flap_dampening_secs,
981        );
982        tree_state
983            .sign_declaration(&identity)
984            .expect("signing own declaration should never fail");
985
986        let mut bloom_state = BloomState::new(node_addr);
987        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
988
989        let coord_cache = CoordCache::new(
990            config.node.cache.coord_size,
991            config.node.cache.coord_ttl_secs * 1000,
992        );
993        let rl = &config.node.rate_limit;
994        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
995            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
996            config.node.limits.max_pending_inbound,
997        );
998
999        let max_connections = config.node.limits.max_connections;
1000        let max_peers = config.node.limits.max_peers;
1001        let max_links = config.node.limits.max_links;
1002        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1003
1004        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1005        let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1006
1007        Ok(Self {
1008            identity,
1009            startup_epoch,
1010            started_at: std::time::Instant::now(),
1011            config,
1012            state: NodeState::Created,
1013            is_leaf_only: false,
1014            tree_state,
1015            bloom_state,
1016            coord_cache,
1017            learned_routes: LearnedRouteTable::default(),
1018            session_direct_degraded_until_ms: HashMap::new(),
1019            recent_requests: HashMap::new(),
1020            transports: HashMap::new(),
1021            transport_drops: HashMap::new(),
1022            links: HashMap::new(),
1023            addr_to_link: HashMap::new(),
1024            packet_tx: None,
1025            packet_rx: None,
1026            connections: HashMap::new(),
1027            peers: HashMap::new(),
1028            sessions: HashMap::new(),
1029            identity_cache: HashMap::new(),
1030            pending_tun_packets: HashMap::new(),
1031            pending_endpoint_data: HashMap::new(),
1032            pending_lookups: HashMap::new(),
1033            max_connections,
1034            max_peers,
1035            max_links,
1036            next_link_id: 1,
1037            next_transport_id: 1,
1038            stats: stats::NodeStats::new(),
1039            stats_history: stats_history::StatsHistory::new(),
1040            tun_state,
1041            tun_name: None,
1042            tun_tx: None,
1043            tun_outbound_rx: None,
1044            external_packet_tx: None,
1045            endpoint_command_rx: None,
1046            endpoint_event_tx: None,
1047            encrypt_workers: None,
1048            decrypt_workers: None,
1049            decrypt_registered_sessions: std::collections::HashSet::new(),
1050            decrypt_fallback_tx,
1051            decrypt_fallback_rx,
1052            tun_reader_handle: None,
1053            tun_writer_handle: None,
1054            #[cfg(target_os = "macos")]
1055            tun_shutdown_fd: None,
1056            dns_identity_rx: None,
1057            dns_task: None,
1058            index_allocator: IndexAllocator::new(),
1059            peers_by_index: HashMap::new(),
1060            pending_outbound: HashMap::new(),
1061            msg1_rate_limiter,
1062            icmp_rate_limiter: IcmpRateLimiter::new(),
1063            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1064            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1065                std::time::Duration::from_millis(coords_response_interval_ms),
1066            ),
1067            discovery_backoff: DiscoveryBackoff::new(),
1068            discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1069            pending_connects: Vec::new(),
1070            retry_pending: HashMap::new(),
1071            nostr_discovery: None,
1072            nostr_discovery_started_at_ms: None,
1073            lan_discovery: None,
1074            local_instance_registry: None,
1075            local_instance_started_at_ms: None,
1076            last_local_instance_publish_ms: None,
1077            last_local_instance_scan_ms: None,
1078            startup_open_discovery_sweep_done: false,
1079            bootstrap_transports: HashSet::new(),
1080            bootstrap_transport_npubs: HashMap::new(),
1081            discovery_fallback_transit_blocked_peers: HashSet::new(),
1082            last_parent_reeval: None,
1083            last_congestion_log: None,
1084            estimated_mesh_size: None,
1085            last_mesh_size_log: None,
1086            last_self_warn: None,
1087            last_local_send_failure_at: None,
1088            last_rx_loop_maintenance_timeout_at: None,
1089            peer_aliases: HashMap::new(),
1090            configured_peer_send_weights,
1091            peer_acl,
1092            host_map,
1093            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1094        })
1095    }
1096
1097    /// Create a leaf-only node (simplified state).
1098    pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1099        let mut node = Self::new(config)?;
1100        node.is_leaf_only = true;
1101        node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1102        Ok(node)
1103    }
1104
1105    fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1106        let base_host_map = HostMap::from_peer_configs(config.peers());
1107        if !config.node.system_files_enabled {
1108            return (
1109                Arc::new(base_host_map.clone()),
1110                acl::PeerAclReloader::memory_only(base_host_map),
1111            );
1112        }
1113
1114        let mut host_map = base_host_map.clone();
1115        let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1116        let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1117            crate::upper::hosts::DEFAULT_HOSTS_PATH,
1118        ));
1119        host_map.merge(hosts_file);
1120        let peer_acl = acl::PeerAclReloader::with_alias_sources(
1121            std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1122            std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1123            base_host_map,
1124            hosts_path,
1125        );
1126        (Arc::new(host_map), peer_acl)
1127    }
1128
1129    fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1130        config
1131            .peers()
1132            .iter()
1133            .filter_map(|peer| {
1134                PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1135                    (
1136                        *identity.node_addr(),
1137                        encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1138                    )
1139                })
1140            })
1141            .collect()
1142    }
1143
1144    fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1145        self.configured_peer_send_weights
1146            .get(peer_addr)
1147            .copied()
1148            .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1149    }
1150
1151    /// Create transport instances from configuration.
1152    ///
1153    /// Returns a vector of TransportHandles for all configured transports.
1154    async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1155        let mut transports = Vec::new();
1156
1157        // Collect UDP configs with optional names to avoid borrow conflicts
1158        let udp_instances: Vec<_> = self
1159            .config
1160            .transports
1161            .udp
1162            .iter()
1163            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1164            .collect();
1165
1166        // Create UDP transport instances
1167        for (name, udp_config) in udp_instances {
1168            let transport_id = self.allocate_transport_id();
1169            let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1170            transports.push(TransportHandle::Udp(udp));
1171        }
1172
1173        #[cfg(feature = "sim-transport")]
1174        {
1175            let sim_instances: Vec<_> = self
1176                .config
1177                .transports
1178                .sim
1179                .iter()
1180                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1181                .collect();
1182
1183            for (name, sim_config) in sim_instances {
1184                let transport_id = self.allocate_transport_id();
1185                let sim = crate::transport::sim::SimTransport::new(
1186                    transport_id,
1187                    name,
1188                    sim_config,
1189                    packet_tx.clone(),
1190                );
1191                transports.push(TransportHandle::Sim(sim));
1192            }
1193        }
1194
1195        // Create Ethernet transport instances where raw-socket support exists.
1196        #[cfg(any(target_os = "linux", target_os = "macos"))]
1197        {
1198            let eth_instances: Vec<_> = self
1199                .config
1200                .transports
1201                .ethernet
1202                .iter()
1203                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1204                .collect();
1205            let xonly = self.identity.pubkey();
1206            for (name, eth_config) in eth_instances {
1207                let mut eth_config = eth_config;
1208                if eth_config.discovery_scope.is_none() {
1209                    eth_config.discovery_scope = self.lan_discovery_scope();
1210                }
1211                let transport_id = self.allocate_transport_id();
1212                let mut eth =
1213                    EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1214                eth.set_local_pubkey(xonly);
1215                transports.push(TransportHandle::Ethernet(eth));
1216            }
1217        }
1218
1219        // Create TCP transport instances
1220        let tcp_instances: Vec<_> = self
1221            .config
1222            .transports
1223            .tcp
1224            .iter()
1225            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1226            .collect();
1227
1228        for (name, tcp_config) in tcp_instances {
1229            let transport_id = self.allocate_transport_id();
1230            let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1231            transports.push(TransportHandle::Tcp(tcp));
1232        }
1233
1234        // Create Tor transport instances
1235        let tor_instances: Vec<_> = self
1236            .config
1237            .transports
1238            .tor
1239            .iter()
1240            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1241            .collect();
1242
1243        for (name, tor_config) in tor_instances {
1244            let transport_id = self.allocate_transport_id();
1245            let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1246            transports.push(TransportHandle::Tor(tor));
1247        }
1248
1249        let webrtc_instances: Vec<_> = self
1250            .config
1251            .transports
1252            .webrtc
1253            .iter()
1254            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1255            .collect();
1256
1257        #[cfg(feature = "webrtc-transport")]
1258        {
1259            for (name, webrtc_config) in webrtc_instances {
1260                let transport_id = self.allocate_transport_id();
1261                match WebRtcTransport::new(
1262                    transport_id,
1263                    name,
1264                    webrtc_config,
1265                    packet_tx.clone(),
1266                    &self.identity,
1267                    &self.config.node.discovery.nostr,
1268                ) {
1269                    Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1270                    Err(err) => {
1271                        warn!(
1272                            transport_id = %transport_id,
1273                            error = %err,
1274                            "failed to initialize WebRTC transport"
1275                        );
1276                    }
1277                }
1278            }
1279        }
1280        #[cfg(not(feature = "webrtc-transport"))]
1281        if !webrtc_instances.is_empty() {
1282            warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1283        }
1284
1285        // Create BLE transport instances
1286        #[cfg(bluer_available)]
1287        {
1288            let ble_instances: Vec<_> = self
1289                .config
1290                .transports
1291                .ble
1292                .iter()
1293                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1294                .collect();
1295
1296            #[cfg(all(bluer_available, not(test)))]
1297            for (name, ble_config) in ble_instances {
1298                let transport_id = self.allocate_transport_id();
1299                let adapter = ble_config.adapter().to_string();
1300                let mtu = ble_config.mtu();
1301                match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1302                    Ok(io) => {
1303                        let mut ble = crate::transport::ble::BleTransport::new(
1304                            transport_id,
1305                            name,
1306                            ble_config,
1307                            io,
1308                            packet_tx.clone(),
1309                        );
1310                        ble.set_local_pubkey(self.identity.pubkey().serialize());
1311                        transports.push(TransportHandle::Ble(ble));
1312                    }
1313                    Err(e) => {
1314                        tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1315                    }
1316                }
1317            }
1318
1319            #[cfg(any(not(bluer_available), test))]
1320            if !ble_instances.is_empty() {
1321                #[cfg(not(test))]
1322                tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1323            }
1324        }
1325
1326        transports
1327    }
1328
1329    /// Find an operational transport that matches the given transport type name.
1330    ///
1331    /// Adopted UDP bootstrap transports are point-to-point sockets handed off
1332    /// from Nostr/STUN traversal. They must not be reused for ordinary
1333    /// `udp host:port` dials discovered through static config, mDNS, or overlay
1334    /// adverts: on macOS a `send_to` through the wrong adopted socket can fail
1335    /// with `EINVAL`, and even on platforms that allow it the packet would use
1336    /// the wrong 5-tuple/NAT mapping. Prefer configured transports and make the
1337    /// choice deterministic by lowest transport id instead of HashMap order.
1338    fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1339        self.transports
1340            .iter()
1341            .filter(|(id, handle)| {
1342                handle.transport_type().name == transport_type
1343                    && handle.is_operational()
1344                    && !self.bootstrap_transports.contains(id)
1345            })
1346            .min_by_key(|(id, _)| id.as_u32())
1347            .map(|(id, _)| *id)
1348    }
1349
1350    /// Resolve an Ethernet peer address ("interface/mac") to a transport ID
1351    /// and binary TransportAddr.
1352    ///
1353    /// Finds the Ethernet transport instance bound to the named interface
1354    /// and parses the MAC portion into a 6-byte TransportAddr.
1355    #[allow(unused_variables)]
1356    fn resolve_ethernet_addr(
1357        &self,
1358        addr_str: &str,
1359    ) -> Result<(TransportId, TransportAddr), NodeError> {
1360        #[cfg(any(target_os = "linux", target_os = "macos"))]
1361        {
1362            let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1363                NodeError::NoTransportForType(format!(
1364                    "invalid Ethernet address format '{}': expected 'interface/mac'",
1365                    addr_str
1366                ))
1367            })?;
1368
1369            // Find the Ethernet transport bound to this interface
1370            let transport_id = self
1371                .transports
1372                .iter()
1373                .find(|(_, handle)| {
1374                    handle.transport_type().name == "ethernet"
1375                        && handle.is_operational()
1376                        && handle.interface_name() == Some(iface)
1377                })
1378                .map(|(id, _)| *id)
1379                .ok_or_else(|| {
1380                    NodeError::NoTransportForType(format!(
1381                        "no operational Ethernet transport for interface '{}'",
1382                        iface
1383                    ))
1384                })?;
1385
1386            let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1387                NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1388            })?;
1389
1390            Ok((transport_id, TransportAddr::from_bytes(&mac)))
1391        }
1392        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1393        {
1394            Err(NodeError::NoTransportForType(
1395                "Ethernet transport is not supported on this platform".to_string(),
1396            ))
1397        }
1398    }
1399
1400    /// Resolve a BLE address string (`"adapter/AA:BB:CC:DD:EE:FF"`) to a
1401    /// (TransportId, TransportAddr) pair by finding the BLE transport
1402    /// instance matching the adapter name.
1403    #[cfg(bluer_available)]
1404    fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1405        let ta = TransportAddr::from_string(addr_str);
1406        let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1407            NodeError::NoTransportForType(format!(
1408                "invalid BLE address format '{}': expected 'adapter/mac'",
1409                addr_str
1410            ))
1411        })?;
1412
1413        // Find the BLE transport for this adapter
1414        let transport_id = self
1415            .transports
1416            .iter()
1417            .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1418            .map(|(id, _)| *id)
1419            .ok_or_else(|| {
1420                NodeError::NoTransportForType(format!(
1421                    "no operational BLE transport for adapter '{}'",
1422                    adapter
1423                ))
1424            })?;
1425
1426        // Validate the address format
1427        crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1428            NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1429        })?;
1430
1431        Ok((transport_id, TransportAddr::from_string(addr_str)))
1432    }
1433
1434    // === Identity Accessors ===
1435
1436    /// Get this node's identity.
1437    pub fn identity(&self) -> &Identity {
1438        &self.identity
1439    }
1440
1441    /// Get this node's NodeAddr.
1442    pub fn node_addr(&self) -> &NodeAddr {
1443        self.identity.node_addr()
1444    }
1445
1446    /// Get this node's npub.
1447    pub fn npub(&self) -> String {
1448        self.identity.npub()
1449    }
1450
1451    /// Return a human-readable display name for a NodeAddr.
1452    ///
1453    /// Lookup order:
1454    /// 1. Host map hostname (from peer aliases + /etc/fips/hosts)
1455    /// 2. Configured peer alias or short npub (from startup map)
1456    /// 3. Active peer's short npub (e.g., inbound peer not in config)
1457    /// 4. Session endpoint's short npub (end-to-end, may not be direct peer)
1458    /// 5. Truncated NodeAddr hex (unknown address)
1459    pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1460        if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1461            return hostname.to_string();
1462        }
1463        if let Some(name) = self.peer_aliases.get(addr) {
1464            return name.clone();
1465        }
1466        if let Some(peer) = self.peers.get(addr) {
1467            return peer.identity().short_npub();
1468        }
1469        if let Some(entry) = self.sessions.get(addr) {
1470            let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1471            return PeerIdentity::from_pubkey(xonly).short_npub();
1472        }
1473        addr.short_hex()
1474    }
1475
1476    /// Tear down a `peers_by_index` entry **and** keep the shard-owned
1477    /// decrypt-worker state coherent: removes the same `cache_key`
1478    /// from the registered-sessions tracking set and tells the
1479    /// assigned shard worker to drop its `OwnedSessionState` entry.
1480    ///
1481    /// Use this instead of a bare `self.peers_by_index.remove(&key)`
1482    /// at every session-lifecycle teardown site (rekey cross-connection
1483    /// swap, peer disconnect, dispatch session-rotation) so the worker
1484    /// doesn't keep stale ciphers / replay windows around. The
1485    /// follow-up `RegisterSession` for the NEW key (if any) will then
1486    /// install the fresh state on the same shard.
1487    pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1488        // Find the peer that owns this index BEFORE removing it from
1489        // the index map, so we can decide whether the deregistration
1490        // also tears down the peer's connected UDP socket.
1491        let owning_peer = self.peers_by_index.get(&cache_key).copied();
1492        self.peers_by_index.remove(&cache_key);
1493        if self.decrypt_registered_sessions.remove(&cache_key)
1494            && let Some(workers) = self.decrypt_workers.as_ref()
1495        {
1496            workers.unregister_session(cache_key);
1497        }
1498        // Tear down the per-peer connected UDP socket *only* if no
1499        // other peers_by_index entry still resolves to this peer.
1500        // Rekey drain calls into this helper with the OLD session
1501        // index while the NEW index is already installed and points
1502        // at the same peer — there the connect()-ed 5-tuple is
1503        // still valid for the new session and we must not close it.
1504        // Peer-teardown sites (CrossConnection swap, stale-index
1505        // fall-through in encrypted.rs, disconnect handler) call
1506        // here when this is the peer's last index, so the connected
1507        // socket goes away with the peer.
1508        if let Some(peer_addr) = owning_peer {
1509            let peer_has_other_index = self
1510                .peers_by_index
1511                .values()
1512                .any(|other| *other == peer_addr);
1513            if !peer_has_other_index {
1514                self.clear_connected_udp_for_peer(&peer_addr);
1515            }
1516        }
1517    }
1518
1519    /// Ensure the current FMP receive index resolves to this peer.
1520    ///
1521    /// Rekey msg1/msg2 handlers pre-register the pending index before
1522    /// cutover, but losing that registration in a debug build used to
1523    /// panic in the cutover path. Repairing the map here is safe: the
1524    /// peer has already promoted the pending session, and the decrypt
1525    /// worker registration immediately after cutover depends on the
1526    /// same `(transport_id, our_index)` key.
1527    pub(in crate::node) fn ensure_current_session_index_registered(
1528        &mut self,
1529        node_addr: &NodeAddr,
1530        context: &'static str,
1531    ) -> bool {
1532        let Some(peer) = self.peers.get(node_addr) else {
1533            return false;
1534        };
1535        let Some(transport_id) = peer.transport_id() else {
1536            warn!(
1537                peer = %self.peer_display_name(node_addr),
1538                context,
1539                "Cannot register current session index without transport id"
1540            );
1541            return false;
1542        };
1543        let Some(our_index) = peer.our_index() else {
1544            warn!(
1545                peer = %self.peer_display_name(node_addr),
1546                context,
1547                "Cannot register current session index without local index"
1548            );
1549            return false;
1550        };
1551
1552        let cache_key = (transport_id, our_index.as_u32());
1553        match self.peers_by_index.get(&cache_key).copied() {
1554            Some(existing) if existing == *node_addr => true,
1555            Some(existing) => {
1556                warn!(
1557                    peer = %self.peer_display_name(node_addr),
1558                    previous_owner = %self.peer_display_name(&existing),
1559                    transport_id = %transport_id,
1560                    our_index = %our_index,
1561                    context,
1562                    "Repairing current session index with stale owner"
1563                );
1564                self.peers_by_index.insert(cache_key, *node_addr);
1565                true
1566            }
1567            None => {
1568                warn!(
1569                    peer = %self.peer_display_name(node_addr),
1570                    transport_id = %transport_id,
1571                    our_index = %our_index,
1572                    context,
1573                    "Repairing missing current session index"
1574                );
1575                self.peers_by_index.insert(cache_key, *node_addr);
1576                true
1577            }
1578        }
1579    }
1580
1581    // === Configuration ===
1582
1583    /// Get the configuration.
1584    pub fn config(&self) -> &Config {
1585        &self.config
1586    }
1587
1588    /// Calculate the effective IPv6 MTU that can be sent over FIPS.
1589    ///
1590    /// Delegates to `upper::icmp::effective_ipv6_mtu()` with this node's
1591    /// transport MTU. Returns the maximum IPv6 packet size (including
1592    /// IPv6 header) that can be transmitted through the FIPS mesh.
1593    pub fn effective_ipv6_mtu(&self) -> u16 {
1594        crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1595    }
1596
1597    /// Get the transport MTU governing the global TUN-boundary MSS clamp.
1598    ///
1599    /// Returns the **minimum** MTU across all operational transports, or
1600    /// 1280 (IPv6 minimum) as fallback. Used for initial TUN configuration
1601    /// where a specific egress transport isn't yet known: the resulting
1602    /// `effective_ipv6_mtu` (transport_mtu - 77) and `max_mss`
1603    /// (effective_mtu - 60) form a conservative ceiling that fits ANY
1604    /// configured-transport's egress, eliminating PMTU-D black holes that
1605    /// would otherwise occur when a flow's actual egress is smaller than
1606    /// the clamp ceiling assumed at TUN init.
1607    ///
1608    /// Returning the smallest (rather than the first-iterated, which used
1609    /// to vary across HashMap iteration order + async-startup race) makes
1610    /// the clamp deterministic across daemon restarts.
1611    ///
1612    /// See `ISSUE-2026-0011` for the empirical investigation.
1613    pub fn transport_mtu(&self) -> u16 {
1614        let min_operational = self
1615            .transports
1616            .values()
1617            .filter(|h| h.is_operational())
1618            .map(|h| h.mtu())
1619            .min();
1620        if let Some(mtu) = min_operational {
1621            return mtu;
1622        }
1623        // Fallback to config: try UDP first, then Ethernet
1624        if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1625            return cfg.mtu();
1626        }
1627        1280
1628    }
1629
1630    // === State ===
1631
1632    /// Get the node state.
1633    pub fn state(&self) -> NodeState {
1634        self.state
1635    }
1636
1637    /// Get the node uptime.
1638    pub fn uptime(&self) -> std::time::Duration {
1639        self.started_at.elapsed()
1640    }
1641
1642    /// Check if node is operational.
1643    pub fn is_running(&self) -> bool {
1644        self.state.is_operational()
1645    }
1646
1647    /// Check if this is a leaf-only node.
1648    pub fn is_leaf_only(&self) -> bool {
1649        self.is_leaf_only
1650    }
1651
1652    // === Tree State ===
1653
1654    /// Get the tree state.
1655    pub fn tree_state(&self) -> &TreeState {
1656        &self.tree_state
1657    }
1658
1659    /// Get mutable tree state.
1660    pub fn tree_state_mut(&mut self) -> &mut TreeState {
1661        &mut self.tree_state
1662    }
1663
1664    // === Bloom State ===
1665
1666    /// Get the Bloom filter state.
1667    pub fn bloom_state(&self) -> &BloomState {
1668        &self.bloom_state
1669    }
1670
1671    /// Get mutable Bloom filter state.
1672    pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1673        &mut self.bloom_state
1674    }
1675
1676    // === Mesh Size Estimate ===
1677
1678    /// Get the cached estimated mesh size.
1679    pub fn estimated_mesh_size(&self) -> Option<u64> {
1680        self.estimated_mesh_size
1681    }
1682
1683    /// Compute and cache the estimated mesh size from bloom filters.
1684    ///
1685    /// Uses the spanning tree partition: parent's filter covers nodes reachable
1686    /// upward, children's filters cover disjoint subtrees downward. The sum
1687    /// of estimated entry counts plus one (self) approximates total network size.
1688    pub(crate) fn compute_mesh_size(&mut self) {
1689        let my_addr = *self.tree_state.my_node_addr();
1690        let parent_id = *self.tree_state.my_declaration().parent_id();
1691        let is_root = self.tree_state.is_root();
1692
1693        let max_fpr = self.config.node.bloom.max_inbound_fpr;
1694        let mut total: f64 = 1.0; // count self
1695        let mut child_count: u32 = 0;
1696        let mut has_data = false;
1697
1698        // Parent's filter: nodes reachable upward through the tree.
1699        // If any contributing filter is above the FPR cap, we refuse to
1700        // estimate rather than substitute a partial/biased aggregate —
1701        // Node.estimated_mesh_size is already Option<u64> and consumers
1702        // (control socket, fipstop, periodic debug log) handle None.
1703        if !is_root
1704            && let Some(parent) = self.peers.get(&parent_id)
1705            && let Some(filter) = parent.inbound_filter()
1706        {
1707            match filter.estimated_count(max_fpr) {
1708                Some(n) => {
1709                    total += n;
1710                    has_data = true;
1711                }
1712                None => {
1713                    self.estimated_mesh_size = None;
1714                    return;
1715                }
1716            }
1717        }
1718
1719        // Children's filters: each child's subtree is disjoint
1720        for (peer_addr, peer) in &self.peers {
1721            if peer_addr == &parent_id {
1722                continue;
1723            }
1724            if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1725                && *decl.parent_id() == my_addr
1726            {
1727                child_count += 1;
1728                if let Some(filter) = peer.inbound_filter() {
1729                    match filter.estimated_count(max_fpr) {
1730                        Some(n) => {
1731                            total += n;
1732                            has_data = true;
1733                        }
1734                        None => {
1735                            self.estimated_mesh_size = None;
1736                            return;
1737                        }
1738                    }
1739                }
1740            }
1741        }
1742
1743        if !has_data {
1744            self.estimated_mesh_size = None;
1745            return;
1746        }
1747
1748        let size = total.round() as u64;
1749        self.estimated_mesh_size = Some(size);
1750
1751        // Periodic logging (reuse MMP default interval: 30s)
1752        let now = std::time::Instant::now();
1753        let should_log = match self.last_mesh_size_log {
1754            None => true,
1755            Some(last) => {
1756                now.duration_since(last)
1757                    >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1758            }
1759        };
1760        if should_log {
1761            tracing::debug!(
1762                estimated_mesh_size = size,
1763                peers = self.peers.len(),
1764                children = child_count,
1765                "Mesh size estimate"
1766            );
1767            self.last_mesh_size_log = Some(now);
1768        }
1769    }
1770
1771    // === Coord Cache ===
1772
1773    /// Get the coordinate cache.
1774    pub fn coord_cache(&self) -> &CoordCache {
1775        &self.coord_cache
1776    }
1777
1778    /// Get mutable coordinate cache.
1779    pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1780        &mut self.coord_cache
1781    }
1782
1783    // === Node Statistics ===
1784
1785    /// Get the node statistics.
1786    pub fn stats(&self) -> &stats::NodeStats {
1787        &self.stats
1788    }
1789
1790    /// Get mutable node statistics.
1791    pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1792        &mut self.stats
1793    }
1794
1795    /// Get the stats history collector.
1796    pub fn stats_history(&self) -> &stats_history::StatsHistory {
1797        &self.stats_history
1798    }
1799
1800    /// Sample the current node state into the stats history ring.
1801    /// Called once per tick from the RX loop.
1802    pub(crate) fn record_stats_history(&mut self) {
1803        let fwd = &self.stats.forwarding;
1804        let peers_with_mmp: Vec<f64> = self
1805            .peers
1806            .values()
1807            .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1808            .collect();
1809        let loss_rate = if peers_with_mmp.is_empty() {
1810            0.0
1811        } else {
1812            peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1813        };
1814
1815        let snap = stats_history::Snapshot {
1816            mesh_size: self.estimated_mesh_size,
1817            tree_depth: self.tree_state.my_coords().depth() as u32,
1818            peer_count: self.peers.len() as u64,
1819            parent_switches_total: self.stats.tree.parent_switches,
1820            bytes_in_total: fwd.received_bytes,
1821            bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1822            packets_in_total: fwd.received_packets,
1823            packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1824            loss_rate,
1825            active_sessions: self.sessions.len() as u64,
1826        };
1827
1828        let now = std::time::Instant::now();
1829        let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1830            .peers
1831            .values()
1832            .map(|p| {
1833                let stats = p.link_stats();
1834                let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1835                    Some(m) => (
1836                        m.metrics.srtt_ms(),
1837                        Some(m.metrics.loss_rate()),
1838                        m.receiver.ecn_ce_count() as u64,
1839                    ),
1840                    None => (None, None, 0),
1841                };
1842                stats_history::PeerSnapshot {
1843                    node_addr: *p.node_addr(),
1844                    last_seen: now,
1845                    srtt_ms,
1846                    loss_rate,
1847                    bytes_in_total: stats.bytes_recv,
1848                    bytes_out_total: stats.bytes_sent,
1849                    packets_in_total: stats.packets_recv,
1850                    packets_out_total: stats.packets_sent,
1851                    ecn_ce_total: ecn_ce,
1852                }
1853            })
1854            .collect();
1855
1856        self.stats_history.tick(now, &snap, &peer_snaps);
1857    }
1858
1859    // === TUN Interface ===
1860
1861    /// Get the TUN state.
1862    pub fn tun_state(&self) -> TunState {
1863        self.tun_state
1864    }
1865
1866    /// Get the TUN interface name, if active.
1867    pub fn tun_name(&self) -> Option<&str> {
1868        self.tun_name.as_deref()
1869    }
1870
1871    // === Resource Limits ===
1872
1873    /// Set the maximum number of connections (handshake phase).
1874    pub fn set_max_connections(&mut self, max: usize) {
1875        self.max_connections = max;
1876    }
1877
1878    /// Set the maximum number of peers (authenticated).
1879    pub fn set_max_peers(&mut self, max: usize) {
1880        self.max_peers = max;
1881    }
1882
1883    /// Returns false when starting more outbound work would exceed a resource
1884    /// cap. A cap of `0` means uncapped.
1885    pub(crate) fn outbound_admission_check(&self) -> bool {
1886        let connection_used = self
1887            .connections
1888            .len()
1889            .saturating_add(self.pending_connects.len());
1890        let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
1891        let connection_allowed =
1892            self.max_connections == 0 || connection_used < self.max_connections;
1893        let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1894        peer_allowed && connection_allowed && link_allowed
1895    }
1896
1897    /// Admission for public/open-discovery outbound work. This includes the
1898    /// general connection/link caps and, when open Nostr discovery is enabled,
1899    /// the configured non-peer budget.
1900    pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
1901        if !self.outbound_admission_check() {
1902            return false;
1903        }
1904
1905        let nostr = &self.config.node.discovery.nostr;
1906        if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
1907            return true;
1908        }
1909
1910        let configured_npubs = self
1911            .config
1912            .peers()
1913            .iter()
1914            .map(|peer| peer.npub.clone())
1915            .collect::<HashSet<_>>();
1916        self.open_discovery_enqueue_budget(&configured_npubs) > 0
1917    }
1918
1919    /// Like `outbound_admission_check`, but for racing a better path to a
1920    /// peer that is already authenticated. This may temporarily add a
1921    /// connection/link, but it does not consume a new peer slot.
1922    pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
1923        let connection_used = self
1924            .connections
1925            .len()
1926            .saturating_add(self.pending_connects.len());
1927        let connection_allowed =
1928            self.max_connections == 0 || connection_used < self.max_connections;
1929        let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1930        connection_allowed && link_allowed
1931    }
1932
1933    /// Set the maximum number of links.
1934    pub fn set_max_links(&mut self, max: usize) {
1935        self.max_links = max;
1936    }
1937
1938    // === Counts ===
1939
1940    /// Number of pending connections (handshake in progress).
1941    pub fn connection_count(&self) -> usize {
1942        self.connections.len()
1943    }
1944
1945    /// Number of authenticated peers.
1946    pub fn peer_count(&self) -> usize {
1947        self.peers.len()
1948    }
1949
1950    /// Number of active links.
1951    pub fn link_count(&self) -> usize {
1952        self.links.len()
1953    }
1954
1955    /// Number of active transports.
1956    pub fn transport_count(&self) -> usize {
1957        self.transports.len()
1958    }
1959
1960    // === Transport Management ===
1961
1962    /// Allocate a new transport ID.
1963    pub fn allocate_transport_id(&mut self) -> TransportId {
1964        let id = TransportId::new(self.next_transport_id);
1965        self.next_transport_id += 1;
1966        id
1967    }
1968
1969    /// Get a transport by ID.
1970    pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1971        self.transports.get(id)
1972    }
1973
1974    /// Get mutable transport by ID.
1975    pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1976        self.transports.get_mut(id)
1977    }
1978
1979    /// Iterate over transport IDs.
1980    pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1981        self.transports.keys()
1982    }
1983
1984    /// Get the packet receiver for the event loop.
1985    pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1986        self.packet_rx.as_mut()
1987    }
1988
1989    // === Link Management ===
1990
1991    /// Allocate a new link ID.
1992    pub fn allocate_link_id(&mut self) -> LinkId {
1993        let id = LinkId::new(self.next_link_id);
1994        self.next_link_id += 1;
1995        id
1996    }
1997
1998    /// Add a link.
1999    pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
2000        if self.max_links > 0 && self.links.len() >= self.max_links {
2001            return Err(NodeError::MaxLinksExceeded {
2002                max: self.max_links,
2003            });
2004        }
2005        let link_id = link.link_id();
2006        let transport_id = link.transport_id();
2007        let remote_addr = link.remote_addr().clone();
2008
2009        self.links.insert(link_id, link);
2010        self.addr_to_link
2011            .insert((transport_id, remote_addr), link_id);
2012        Ok(())
2013    }
2014
2015    /// Get a link by ID.
2016    pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2017        self.links.get(link_id)
2018    }
2019
2020    /// Get a mutable link by ID.
2021    pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2022        self.links.get_mut(link_id)
2023    }
2024
2025    /// Find link ID by transport address.
2026    pub fn find_link_by_addr(
2027        &self,
2028        transport_id: TransportId,
2029        addr: &TransportAddr,
2030    ) -> Option<LinkId> {
2031        self.addr_to_link
2032            .get(&(transport_id, addr.clone()))
2033            .copied()
2034    }
2035
2036    /// Remove a link.
2037    ///
2038    /// Only removes the addr_to_link reverse lookup if it still points to this
2039    /// link. In cross-connection scenarios, a newer link may have replaced the
2040    /// entry for the same address.
2041    pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2042        if let Some(link) = self.links.remove(link_id) {
2043            // Clean up reverse lookup only if it still maps to this link
2044            let key = (link.transport_id(), link.remote_addr().clone());
2045            if self.addr_to_link.get(&key) == Some(link_id) {
2046                self.addr_to_link.remove(&key);
2047            }
2048            Some(link)
2049        } else {
2050            None
2051        }
2052    }
2053
2054    pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2055        if !self.bootstrap_transports.contains(&transport_id) {
2056            return;
2057        }
2058
2059        let transport_in_use = self
2060            .links
2061            .values()
2062            .any(|link| link.transport_id() == transport_id)
2063            || self
2064                .connections
2065                .values()
2066                .any(|conn| conn.transport_id() == Some(transport_id))
2067            || self
2068                .peers
2069                .values()
2070                .any(|peer| peer.transport_id() == Some(transport_id))
2071            || self
2072                .pending_connects
2073                .iter()
2074                .any(|pending| pending.transport_id == transport_id);
2075
2076        if transport_in_use {
2077            return;
2078        }
2079
2080        tracing::debug!(
2081            transport_id = %transport_id,
2082            "bootstrap transport has no remaining references; dropping"
2083        );
2084
2085        self.bootstrap_transports.remove(&transport_id);
2086        self.bootstrap_transport_npubs.remove(&transport_id);
2087        self.transport_drops.remove(&transport_id);
2088        self.transports.remove(&transport_id);
2089    }
2090
2091    /// Iterate over all links.
2092    pub fn links(&self) -> impl Iterator<Item = &Link> {
2093        self.links.values()
2094    }
2095
2096    // === Connection Management (Handshake Phase) ===
2097
2098    /// Add a pending connection.
2099    pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2100        let link_id = connection.link_id();
2101
2102        if self.connections.contains_key(&link_id) {
2103            return Err(NodeError::ConnectionAlreadyExists(link_id));
2104        }
2105
2106        if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2107            return Err(NodeError::MaxConnectionsExceeded {
2108                max: self.max_connections,
2109            });
2110        }
2111
2112        self.connections.insert(link_id, connection);
2113        Ok(())
2114    }
2115
2116    /// Get a connection by LinkId.
2117    pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2118        self.connections.get(link_id)
2119    }
2120
2121    /// Get a mutable connection by LinkId.
2122    pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2123        self.connections.get_mut(link_id)
2124    }
2125
2126    /// Remove a connection.
2127    pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2128        self.connections.remove(link_id)
2129    }
2130
2131    /// Iterate over all connections.
2132    pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2133        self.connections.values()
2134    }
2135
2136    // === Peer Management (Active Phase) ===
2137
2138    /// Get a peer by NodeAddr.
2139    pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2140        self.peers.get(node_addr)
2141    }
2142
2143    /// Get a mutable peer by NodeAddr.
2144    pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2145        self.peers.get_mut(node_addr)
2146    }
2147
2148    /// Remove a peer.
2149    pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2150        self.peers.remove(node_addr)
2151    }
2152
2153    /// Iterate over all peers.
2154    pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2155        self.peers.values()
2156    }
2157
2158    /// Reference to the Nostr discovery handle if discovery is enabled.
2159    /// Used by control queries (`show_peers` per-peer Nostr-traversal
2160    /// state) to read failure-state without taking shared ownership.
2161    pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2162        self.nostr_discovery.as_deref()
2163    }
2164
2165    /// Iterate over all peer node IDs.
2166    pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2167        self.peers.keys()
2168    }
2169
2170    /// Iterate over peers that can send traffic.
2171    pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2172        self.peers.values().filter(|p| p.can_send())
2173    }
2174
2175    /// Number of peers that can send traffic.
2176    pub fn sendable_peer_count(&self) -> usize {
2177        self.peers.values().filter(|p| p.can_send()).count()
2178    }
2179
2180    pub(crate) fn set_discovery_fallback_transit_allowed(
2181        &mut self,
2182        peer_addr: NodeAddr,
2183        allowed: bool,
2184    ) {
2185        if allowed {
2186            self.discovery_fallback_transit_blocked_peers
2187                .remove(&peer_addr);
2188        } else {
2189            self.discovery_fallback_transit_blocked_peers
2190                .insert(peer_addr);
2191        }
2192    }
2193
2194    pub(crate) fn configured_discovery_fallback_transit(
2195        &self,
2196        peer_addr: &NodeAddr,
2197    ) -> Option<bool> {
2198        self.configured_peer(peer_addr)
2199            .map(|peer| peer.discovery_fallback_transit)
2200    }
2201
2202    pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2203        self.config.peers().iter().find(|peer| {
2204            PeerIdentity::from_npub(&peer.npub)
2205                .ok()
2206                .is_some_and(|identity| identity.node_addr() == peer_addr)
2207        })
2208    }
2209
2210    pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2211        if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2212            return retry_state.peer_config.discovery_fallback_transit;
2213        }
2214
2215        if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2216            return allowed;
2217        }
2218
2219        self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2220    }
2221
2222    // === End-to-End Sessions ===
2223
2224    /// Get a session by remote NodeAddr.
2225    /// Disable the discovery forward rate limiter (for tests).
2226    #[cfg(test)]
2227    pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2228        self.discovery_forward_limiter
2229            .set_interval(std::time::Duration::ZERO);
2230    }
2231
2232    #[cfg(test)]
2233    pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2234        self.sessions.get(remote)
2235    }
2236
2237    /// Get a mutable session by remote NodeAddr.
2238    #[cfg(test)]
2239    pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2240        self.sessions.get_mut(remote)
2241    }
2242
2243    /// Remove a session.
2244    #[cfg(test)]
2245    pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2246        self.sessions.remove(remote)
2247    }
2248
2249    /// Read the path_mtu_lookup entry for a destination FipsAddress.
2250    #[cfg(test)]
2251    pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2252        self.path_mtu_lookup
2253            .read()
2254            .ok()
2255            .and_then(|map| map.get(fips_addr).copied())
2256    }
2257
2258    /// Write a path_mtu_lookup entry directly (for tests that pre-seed the map).
2259    #[cfg(test)]
2260    pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2261        if let Ok(mut map) = self.path_mtu_lookup.write() {
2262            map.insert(fips_addr, mtu);
2263        }
2264    }
2265
2266    /// Number of end-to-end sessions.
2267    pub fn session_count(&self) -> usize {
2268        self.sessions.len()
2269    }
2270
2271    /// Iterate over all session entries (for control queries).
2272    pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2273        self.sessions.iter()
2274    }
2275
2276    // === Identity Cache ===
2277
2278    /// Register a node in the identity cache for FipsAddress → NodeAddr lookup.
2279    pub(crate) fn register_identity(
2280        &mut self,
2281        node_addr: NodeAddr,
2282        pubkey: secp256k1::PublicKey,
2283    ) -> bool {
2284        let mut prefix = [0u8; 15];
2285        prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2286        if let Some(entry) = self.identity_cache.get(&prefix)
2287            && entry.node_addr == node_addr
2288            && entry.pubkey == pubkey
2289        {
2290            // Endpoint sends pass the same PeerIdentity on every packet. Once
2291            // validated, avoid re-deriving NodeAddr from the public key in the
2292            // data path; that hash showed up in macOS sender profiles.
2293            return true;
2294        }
2295
2296        let (xonly, _) = pubkey.x_only_public_key();
2297        let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2298        if derived_node_addr != node_addr {
2299            debug!(
2300                claimed_node_addr = %node_addr,
2301                derived_node_addr = %derived_node_addr,
2302                "Rejected identity cache entry with mismatched public key"
2303            );
2304            return false;
2305        }
2306
2307        let now_ms = Self::now_ms();
2308        if let Some(entry) = self.identity_cache.get_mut(&prefix)
2309            && entry.node_addr == node_addr
2310        {
2311            entry.pubkey = pubkey;
2312            entry.last_seen_ms = now_ms;
2313            return true;
2314        }
2315
2316        let npub = encode_npub(&xonly);
2317        self.identity_cache.insert(
2318            prefix,
2319            IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2320        );
2321        // LRU eviction
2322        let max = self.config.node.cache.identity_size;
2323        if self.identity_cache.len() > max
2324            && let Some(oldest_key) = self
2325                .identity_cache
2326                .iter()
2327                .min_by_key(|(_, entry)| entry.last_seen_ms)
2328                .map(|(k, _)| *k)
2329        {
2330            self.identity_cache.remove(&oldest_key);
2331        }
2332        true
2333    }
2334
2335    /// Look up a destination by FipsAddress prefix (bytes 1-15 of the IPv6 address).
2336    pub(crate) fn lookup_by_fips_prefix(
2337        &mut self,
2338        prefix: &[u8; 15],
2339    ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2340        if let Some(entry) = self.identity_cache.get_mut(prefix) {
2341            entry.last_seen_ms = Self::now_ms(); // LRU touch
2342            Some((entry.node_addr, entry.pubkey))
2343        } else {
2344            None
2345        }
2346    }
2347
2348    /// Check if a node's identity is in the cache (without LRU touch).
2349    pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2350        let mut prefix = [0u8; 15];
2351        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2352        self.identity_cache.contains_key(&prefix)
2353    }
2354
2355    /// Number of identity cache entries.
2356    pub fn identity_cache_len(&self) -> usize {
2357        self.identity_cache.len()
2358    }
2359
2360    /// Iterate over identity cache entries.
2361    ///
2362    /// Returns `(NodeAddr, PublicKey, last_seen_ms)` for each cached identity.
2363    /// Used by the `show_identity_cache` control query.
2364    pub fn identity_cache_iter(
2365        &self,
2366    ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2367        self.identity_cache
2368            .values()
2369            .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2370    }
2371
2372    /// Configured maximum identity cache size.
2373    pub fn identity_cache_max(&self) -> usize {
2374        self.config.node.cache.identity_size
2375    }
2376
2377    /// Number of pending discovery lookups.
2378    pub fn pending_lookup_count(&self) -> usize {
2379        self.pending_lookups.len()
2380    }
2381
2382    /// Iterate over pending discovery lookups for diagnostics.
2383    pub fn pending_lookups_iter(
2384        &self,
2385    ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2386        self.pending_lookups.iter()
2387    }
2388
2389    /// Number of recent discovery requests tracked.
2390    pub fn recent_request_count(&self) -> usize {
2391        self.recent_requests.len()
2392    }
2393
2394    /// Count of destinations with queued TUN packets awaiting session setup.
2395    pub fn pending_tun_destinations(&self) -> usize {
2396        self.pending_tun_packets.len()
2397    }
2398
2399    /// Total TUN packets queued across all destinations.
2400    pub fn pending_tun_total_packets(&self) -> usize {
2401        self.pending_tun_packets.values().map(|q| q.len()).sum()
2402    }
2403
2404    /// Iterate over retry state for diagnostics.
2405    pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2406        self.retry_pending.iter()
2407    }
2408
2409    // === Routing ===
2410
2411    /// Check if a peer is a tree neighbor (parent or child in the spanning tree).
2412    ///
2413    /// Returns true if the peer is our current tree parent, or if the peer
2414    /// has declared us as their parent (making them our child).
2415    pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2416        // Peer is our parent
2417        if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2418            return true;
2419        }
2420        // Peer is our child (their declaration names us as parent)
2421        if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2422            && decl.parent_id() == self.node_addr()
2423        {
2424            return true;
2425        }
2426        false
2427    }
2428
2429    /// Find next hop for a destination node address.
2430    ///
2431    /// Routing priority:
2432    /// 1. Destination is self → `None` (local delivery)
2433    /// 2. Destination is a direct peer → that peer
2434    /// 3. Reply-learned routes in `reply_learned` mode. These are locally
2435    ///    observed reverse paths, selected with weighted multipath plus
2436    ///    periodic coordinate/tree exploration.
2437    /// 4. Bloom filter candidates with cached dest coords → among peers whose
2438    ///    bloom filter contains the destination, pick the one that minimizes
2439    ///    tree distance to the destination, with
2440    ///    `(link_cost, tree_distance_to_dest, node_addr)` tie-breaking.
2441    ///    The self-distance check ensures only peers strictly closer to the
2442    ///    destination than us are considered (prevents routing loops).
2443    /// 5. Greedy tree routing fallback (requires cached dest coords)
2444    /// 6. No route → `None`
2445    ///
2446    /// Both the bloom filter and tree routing paths require cached destination
2447    /// coordinates (checked in `coord_cache`). Without coordinates, the node
2448    /// cannot make loop-free forwarding decisions. The caller should signal
2449    /// `CoordsRequired` back to the source when `None` is returned for a
2450    /// non-local destination.
2451    pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2452        // 1. Local delivery
2453        if dest_node_addr == self.node_addr() {
2454            return None;
2455        }
2456        let now_ms = Self::now_ms();
2457        let direct_session_degraded = self.session_direct_path_is_degraded(dest_node_addr, now_ms);
2458
2459        // 2. Healthy direct peer. Stale direct links remain usable, but in
2460        // reply-learned mode they must not hide a live mesh route learned from
2461        // recent traffic or discovery.
2462        let direct_peer_can_send = self
2463            .peers
2464            .get(dest_node_addr)
2465            .is_some_and(|peer| peer.can_send());
2466        if let Some(peer) = self.peers.get(dest_node_addr)
2467            && peer.is_healthy()
2468            && !direct_session_degraded
2469        {
2470            return Some(peer);
2471        }
2472
2473        let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2474            Some(
2475                self.peers
2476                    .iter()
2477                    .filter(|(_, peer)| peer.can_send())
2478                    .map(|(addr, _)| *addr)
2479                    .collect::<HashSet<_>>(),
2480            )
2481        } else {
2482            None
2483        };
2484
2485        // 3. Optional reply-learned routing. These entries are not peer
2486        // claims; they are local observations of which peer carried traffic
2487        // or a verified lookup response back from the destination. Most
2488        // packets use weighted multipath over learned routes, but periodic
2489        // fallback exploration lets coord/bloom/tree routes discover better
2490        // candidates.
2491        let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2492            self.learned_routes.should_explore_fallback(
2493                dest_node_addr,
2494                now_ms,
2495                self.config.node.routing.learned_fallback_explore_interval,
2496                |addr| sendable.contains(addr),
2497            )
2498        });
2499        if let Some(sendable) = &sendable_learned_peers
2500            && !explore_fallback
2501            && let Some(next_hop_addr) =
2502                self.learned_routes
2503                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2504        {
2505            return self.peers.get(&next_hop_addr);
2506        }
2507
2508        // Look up cached destination coordinates (required by both bloom and tree paths).
2509        let Some(dest_coords) = self
2510            .coord_cache
2511            .get_and_touch(dest_node_addr, now_ms)
2512            .cloned()
2513        else {
2514            if let Some(sendable) = &sendable_learned_peers
2515                && let Some(next_hop_addr) =
2516                    self.learned_routes
2517                        .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2518            {
2519                return self.peers.get(&next_hop_addr);
2520            }
2521            if direct_peer_can_send {
2522                return self.peers.get(dest_node_addr);
2523            }
2524            return None;
2525        };
2526
2527        // 4. Bloom filter candidates — requires dest_coords for loop-free selection.
2528        //    If no candidate is strictly closer, fall through to tree routing.
2529        let coordinate_route_addr = {
2530            let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2531            if !candidates.is_empty() {
2532                self.select_best_candidate(&candidates, &dest_coords)
2533                    .map(|peer| *peer.node_addr())
2534            } else {
2535                None
2536            }
2537        };
2538        if let Some(next_hop_addr) = coordinate_route_addr {
2539            return self.peers.get(&next_hop_addr);
2540        }
2541
2542        // 5. Greedy tree routing fallback
2543        let tree_route_addr = self
2544            .tree_state
2545            .find_next_hop(&dest_coords)
2546            .filter(|next_hop_id| {
2547                self.peers
2548                    .get(next_hop_id)
2549                    .is_some_and(|peer| peer.can_send())
2550            });
2551        if let Some(next_hop_addr) = tree_route_addr {
2552            return self.peers.get(&next_hop_addr);
2553        }
2554        if explore_fallback {
2555            return sendable_learned_peers.as_ref().and_then(|sendable| {
2556                self.learned_routes
2557                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2558                    .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2559            });
2560        }
2561
2562        if let Some(sendable) = &sendable_learned_peers
2563            && let Some(next_hop_addr) =
2564                self.learned_routes
2565                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2566        {
2567            return self.peers.get(&next_hop_addr);
2568        }
2569
2570        if direct_peer_can_send {
2571            return self.peers.get(dest_node_addr);
2572        }
2573
2574        None
2575    }
2576
2577    pub(in crate::node) fn session_direct_path_is_degraded(
2578        &mut self,
2579        dest: &NodeAddr,
2580        now_ms: u64,
2581    ) -> bool {
2582        match self.session_direct_degraded_until_ms.get(dest).copied() {
2583            Some(until_ms) if until_ms > now_ms => true,
2584            Some(_) => {
2585                self.session_direct_degraded_until_ms.remove(dest);
2586                false
2587            }
2588            None => false,
2589        }
2590    }
2591
2592    pub(in crate::node) fn mark_session_direct_path_degraded(
2593        &mut self,
2594        dest: NodeAddr,
2595        now_ms: u64,
2596    ) -> bool {
2597        let until_ms = now_ms.saturating_add(SESSION_DIRECT_DEGRADED_HOLD_MS);
2598        let entry = self
2599            .session_direct_degraded_until_ms
2600            .entry(dest)
2601            .or_insert(0);
2602        let was_degraded = *entry > now_ms;
2603        *entry = (*entry).max(until_ms);
2604        !was_degraded
2605    }
2606
2607    pub(in crate::node) fn clear_session_direct_path_degraded(&mut self, dest: &NodeAddr) -> bool {
2608        self.session_direct_degraded_until_ms.remove(dest).is_some()
2609    }
2610
2611    pub(in crate::node) fn learn_reverse_route(
2612        &mut self,
2613        destination: NodeAddr,
2614        next_hop: NodeAddr,
2615    ) {
2616        if self.config.node.routing.mode != RoutingMode::ReplyLearned
2617            || destination == *self.node_addr()
2618        {
2619            return;
2620        }
2621        let now_ms = Self::now_ms();
2622        self.learned_routes.learn(
2623            destination,
2624            next_hop,
2625            now_ms,
2626            self.config.node.routing.learned_ttl_secs,
2627            self.config.node.routing.max_learned_routes_per_dest,
2628        );
2629    }
2630
2631    pub(in crate::node) fn record_route_failure(
2632        &mut self,
2633        destination: NodeAddr,
2634        next_hop: NodeAddr,
2635    ) {
2636        if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2637            return;
2638        }
2639        self.learned_routes.record_failure(&destination, &next_hop);
2640    }
2641
2642    pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2643        self.learned_routes.snapshot(now_ms)
2644    }
2645
2646    pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2647        self.learned_routes.purge_expired(now_ms);
2648    }
2649
2650    /// Select the best peer from a set of bloom filter candidates.
2651    ///
2652    /// Uses distance from each candidate's tree coordinates to the destination
2653    /// as the primary metric (after link_cost). Only selects peers that are
2654    /// strictly closer to the destination than we are (self-distance check
2655    /// prevents routing loops).
2656    ///
2657    /// Ordering: `(link_cost, distance_to_dest, node_addr)`.
2658    fn select_best_candidate<'a>(
2659        &'a self,
2660        candidates: &[&'a ActivePeer],
2661        dest_coords: &crate::tree::TreeCoordinate,
2662    ) -> Option<&'a ActivePeer> {
2663        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2664
2665        let mut best: Option<(&ActivePeer, f64, usize)> = None;
2666
2667        for &candidate in candidates {
2668            if !candidate.can_send() {
2669                continue;
2670            }
2671
2672            let cost = candidate.link_cost();
2673
2674            let dist = self
2675                .tree_state
2676                .peer_coords(candidate.node_addr())
2677                .map(|pc| pc.distance_to(dest_coords))
2678                .unwrap_or(usize::MAX);
2679
2680            // Self-distance check: only consider peers strictly closer
2681            // to the destination than we are (prevents routing loops)
2682            if dist >= my_distance {
2683                continue;
2684            }
2685
2686            let dominated = match &best {
2687                None => true,
2688                Some((_, best_cost, best_dist)) => {
2689                    cost < *best_cost
2690                        || (cost == *best_cost && dist < *best_dist)
2691                        || (cost == *best_cost
2692                            && dist == *best_dist
2693                            && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2694                }
2695            };
2696
2697            if dominated {
2698                best = Some((candidate, cost, dist));
2699            }
2700        }
2701
2702        best.map(|(peer, _, _)| peer)
2703    }
2704
2705    /// Check if a destination is in any peer's bloom filter.
2706    pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2707        self.peers.values().filter(|p| p.may_reach(dest)).collect()
2708    }
2709
2710    /// Get the TUN packet sender channel.
2711    ///
2712    /// Returns None if TUN is not active or the node hasn't been started.
2713    pub fn tun_tx(&self) -> Option<&TunTx> {
2714        self.tun_tx.as_ref()
2715    }
2716
2717    /// Attach app-owned packet I/O for embedded operation without a system TUN.
2718    ///
2719    /// This must be called before [`Node::start`] and requires `tun.enabled =
2720    /// false`. Outbound packets sent to the returned sender are processed by the
2721    /// normal session pipeline. Inbound packets delivered by FIPS sessions are
2722    /// sent to the returned receiver with source attribution.
2723    pub fn attach_external_packet_io(
2724        &mut self,
2725        capacity: usize,
2726    ) -> Result<ExternalPacketIo, NodeError> {
2727        if self.state != NodeState::Created {
2728            return Err(NodeError::Config(ConfigError::Validation(
2729                "external packet I/O must be attached before node start".to_string(),
2730            )));
2731        }
2732        if self.config.tun.enabled {
2733            return Err(NodeError::Config(ConfigError::Validation(
2734                "external packet I/O requires tun.enabled=false".to_string(),
2735            )));
2736        }
2737
2738        let capacity = capacity.max(1);
2739        let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2740        let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2741        self.tun_outbound_rx = Some(outbound_rx);
2742        self.external_packet_tx = Some(inbound_tx);
2743
2744        Ok(ExternalPacketIo {
2745            outbound_tx,
2746            inbound_rx,
2747        })
2748    }
2749
2750    /// Attach app-owned endpoint data I/O for embedded operation.
2751    ///
2752    /// Commands sent to the returned sender are processed by the node RX loop.
2753    /// Incoming endpoint data is emitted as source-attributed events.
2754    pub(crate) fn attach_endpoint_data_io(
2755        &mut self,
2756        capacity: usize,
2757    ) -> Result<EndpointDataIo, NodeError> {
2758        if self.state != NodeState::Created {
2759            return Err(NodeError::Config(ConfigError::Validation(
2760                "endpoint data I/O must be attached before node start".to_string(),
2761            )));
2762        }
2763
2764        let command_capacity = endpoint_data_command_capacity(capacity);
2765        let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2766        // Inbound endpoint-data events use an unbounded channel — see
2767        // `EndpointDataIo::event_rx` docs for the rationale (kills the
2768        // per-packet semaphore + the cross-task relay task that used to
2769        // sit on top of this channel).
2770        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2771        self.endpoint_command_rx = Some(command_rx);
2772        self.endpoint_event_tx = Some(event_tx.clone());
2773
2774        Ok(EndpointDataIo {
2775            command_tx,
2776            event_rx,
2777            event_tx,
2778        })
2779    }
2780
2781    pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2782        let mut prefix = [0u8; 15];
2783        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2784        self.identity_cache
2785            .get(&prefix)
2786            .filter(|entry| &entry.node_addr == addr)
2787            .map(|entry| entry.pubkey)
2788    }
2789
2790    pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2791        let mut prefix = [0u8; 15];
2792        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2793        self.identity_cache
2794            .get(&prefix)
2795            .filter(|entry| &entry.node_addr == addr)
2796            .map(|entry| entry.npub.clone())
2797    }
2798
2799    pub(in crate::node) fn deliver_external_ipv6_packet(
2800        &self,
2801        src_addr: &NodeAddr,
2802        packet: Vec<u8>,
2803    ) {
2804        let Some(external_packet_tx) = &self.external_packet_tx else {
2805            return;
2806        };
2807        if packet.len() < 40 {
2808            return;
2809        }
2810        let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2811            return;
2812        };
2813        let delivered = NodeDeliveredPacket {
2814            source_node_addr: *src_addr,
2815            source_npub: self.npub_for_node_addr(src_addr),
2816            destination,
2817            packet,
2818        };
2819        if let Err(error) = external_packet_tx.try_send(delivered) {
2820            debug!(error = %error, "Failed to deliver packet to external app sink");
2821        }
2822    }
2823
2824    // === Sending ===
2825
2826    /// Encrypt and send a link-layer message to an authenticated peer.
2827    ///
2828    /// The plaintext should include the message type byte followed by the
2829    /// message-specific payload (e.g., `[0x50, reason]` for Disconnect).
2830    ///
2831    /// The send path prepends a 4-byte session-relative timestamp (inner
2832    /// header) before encryption. The full 16-byte outer header is used
2833    /// as AAD for the AEAD construction.
2834    ///
2835    /// This is the standard path for sending any link-layer control message
2836    /// to a peer over their encrypted Noise session.
2837    pub(super) async fn send_encrypted_link_message(
2838        &mut self,
2839        node_addr: &NodeAddr,
2840        plaintext: &[u8],
2841    ) -> Result<(), NodeError> {
2842        self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2843            .await
2844    }
2845
2846    /// Update the local-outbound-broken signal from a `transport.send`
2847    /// outcome. Sets `last_local_send_failure_at` on local-side io
2848    /// errors (NetworkUnreachable / HostUnreachable / AddrNotAvailable);
2849    /// clears it on success. The reaper consults this in
2850    /// `check_link_heartbeats` to switch to `fast_link_dead_timeout_secs`.
2851    pub(in crate::node) fn note_local_send_outcome(
2852        &mut self,
2853        result: &Result<usize, TransportError>,
2854    ) {
2855        match result {
2856            Ok(_) => {
2857                if self.last_local_send_failure_at.is_some() {
2858                    self.last_local_send_failure_at = None;
2859                }
2860            }
2861            Err(error) if error.is_local_route_unavailable() => {
2862                self.last_local_send_failure_at = Some(std::time::Instant::now());
2863            }
2864            Err(_) => {}
2865        }
2866    }
2867
2868    /// Return the active dead-timeout after considering recent local route
2869    /// failures. The fast-dead signal is intentionally short-lived: on the
2870    /// UDP worker path a send call can return before the kernel result is
2871    /// observed, so a single stale route error must not compress liveness for
2872    /// the whole normal dead-timeout window.
2873    pub(in crate::node) fn local_send_failure_dead_timeout(
2874        &mut self,
2875        now: std::time::Instant,
2876        dead_timeout: std::time::Duration,
2877        fast_dead_timeout: std::time::Duration,
2878    ) -> std::time::Duration {
2879        match self.last_local_send_failure_at {
2880            Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
2881                fast_dead_timeout.min(dead_timeout)
2882            }
2883            Some(_) => {
2884                self.last_local_send_failure_at = None;
2885                dead_timeout
2886            }
2887            None => dead_timeout,
2888        }
2889    }
2890
2891    pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
2892        self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
2893    }
2894
2895    pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
2896        let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
2897            return false;
2898        };
2899        let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
2900        std::time::Instant::now().duration_since(t) <= grace
2901    }
2902
2903    /// Like `send_encrypted_link_message` but allows setting the FMP CE flag.
2904    ///
2905    /// Used by the forwarding path to relay congestion signals hop-by-hop.
2906    pub(super) async fn send_encrypted_link_message_with_ce(
2907        &mut self,
2908        node_addr: &NodeAddr,
2909        plaintext: &[u8],
2910        ce_flag: bool,
2911    ) -> Result<(), NodeError> {
2912        let peer = self
2913            .peers
2914            .get_mut(node_addr)
2915            .ok_or(NodeError::PeerNotFound(*node_addr))?;
2916
2917        let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2918            node_addr: *node_addr,
2919            reason: "no their_index".into(),
2920        })?;
2921        let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2922            node_addr: *node_addr,
2923            reason: "no transport_id".into(),
2924        })?;
2925        let remote_addr = peer
2926            .current_addr()
2927            .cloned()
2928            .ok_or_else(|| NodeError::SendFailed {
2929                node_addr: *node_addr,
2930                reason: "no current_addr".into(),
2931            })?;
2932        #[cfg(any(target_os = "linux", target_os = "macos"))]
2933        let connected_socket = peer.connected_udp();
2934
2935        // Prepend 4-byte session-relative timestamp (inner header)
2936        let timestamp_ms = peer.session_elapsed_ms();
2937
2938        // MMP: read spin bit value before entering session borrow
2939        let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2940        let mut flags = if sp_flag { FLAG_SP } else { 0 };
2941        if ce_flag {
2942            flags |= FLAG_CE;
2943        }
2944        if peer.current_k_bit() {
2945            flags |= FLAG_KEY_EPOCH;
2946        }
2947
2948        let session = peer
2949            .noise_session_mut()
2950            .ok_or_else(|| NodeError::SendFailed {
2951                node_addr: *node_addr,
2952                reason: "no noise session".into(),
2953            })?;
2954
2955        // Build 16-byte outer header upfront. The inner-plaintext
2956        // layout is `[ts:4 LE][plaintext...]`, so its length is exactly
2957        // `INNER_TS_LEN + plaintext.len()` — no need to build the Vec
2958        // just to measure it. The worker path uses this length to size
2959        // the wire buffer directly; the legacy path below still
2960        // materialises a separate `inner_plaintext` Vec for the inline
2961        // encrypt-and-send call.
2962        const INNER_TS_LEN: usize = 4;
2963        let counter = session.current_send_counter();
2964        let inner_len = INNER_TS_LEN + plaintext.len();
2965        let payload_len = inner_len as u16;
2966        let header = build_established_header(their_index, counter, flags, payload_len);
2967
2968        // **UDP send fast path.** The encrypt-worker pool is always
2969        // spawned at lifecycle start (workers = num_cpus) in
2970        // production, so this branch is taken for every authentic
2971        // send on every UDP-transported established session. The
2972        // AEAD work + sendmsg syscall run on a dedicated OS thread;
2973        // the rx_loop only builds the wire buffer + reserves the
2974        // counter inline.
2975        //
2976        // Other transport kinds (BLE, TCP, sim, ethernet) fall
2977        // through to the inline encrypt + transport.send path
2978        // below — those don't have raw-fd / sendmmsg / UDP_GSO
2979        // benefits to expose through the worker pool, so the simpler
2980        // synchronous send is the right shape for them.
2981        //
2982        // The `encrypt_workers.is_some()` check below is true in
2983        // production (lifecycle::start spawns the pool); it stays
2984        // checked rather than `expect()`-ed because unit tests
2985        // construct `Node` without calling `start()`.
2986        let transport_for_send = self
2987            .transports
2988            .get(&transport_id)
2989            .ok_or(NodeError::TransportNotFound(transport_id))?;
2990        match transport_for_send.connection_state(&remote_addr) {
2991            ConnectionState::Connected => {}
2992            other => {
2993                if matches!(other, ConnectionState::None) {
2994                    let _ = transport_for_send.connect(&remote_addr).await;
2995                }
2996                return Err(NodeError::SendFailed {
2997                    node_addr: *node_addr,
2998                    reason: format!("transport connection not ready: {:?}", other),
2999                });
3000            }
3001        }
3002        let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
3003        if let Some(workers) = self.encrypt_workers.as_ref().cloned()
3004            && is_udp
3005            && let Some(cipher_clone) = session.send_cipher_clone()
3006        {
3007            {
3008                // Reserve the counter on the session so subsequent
3009                // sends don't reuse it. `current_send_counter` only
3010                // peeks; we advance via `take_send_counter`.
3011                let reserved_counter =
3012                    session
3013                        .take_send_counter()
3014                        .map_err(|e| NodeError::SendFailed {
3015                            node_addr: *node_addr,
3016                            reason: format!("counter reservation failed: {}", e),
3017                        })?;
3018                debug_assert_eq!(reserved_counter, counter);
3019                // Re-derive the header with the now-locked-in counter
3020                // value (same value, but the call sequence is more
3021                // explicit).
3022                let header =
3023                    build_established_header(their_index, reserved_counter, flags, payload_len);
3024                let transport = transport_for_send;
3025                // Snapshot the per-peer connected UDP socket before
3026                // resolving the fallback address. On the established
3027                // steady-state path this socket already carries the
3028                // kernel peer address, so re-parsing the configured
3029                // transport address and touching the DNS cache on every
3030                // packet is pure overhead on the sender hot path.
3031                let send_target = {
3032                    if let TransportHandle::Udp(udp) = transport {
3033                        let socket_addr = {
3034                            #[cfg(any(target_os = "linux", target_os = "macos"))]
3035                            {
3036                                match connected_socket.as_ref() {
3037                                    Some(socket) => Some(socket.peer_addr()),
3038                                    None => udp.resolve_for_off_task(&remote_addr).await.ok(),
3039                                }
3040                            }
3041                            #[cfg(not(any(target_os = "linux", target_os = "macos")))]
3042                            {
3043                                udp.resolve_for_off_task(&remote_addr).await.ok()
3044                            }
3045                        };
3046                        match (udp.async_socket(), socket_addr) {
3047                            (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3048                            _ => None,
3049                        }
3050                    } else {
3051                        None
3052                    }
3053                };
3054                if let Some((socket, socket_addr)) = send_target {
3055                    // Build the wire buffer **directly** from
3056                    // `plaintext` with a single allocation:
3057                    //   `[16 header][4 ts][plaintext...]` with
3058                    // +16 trailing capacity for the AEAD tag.
3059                    // The worker seals `wire_buf[16..]` in
3060                    // place and appends the tag — no second
3061                    // alloc, no second memcpy.
3062                    //
3063                    // Previous design built `inner_plaintext`
3064                    // via `prepend_inner_header` (1 alloc + 1
3065                    // copy) and then let the worker memcpy
3066                    // header + plaintext into a fresh Vec
3067                    // (another alloc + copy). At ~100 kpps the
3068                    // saved alloc/copy is ~150 MB/sec of memory
3069                    // bandwidth on the hot rx_loop + worker.
3070                    let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3071                    let mut wire_buf = Vec::with_capacity(wire_capacity);
3072                    wire_buf.extend_from_slice(&header);
3073                    wire_buf.extend_from_slice(&timestamp_ms.to_le_bytes());
3074                    wire_buf.extend_from_slice(plaintext);
3075                    let predicted_bytes = wire_capacity;
3076                    // Stats / MMP update inline — predicted size
3077                    // is exact for ChaCha20-Poly1305 (tag is
3078                    // constant 16 bytes). When `connected_socket` is
3079                    // `Some`, the worker sends on it without a
3080                    // destination sockaddr — the kernel skips the
3081                    // per-packet sockaddr + route + neighbor resolve.
3082                    if let Some(peer) = self.peers.get_mut(node_addr) {
3083                        peer.link_stats_mut().record_sent(predicted_bytes);
3084                        if let Some(mmp) = peer.mmp_mut() {
3085                            mmp.sender
3086                                .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3087                        }
3088                    }
3089                    let scheduling_weight = self.send_weight_for_peer(node_addr);
3090                    workers.dispatch(self::encrypt_worker::FmpSendJob {
3091                        cipher: cipher_clone,
3092                        counter: reserved_counter,
3093                        wire_buf,
3094                        fsp_seal: None,
3095                        socket,
3096                        dest_addr: socket_addr,
3097                        #[cfg(any(target_os = "linux", target_os = "macos"))]
3098                        connected_socket,
3099                        drop_on_backpressure: fmp_plaintext_is_bulk_session_datagram(plaintext),
3100                        scheduling_weight,
3101                        queued_at: crate::perf_profile::stamp(),
3102                    });
3103                    return Ok(());
3104                }
3105            }
3106        }
3107
3108        // Inline (legacy) path: encrypt + send on the rx_loop.
3109        // Build the inner plaintext lazily here — the worker path
3110        // above never reaches this point, so the prepend_inner_header
3111        // alloc is avoided in the fast path.
3112        let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3113        // Encrypt with AAD binding to the outer header
3114        let ciphertext = {
3115            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3116            session
3117                .encrypt_with_aad(&inner_plaintext, &header)
3118                .map_err(|e| NodeError::SendFailed {
3119                    node_addr: *node_addr,
3120                    reason: format!("encryption failed: {}", e),
3121                })?
3122        };
3123
3124        let wire_packet = build_encrypted(&header, &ciphertext);
3125
3126        // Re-borrow peer for stats update after sending
3127        let send_result = {
3128            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3129            let transport = self
3130                .transports
3131                .get(&transport_id)
3132                .ok_or(NodeError::TransportNotFound(transport_id))?;
3133            transport.send(&remote_addr, &wire_packet).await
3134        };
3135        self.note_local_send_outcome(&send_result);
3136        let bytes_sent = send_result.map_err(|e| match e {
3137            TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3138                node_addr: *node_addr,
3139                packet_size,
3140                mtu,
3141            },
3142            other => NodeError::SendFailed {
3143                node_addr: *node_addr,
3144                reason: format!("transport send: {}", other),
3145            },
3146        })?;
3147
3148        // Update send statistics
3149        if let Some(peer) = self.peers.get_mut(node_addr) {
3150            peer.link_stats_mut().record_sent(bytes_sent);
3151            // MMP: record sent frame for sender report generation
3152            if let Some(mmp) = peer.mmp_mut() {
3153                mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3154            }
3155        }
3156
3157        Ok(())
3158    }
3159}
3160
3161impl fmt::Debug for Node {
3162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3163        f.debug_struct("Node")
3164            .field("node_addr", self.node_addr())
3165            .field("state", &self.state)
3166            .field("is_leaf_only", &self.is_leaf_only)
3167            .field("connections", &self.connection_count())
3168            .field("peers", &self.peer_count())
3169            .field("links", &self.link_count())
3170            .field("transports", &self.transport_count())
3171            .finish()
3172    }
3173}