Skip to main content

fips_core/node/
mod.rs

1//! FIPS Node Entity
2//!
3//! Top-level structure representing a running FIPS instance. The Node
4//! holds all state required for mesh routing: identity, tree state,
5//! Bloom filters, coordinate caches, transports, links, and peers.
6
7mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31#[cfg(unix)]
32use self::wire::ESTABLISHED_HEADER_SIZE;
33use self::wire::{
34    FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted, build_established_header,
35    prepend_inner_header,
36};
37use crate::bloom::BloomState;
38use crate::cache::CoordCache;
39use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
40use crate::node::session::SessionEntry;
41use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
42use crate::peer::{ActivePeer, PeerConnection};
43#[cfg(any(target_os = "linux", target_os = "macos"))]
44use crate::transport::ethernet::EthernetTransport;
45use crate::transport::tcp::TcpTransport;
46use crate::transport::tor::TorTransport;
47use crate::transport::udp::UdpTransport;
48#[cfg(feature = "webrtc-transport")]
49use crate::transport::webrtc::WebRtcTransport;
50use crate::transport::{
51    ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
52    TransportHandle, TransportId,
53};
54use crate::tree::TreeState;
55use crate::upper::hosts::HostMap;
56use crate::upper::icmp_rate_limit::IcmpRateLimiter;
57use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
58use crate::utils::index::IndexAllocator;
59use crate::{
60    Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
61    PeerIdentity, encode_npub,
62};
63use rand::Rng;
64use std::collections::{HashMap, HashSet, VecDeque};
65use std::fmt;
66use std::sync::Arc;
67use std::thread::JoinHandle;
68use thiserror::Error;
69use tracing::{debug, warn};
70
71const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
72const SESSION_DIRECT_DEGRADED_HOLD_MS: u64 = 20_000;
73const SESSION_DIRECT_DEGRADED_MIN_SAMPLE: u64 = 16;
74const SESSION_DIRECT_DEGRADED_LOSS_THRESHOLD: f64 = 0.08;
75const SESSION_DIRECT_RECOVERY_LOSS_THRESHOLD: f64 = 0.02;
76const ROUTING_FALLBACK_MIN_COST_ADVANTAGE: f64 = 0.25;
77
78fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
79    if plaintext
80        .first()
81        .is_none_or(|ty| *ty != LinkMessageType::SessionDatagram.to_byte())
82    {
83        return false;
84    }
85    let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
86        return false;
87    };
88    FspCommonPrefix::parse(fsp_payload).is_some_and(|prefix| {
89        prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted() && !prefix.has_coords()
90    })
91}
92
93fn endpoint_payload_is_tcp(payload: &[u8]) -> bool {
94    const IPPROTO_TCP: u8 = 6;
95    const IPV4_MIN_HEADER_LEN: usize = 20;
96
97    let Some(version_ihl) = payload.first().copied() else {
98        return false;
99    };
100
101    match version_ihl >> 4 {
102        4 => {
103            if payload.len() < IPV4_MIN_HEADER_LEN {
104                return false;
105            }
106            let header_len = usize::from(version_ihl & 0x0f) * 4;
107            header_len >= IPV4_MIN_HEADER_LEN
108                && payload.len() >= header_len
109                && payload[9] == IPPROTO_TCP
110        }
111        6 => ipv6_payload_next_header(payload).is_some_and(|proto| proto == IPPROTO_TCP),
112        _ => false,
113    }
114}
115
116fn ipv6_payload_next_header(payload: &[u8]) -> Option<u8> {
117    const IPV6_HEADER_LEN: usize = 40;
118    const IPV6_FRAGMENT_HEADER_LEN: usize = 8;
119
120    if payload.len() < IPV6_HEADER_LEN || payload[0] >> 4 != 6 {
121        return None;
122    }
123
124    let mut next_header = payload[6];
125    let mut offset = IPV6_HEADER_LEN;
126    let mut extension_count = 0usize;
127    while ipv6_extension_header_is_skippable(next_header) {
128        if next_header == 44 {
129            if payload.len() < offset + IPV6_FRAGMENT_HEADER_LEN {
130                return None;
131            }
132            next_header = payload[offset];
133            offset += IPV6_FRAGMENT_HEADER_LEN;
134        } else if next_header == 51 {
135            if payload.len() < offset + 2 {
136                return None;
137            }
138            let header_len = (usize::from(payload[offset + 1]) + 2) * 4;
139            if payload.len() < offset + header_len {
140                return None;
141            }
142            next_header = payload[offset];
143            offset += header_len;
144        } else {
145            if payload.len() < offset + 2 {
146                return None;
147            }
148            let header_len = (usize::from(payload[offset + 1]) + 1) * 8;
149            if payload.len() < offset + header_len {
150                return None;
151            }
152            next_header = payload[offset];
153            offset += header_len;
154        }
155        extension_count += 1;
156        if extension_count > 8 {
157            return None;
158        }
159    }
160
161    Some(next_header)
162}
163
164fn ipv6_extension_header_is_skippable(next_header: u8) -> bool {
165    matches!(next_header, 0 | 43 | 44 | 51 | 60 | 135)
166}
167
168/// Half-range of the symmetric jitter applied to per-session rekey timers.
169///
170/// Each FMP/FSP session draws an offset uniformly from
171/// `[-REKEY_JITTER_SECS, +REKEY_JITTER_SECS]` seconds at construction and
172/// after each cutover. This preserves the configured mean interval while
173/// reducing dual-initiation bursts in symmetric-start meshes.
174pub(crate) const REKEY_JITTER_SECS: i64 = 15;
175
176/// Errors related to node operations.
177#[derive(Debug, Error)]
178pub enum NodeError {
179    #[error("node not started")]
180    NotStarted,
181
182    #[error("node already started")]
183    AlreadyStarted,
184
185    #[error("node already stopped")]
186    AlreadyStopped,
187
188    #[error("transport not found: {0}")]
189    TransportNotFound(TransportId),
190
191    #[error("no transport available for type: {0}")]
192    NoTransportForType(String),
193
194    #[error("link not found: {0}")]
195    LinkNotFound(LinkId),
196
197    #[error("connection not found: {0}")]
198    ConnectionNotFound(LinkId),
199
200    #[error("peer not found: {0:?}")]
201    PeerNotFound(NodeAddr),
202
203    #[error("peer already exists: {0:?}")]
204    PeerAlreadyExists(NodeAddr),
205
206    #[error("connection already exists for link: {0}")]
207    ConnectionAlreadyExists(LinkId),
208
209    #[error("invalid peer npub '{npub}': {reason}")]
210    InvalidPeerNpub { npub: String, reason: String },
211
212    #[error("discovery error: {0}")]
213    Discovery(String),
214
215    #[error("access denied: {0}")]
216    AccessDenied(String),
217
218    #[error("max connections exceeded: {max}")]
219    MaxConnectionsExceeded { max: usize },
220
221    #[error("max peers exceeded: {max}")]
222    MaxPeersExceeded { max: usize },
223
224    #[error("max links exceeded: {max}")]
225    MaxLinksExceeded { max: usize },
226
227    #[error("handshake incomplete for link {0}")]
228    HandshakeIncomplete(LinkId),
229
230    #[error("no session available for link {0}")]
231    NoSession(LinkId),
232
233    #[error("promotion failed for link {link_id}: {reason}")]
234    PromotionFailed { link_id: LinkId, reason: String },
235
236    #[error("send failed to {node_addr}: {reason}")]
237    SendFailed { node_addr: NodeAddr, reason: String },
238
239    #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
240    MtuExceeded {
241        node_addr: NodeAddr,
242        packet_size: usize,
243        mtu: u16,
244    },
245
246    #[error("config error: {0}")]
247    Config(#[from] ConfigError),
248
249    #[error("identity error: {0}")]
250    Identity(#[from] IdentityError),
251
252    #[error("TUN error: {0}")]
253    Tun(#[from] TunError),
254
255    #[error("index allocation failed: {0}")]
256    IndexAllocationFailed(String),
257
258    #[error("handshake failed: {0}")]
259    HandshakeFailed(String),
260
261    #[error("transport error: {0}")]
262    TransportError(String),
263
264    #[error("local route unavailable: {0}")]
265    LocalRouteUnavailable(String),
266
267    #[error("bootstrap handoff failed: {0}")]
268    BootstrapHandoff(String),
269}
270
271impl NodeError {
272    pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
273        if error.is_local_route_unavailable() {
274            Self::LocalRouteUnavailable(error.to_string())
275        } else {
276            Self::TransportError(error.to_string())
277        }
278    }
279
280    pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
281        matches!(self, Self::LocalRouteUnavailable(_))
282    }
283}
284
285/// Source-attributed packet delivered by a node running without a system TUN.
286#[derive(Debug, Clone, PartialEq, Eq)]
287pub struct NodeDeliveredPacket {
288    /// FIPS node address that originated the packet.
289    pub source_node_addr: NodeAddr,
290    /// Source Nostr public key when the node has learned it.
291    pub source_npub: Option<String>,
292    /// Destination FIPS address from the IPv6 packet.
293    pub destination: FipsAddress,
294    /// Full IPv6 packet after FIPS session decapsulation.
295    pub packet: Vec<u8>,
296}
297
298#[derive(Debug, Clone)]
299struct IdentityCacheEntry {
300    node_addr: NodeAddr,
301    pubkey: secp256k1::PublicKey,
302    npub: String,
303    last_seen_ms: u64,
304}
305
306impl IdentityCacheEntry {
307    fn new(
308        node_addr: NodeAddr,
309        pubkey: secp256k1::PublicKey,
310        npub: String,
311        last_seen_ms: u64,
312    ) -> Self {
313        Self {
314            node_addr,
315            pubkey,
316            npub,
317            last_seen_ms,
318        }
319    }
320}
321
322/// App-owned packet channels for embedding FIPS without a system TUN.
323#[derive(Debug)]
324pub struct ExternalPacketIo {
325    /// Send outbound IPv6 packets into the node.
326    pub outbound_tx: crate::upper::tun::TunOutboundTx,
327    /// Receive inbound IPv6 packets delivered by FIPS sessions.
328    pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
329}
330
331/// App-owned endpoint data channels for embedding FIPS without a daemon.
332#[derive(Debug)]
333pub(crate) struct EndpointDataIo {
334    /// Send endpoint data commands into the node RX loop.
335    ///
336    /// Bounded with a generous default so normal sender bursts do not
337    /// stall on semaphore acquisition. macOS pacing happens at the UDP
338    /// egress thread where the real Wi-Fi/interface bottleneck is visible;
339    /// constraining this app queue instead caused the inner TCP flow to
340    /// collapse under iperf. `FIPS_ENDPOINT_DATA_QUEUE_CAP` overrides the
341    /// default for benches.
342    pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
343    /// Receive endpoint data delivered by FIPS sessions.
344    ///
345    /// Unbounded so the rx_loop's send on inbound packet delivery is a
346    /// wait-free push (no semaphore acquire), and so we can drop the
347    /// per-packet cross-task relay that previously sat between the node
348    /// task and the `FipsEndpoint::recv()` consumer. Backpressure is
349    /// naturally bounded — the rx_loop both produces here and runs the
350    /// same runtime that schedules the consumer, so a stalled consumer
351    /// stalls production too.
352    pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
353    /// Clone of the event_tx exposed for in-process loopback (e.g.
354    /// `FipsEndpoint::send` to self_npub). Lets the endpoint inject an
355    /// event into the same queue without going through the encrypt /
356    /// decrypt path, while keeping every consumer reading from a single
357    /// channel.
358    pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
359}
360
361fn endpoint_data_command_capacity(requested: usize) -> usize {
362    if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
363        && let Ok(value) = raw.trim().parse::<usize>()
364        && value > 0
365    {
366        return value;
367    }
368
369    requested.max(1).max(32_768)
370}
371
372/// Commands accepted by the node endpoint data service.
373#[derive(Debug)]
374pub(crate) enum NodeEndpointCommand {
375    /// Send with an explicit response channel — used by callers that
376    /// care whether the local-stack handoff succeeded (e.g.
377    /// `blocking_send` waits for the runtime to accept the send).
378    Send {
379        remote: PeerIdentity,
380        payload: Vec<u8>,
381        queued_at: Option<std::time::Instant>,
382        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
383    },
384    /// **Fire-and-forget** variant of `Send` — no oneshot allocation,
385    /// no per-packet result channel. Used by the data-plane fast path
386    /// (`FipsEndpoint::send`) where the caller already discards the
387    /// result. Saves one oneshot::channel() allocation per outbound
388    /// packet on the application's send hot path.
389    SendOneway {
390        remote: PeerIdentity,
391        payload: Vec<u8>,
392        queued_at: Option<std::time::Instant>,
393    },
394    PeerSnapshot {
395        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
396    },
397    RelaySnapshot {
398        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
399    },
400    UpdateRelays {
401        advert_relays: Vec<String>,
402        dm_relays: Vec<String>,
403        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
404    },
405    /// Replace the runtime peer list. Newly added auto-connect peers get
406    /// `initiate_peer_connection` immediately; removed peers are dropped
407    /// from the retry queue (the regular liveness timeout reaps any active
408    /// session). Existing entries are kept and their `addresses` field is
409    /// refreshed so the next retry sees the latest hints.
410    UpdatePeers {
411        peers: Vec<crate::config::PeerConfig>,
412        response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
413    },
414}
415
416/// Reports what changed in response to `UpdatePeers`.
417#[derive(Debug, Clone, Default, PartialEq, Eq)]
418pub(crate) struct UpdatePeersOutcome {
419    pub(crate) added: usize,
420    pub(crate) removed: usize,
421    pub(crate) updated: usize,
422    pub(crate) unchanged: usize,
423}
424
425/// Endpoint data events emitted by the node session receive path.
426#[derive(Debug)]
427pub(crate) enum NodeEndpointEvent {
428    Data {
429        source_node_addr: NodeAddr,
430        source_npub: Option<String>,
431        payload: Vec<u8>,
432        queued_at: Option<std::time::Instant>,
433    },
434}
435
436/// Authenticated peer state exposed to embedded endpoint callers.
437#[derive(Debug, Clone, PartialEq, Eq)]
438pub(crate) struct NodeEndpointPeer {
439    pub(crate) npub: String,
440    pub(crate) connected: bool,
441    pub(crate) transport_addr: Option<String>,
442    pub(crate) transport_type: Option<String>,
443    pub(crate) link_id: u64,
444    pub(crate) srtt_ms: Option<u64>,
445    pub(crate) packets_sent: u64,
446    pub(crate) packets_recv: u64,
447    pub(crate) bytes_sent: u64,
448    pub(crate) bytes_recv: u64,
449    pub(crate) direct_probe_pending: bool,
450    pub(crate) direct_probe_after_ms: Option<u64>,
451}
452
453/// Live Nostr relay state exposed to embedded endpoint callers.
454#[derive(Debug, Clone, PartialEq, Eq)]
455pub(crate) struct NodeEndpointRelayStatus {
456    pub(crate) url: String,
457    pub(crate) status: String,
458}
459
460/// Node operational state.
461#[derive(Clone, Copy, Debug, PartialEq, Eq)]
462pub enum NodeState {
463    /// Created but not started.
464    Created,
465    /// Starting up (initializing transports).
466    Starting,
467    /// Fully operational.
468    Running,
469    /// Shutting down.
470    Stopping,
471    /// Stopped.
472    Stopped,
473}
474
475impl NodeState {
476    /// Check if node is operational.
477    pub fn is_operational(&self) -> bool {
478        matches!(self, NodeState::Running)
479    }
480
481    /// Check if node can be started.
482    pub fn can_start(&self) -> bool {
483        matches!(self, NodeState::Created | NodeState::Stopped)
484    }
485
486    /// Check if node can be stopped.
487    pub fn can_stop(&self) -> bool {
488        matches!(self, NodeState::Running)
489    }
490}
491
492impl fmt::Display for NodeState {
493    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
494        let s = match self {
495            NodeState::Created => "created",
496            NodeState::Starting => "starting",
497            NodeState::Running => "running",
498            NodeState::Stopping => "stopping",
499            NodeState::Stopped => "stopped",
500        };
501        write!(f, "{}", s)
502    }
503}
504
505/// Recent request tracking for dedup and reverse-path forwarding.
506///
507/// When a LookupRequest is forwarded through a node, the node stores the
508/// request_id and which peer sent it. When the corresponding LookupResponse
509/// arrives, it's forwarded back to that peer (reverse-path forwarding).
510/// The `response_forwarded` flag prevents response routing loops.
511#[derive(Clone, Debug)]
512pub(crate) struct RecentRequest {
513    /// The peer who sent this request to us.
514    pub(crate) from_peer: NodeAddr,
515    /// When we received this request (Unix milliseconds).
516    pub(crate) timestamp_ms: u64,
517    /// Whether we've already forwarded a response for this request.
518    /// Prevents response routing loops when convergent request paths
519    /// create bidirectional entries in recent_requests.
520    pub(crate) response_forwarded: bool,
521}
522
523impl RecentRequest {
524    pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
525        Self {
526            from_peer,
527            timestamp_ms,
528            response_forwarded: false,
529        }
530    }
531
532    /// Check if this entry has expired (older than expiry_ms).
533    pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
534        current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
535    }
536}
537
538/// Key for addr_to_link reverse lookup.
539type AddrKey = (TransportId, TransportAddr);
540
541/// Per-transport kernel drop tracking for congestion detection.
542///
543/// Sampled every tick (1s). The `dropping` flag indicates whether new
544/// kernel drops were observed since the previous sample.
545#[derive(Debug, Default)]
546struct TransportDropState {
547    /// Previous `recv_drops` sample (cumulative counter).
548    prev_drops: u64,
549    /// True if drops increased since the last sample.
550    dropping: bool,
551}
552
553/// State for a link waiting for transport-level connection establishment.
554///
555/// For connection-oriented transports (TCP, Tor), the transport connect runs
556/// asynchronously. This struct holds the data needed to complete the handshake
557/// once the connection is ready.
558struct PendingConnect {
559    /// The link that was created for this connection.
560    link_id: LinkId,
561    /// Which transport is being used.
562    transport_id: TransportId,
563    /// The remote address being connected to.
564    remote_addr: TransportAddr,
565    /// The peer identity (for handshake initiation).
566    peer_identity: PeerIdentity,
567}
568
569/// A running FIPS node instance.
570///
571/// This is the top-level container holding all node state.
572///
573/// ## Peer Lifecycle
574///
575/// Peers go through two phases:
576/// 1. **Connection phase** (`connections`): Handshake in progress, indexed by LinkId
577/// 2. **Active phase** (`peers`): Authenticated, indexed by NodeAddr
578///
579/// The `addr_to_link` map enables dispatching incoming packets to the right
580/// connection before authentication completes.
581// Discovery lookup constants moved to config: node.discovery.attempt_timeouts_secs, node.discovery.ttl
582pub struct Node {
583    // === Identity ===
584    /// This node's cryptographic identity.
585    identity: Identity,
586
587    /// Random epoch generated at startup for peer restart detection.
588    /// Exchanged inside Noise handshake messages so peers can detect restarts.
589    startup_epoch: [u8; 8],
590
591    /// Instant when the node was created, for uptime reporting.
592    started_at: std::time::Instant,
593
594    // === Configuration ===
595    /// Loaded configuration.
596    config: Config,
597
598    // === State ===
599    /// Node operational state.
600    state: NodeState,
601
602    /// Whether this is a leaf-only node.
603    is_leaf_only: bool,
604
605    // === Spanning Tree ===
606    /// Local spanning tree state.
607    tree_state: TreeState,
608
609    // === Bloom Filter ===
610    /// Local Bloom filter state.
611    bloom_state: BloomState,
612
613    // === Routing ===
614    /// Address -> coordinates cache (from session setup and discovery).
615    coord_cache: CoordCache,
616    /// Locally learned reverse-path next-hop hints.
617    learned_routes: LearnedRouteTable,
618    /// Destinations whose direct first-hop path is temporarily suspect because
619    /// session-layer MMP observed sustained loss while using that direct path.
620    session_direct_degraded_until_ms: HashMap<NodeAddr, u64>,
621    /// Recent discovery requests (dedup + reverse-path forwarding).
622    /// Maps request_id → RecentRequest.
623    recent_requests: HashMap<u64, RecentRequest>,
624    /// Per-destination path MTU lookup, keyed by FipsAddress (mirrors
625    /// `coord_cache.entries[*].path_mtu`). Sync read-only access from
626    /// the TUN reader/writer threads at TCP MSS clamp time so the
627    /// SYN/SYN-ACK clamp can use the smaller of the local-egress floor
628    /// and the learned per-destination path MTU.
629    path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
630
631    // === Transports & Links ===
632    /// Active transports (owned by Node).
633    transports: HashMap<TransportId, TransportHandle>,
634    /// Per-transport kernel drop tracking for congestion detection.
635    transport_drops: HashMap<TransportId, TransportDropState>,
636    /// Active links.
637    links: HashMap<LinkId, Link>,
638    /// Reverse lookup: (transport_id, remote_addr) -> link_id.
639    addr_to_link: HashMap<AddrKey, LinkId>,
640
641    // === Packet Channel ===
642    /// Packet sender for transports.
643    packet_tx: Option<PacketTx>,
644    /// Packet receiver (for event loop).
645    packet_rx: Option<PacketRx>,
646
647    // === Connections (Handshake Phase) ===
648    /// Pending connections (handshake in progress).
649    /// Indexed by LinkId since we don't know the peer's identity yet.
650    connections: HashMap<LinkId, PeerConnection>,
651
652    // === Peers (Active Phase) ===
653    /// Authenticated peers.
654    /// Indexed by NodeAddr (verified identity).
655    peers: HashMap<NodeAddr, ActivePeer>,
656
657    // === End-to-End Sessions ===
658    /// Session table for end-to-end encrypted sessions.
659    /// Keyed by remote NodeAddr.
660    sessions: HashMap<NodeAddr, SessionEntry>,
661
662    // === Identity Cache ===
663    /// Maps FipsAddress prefix bytes (bytes 1-15) to cached peer identity data.
664    /// Enables reverse lookup from IPv6 destination to session/routing identity.
665    identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
666
667    // === Pending TUN Packets ===
668    /// Packets queued while waiting for session establishment.
669    /// Keyed by destination NodeAddr, bounded per-dest and total.
670    pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
671    /// Endpoint data payloads queued while waiting for session establishment.
672    pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
673    // === Pending Discovery Lookups ===
674    /// Tracks in-flight discovery lookups. Maps target NodeAddr to the
675    /// initiation timestamp (Unix ms). Prevents duplicate flood queries.
676    pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
677
678    // === Resource Limits ===
679    /// Maximum connections (0 = unlimited).
680    max_connections: usize,
681    /// Maximum peers (0 = unlimited).
682    max_peers: usize,
683    /// Maximum links (0 = unlimited).
684    max_links: usize,
685
686    // === Counters ===
687    /// Next link ID to allocate.
688    next_link_id: u64,
689    /// Next transport ID to allocate.
690    next_transport_id: u32,
691
692    // === Node Statistics ===
693    /// Routing, forwarding, discovery, and error signal counters.
694    stats: stats::NodeStats,
695
696    /// Time-series history of node-level metrics (1s/1m rings).
697    stats_history: stats_history::StatsHistory,
698
699    // === TUN Interface ===
700    /// TUN device state.
701    tun_state: TunState,
702    /// TUN interface name (for cleanup).
703    tun_name: Option<String>,
704    /// TUN packet sender channel.
705    tun_tx: Option<TunTx>,
706    /// Receiver for outbound packets from the TUN reader.
707    tun_outbound_rx: Option<TunOutboundRx>,
708    /// App-owned packet sink used by embedded/no-TUN integrations.
709    external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
710    /// Endpoint data command receiver used by embedded/no-daemon integrations.
711    endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
712    /// Endpoint data event sink used by embedded/no-daemon integrations.
713    endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
714    /// Off-task FMP-encrypt + UDP-send worker pool. `None` if not yet
715    /// spawned (set up in `start()` once transports are running).
716    /// `Some(pool)` once available; the pool internally holds
717    /// per-worker mpsc senders and round-robins jobs across them.
718    /// See `node::encrypt_worker` for the rationale and layout.
719    encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
720    /// Off-task FMP + FSP decrypt + delivery worker pool. Mirror of
721    /// `encrypt_workers` for the receive side.
722    decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
723    /// Set of sessions that have been registered with the decrypt
724    /// shard worker pool. Used by rx_loop to decide between fast-path
725    /// dispatch (worker owns the session) and legacy in-place decrypt
726    /// (worker doesn't have it yet). Per the data-plane restructure,
727    /// the worker owns its session state directly — there's no shared
728    /// `Arc<RwLock<HashMap>>` of cipher / replay state anymore, only
729    /// this set tracks **whether** the worker has been told about a
730    /// given session.
731    decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
732    /// Fallback channel: decrypt worker bounces non-fast-path packets
733    /// (anything that's not bulk EndpointData) back here for rx_loop
734    /// to handle via the legacy path. Drained by a new rx_loop arm.
735    decrypt_fallback_rx:
736        Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
737    decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
738    /// TUN reader thread handle.
739    tun_reader_handle: Option<JoinHandle<()>>,
740    /// TUN writer thread handle.
741    tun_writer_handle: Option<JoinHandle<()>>,
742    /// Shutdown pipe: writing to this fd unblocks the TUN reader thread on macOS.
743    /// On Linux, deleting the interface via netlink serves the same purpose.
744    #[cfg(target_os = "macos")]
745    tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
746
747    // === DNS Responder ===
748    /// Receiver for resolved identities from the DNS responder.
749    dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
750    /// DNS responder task handle.
751    dns_task: Option<tokio::task::JoinHandle<()>>,
752
753    // === Index-Based Session Dispatch ===
754    /// Allocator for session indices.
755    index_allocator: IndexAllocator,
756    /// O(1) lookup: (transport_id, our_index) → NodeAddr.
757    /// This maps our session index to the peer that uses it.
758    peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
759    /// Pending outbound handshakes by our sender_idx.
760    /// Tracks which LinkId corresponds to which session index.
761    pending_outbound: HashMap<(TransportId, u32), LinkId>,
762
763    // === Rate Limiting ===
764    /// Rate limiter for msg1 processing (DoS protection).
765    msg1_rate_limiter: HandshakeRateLimiter,
766    /// Rate limiter for ICMP Packet Too Big messages.
767    icmp_rate_limiter: IcmpRateLimiter,
768    /// Rate limiter for routing error signals (CoordsRequired / PathBroken).
769    routing_error_rate_limiter: RoutingErrorRateLimiter,
770    /// Rate limiter for source-side CoordsRequired/PathBroken responses.
771    coords_response_rate_limiter: RoutingErrorRateLimiter,
772    /// Backoff for failed discovery lookups (originator-side).
773    discovery_backoff: DiscoveryBackoff,
774    /// Rate limiter for forwarded discovery requests (transit-side).
775    discovery_forward_limiter: DiscoveryForwardRateLimiter,
776
777    // === Pending Transport Connects ===
778    /// Links waiting for transport-level connection establishment before
779    /// sending handshake msg1. For connection-oriented transports (TCP, Tor),
780    /// the transport connect runs in the background; the tick handler polls
781    /// connection_state() and initiates the handshake when connected.
782    pending_connects: Vec<PendingConnect>,
783
784    // === Connection Retry ===
785    /// Retry state for peers whose outbound connections have failed.
786    /// Keyed by NodeAddr. Entries are created when a handshake times out
787    /// or fails, and removed on successful promotion or when max retries
788    /// are exhausted.
789    retry_pending: HashMap<NodeAddr, retry::RetryState>,
790
791    /// Optional Nostr/STUN overlay discovery coordinator for `udp:nat` peers.
792    nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
793    /// mDNS / DNS-SD responder + browser for local-link peer discovery.
794    /// Identity is unverified at this layer — the Noise XX handshake
795    /// initiated against an mDNS-observed endpoint is what proves the
796    /// peer holds the matching private key.
797    lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
798    /// Same-host JSON registry under `~/.fips/instances`. Records are
799    /// loopback routing hints only; peer identity is still verified by the
800    /// Noise handshake.
801    local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
802    local_instance_started_at_ms: Option<u64>,
803    last_local_instance_publish_ms: Option<u64>,
804    last_local_instance_scan_ms: Option<u64>,
805    /// Wall-clock ms when Nostr discovery successfully started, used to
806    /// schedule the one-shot startup advert sweep after a settle delay.
807    /// `None` until discovery comes up; remains `None` if discovery is
808    /// disabled or failed to start.
809    nostr_discovery_started_at_ms: Option<u64>,
810    /// Whether the one-shot startup advert sweep has run. Set to true
811    /// after the first sweep fires (under `policy: open`); thereafter
812    /// only the per-tick `queue_open_discovery_retries` continues.
813    startup_open_discovery_sweep_done: bool,
814    /// Per-peer UDP transports adopted from NAT traversal handoff.
815    bootstrap_transports: HashSet<TransportId>,
816    /// Originating peer npub (bech32) for each adopted bootstrap
817    /// transport, captured at `adopt_established_traversal` time.
818    /// Populated alongside `bootstrap_transports`; cleared in
819    /// `cleanup_bootstrap_transport_if_unused`. Used by the rx loop to
820    /// route fatal-protocol-mismatch observations back to the
821    /// Nostr-discovery `failure_state` for long cooldown application.
822    bootstrap_transport_npubs: HashMap<TransportId, String>,
823    /// Peers that should not be used as reply-learned fallback transit for
824    /// other destinations. Direct lookups to the peer are still permitted.
825    discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
826
827    // === Periodic Parent Re-evaluation ===
828    /// Timestamp of last periodic parent re-evaluation (for pacing).
829    last_parent_reeval: Option<crate::time::Instant>,
830
831    // === Congestion Logging ===
832    /// Timestamp of last congestion detection log (rate-limited to 5s).
833    last_congestion_log: Option<std::time::Instant>,
834
835    // === Mesh Size Estimate ===
836    /// Cached estimated mesh size (computed once per tick from bloom filters).
837    estimated_mesh_size: Option<u64>,
838    /// Timestamp of last mesh size log emission.
839    last_mesh_size_log: Option<std::time::Instant>,
840
841    // === Bloom Self-Plausibility ===
842    /// Rate-limit state for the self-plausibility WARN. Fires at most
843    /// once per 60s globally when our own outgoing FilterAnnounce has
844    /// an FPR above `node.bloom.max_inbound_fpr`, signalling either
845    /// aggregation drift or an ingress bypass.
846    last_self_warn: Option<std::time::Instant>,
847
848    // === Local Outbound Liveness ===
849    /// Set per peer when a `transport.send` returned a local-side io error
850    /// (`NetworkUnreachable` / `HostUnreachable` / `AddrNotAvailable`),
851    /// cleared on the next successful send to that peer. Used by
852    /// `check_link_heartbeats` to compress only that peer's dead-timeout to
853    /// `fast_link_dead_timeout_secs` while its outbound is observed broken.
854    local_send_failure_at_by_peer: HashMap<NodeAddr, std::time::Instant>,
855    /// Set when the rx loop could not complete its 1s maintenance work
856    /// inside the watchdog timeout. Link-dead detection may be valid during
857    /// overload, but traversal cooldown should not punish a path just because
858    /// our own scheduler/worker queue was late.
859    last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
860
861    // === Display Names ===
862    /// Human-readable names for configured peers (alias or short npub).
863    /// Populated at startup from peer config.
864    peer_aliases: HashMap<NodeAddr, String>,
865    /// Scheduler weight for explicitly configured peers. Built when config
866    /// changes so the packet hot path only does a NodeAddr hash lookup.
867    configured_peer_send_weights: HashMap<NodeAddr, u8>,
868
869    /// Reloadable peer ACL state from standard allow/deny files.
870    peer_acl: acl::PeerAclReloader,
871
872    // === Host Map ===
873    /// Static hostname → npub mapping for DNS resolution.
874    /// Built at construction from peer aliases and /etc/fips/hosts.
875    host_map: Arc<HostMap>,
876}
877
878impl Node {
879    /// Create a new node from configuration.
880    pub fn new(config: Config) -> Result<Self, NodeError> {
881        config.validate()?;
882        let identity = config.create_identity()?;
883        let node_addr = *identity.node_addr();
884        let is_leaf_only = config.is_leaf_only();
885
886        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
887        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
888
889        let mut startup_epoch = [0u8; 8];
890        rand::rng().fill_bytes(&mut startup_epoch);
891
892        let mut bloom_state = if is_leaf_only {
893            BloomState::leaf_only(node_addr)
894        } else {
895            BloomState::new(node_addr)
896        };
897        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
898
899        let tun_state = if config.tun.enabled {
900            TunState::Configured
901        } else {
902            TunState::Disabled
903        };
904
905        // Initialize tree state with signed self-declaration
906        let mut tree_state = TreeState::new(node_addr);
907        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
908        tree_state.set_hold_down(config.node.tree.hold_down_secs);
909        tree_state.set_flap_dampening(
910            config.node.tree.flap_threshold,
911            config.node.tree.flap_window_secs,
912            config.node.tree.flap_dampening_secs,
913        );
914        tree_state
915            .sign_declaration(&identity)
916            .expect("signing own declaration should never fail");
917
918        let coord_cache = CoordCache::new(
919            config.node.cache.coord_size,
920            config.node.cache.coord_ttl_secs * 1000,
921        );
922        let rl = &config.node.rate_limit;
923        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
924            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
925            config.node.limits.max_pending_inbound,
926        );
927
928        let max_connections = config.node.limits.max_connections;
929        let max_peers = config.node.limits.max_peers;
930        let max_links = config.node.limits.max_links;
931        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
932        let backoff_base_secs = config.node.discovery.backoff_base_secs;
933        let backoff_max_secs = config.node.discovery.backoff_max_secs;
934        let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
935
936        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
937        let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
938
939        Ok(Self {
940            identity,
941            startup_epoch,
942            started_at: std::time::Instant::now(),
943            config,
944            state: NodeState::Created,
945            is_leaf_only,
946            tree_state,
947            bloom_state,
948            coord_cache,
949            learned_routes: LearnedRouteTable::default(),
950            session_direct_degraded_until_ms: HashMap::new(),
951            recent_requests: HashMap::new(),
952            transports: HashMap::new(),
953            transport_drops: HashMap::new(),
954            links: HashMap::new(),
955            addr_to_link: HashMap::new(),
956            packet_tx: None,
957            packet_rx: None,
958            connections: HashMap::new(),
959            peers: HashMap::new(),
960            sessions: HashMap::new(),
961            identity_cache: HashMap::new(),
962            pending_tun_packets: HashMap::new(),
963            pending_endpoint_data: HashMap::new(),
964            pending_lookups: HashMap::new(),
965            max_connections,
966            max_peers,
967            max_links,
968            next_link_id: 1,
969            next_transport_id: 1,
970            stats: stats::NodeStats::new(),
971            stats_history: stats_history::StatsHistory::new(),
972            tun_state,
973            tun_name: None,
974            tun_tx: None,
975            tun_outbound_rx: None,
976            external_packet_tx: None,
977            endpoint_command_rx: None,
978            endpoint_event_tx: None,
979            encrypt_workers: None,
980            decrypt_workers: None,
981            decrypt_registered_sessions: std::collections::HashSet::new(),
982            decrypt_fallback_tx,
983            decrypt_fallback_rx,
984            tun_reader_handle: None,
985            tun_writer_handle: None,
986            #[cfg(target_os = "macos")]
987            tun_shutdown_fd: None,
988            dns_identity_rx: None,
989            dns_task: None,
990            index_allocator: IndexAllocator::new(),
991            peers_by_index: HashMap::new(),
992            pending_outbound: HashMap::new(),
993            msg1_rate_limiter,
994            icmp_rate_limiter: IcmpRateLimiter::new(),
995            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
996            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
997                std::time::Duration::from_millis(coords_response_interval_ms),
998            ),
999            discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
1000            discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
1001                std::time::Duration::from_secs(forward_min_interval_secs),
1002            ),
1003            pending_connects: Vec::new(),
1004            retry_pending: HashMap::new(),
1005            nostr_discovery: None,
1006            nostr_discovery_started_at_ms: None,
1007            lan_discovery: None,
1008            local_instance_registry: None,
1009            local_instance_started_at_ms: None,
1010            last_local_instance_publish_ms: None,
1011            last_local_instance_scan_ms: None,
1012            startup_open_discovery_sweep_done: false,
1013            bootstrap_transports: HashSet::new(),
1014            bootstrap_transport_npubs: HashMap::new(),
1015            discovery_fallback_transit_blocked_peers: HashSet::new(),
1016            last_parent_reeval: None,
1017            last_congestion_log: None,
1018            estimated_mesh_size: None,
1019            last_mesh_size_log: None,
1020            last_self_warn: None,
1021            local_send_failure_at_by_peer: HashMap::new(),
1022            last_rx_loop_maintenance_timeout_at: None,
1023            peer_aliases: HashMap::new(),
1024            configured_peer_send_weights,
1025            peer_acl,
1026            host_map,
1027            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1028        })
1029    }
1030
1031    /// Create a node with a specific identity.
1032    ///
1033    /// This constructor validates cross-field config invariants before
1034    /// constructing the node, same as [`Node::new`].
1035    pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
1036        config.validate()?;
1037        let node_addr = *identity.node_addr();
1038
1039        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
1040        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
1041
1042        let mut startup_epoch = [0u8; 8];
1043        rand::rng().fill_bytes(&mut startup_epoch);
1044
1045        let tun_state = if config.tun.enabled {
1046            TunState::Configured
1047        } else {
1048            TunState::Disabled
1049        };
1050
1051        // Initialize tree state with signed self-declaration
1052        let mut tree_state = TreeState::new(node_addr);
1053        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
1054        tree_state.set_hold_down(config.node.tree.hold_down_secs);
1055        tree_state.set_flap_dampening(
1056            config.node.tree.flap_threshold,
1057            config.node.tree.flap_window_secs,
1058            config.node.tree.flap_dampening_secs,
1059        );
1060        tree_state
1061            .sign_declaration(&identity)
1062            .expect("signing own declaration should never fail");
1063
1064        let mut bloom_state = BloomState::new(node_addr);
1065        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
1066
1067        let coord_cache = CoordCache::new(
1068            config.node.cache.coord_size,
1069            config.node.cache.coord_ttl_secs * 1000,
1070        );
1071        let rl = &config.node.rate_limit;
1072        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
1073            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
1074            config.node.limits.max_pending_inbound,
1075        );
1076
1077        let max_connections = config.node.limits.max_connections;
1078        let max_peers = config.node.limits.max_peers;
1079        let max_links = config.node.limits.max_links;
1080        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1081
1082        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1083        let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1084
1085        Ok(Self {
1086            identity,
1087            startup_epoch,
1088            started_at: std::time::Instant::now(),
1089            config,
1090            state: NodeState::Created,
1091            is_leaf_only: false,
1092            tree_state,
1093            bloom_state,
1094            coord_cache,
1095            learned_routes: LearnedRouteTable::default(),
1096            session_direct_degraded_until_ms: HashMap::new(),
1097            recent_requests: HashMap::new(),
1098            transports: HashMap::new(),
1099            transport_drops: HashMap::new(),
1100            links: HashMap::new(),
1101            addr_to_link: HashMap::new(),
1102            packet_tx: None,
1103            packet_rx: None,
1104            connections: HashMap::new(),
1105            peers: HashMap::new(),
1106            sessions: HashMap::new(),
1107            identity_cache: HashMap::new(),
1108            pending_tun_packets: HashMap::new(),
1109            pending_endpoint_data: HashMap::new(),
1110            pending_lookups: HashMap::new(),
1111            max_connections,
1112            max_peers,
1113            max_links,
1114            next_link_id: 1,
1115            next_transport_id: 1,
1116            stats: stats::NodeStats::new(),
1117            stats_history: stats_history::StatsHistory::new(),
1118            tun_state,
1119            tun_name: None,
1120            tun_tx: None,
1121            tun_outbound_rx: None,
1122            external_packet_tx: None,
1123            endpoint_command_rx: None,
1124            endpoint_event_tx: None,
1125            encrypt_workers: None,
1126            decrypt_workers: None,
1127            decrypt_registered_sessions: std::collections::HashSet::new(),
1128            decrypt_fallback_tx,
1129            decrypt_fallback_rx,
1130            tun_reader_handle: None,
1131            tun_writer_handle: None,
1132            #[cfg(target_os = "macos")]
1133            tun_shutdown_fd: None,
1134            dns_identity_rx: None,
1135            dns_task: None,
1136            index_allocator: IndexAllocator::new(),
1137            peers_by_index: HashMap::new(),
1138            pending_outbound: HashMap::new(),
1139            msg1_rate_limiter,
1140            icmp_rate_limiter: IcmpRateLimiter::new(),
1141            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1142            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1143                std::time::Duration::from_millis(coords_response_interval_ms),
1144            ),
1145            discovery_backoff: DiscoveryBackoff::new(),
1146            discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1147            pending_connects: Vec::new(),
1148            retry_pending: HashMap::new(),
1149            nostr_discovery: None,
1150            nostr_discovery_started_at_ms: None,
1151            lan_discovery: None,
1152            local_instance_registry: None,
1153            local_instance_started_at_ms: None,
1154            last_local_instance_publish_ms: None,
1155            last_local_instance_scan_ms: None,
1156            startup_open_discovery_sweep_done: false,
1157            bootstrap_transports: HashSet::new(),
1158            bootstrap_transport_npubs: HashMap::new(),
1159            discovery_fallback_transit_blocked_peers: HashSet::new(),
1160            last_parent_reeval: None,
1161            last_congestion_log: None,
1162            estimated_mesh_size: None,
1163            last_mesh_size_log: None,
1164            last_self_warn: None,
1165            local_send_failure_at_by_peer: HashMap::new(),
1166            last_rx_loop_maintenance_timeout_at: None,
1167            peer_aliases: HashMap::new(),
1168            configured_peer_send_weights,
1169            peer_acl,
1170            host_map,
1171            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1172        })
1173    }
1174
1175    /// Create a leaf-only node (simplified state).
1176    pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1177        let mut node = Self::new(config)?;
1178        node.is_leaf_only = true;
1179        node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1180        Ok(node)
1181    }
1182
1183    fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1184        let base_host_map = HostMap::from_peer_configs(config.peers());
1185        if !config.node.system_files_enabled {
1186            return (
1187                Arc::new(base_host_map.clone()),
1188                acl::PeerAclReloader::memory_only(base_host_map),
1189            );
1190        }
1191
1192        let mut host_map = base_host_map.clone();
1193        let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1194        let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1195            crate::upper::hosts::DEFAULT_HOSTS_PATH,
1196        ));
1197        host_map.merge(hosts_file);
1198        let peer_acl = acl::PeerAclReloader::with_alias_sources(
1199            std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1200            std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1201            base_host_map,
1202            hosts_path,
1203        );
1204        (Arc::new(host_map), peer_acl)
1205    }
1206
1207    fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1208        config
1209            .peers()
1210            .iter()
1211            .filter_map(|peer| {
1212                PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1213                    (
1214                        *identity.node_addr(),
1215                        encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1216                    )
1217                })
1218            })
1219            .collect()
1220    }
1221
1222    #[cfg(unix)]
1223    fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1224        self.configured_peer_send_weights
1225            .get(peer_addr)
1226            .copied()
1227            .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1228    }
1229
1230    /// Create transport instances from configuration.
1231    ///
1232    /// Returns a vector of TransportHandles for all configured transports.
1233    async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1234        let mut transports = Vec::new();
1235
1236        // Collect UDP configs with optional names to avoid borrow conflicts
1237        let udp_instances: Vec<_> = self
1238            .config
1239            .transports
1240            .udp
1241            .iter()
1242            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1243            .collect();
1244
1245        // Create UDP transport instances
1246        for (name, udp_config) in udp_instances {
1247            let transport_id = self.allocate_transport_id();
1248            let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1249            transports.push(TransportHandle::Udp(udp));
1250        }
1251
1252        #[cfg(feature = "sim-transport")]
1253        {
1254            let sim_instances: Vec<_> = self
1255                .config
1256                .transports
1257                .sim
1258                .iter()
1259                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1260                .collect();
1261
1262            for (name, sim_config) in sim_instances {
1263                let transport_id = self.allocate_transport_id();
1264                let sim = crate::transport::sim::SimTransport::new(
1265                    transport_id,
1266                    name,
1267                    sim_config,
1268                    packet_tx.clone(),
1269                );
1270                transports.push(TransportHandle::Sim(sim));
1271            }
1272        }
1273
1274        // Create Ethernet transport instances where raw-socket support exists.
1275        #[cfg(any(target_os = "linux", target_os = "macos"))]
1276        {
1277            let eth_instances: Vec<_> = self
1278                .config
1279                .transports
1280                .ethernet
1281                .iter()
1282                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1283                .collect();
1284            let xonly = self.identity.pubkey();
1285            for (name, eth_config) in eth_instances {
1286                let mut eth_config = eth_config;
1287                if eth_config.discovery_scope.is_none() {
1288                    eth_config.discovery_scope = self.lan_discovery_scope();
1289                }
1290                let transport_id = self.allocate_transport_id();
1291                let mut eth =
1292                    EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1293                eth.set_local_pubkey(xonly);
1294                transports.push(TransportHandle::Ethernet(eth));
1295            }
1296        }
1297
1298        // Create TCP transport instances
1299        let tcp_instances: Vec<_> = self
1300            .config
1301            .transports
1302            .tcp
1303            .iter()
1304            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1305            .collect();
1306
1307        for (name, tcp_config) in tcp_instances {
1308            let transport_id = self.allocate_transport_id();
1309            let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1310            transports.push(TransportHandle::Tcp(tcp));
1311        }
1312
1313        // Create Tor transport instances
1314        let tor_instances: Vec<_> = self
1315            .config
1316            .transports
1317            .tor
1318            .iter()
1319            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1320            .collect();
1321
1322        for (name, tor_config) in tor_instances {
1323            let transport_id = self.allocate_transport_id();
1324            let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1325            transports.push(TransportHandle::Tor(tor));
1326        }
1327
1328        let webrtc_instances: Vec<_> = self
1329            .config
1330            .transports
1331            .webrtc
1332            .iter()
1333            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1334            .collect();
1335
1336        #[cfg(feature = "webrtc-transport")]
1337        {
1338            for (name, webrtc_config) in webrtc_instances {
1339                let transport_id = self.allocate_transport_id();
1340                match WebRtcTransport::new(
1341                    transport_id,
1342                    name,
1343                    webrtc_config,
1344                    packet_tx.clone(),
1345                    &self.identity,
1346                    &self.config.node.discovery.nostr,
1347                ) {
1348                    Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1349                    Err(err) => {
1350                        warn!(
1351                            transport_id = %transport_id,
1352                            error = %err,
1353                            "failed to initialize WebRTC transport"
1354                        );
1355                    }
1356                }
1357            }
1358        }
1359        #[cfg(not(feature = "webrtc-transport"))]
1360        if !webrtc_instances.is_empty() {
1361            warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1362        }
1363
1364        // Create BLE transport instances
1365        #[cfg(bluer_available)]
1366        {
1367            let ble_instances: Vec<_> = self
1368                .config
1369                .transports
1370                .ble
1371                .iter()
1372                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1373                .collect();
1374
1375            #[cfg(all(bluer_available, not(test)))]
1376            for (name, ble_config) in ble_instances {
1377                let transport_id = self.allocate_transport_id();
1378                let adapter = ble_config.adapter().to_string();
1379                let mtu = ble_config.mtu();
1380                match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1381                    Ok(io) => {
1382                        let mut ble = crate::transport::ble::BleTransport::new(
1383                            transport_id,
1384                            name,
1385                            ble_config,
1386                            io,
1387                            packet_tx.clone(),
1388                        );
1389                        ble.set_local_pubkey(self.identity.pubkey().serialize());
1390                        transports.push(TransportHandle::Ble(ble));
1391                    }
1392                    Err(e) => {
1393                        tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1394                    }
1395                }
1396            }
1397
1398            #[cfg(any(not(bluer_available), test))]
1399            if !ble_instances.is_empty() {
1400                #[cfg(not(test))]
1401                tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1402            }
1403        }
1404
1405        transports
1406    }
1407
1408    /// Find an operational transport that matches the given transport type name.
1409    ///
1410    /// Adopted UDP bootstrap transports are point-to-point sockets handed off
1411    /// from Nostr/STUN traversal. They must not be reused for ordinary
1412    /// `udp host:port` dials discovered through static config, mDNS, or overlay
1413    /// adverts: on macOS a `send_to` through the wrong adopted socket can fail
1414    /// with `EINVAL`, and even on platforms that allow it the packet would use
1415    /// the wrong 5-tuple/NAT mapping. Prefer configured transports and make the
1416    /// choice deterministic by lowest transport id instead of HashMap order.
1417    fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1418        self.transports
1419            .iter()
1420            .filter(|(id, handle)| {
1421                handle.transport_type().name == transport_type
1422                    && handle.is_operational()
1423                    && !self.bootstrap_transports.contains(id)
1424            })
1425            .min_by_key(|(id, _)| id.as_u32())
1426            .map(|(id, _)| *id)
1427    }
1428
1429    /// Resolve an Ethernet peer address ("interface/mac") to a transport ID
1430    /// and binary TransportAddr.
1431    ///
1432    /// Finds the Ethernet transport instance bound to the named interface
1433    /// and parses the MAC portion into a 6-byte TransportAddr.
1434    #[allow(unused_variables)]
1435    fn resolve_ethernet_addr(
1436        &self,
1437        addr_str: &str,
1438    ) -> Result<(TransportId, TransportAddr), NodeError> {
1439        #[cfg(any(target_os = "linux", target_os = "macos"))]
1440        {
1441            let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1442                NodeError::NoTransportForType(format!(
1443                    "invalid Ethernet address format '{}': expected 'interface/mac'",
1444                    addr_str
1445                ))
1446            })?;
1447
1448            // Find the Ethernet transport bound to this interface
1449            let transport_id = self
1450                .transports
1451                .iter()
1452                .find(|(_, handle)| {
1453                    handle.transport_type().name == "ethernet"
1454                        && handle.is_operational()
1455                        && handle.interface_name() == Some(iface)
1456                })
1457                .map(|(id, _)| *id)
1458                .ok_or_else(|| {
1459                    NodeError::NoTransportForType(format!(
1460                        "no operational Ethernet transport for interface '{}'",
1461                        iface
1462                    ))
1463                })?;
1464
1465            let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1466                NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1467            })?;
1468
1469            Ok((transport_id, TransportAddr::from_bytes(&mac)))
1470        }
1471        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1472        {
1473            Err(NodeError::NoTransportForType(
1474                "Ethernet transport is not supported on this platform".to_string(),
1475            ))
1476        }
1477    }
1478
1479    /// Resolve a BLE address string (`"adapter/AA:BB:CC:DD:EE:FF"`) to a
1480    /// (TransportId, TransportAddr) pair by finding the BLE transport
1481    /// instance matching the adapter name.
1482    #[cfg(bluer_available)]
1483    fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1484        let ta = TransportAddr::from_string(addr_str);
1485        let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1486            NodeError::NoTransportForType(format!(
1487                "invalid BLE address format '{}': expected 'adapter/mac'",
1488                addr_str
1489            ))
1490        })?;
1491
1492        // Find the BLE transport for this adapter
1493        let transport_id = self
1494            .transports
1495            .iter()
1496            .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1497            .map(|(id, _)| *id)
1498            .ok_or_else(|| {
1499                NodeError::NoTransportForType(format!(
1500                    "no operational BLE transport for adapter '{}'",
1501                    adapter
1502                ))
1503            })?;
1504
1505        // Validate the address format
1506        crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1507            NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1508        })?;
1509
1510        Ok((transport_id, TransportAddr::from_string(addr_str)))
1511    }
1512
1513    // === Identity Accessors ===
1514
1515    /// Get this node's identity.
1516    pub fn identity(&self) -> &Identity {
1517        &self.identity
1518    }
1519
1520    /// Get this node's NodeAddr.
1521    pub fn node_addr(&self) -> &NodeAddr {
1522        self.identity.node_addr()
1523    }
1524
1525    /// Get this node's npub.
1526    pub fn npub(&self) -> String {
1527        self.identity.npub()
1528    }
1529
1530    /// Return a human-readable display name for a NodeAddr.
1531    ///
1532    /// Lookup order:
1533    /// 1. Host map hostname (from peer aliases + /etc/fips/hosts)
1534    /// 2. Configured peer alias or short npub (from startup map)
1535    /// 3. Active peer's short npub (e.g., inbound peer not in config)
1536    /// 4. Session endpoint's short npub (end-to-end, may not be direct peer)
1537    /// 5. Truncated NodeAddr hex (unknown address)
1538    pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1539        if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1540            return hostname.to_string();
1541        }
1542        if let Some(name) = self.peer_aliases.get(addr) {
1543            return name.clone();
1544        }
1545        if let Some(peer) = self.peers.get(addr) {
1546            return peer.identity().short_npub();
1547        }
1548        if let Some(entry) = self.sessions.get(addr) {
1549            let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1550            return PeerIdentity::from_pubkey(xonly).short_npub();
1551        }
1552        addr.short_hex()
1553    }
1554
1555    /// Tear down a `peers_by_index` entry **and** keep the shard-owned
1556    /// decrypt-worker state coherent: removes the same `cache_key`
1557    /// from the registered-sessions tracking set and tells the
1558    /// assigned shard worker to drop its `OwnedSessionState` entry.
1559    ///
1560    /// Use this instead of a bare `self.peers_by_index.remove(&key)`
1561    /// at every session-lifecycle teardown site (rekey cross-connection
1562    /// swap, peer disconnect, dispatch session-rotation) so the worker
1563    /// doesn't keep stale ciphers / replay windows around. The
1564    /// follow-up `RegisterSession` for the NEW key (if any) will then
1565    /// install the fresh state on the same shard.
1566    pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1567        // Find the peer that owns this index BEFORE removing it from
1568        // the index map, so we can decide whether the deregistration
1569        // also tears down the peer's connected UDP socket.
1570        let owning_peer = self.peers_by_index.get(&cache_key).copied();
1571        self.peers_by_index.remove(&cache_key);
1572        if self.decrypt_registered_sessions.remove(&cache_key)
1573            && let Some(workers) = self.decrypt_workers.as_ref()
1574        {
1575            workers.unregister_session(cache_key);
1576        }
1577        // Tear down the per-peer connected UDP socket *only* if no
1578        // other peers_by_index entry still resolves to this peer.
1579        // Rekey drain calls into this helper with the OLD session
1580        // index while the NEW index is already installed and points
1581        // at the same peer — there the connect()-ed 5-tuple is
1582        // still valid for the new session and we must not close it.
1583        // Peer-teardown sites (CrossConnection swap, stale-index
1584        // fall-through in encrypted.rs, disconnect handler) call
1585        // here when this is the peer's last index, so the connected
1586        // socket goes away with the peer.
1587        if let Some(peer_addr) = owning_peer {
1588            let peer_has_other_index = self
1589                .peers_by_index
1590                .values()
1591                .any(|other| *other == peer_addr);
1592            if !peer_has_other_index {
1593                self.clear_connected_udp_for_peer(&peer_addr);
1594            }
1595        }
1596    }
1597
1598    /// Ensure the current FMP receive index resolves to this peer.
1599    ///
1600    /// Rekey msg1/msg2 handlers pre-register the pending index before
1601    /// cutover, but losing that registration in a debug build used to
1602    /// panic in the cutover path. Repairing the map here is safe: the
1603    /// peer has already promoted the pending session, and the decrypt
1604    /// worker registration immediately after cutover depends on the
1605    /// same `(transport_id, our_index)` key.
1606    pub(in crate::node) fn ensure_current_session_index_registered(
1607        &mut self,
1608        node_addr: &NodeAddr,
1609        context: &'static str,
1610    ) -> bool {
1611        let Some(peer) = self.peers.get(node_addr) else {
1612            return false;
1613        };
1614        let Some(transport_id) = peer.transport_id() else {
1615            warn!(
1616                peer = %self.peer_display_name(node_addr),
1617                context,
1618                "Cannot register current session index without transport id"
1619            );
1620            return false;
1621        };
1622        let Some(our_index) = peer.our_index() else {
1623            warn!(
1624                peer = %self.peer_display_name(node_addr),
1625                context,
1626                "Cannot register current session index without local index"
1627            );
1628            return false;
1629        };
1630
1631        let cache_key = (transport_id, our_index.as_u32());
1632        match self.peers_by_index.get(&cache_key).copied() {
1633            Some(existing) if existing == *node_addr => true,
1634            Some(existing) => {
1635                warn!(
1636                    peer = %self.peer_display_name(node_addr),
1637                    previous_owner = %self.peer_display_name(&existing),
1638                    transport_id = %transport_id,
1639                    our_index = %our_index,
1640                    context,
1641                    "Repairing current session index with stale owner"
1642                );
1643                self.peers_by_index.insert(cache_key, *node_addr);
1644                true
1645            }
1646            None => {
1647                warn!(
1648                    peer = %self.peer_display_name(node_addr),
1649                    transport_id = %transport_id,
1650                    our_index = %our_index,
1651                    context,
1652                    "Repairing missing current session index"
1653                );
1654                self.peers_by_index.insert(cache_key, *node_addr);
1655                true
1656            }
1657        }
1658    }
1659
1660    // === Configuration ===
1661
1662    /// Get the configuration.
1663    pub fn config(&self) -> &Config {
1664        &self.config
1665    }
1666
1667    /// Calculate the effective IPv6 MTU that can be sent over FIPS.
1668    ///
1669    /// Delegates to `upper::icmp::effective_ipv6_mtu()` with this node's
1670    /// transport MTU. Returns the maximum IPv6 packet size (including
1671    /// IPv6 header) that can be transmitted through the FIPS mesh.
1672    pub fn effective_ipv6_mtu(&self) -> u16 {
1673        crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1674    }
1675
1676    /// Get the transport MTU governing the global TUN-boundary MSS clamp.
1677    ///
1678    /// Returns the **minimum** MTU across all operational transports, or
1679    /// 1280 (IPv6 minimum) as fallback. Used for initial TUN configuration
1680    /// where a specific egress transport isn't yet known: the resulting
1681    /// `effective_ipv6_mtu` (transport_mtu - 77) and `max_mss`
1682    /// (effective_mtu - 60) form a conservative ceiling that fits ANY
1683    /// configured-transport's egress, eliminating PMTU-D black holes that
1684    /// would otherwise occur when a flow's actual egress is smaller than
1685    /// the clamp ceiling assumed at TUN init.
1686    ///
1687    /// Returning the smallest (rather than the first-iterated, which used
1688    /// to vary across HashMap iteration order + async-startup race) makes
1689    /// the clamp deterministic across daemon restarts.
1690    ///
1691    /// See `ISSUE-2026-0011` for the empirical investigation.
1692    pub fn transport_mtu(&self) -> u16 {
1693        let min_operational = self
1694            .transports
1695            .values()
1696            .filter(|h| h.is_operational())
1697            .map(|h| h.mtu())
1698            .min();
1699        if let Some(mtu) = min_operational {
1700            return mtu;
1701        }
1702        // Fallback to config: try UDP first, then Ethernet
1703        if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1704            return cfg.mtu();
1705        }
1706        1280
1707    }
1708
1709    // === State ===
1710
1711    /// Get the node state.
1712    pub fn state(&self) -> NodeState {
1713        self.state
1714    }
1715
1716    /// Get the node uptime.
1717    pub fn uptime(&self) -> std::time::Duration {
1718        self.started_at.elapsed()
1719    }
1720
1721    /// Check if node is operational.
1722    pub fn is_running(&self) -> bool {
1723        self.state.is_operational()
1724    }
1725
1726    /// Check if this is a leaf-only node.
1727    pub fn is_leaf_only(&self) -> bool {
1728        self.is_leaf_only
1729    }
1730
1731    // === Tree State ===
1732
1733    /// Get the tree state.
1734    pub fn tree_state(&self) -> &TreeState {
1735        &self.tree_state
1736    }
1737
1738    /// Get mutable tree state.
1739    pub fn tree_state_mut(&mut self) -> &mut TreeState {
1740        &mut self.tree_state
1741    }
1742
1743    // === Bloom State ===
1744
1745    /// Get the Bloom filter state.
1746    pub fn bloom_state(&self) -> &BloomState {
1747        &self.bloom_state
1748    }
1749
1750    /// Get mutable Bloom filter state.
1751    pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1752        &mut self.bloom_state
1753    }
1754
1755    // === Mesh Size Estimate ===
1756
1757    /// Get the cached estimated mesh size.
1758    pub fn estimated_mesh_size(&self) -> Option<u64> {
1759        self.estimated_mesh_size
1760    }
1761
1762    /// Compute and cache the estimated mesh size from bloom filters.
1763    ///
1764    /// Uses the spanning tree partition: parent's filter covers nodes reachable
1765    /// upward, children's filters cover disjoint subtrees downward. The sum
1766    /// of estimated entry counts plus one (self) approximates total network size.
1767    pub(crate) fn compute_mesh_size(&mut self) {
1768        let my_addr = *self.tree_state.my_node_addr();
1769        let parent_id = *self.tree_state.my_declaration().parent_id();
1770        let is_root = self.tree_state.is_root();
1771
1772        let max_fpr = self.config.node.bloom.max_inbound_fpr;
1773        let mut total: f64 = 1.0; // count self
1774        let mut child_count: u32 = 0;
1775        let mut has_data = false;
1776
1777        // Parent's filter: nodes reachable upward through the tree.
1778        // If any contributing filter is above the FPR cap, we refuse to
1779        // estimate rather than substitute a partial/biased aggregate —
1780        // Node.estimated_mesh_size is already Option<u64> and consumers
1781        // (control socket, fipstop, periodic debug log) handle None.
1782        if !is_root
1783            && let Some(parent) = self.peers.get(&parent_id)
1784            && let Some(filter) = parent.inbound_filter()
1785        {
1786            match filter.estimated_count(max_fpr) {
1787                Some(n) => {
1788                    total += n;
1789                    has_data = true;
1790                }
1791                None => {
1792                    self.estimated_mesh_size = None;
1793                    return;
1794                }
1795            }
1796        }
1797
1798        // Children's filters: each child's subtree is disjoint
1799        for (peer_addr, peer) in &self.peers {
1800            if peer_addr == &parent_id {
1801                continue;
1802            }
1803            if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1804                && *decl.parent_id() == my_addr
1805            {
1806                child_count += 1;
1807                if let Some(filter) = peer.inbound_filter() {
1808                    match filter.estimated_count(max_fpr) {
1809                        Some(n) => {
1810                            total += n;
1811                            has_data = true;
1812                        }
1813                        None => {
1814                            self.estimated_mesh_size = None;
1815                            return;
1816                        }
1817                    }
1818                }
1819            }
1820        }
1821
1822        if !has_data {
1823            self.estimated_mesh_size = None;
1824            return;
1825        }
1826
1827        let size = total.round() as u64;
1828        self.estimated_mesh_size = Some(size);
1829
1830        // Periodic logging (reuse MMP default interval: 30s)
1831        let now = std::time::Instant::now();
1832        let should_log = match self.last_mesh_size_log {
1833            None => true,
1834            Some(last) => {
1835                now.duration_since(last)
1836                    >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1837            }
1838        };
1839        if should_log {
1840            tracing::debug!(
1841                estimated_mesh_size = size,
1842                peers = self.peers.len(),
1843                children = child_count,
1844                "Mesh size estimate"
1845            );
1846            self.last_mesh_size_log = Some(now);
1847        }
1848    }
1849
1850    // === Coord Cache ===
1851
1852    /// Get the coordinate cache.
1853    pub fn coord_cache(&self) -> &CoordCache {
1854        &self.coord_cache
1855    }
1856
1857    /// Get mutable coordinate cache.
1858    pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1859        &mut self.coord_cache
1860    }
1861
1862    // === Node Statistics ===
1863
1864    /// Get the node statistics.
1865    pub fn stats(&self) -> &stats::NodeStats {
1866        &self.stats
1867    }
1868
1869    /// Get mutable node statistics.
1870    pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1871        &mut self.stats
1872    }
1873
1874    /// Get the stats history collector.
1875    pub fn stats_history(&self) -> &stats_history::StatsHistory {
1876        &self.stats_history
1877    }
1878
1879    /// Sample the current node state into the stats history ring.
1880    /// Called once per tick from the RX loop.
1881    pub(crate) fn record_stats_history(&mut self) {
1882        let fwd = &self.stats.forwarding;
1883        let peers_with_mmp: Vec<f64> = self
1884            .peers
1885            .values()
1886            .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1887            .collect();
1888        let loss_rate = if peers_with_mmp.is_empty() {
1889            0.0
1890        } else {
1891            peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1892        };
1893
1894        let snap = stats_history::Snapshot {
1895            mesh_size: self.estimated_mesh_size,
1896            tree_depth: self.tree_state.my_coords().depth() as u32,
1897            peer_count: self.peers.len() as u64,
1898            parent_switches_total: self.stats.tree.parent_switches,
1899            bytes_in_total: fwd.received_bytes,
1900            bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1901            packets_in_total: fwd.received_packets,
1902            packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1903            loss_rate,
1904            active_sessions: self.sessions.len() as u64,
1905        };
1906
1907        let now = std::time::Instant::now();
1908        let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1909            .peers
1910            .values()
1911            .map(|p| {
1912                let stats = p.link_stats();
1913                let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1914                    Some(m) => (
1915                        m.metrics.srtt_ms(),
1916                        Some(m.metrics.loss_rate()),
1917                        m.receiver.ecn_ce_count() as u64,
1918                    ),
1919                    None => (None, None, 0),
1920                };
1921                stats_history::PeerSnapshot {
1922                    node_addr: *p.node_addr(),
1923                    last_seen: now,
1924                    srtt_ms,
1925                    loss_rate,
1926                    bytes_in_total: stats.bytes_recv,
1927                    bytes_out_total: stats.bytes_sent,
1928                    packets_in_total: stats.packets_recv,
1929                    packets_out_total: stats.packets_sent,
1930                    ecn_ce_total: ecn_ce,
1931                }
1932            })
1933            .collect();
1934
1935        self.stats_history.tick(now, &snap, &peer_snaps);
1936    }
1937
1938    // === TUN Interface ===
1939
1940    /// Get the TUN state.
1941    pub fn tun_state(&self) -> TunState {
1942        self.tun_state
1943    }
1944
1945    /// Get the TUN interface name, if active.
1946    pub fn tun_name(&self) -> Option<&str> {
1947        self.tun_name.as_deref()
1948    }
1949
1950    // === Resource Limits ===
1951
1952    /// Set the maximum number of connections (handshake phase).
1953    pub fn set_max_connections(&mut self, max: usize) {
1954        self.max_connections = max;
1955    }
1956
1957    /// Set the maximum number of peers (authenticated).
1958    pub fn set_max_peers(&mut self, max: usize) {
1959        self.max_peers = max;
1960    }
1961
1962    /// Returns false when starting more outbound work would exceed a resource
1963    /// cap. A cap of `0` means uncapped.
1964    pub(crate) fn outbound_admission_check(&self) -> bool {
1965        let connection_used = self
1966            .connections
1967            .len()
1968            .saturating_add(self.pending_connects.len());
1969        let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
1970        let connection_allowed =
1971            self.max_connections == 0 || connection_used < self.max_connections;
1972        let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1973        peer_allowed && connection_allowed && link_allowed
1974    }
1975
1976    /// Admission for public/open-discovery outbound work. This includes the
1977    /// general connection/link caps and, when open Nostr discovery is enabled,
1978    /// the configured non-peer budget.
1979    pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
1980        if !self.outbound_admission_check() {
1981            return false;
1982        }
1983
1984        let nostr = &self.config.node.discovery.nostr;
1985        if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
1986            return true;
1987        }
1988
1989        let configured_npubs = self
1990            .config
1991            .peers()
1992            .iter()
1993            .map(|peer| peer.npub.clone())
1994            .collect::<HashSet<_>>();
1995        self.open_discovery_enqueue_budget(&configured_npubs) > 0
1996    }
1997
1998    /// Like `outbound_admission_check`, but for racing a better path to a
1999    /// peer that is already authenticated. This may temporarily add a
2000    /// connection/link, but it does not consume a new peer slot.
2001    pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
2002        let connection_used = self
2003            .connections
2004            .len()
2005            .saturating_add(self.pending_connects.len());
2006        let connection_allowed =
2007            self.max_connections == 0 || connection_used < self.max_connections;
2008        let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
2009        connection_allowed && link_allowed
2010    }
2011
2012    /// Set the maximum number of links.
2013    pub fn set_max_links(&mut self, max: usize) {
2014        self.max_links = max;
2015    }
2016
2017    // === Counts ===
2018
2019    /// Number of pending connections (handshake in progress).
2020    pub fn connection_count(&self) -> usize {
2021        self.connections.len()
2022    }
2023
2024    /// Number of authenticated peers.
2025    pub fn peer_count(&self) -> usize {
2026        self.peers.len()
2027    }
2028
2029    /// Number of active links.
2030    pub fn link_count(&self) -> usize {
2031        self.links.len()
2032    }
2033
2034    /// Number of active transports.
2035    pub fn transport_count(&self) -> usize {
2036        self.transports.len()
2037    }
2038
2039    // === Transport Management ===
2040
2041    /// Allocate a new transport ID.
2042    pub fn allocate_transport_id(&mut self) -> TransportId {
2043        let id = TransportId::new(self.next_transport_id);
2044        self.next_transport_id += 1;
2045        id
2046    }
2047
2048    /// Get a transport by ID.
2049    pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
2050        self.transports.get(id)
2051    }
2052
2053    /// Get mutable transport by ID.
2054    pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
2055        self.transports.get_mut(id)
2056    }
2057
2058    /// Iterate over transport IDs.
2059    pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
2060        self.transports.keys()
2061    }
2062
2063    /// Get the packet receiver for the event loop.
2064    pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
2065        self.packet_rx.as_mut()
2066    }
2067
2068    // === Link Management ===
2069
2070    /// Allocate a new link ID.
2071    pub fn allocate_link_id(&mut self) -> LinkId {
2072        let id = LinkId::new(self.next_link_id);
2073        self.next_link_id += 1;
2074        id
2075    }
2076
2077    /// Add a link.
2078    pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
2079        if self.max_links > 0 && self.links.len() >= self.max_links {
2080            return Err(NodeError::MaxLinksExceeded {
2081                max: self.max_links,
2082            });
2083        }
2084        let link_id = link.link_id();
2085        let transport_id = link.transport_id();
2086        let remote_addr = link.remote_addr().clone();
2087
2088        self.links.insert(link_id, link);
2089        self.addr_to_link
2090            .insert((transport_id, remote_addr), link_id);
2091        Ok(())
2092    }
2093
2094    /// Get a link by ID.
2095    pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2096        self.links.get(link_id)
2097    }
2098
2099    /// Get a mutable link by ID.
2100    pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2101        self.links.get_mut(link_id)
2102    }
2103
2104    /// Find link ID by transport address.
2105    pub fn find_link_by_addr(
2106        &self,
2107        transport_id: TransportId,
2108        addr: &TransportAddr,
2109    ) -> Option<LinkId> {
2110        self.addr_to_link
2111            .get(&(transport_id, addr.clone()))
2112            .copied()
2113    }
2114
2115    /// Remove a link.
2116    ///
2117    /// Only removes the addr_to_link reverse lookup if it still points to this
2118    /// link. In cross-connection scenarios, a newer link may have replaced the
2119    /// entry for the same address.
2120    pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2121        if let Some(link) = self.links.remove(link_id) {
2122            // Clean up reverse lookup only if it still maps to this link
2123            let key = (link.transport_id(), link.remote_addr().clone());
2124            if self.addr_to_link.get(&key) == Some(link_id) {
2125                self.addr_to_link.remove(&key);
2126            }
2127            Some(link)
2128        } else {
2129            None
2130        }
2131    }
2132
2133    pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2134        if !self.bootstrap_transports.contains(&transport_id) {
2135            return;
2136        }
2137
2138        let transport_in_use = self
2139            .links
2140            .values()
2141            .any(|link| link.transport_id() == transport_id)
2142            || self
2143                .connections
2144                .values()
2145                .any(|conn| conn.transport_id() == Some(transport_id))
2146            || self
2147                .peers
2148                .values()
2149                .any(|peer| peer.transport_id() == Some(transport_id))
2150            || self
2151                .pending_connects
2152                .iter()
2153                .any(|pending| pending.transport_id == transport_id);
2154
2155        if transport_in_use {
2156            return;
2157        }
2158
2159        tracing::debug!(
2160            transport_id = %transport_id,
2161            "bootstrap transport has no remaining references; dropping"
2162        );
2163
2164        self.bootstrap_transports.remove(&transport_id);
2165        self.bootstrap_transport_npubs.remove(&transport_id);
2166        self.transport_drops.remove(&transport_id);
2167        self.transports.remove(&transport_id);
2168    }
2169
2170    /// Iterate over all links.
2171    pub fn links(&self) -> impl Iterator<Item = &Link> {
2172        self.links.values()
2173    }
2174
2175    // === Connection Management (Handshake Phase) ===
2176
2177    /// Add a pending connection.
2178    pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2179        let link_id = connection.link_id();
2180
2181        if self.connections.contains_key(&link_id) {
2182            return Err(NodeError::ConnectionAlreadyExists(link_id));
2183        }
2184
2185        if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2186            return Err(NodeError::MaxConnectionsExceeded {
2187                max: self.max_connections,
2188            });
2189        }
2190
2191        self.connections.insert(link_id, connection);
2192        Ok(())
2193    }
2194
2195    /// Get a connection by LinkId.
2196    pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2197        self.connections.get(link_id)
2198    }
2199
2200    /// Get a mutable connection by LinkId.
2201    pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2202        self.connections.get_mut(link_id)
2203    }
2204
2205    /// Remove a connection.
2206    pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2207        self.connections.remove(link_id)
2208    }
2209
2210    /// Iterate over all connections.
2211    pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2212        self.connections.values()
2213    }
2214
2215    // === Peer Management (Active Phase) ===
2216
2217    /// Get a peer by NodeAddr.
2218    pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2219        self.peers.get(node_addr)
2220    }
2221
2222    /// Get a mutable peer by NodeAddr.
2223    pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2224        self.peers.get_mut(node_addr)
2225    }
2226
2227    /// Remove a peer.
2228    pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2229        self.peers.remove(node_addr)
2230    }
2231
2232    /// Iterate over all peers.
2233    pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2234        self.peers.values()
2235    }
2236
2237    /// Reference to the Nostr discovery handle if discovery is enabled.
2238    /// Used by control queries (`show_peers` per-peer Nostr-traversal
2239    /// state) to read failure-state without taking shared ownership.
2240    pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2241        self.nostr_discovery.as_deref()
2242    }
2243
2244    /// Iterate over all peer node IDs.
2245    pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2246        self.peers.keys()
2247    }
2248
2249    /// Iterate over peers that can send traffic.
2250    pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2251        self.peers.values().filter(|p| p.can_send())
2252    }
2253
2254    /// Number of peers that can send traffic.
2255    pub fn sendable_peer_count(&self) -> usize {
2256        self.peers.values().filter(|p| p.can_send()).count()
2257    }
2258
2259    pub(crate) fn set_discovery_fallback_transit_allowed(
2260        &mut self,
2261        peer_addr: NodeAddr,
2262        allowed: bool,
2263    ) {
2264        if allowed {
2265            self.discovery_fallback_transit_blocked_peers
2266                .remove(&peer_addr);
2267        } else {
2268            self.discovery_fallback_transit_blocked_peers
2269                .insert(peer_addr);
2270        }
2271    }
2272
2273    pub(crate) fn configured_discovery_fallback_transit(
2274        &self,
2275        peer_addr: &NodeAddr,
2276    ) -> Option<bool> {
2277        self.configured_peer(peer_addr)
2278            .map(|peer| peer.discovery_fallback_transit)
2279    }
2280
2281    pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2282        self.config.peers().iter().find(|peer| {
2283            PeerIdentity::from_npub(&peer.npub)
2284                .ok()
2285                .is_some_and(|identity| identity.node_addr() == peer_addr)
2286        })
2287    }
2288
2289    pub(in crate::node) fn active_peer_uses_configured_static_udp_path(
2290        &self,
2291        peer_addr: &NodeAddr,
2292    ) -> bool {
2293        let Some(peer_config) = self.configured_peer(peer_addr) else {
2294            return false;
2295        };
2296
2297        peer_config.addresses.iter().any(|candidate| {
2298            candidate.seen_at_ms.is_none()
2299                && candidate.transport.eq_ignore_ascii_case("udp")
2300                && self.active_peer_matches_candidate(peer_addr, candidate)
2301        })
2302    }
2303
2304    pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2305        if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2306            return retry_state.peer_config.discovery_fallback_transit;
2307        }
2308
2309        if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2310            return allowed;
2311        }
2312
2313        self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2314    }
2315
2316    // === End-to-End Sessions ===
2317
2318    /// Get a session by remote NodeAddr.
2319    /// Disable the discovery forward rate limiter (for tests).
2320    #[cfg(test)]
2321    pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2322        self.discovery_forward_limiter
2323            .set_interval(std::time::Duration::ZERO);
2324    }
2325
2326    #[cfg(test)]
2327    pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2328        self.sessions.get(remote)
2329    }
2330
2331    /// Get a mutable session by remote NodeAddr.
2332    #[cfg(test)]
2333    pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2334        self.sessions.get_mut(remote)
2335    }
2336
2337    /// Remove a session.
2338    #[cfg(test)]
2339    pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2340        self.sessions.remove(remote)
2341    }
2342
2343    /// Read the path_mtu_lookup entry for a destination FipsAddress.
2344    #[cfg(test)]
2345    pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2346        self.path_mtu_lookup
2347            .read()
2348            .ok()
2349            .and_then(|map| map.get(fips_addr).copied())
2350    }
2351
2352    /// Write a path_mtu_lookup entry directly (for tests that pre-seed the map).
2353    #[cfg(test)]
2354    pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2355        if let Ok(mut map) = self.path_mtu_lookup.write() {
2356            map.insert(fips_addr, mtu);
2357        }
2358    }
2359
2360    /// Number of end-to-end sessions.
2361    pub fn session_count(&self) -> usize {
2362        self.sessions.len()
2363    }
2364
2365    /// Iterate over all session entries (for control queries).
2366    pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2367        self.sessions.iter()
2368    }
2369
2370    // === Identity Cache ===
2371
2372    /// Register a node in the identity cache for FipsAddress → NodeAddr lookup.
2373    pub(crate) fn register_identity(
2374        &mut self,
2375        node_addr: NodeAddr,
2376        pubkey: secp256k1::PublicKey,
2377    ) -> bool {
2378        let mut prefix = [0u8; 15];
2379        prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2380        if let Some(entry) = self.identity_cache.get(&prefix)
2381            && entry.node_addr == node_addr
2382            && entry.pubkey == pubkey
2383        {
2384            // Endpoint sends pass the same PeerIdentity on every packet. Once
2385            // validated, avoid re-deriving NodeAddr from the public key in the
2386            // data path; that hash showed up in macOS sender profiles.
2387            return true;
2388        }
2389
2390        let (xonly, _) = pubkey.x_only_public_key();
2391        let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2392        if derived_node_addr != node_addr {
2393            debug!(
2394                claimed_node_addr = %node_addr,
2395                derived_node_addr = %derived_node_addr,
2396                "Rejected identity cache entry with mismatched public key"
2397            );
2398            return false;
2399        }
2400
2401        let now_ms = Self::now_ms();
2402        if let Some(entry) = self.identity_cache.get_mut(&prefix)
2403            && entry.node_addr == node_addr
2404        {
2405            entry.pubkey = pubkey;
2406            entry.last_seen_ms = now_ms;
2407            return true;
2408        }
2409
2410        let npub = encode_npub(&xonly);
2411        self.identity_cache.insert(
2412            prefix,
2413            IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2414        );
2415        // LRU eviction
2416        let max = self.config.node.cache.identity_size;
2417        if self.identity_cache.len() > max
2418            && let Some(oldest_key) = self
2419                .identity_cache
2420                .iter()
2421                .min_by_key(|(_, entry)| entry.last_seen_ms)
2422                .map(|(k, _)| *k)
2423        {
2424            self.identity_cache.remove(&oldest_key);
2425        }
2426        true
2427    }
2428
2429    /// Look up a destination by FipsAddress prefix (bytes 1-15 of the IPv6 address).
2430    pub(crate) fn lookup_by_fips_prefix(
2431        &mut self,
2432        prefix: &[u8; 15],
2433    ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2434        if let Some(entry) = self.identity_cache.get_mut(prefix) {
2435            entry.last_seen_ms = Self::now_ms(); // LRU touch
2436            Some((entry.node_addr, entry.pubkey))
2437        } else {
2438            None
2439        }
2440    }
2441
2442    /// Check if a node's identity is in the cache (without LRU touch).
2443    pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2444        let mut prefix = [0u8; 15];
2445        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2446        self.identity_cache.contains_key(&prefix)
2447    }
2448
2449    /// Number of identity cache entries.
2450    pub fn identity_cache_len(&self) -> usize {
2451        self.identity_cache.len()
2452    }
2453
2454    /// Iterate over identity cache entries.
2455    ///
2456    /// Returns `(NodeAddr, PublicKey, last_seen_ms)` for each cached identity.
2457    /// Used by the `show_identity_cache` control query.
2458    pub fn identity_cache_iter(
2459        &self,
2460    ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2461        self.identity_cache
2462            .values()
2463            .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2464    }
2465
2466    /// Configured maximum identity cache size.
2467    pub fn identity_cache_max(&self) -> usize {
2468        self.config.node.cache.identity_size
2469    }
2470
2471    /// Number of pending discovery lookups.
2472    pub fn pending_lookup_count(&self) -> usize {
2473        self.pending_lookups.len()
2474    }
2475
2476    /// Iterate over pending discovery lookups for diagnostics.
2477    pub fn pending_lookups_iter(
2478        &self,
2479    ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2480        self.pending_lookups.iter()
2481    }
2482
2483    /// Number of recent discovery requests tracked.
2484    pub fn recent_request_count(&self) -> usize {
2485        self.recent_requests.len()
2486    }
2487
2488    /// Count of destinations with queued TUN packets awaiting session setup.
2489    pub fn pending_tun_destinations(&self) -> usize {
2490        self.pending_tun_packets.len()
2491    }
2492
2493    /// Total TUN packets queued across all destinations.
2494    pub fn pending_tun_total_packets(&self) -> usize {
2495        self.pending_tun_packets.values().map(|q| q.len()).sum()
2496    }
2497
2498    /// Iterate over retry state for diagnostics.
2499    pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2500        self.retry_pending.iter()
2501    }
2502
2503    // === Routing ===
2504
2505    /// Check if a peer is a tree neighbor (parent or child in the spanning tree).
2506    ///
2507    /// Returns true if the peer is our current tree parent, or if the peer
2508    /// has declared us as their parent (making them our child).
2509    pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2510        // Peer is our parent
2511        if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2512            return true;
2513        }
2514        // Peer is our child (their declaration names us as parent)
2515        if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2516            && decl.parent_id() == self.node_addr()
2517        {
2518            return true;
2519        }
2520        false
2521    }
2522
2523    /// Find next hop for a destination node address.
2524    ///
2525    /// Routing priority:
2526    /// 1. Destination is self → `None` (local delivery)
2527    /// 2. Destination is a healthy direct peer → that peer, unless a known
2528    ///    fallback next-hop has a meaningful link-quality advantage.
2529    /// 3. Reply-learned routes in `reply_learned` mode. These are locally
2530    ///    observed reverse paths, selected with weighted multipath plus
2531    ///    periodic coordinate/tree exploration.
2532    /// 4. Bloom filter candidates with cached dest coords → among peers whose
2533    ///    bloom filter contains the destination, pick the one that minimizes
2534    ///    tree distance to the destination, with
2535    ///    `(link_cost, tree_distance_to_dest, node_addr)` tie-breaking.
2536    ///    The self-distance check ensures only peers strictly closer to the
2537    ///    destination than us are considered (prevents routing loops).
2538    /// 5. Greedy tree routing fallback (requires cached dest coords)
2539    /// 6. No route → `None`
2540    ///
2541    /// Both the bloom filter and tree routing paths require cached destination
2542    /// coordinates (checked in `coord_cache`). Without coordinates, the node
2543    /// cannot make loop-free forwarding decisions. The caller should signal
2544    /// `CoordsRequired` back to the source when `None` is returned for a
2545    /// non-local destination.
2546    pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2547        // 1. Local delivery
2548        if dest_node_addr == self.node_addr() {
2549            return None;
2550        }
2551        let now_ms = Self::now_ms();
2552        let direct_session_degraded =
2553            self.session_direct_path_blocks_direct_payload(dest_node_addr, now_ms);
2554
2555        let healthy_direct_route = self
2556            .peers
2557            .get(dest_node_addr)
2558            .filter(|peer| peer.is_healthy() && !direct_session_degraded)
2559            .map(|_| *dest_node_addr);
2560        if let Some(direct_addr) = healthy_direct_route
2561            && self
2562                .peers
2563                .get(&direct_addr)
2564                .is_some_and(|peer| peer.link_cost() <= 1.0 + ROUTING_FALLBACK_MIN_COST_ADVANTAGE)
2565        {
2566            return self.peers.get(&direct_addr);
2567        }
2568        let direct_payload_eligible = healthy_direct_route.is_some();
2569        let payload_candidate_can_send = |addr: &NodeAddr, peer: &ActivePeer| {
2570            if addr == dest_node_addr {
2571                direct_payload_eligible
2572            } else {
2573                peer.is_healthy()
2574            }
2575        };
2576
2577        // A healthy direct path is not automatically the best path. A
2578        // hotspot/NAT hairpin can remain sendable with high RTT or mild loss;
2579        // in that case a lower-cost mesh next-hop should carry traffic while
2580        // direct probes continue in the background.
2581        let fallback_beats_direct = |node: &Self, fallback_addr: NodeAddr| {
2582            node.route_candidate_beats_direct(healthy_direct_route, fallback_addr)
2583        };
2584
2585        let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2586            Some(
2587                self.peers
2588                    .iter()
2589                    .filter(|(addr, peer)| payload_candidate_can_send(addr, peer))
2590                    .map(|(addr, _)| *addr)
2591                    .collect::<HashSet<_>>(),
2592            )
2593        } else {
2594            None
2595        };
2596
2597        // 3. Optional reply-learned routing. These entries are not peer
2598        // claims; they are local observations of which peer carried traffic
2599        // or a verified lookup response back from the destination. Most
2600        // packets use weighted multipath over learned routes, but periodic
2601        // fallback exploration lets coord/bloom/tree routes discover better
2602        // candidates.
2603        let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2604            self.learned_routes.should_explore_fallback(
2605                dest_node_addr,
2606                now_ms,
2607                self.config.node.routing.learned_fallback_explore_interval,
2608                |addr| sendable.contains(addr),
2609            )
2610        });
2611        if let Some(sendable) = &sendable_learned_peers
2612            && !explore_fallback
2613        {
2614            let eligible = sendable
2615                .iter()
2616                .copied()
2617                .filter(|addr| fallback_beats_direct(self, *addr))
2618                .collect::<HashSet<_>>();
2619            if !eligible.is_empty()
2620                && let Some(next_hop_addr) =
2621                    self.learned_routes
2622                        .select_next_hop(dest_node_addr, now_ms, |addr| eligible.contains(addr))
2623            {
2624                return self.peers.get(&next_hop_addr);
2625            }
2626        }
2627
2628        // Look up cached destination coordinates (required by both bloom and tree paths).
2629        let Some(dest_coords) = self
2630            .coord_cache
2631            .get_and_touch(dest_node_addr, now_ms)
2632            .cloned()
2633        else {
2634            if (healthy_direct_route.is_none() || explore_fallback)
2635                && let Some(sendable) = &sendable_learned_peers
2636                && let Some(next_hop_addr) =
2637                    self.learned_routes
2638                        .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2639            {
2640                return self.peers.get(&next_hop_addr);
2641            }
2642            if let Some(direct_addr) = healthy_direct_route {
2643                return self.peers.get(&direct_addr);
2644            }
2645            return None;
2646        };
2647
2648        // 4. Bloom filter candidates — requires dest_coords for loop-free selection.
2649        //    If no candidate is strictly closer, fall through to tree routing.
2650        let coordinate_route_addr = {
2651            let candidates: Vec<&ActivePeer> = self
2652                .peers
2653                .iter()
2654                .filter(|(addr, peer)| {
2655                    payload_candidate_can_send(addr, peer) && peer.may_reach(dest_node_addr)
2656                })
2657                .map(|(_, peer)| peer)
2658                .collect();
2659            if !candidates.is_empty() {
2660                self.select_best_candidate(&candidates, &dest_coords)
2661                    .map(|peer| *peer.node_addr())
2662            } else {
2663                None
2664            }
2665        };
2666        if let Some(next_hop_addr) = coordinate_route_addr
2667            && fallback_beats_direct(self, next_hop_addr)
2668        {
2669            return self.peers.get(&next_hop_addr);
2670        }
2671
2672        // 5. Greedy tree routing fallback
2673        let tree_route_addr = self.select_tree_payload_candidate(
2674            &dest_coords,
2675            dest_node_addr,
2676            direct_payload_eligible,
2677        );
2678        if let Some(next_hop_addr) = tree_route_addr
2679            && fallback_beats_direct(self, next_hop_addr)
2680        {
2681            return self.peers.get(&next_hop_addr);
2682        }
2683
2684        if explore_fallback {
2685            return sendable_learned_peers.as_ref().and_then(|sendable| {
2686                self.learned_routes
2687                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2688                    .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2689            });
2690        }
2691
2692        if let Some(direct_addr) = healthy_direct_route {
2693            return self.peers.get(&direct_addr);
2694        }
2695
2696        if let Some(sendable) = &sendable_learned_peers
2697            && let Some(next_hop_addr) =
2698                self.learned_routes
2699                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2700        {
2701            return self.peers.get(&next_hop_addr);
2702        }
2703
2704        None
2705    }
2706
2707    pub(in crate::node) fn find_transit_next_hop(
2708        &mut self,
2709        dest_node_addr: &NodeAddr,
2710        previous_hop: &NodeAddr,
2711    ) -> Option<NodeAddr> {
2712        if dest_node_addr == self.node_addr() {
2713            return None;
2714        }
2715
2716        if dest_node_addr != previous_hop
2717            && self
2718                .peers
2719                .get(dest_node_addr)
2720                .is_some_and(|peer| peer.is_healthy())
2721        {
2722            return Some(*dest_node_addr);
2723        }
2724
2725        let next_hop_addr = *self.find_next_hop(dest_node_addr)?.node_addr();
2726        if &next_hop_addr == previous_hop {
2727            self.record_route_failure(*dest_node_addr, next_hop_addr);
2728            return None;
2729        }
2730        Some(next_hop_addr)
2731    }
2732
2733    fn route_candidate_beats_direct(
2734        &self,
2735        healthy_direct_route: Option<NodeAddr>,
2736        candidate_addr: NodeAddr,
2737    ) -> bool {
2738        let Some(direct_addr) = healthy_direct_route else {
2739            return true;
2740        };
2741        if candidate_addr == direct_addr {
2742            return false;
2743        }
2744
2745        let Some(direct) = self.peers.get(&direct_addr) else {
2746            return true;
2747        };
2748        let Some(candidate) = self.peers.get(&candidate_addr) else {
2749            return false;
2750        };
2751        if !candidate.is_healthy() {
2752            return false;
2753        }
2754
2755        let direct_cost = direct.link_cost();
2756        let candidate_cost = candidate.link_cost();
2757        candidate_cost + ROUTING_FALLBACK_MIN_COST_ADVANTAGE < direct_cost
2758    }
2759
2760    fn select_tree_payload_candidate(
2761        &self,
2762        dest_coords: &crate::tree::TreeCoordinate,
2763        direct_dest: &NodeAddr,
2764        direct_payload_eligible: bool,
2765    ) -> Option<NodeAddr> {
2766        if self.tree_state.my_coords().root_id() != dest_coords.root_id() {
2767            return None;
2768        }
2769
2770        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2771        let mut best: Option<(NodeAddr, usize)> = None;
2772
2773        for (peer_addr, peer) in &self.peers {
2774            if peer_addr == direct_dest {
2775                if !direct_payload_eligible {
2776                    continue;
2777                }
2778            } else if !peer.is_healthy() {
2779                continue;
2780            }
2781
2782            let Some(peer_coords) = self.tree_state.peer_coords(peer_addr) else {
2783                continue;
2784            };
2785            let distance = peer_coords.distance_to(dest_coords);
2786            if distance >= my_distance {
2787                continue;
2788            }
2789
2790            let dominated = match &best {
2791                None => true,
2792                Some((best_id, best_dist)) => {
2793                    distance < *best_dist || (distance == *best_dist && peer_addr < best_id)
2794                }
2795            };
2796            if dominated {
2797                best = Some((*peer_addr, distance));
2798            }
2799        }
2800
2801        best.map(|(peer_addr, _)| peer_addr)
2802    }
2803
2804    pub(in crate::node) fn session_direct_path_is_degraded(
2805        &mut self,
2806        dest: &NodeAddr,
2807        now_ms: u64,
2808    ) -> bool {
2809        match self.session_direct_degraded_until_ms.get(dest).copied() {
2810            Some(until_ms) if until_ms > now_ms => true,
2811            Some(_) => {
2812                self.session_direct_degraded_until_ms.remove(dest);
2813                false
2814            }
2815            None => false,
2816        }
2817    }
2818
2819    pub(in crate::node) fn session_direct_path_blocks_direct_payload(
2820        &mut self,
2821        dest: &NodeAddr,
2822        now_ms: u64,
2823    ) -> bool {
2824        self.session_direct_path_is_degraded(dest, now_ms)
2825            && !self.active_peer_uses_configured_static_udp_path(dest)
2826    }
2827
2828    pub(in crate::node) fn mark_session_direct_path_degraded(
2829        &mut self,
2830        dest: NodeAddr,
2831        now_ms: u64,
2832    ) -> bool {
2833        let until_ms = now_ms.saturating_add(SESSION_DIRECT_DEGRADED_HOLD_MS);
2834        let entry = self
2835            .session_direct_degraded_until_ms
2836            .entry(dest)
2837            .or_insert(0);
2838        let was_degraded = *entry > now_ms;
2839        *entry = (*entry).max(until_ms);
2840        !was_degraded
2841    }
2842
2843    pub(in crate::node) fn clear_session_direct_path_degraded(&mut self, dest: &NodeAddr) -> bool {
2844        self.session_direct_degraded_until_ms.remove(dest).is_some()
2845    }
2846
2847    pub(in crate::node) fn learn_reverse_route(
2848        &mut self,
2849        destination: NodeAddr,
2850        next_hop: NodeAddr,
2851    ) {
2852        if self.config.node.routing.mode != RoutingMode::ReplyLearned
2853            || destination == *self.node_addr()
2854        {
2855            return;
2856        }
2857        let now_ms = Self::now_ms();
2858        self.learned_routes.learn(
2859            destination,
2860            next_hop,
2861            now_ms,
2862            self.config.node.routing.learned_ttl_secs,
2863            self.config.node.routing.max_learned_routes_per_dest,
2864        );
2865    }
2866
2867    pub(in crate::node) fn record_route_failure(
2868        &mut self,
2869        destination: NodeAddr,
2870        next_hop: NodeAddr,
2871    ) {
2872        if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2873            return;
2874        }
2875        self.learned_routes.record_failure(&destination, &next_hop);
2876    }
2877
2878    pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2879        self.learned_routes.snapshot(now_ms)
2880    }
2881
2882    pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2883        self.learned_routes.purge_expired(now_ms);
2884    }
2885
2886    /// Select the best peer from a set of bloom filter candidates.
2887    ///
2888    /// Uses distance from each candidate's tree coordinates to the destination
2889    /// as the primary metric (after link_cost). Only selects peers that are
2890    /// strictly closer to the destination than we are (self-distance check
2891    /// prevents routing loops).
2892    ///
2893    /// Ordering: `(link_cost, distance_to_dest, node_addr)`.
2894    fn select_best_candidate<'a>(
2895        &'a self,
2896        candidates: &[&'a ActivePeer],
2897        dest_coords: &crate::tree::TreeCoordinate,
2898    ) -> Option<&'a ActivePeer> {
2899        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2900
2901        let mut best: Option<(&ActivePeer, f64, usize)> = None;
2902
2903        for &candidate in candidates {
2904            if !candidate.can_send() {
2905                continue;
2906            }
2907
2908            let cost = candidate.link_cost();
2909
2910            let dist = self
2911                .tree_state
2912                .peer_coords(candidate.node_addr())
2913                .map(|pc| pc.distance_to(dest_coords))
2914                .unwrap_or(usize::MAX);
2915
2916            // Self-distance check: only consider peers strictly closer
2917            // to the destination than we are (prevents routing loops)
2918            if dist >= my_distance {
2919                continue;
2920            }
2921
2922            let dominated = match &best {
2923                None => true,
2924                Some((_, best_cost, best_dist)) => {
2925                    cost < *best_cost
2926                        || (cost == *best_cost && dist < *best_dist)
2927                        || (cost == *best_cost
2928                            && dist == *best_dist
2929                            && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2930                }
2931            };
2932
2933            if dominated {
2934                best = Some((candidate, cost, dist));
2935            }
2936        }
2937
2938        best.map(|(peer, _, _)| peer)
2939    }
2940
2941    /// Check if a destination is in any peer's bloom filter.
2942    pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2943        self.peers.values().filter(|p| p.may_reach(dest)).collect()
2944    }
2945
2946    /// Get the TUN packet sender channel.
2947    ///
2948    /// Returns None if TUN is not active or the node hasn't been started.
2949    pub fn tun_tx(&self) -> Option<&TunTx> {
2950        self.tun_tx.as_ref()
2951    }
2952
2953    /// Attach app-owned packet I/O for embedded operation without a system TUN.
2954    ///
2955    /// This must be called before [`Node::start`] and requires `tun.enabled =
2956    /// false`. Outbound packets sent to the returned sender are processed by the
2957    /// normal session pipeline. Inbound packets delivered by FIPS sessions are
2958    /// sent to the returned receiver with source attribution.
2959    pub fn attach_external_packet_io(
2960        &mut self,
2961        capacity: usize,
2962    ) -> Result<ExternalPacketIo, NodeError> {
2963        if self.state != NodeState::Created {
2964            return Err(NodeError::Config(ConfigError::Validation(
2965                "external packet I/O must be attached before node start".to_string(),
2966            )));
2967        }
2968        if self.config.tun.enabled {
2969            return Err(NodeError::Config(ConfigError::Validation(
2970                "external packet I/O requires tun.enabled=false".to_string(),
2971            )));
2972        }
2973
2974        let capacity = capacity.max(1);
2975        let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2976        let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2977        self.tun_outbound_rx = Some(outbound_rx);
2978        self.external_packet_tx = Some(inbound_tx);
2979
2980        Ok(ExternalPacketIo {
2981            outbound_tx,
2982            inbound_rx,
2983        })
2984    }
2985
2986    /// Attach app-owned endpoint data I/O for embedded operation.
2987    ///
2988    /// Commands sent to the returned sender are processed by the node RX loop.
2989    /// Incoming endpoint data is emitted as source-attributed events.
2990    pub(crate) fn attach_endpoint_data_io(
2991        &mut self,
2992        capacity: usize,
2993    ) -> Result<EndpointDataIo, NodeError> {
2994        if self.state != NodeState::Created {
2995            return Err(NodeError::Config(ConfigError::Validation(
2996                "endpoint data I/O must be attached before node start".to_string(),
2997            )));
2998        }
2999
3000        let command_capacity = endpoint_data_command_capacity(capacity);
3001        let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
3002        // Inbound endpoint-data events use an unbounded channel — see
3003        // `EndpointDataIo::event_rx` docs for the rationale (kills the
3004        // per-packet semaphore + the cross-task relay task that used to
3005        // sit on top of this channel).
3006        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
3007        self.endpoint_command_rx = Some(command_rx);
3008        self.endpoint_event_tx = Some(event_tx.clone());
3009
3010        Ok(EndpointDataIo {
3011            command_tx,
3012            event_rx,
3013            event_tx,
3014        })
3015    }
3016
3017    pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
3018        let mut prefix = [0u8; 15];
3019        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
3020        self.identity_cache
3021            .get(&prefix)
3022            .filter(|entry| &entry.node_addr == addr)
3023            .map(|entry| entry.pubkey)
3024    }
3025
3026    pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
3027        let mut prefix = [0u8; 15];
3028        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
3029        self.identity_cache
3030            .get(&prefix)
3031            .filter(|entry| &entry.node_addr == addr)
3032            .map(|entry| entry.npub.clone())
3033    }
3034
3035    pub(in crate::node) fn deliver_external_ipv6_packet(
3036        &self,
3037        src_addr: &NodeAddr,
3038        packet: Vec<u8>,
3039    ) {
3040        let Some(external_packet_tx) = &self.external_packet_tx else {
3041            return;
3042        };
3043        if packet.len() < 40 {
3044            return;
3045        }
3046        let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
3047            return;
3048        };
3049        let delivered = NodeDeliveredPacket {
3050            source_node_addr: *src_addr,
3051            source_npub: self.npub_for_node_addr(src_addr),
3052            destination,
3053            packet,
3054        };
3055        if let Err(error) = external_packet_tx.try_send(delivered) {
3056            debug!(error = %error, "Failed to deliver packet to external app sink");
3057        }
3058    }
3059
3060    // === Sending ===
3061
3062    /// Encrypt and send a link-layer message to an authenticated peer.
3063    ///
3064    /// The plaintext should include the message type byte followed by the
3065    /// message-specific payload (e.g., `[0x50, reason]` for Disconnect).
3066    ///
3067    /// The send path prepends a 4-byte session-relative timestamp (inner
3068    /// header) before encryption. The full 16-byte outer header is used
3069    /// as AAD for the AEAD construction.
3070    ///
3071    /// This is the standard path for sending any link-layer control message
3072    /// to a peer over their encrypted Noise session.
3073    pub(super) async fn send_encrypted_link_message(
3074        &mut self,
3075        node_addr: &NodeAddr,
3076        plaintext: &[u8],
3077    ) -> Result<(), NodeError> {
3078        self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
3079            .await
3080    }
3081
3082    /// Update one peer's local-outbound-broken signal from a `transport.send`
3083    /// outcome. Sets a per-peer timestamp on local-side io errors
3084    /// (NetworkUnreachable / HostUnreachable / AddrNotAvailable); clears that
3085    /// peer on success. The reaper consults this in `check_link_heartbeats` to
3086    /// switch only that peer to `fast_link_dead_timeout_secs`.
3087    pub(in crate::node) fn note_local_send_outcome(
3088        &mut self,
3089        node_addr: &NodeAddr,
3090        result: &Result<usize, TransportError>,
3091    ) {
3092        match result {
3093            Ok(_) => {
3094                self.local_send_failure_at_by_peer.remove(node_addr);
3095            }
3096            Err(error) if error.is_local_route_unavailable() => {
3097                self.local_send_failure_at_by_peer
3098                    .insert(*node_addr, std::time::Instant::now());
3099            }
3100            Err(_) => {}
3101        }
3102    }
3103
3104    /// Return the active dead-timeout for one peer after considering recent
3105    /// local route failures. The fast-dead signal is intentionally short-lived:
3106    /// on the UDP worker path a send call can return before the kernel result
3107    /// is observed, so a stale route error must not compress liveness for the
3108    /// whole normal dead-timeout window.
3109    pub(in crate::node) fn local_send_failure_dead_timeout_for_peer(
3110        &self,
3111        node_addr: &NodeAddr,
3112        now: std::time::Instant,
3113        dead_timeout: std::time::Duration,
3114        fast_dead_timeout: std::time::Duration,
3115    ) -> std::time::Duration {
3116        match self.local_send_failure_at_by_peer.get(node_addr).copied() {
3117            Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
3118                fast_dead_timeout.min(dead_timeout)
3119            }
3120            None => dead_timeout,
3121            Some(_) => dead_timeout,
3122        }
3123    }
3124
3125    pub(in crate::node) fn purge_expired_local_send_failures(&mut self, now: std::time::Instant) {
3126        self.local_send_failure_at_by_peer
3127            .retain(|_, at| now.duration_since(*at) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW);
3128    }
3129
3130    pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
3131        self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
3132    }
3133
3134    pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
3135        let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
3136            return false;
3137        };
3138        let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
3139        std::time::Instant::now().duration_since(t) <= grace
3140    }
3141
3142    /// Like `send_encrypted_link_message` but allows setting the FMP CE flag.
3143    ///
3144    /// Used by the forwarding path to relay congestion signals hop-by-hop.
3145    pub(super) async fn send_encrypted_link_message_with_ce(
3146        &mut self,
3147        node_addr: &NodeAddr,
3148        plaintext: &[u8],
3149        ce_flag: bool,
3150    ) -> Result<(), NodeError> {
3151        let peer = self
3152            .peers
3153            .get_mut(node_addr)
3154            .ok_or(NodeError::PeerNotFound(*node_addr))?;
3155
3156        let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
3157            node_addr: *node_addr,
3158            reason: "no their_index".into(),
3159        })?;
3160        let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
3161            node_addr: *node_addr,
3162            reason: "no transport_id".into(),
3163        })?;
3164        let remote_addr = peer
3165            .current_addr()
3166            .cloned()
3167            .ok_or_else(|| NodeError::SendFailed {
3168                node_addr: *node_addr,
3169                reason: "no current_addr".into(),
3170            })?;
3171        #[cfg(any(target_os = "linux", target_os = "macos"))]
3172        let connected_socket = peer.connected_udp();
3173
3174        // Prepend 4-byte session-relative timestamp (inner header)
3175        let timestamp_ms = peer.session_elapsed_ms();
3176
3177        // MMP: read spin bit value before entering session borrow
3178        let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
3179        let mut flags = if sp_flag { FLAG_SP } else { 0 };
3180        if ce_flag {
3181            flags |= FLAG_CE;
3182        }
3183        if peer.current_k_bit() {
3184            flags |= FLAG_KEY_EPOCH;
3185        }
3186
3187        let session = peer
3188            .noise_session_mut()
3189            .ok_or_else(|| NodeError::SendFailed {
3190                node_addr: *node_addr,
3191                reason: "no noise session".into(),
3192            })?;
3193
3194        // Build 16-byte outer header upfront. The inner-plaintext
3195        // layout is `[ts:4 LE][plaintext...]`, so its length is exactly
3196        // `INNER_TS_LEN + plaintext.len()` — no need to build the Vec
3197        // just to measure it. The worker path uses this length to size
3198        // the wire buffer directly; the legacy path below still
3199        // materialises a separate `inner_plaintext` Vec for the inline
3200        // encrypt-and-send call.
3201        const INNER_TS_LEN: usize = 4;
3202        let counter = session.current_send_counter();
3203        let inner_len = INNER_TS_LEN + plaintext.len();
3204        let payload_len = inner_len as u16;
3205        let header = build_established_header(their_index, counter, flags, payload_len);
3206
3207        // **Unix UDP send fast path.** On Unix, the encrypt-worker pool
3208        // is spawned at lifecycle start (workers = num_cpus) in
3209        // production, so this branch is taken for every authentic send on
3210        // every UDP-transported established session. The AEAD work +
3211        // sendmsg syscall run on a dedicated OS thread; the rx_loop only
3212        // builds the wire buffer + reserves the counter inline.
3213        //
3214        // Other transport kinds (BLE, TCP, sim, ethernet) fall
3215        // through to the inline encrypt + transport.send path
3216        // below — those don't have raw-fd / sendmmsg / UDP_GSO
3217        // benefits to expose through the worker pool, so the simpler
3218        // synchronous send is the right shape for them.
3219        //
3220        // Windows intentionally stays on the inline tokio UDP send path:
3221        // lifecycle::start does not spawn these raw-fd workers there, and
3222        // tests may still set `encrypt_workers` manually.
3223        //
3224        // The `encrypt_workers.is_some()` check below is true in Unix
3225        // production (lifecycle::start spawns the pool); it stays checked
3226        // rather than `expect()`-ed because unit tests construct `Node`
3227        // without calling `start()`.
3228        let transport_for_send = self
3229            .transports
3230            .get(&transport_id)
3231            .ok_or(NodeError::TransportNotFound(transport_id))?;
3232        match transport_for_send.connection_state(&remote_addr) {
3233            ConnectionState::Connected => {}
3234            other => {
3235                if matches!(other, ConnectionState::None) {
3236                    let _ = transport_for_send.connect(&remote_addr).await;
3237                }
3238                return Err(NodeError::SendFailed {
3239                    node_addr: *node_addr,
3240                    reason: format!("transport connection not ready: {:?}", other),
3241                });
3242            }
3243        }
3244        #[cfg(unix)]
3245        {
3246            let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
3247            if let Some(workers) = self.encrypt_workers.as_ref().cloned()
3248                && is_udp
3249                && let Some(cipher_clone) = session.send_cipher_clone()
3250            {
3251                // Reserve the counter on the session so subsequent
3252                // sends don't reuse it. `current_send_counter` only
3253                // peeks; we advance via `take_send_counter`.
3254                let reserved_counter =
3255                    session
3256                        .take_send_counter()
3257                        .map_err(|e| NodeError::SendFailed {
3258                            node_addr: *node_addr,
3259                            reason: format!("counter reservation failed: {}", e),
3260                        })?;
3261                debug_assert_eq!(reserved_counter, counter);
3262                // Re-derive the header with the now-locked-in counter
3263                // value (same value, but the call sequence is more
3264                // explicit).
3265                let header =
3266                    build_established_header(their_index, reserved_counter, flags, payload_len);
3267                let transport = transport_for_send;
3268                // Snapshot the per-peer connected UDP socket before
3269                // resolving the fallback address. On the established
3270                // steady-state path this socket already carries the
3271                // kernel peer address, so re-parsing the configured
3272                // transport address and touching the DNS cache on every
3273                // packet is pure overhead on the sender hot path.
3274                let send_target = {
3275                    if let TransportHandle::Udp(udp) = transport {
3276                        let socket_addr = {
3277                            #[cfg(any(target_os = "linux", target_os = "macos"))]
3278                            {
3279                                match connected_socket.as_ref() {
3280                                    Some(socket) => Some(socket.peer_addr()),
3281                                    None => udp.resolve_for_off_task(&remote_addr).await.ok(),
3282                                }
3283                            }
3284                            #[cfg(not(any(target_os = "linux", target_os = "macos")))]
3285                            {
3286                                udp.resolve_for_off_task(&remote_addr).await.ok()
3287                            }
3288                        };
3289                        match (udp.async_socket(), socket_addr) {
3290                            (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3291                            _ => None,
3292                        }
3293                    } else {
3294                        None
3295                    }
3296                };
3297                if let Some((socket, socket_addr)) = send_target {
3298                    // Build the wire buffer **directly** from
3299                    // `plaintext` with a single allocation:
3300                    //   `[16 header][4 ts][plaintext...]` with
3301                    // +16 trailing capacity for the AEAD tag.
3302                    // The worker seals `wire_buf[16..]` in
3303                    // place and appends the tag — no second
3304                    // alloc, no second memcpy.
3305                    //
3306                    // Previous design built `inner_plaintext`
3307                    // via `prepend_inner_header` (1 alloc + 1
3308                    // copy) and then let the worker memcpy
3309                    // header + plaintext into a fresh Vec
3310                    // (another alloc + copy). At ~100 kpps the
3311                    // saved alloc/copy is ~150 MB/sec of memory
3312                    // bandwidth on the hot rx_loop + worker.
3313                    let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3314                    let mut wire_buf = Vec::with_capacity(wire_capacity);
3315                    wire_buf.extend_from_slice(&header);
3316                    wire_buf.extend_from_slice(&timestamp_ms.to_le_bytes());
3317                    wire_buf.extend_from_slice(plaintext);
3318                    let predicted_bytes = wire_capacity;
3319                    // Stats / MMP update inline — predicted size
3320                    // is exact for ChaCha20-Poly1305 (tag is
3321                    // constant 16 bytes). When `connected_socket` is
3322                    // `Some`, the worker sends on it without a
3323                    // destination sockaddr — the kernel skips the
3324                    // per-packet sockaddr + route + neighbor resolve.
3325                    if let Some(peer) = self.peers.get_mut(node_addr) {
3326                        peer.link_stats_mut().record_sent(predicted_bytes);
3327                        if let Some(mmp) = peer.mmp_mut() {
3328                            mmp.sender
3329                                .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3330                        }
3331                    }
3332                    let scheduling_weight = self.send_weight_for_peer(node_addr);
3333                    let bulk_endpoint_data = fmp_plaintext_is_bulk_session_datagram(plaintext);
3334                    workers.dispatch(self::encrypt_worker::FmpSendJob {
3335                        cipher: cipher_clone,
3336                        counter: reserved_counter,
3337                        wire_buf,
3338                        fsp_seal: None,
3339                        socket,
3340                        dest_addr: socket_addr,
3341                        #[cfg(any(target_os = "linux", target_os = "macos"))]
3342                        connected_socket,
3343                        bulk_endpoint_data,
3344                        drop_on_backpressure: bulk_endpoint_data,
3345                        scheduling_weight,
3346                        queued_at: crate::perf_profile::stamp(),
3347                    });
3348                    return Ok(());
3349                }
3350            }
3351        }
3352
3353        // Inline (legacy) path: encrypt + send on the rx_loop.
3354        // Build the inner plaintext lazily here — the worker path
3355        // above never reaches this point, so the prepend_inner_header
3356        // alloc is avoided in the fast path.
3357        let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3358        // Encrypt with AAD binding to the outer header
3359        let ciphertext = {
3360            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3361            session
3362                .encrypt_with_aad(&inner_plaintext, &header)
3363                .map_err(|e| NodeError::SendFailed {
3364                    node_addr: *node_addr,
3365                    reason: format!("encryption failed: {}", e),
3366                })?
3367        };
3368
3369        let wire_packet = build_encrypted(&header, &ciphertext);
3370
3371        // Re-borrow peer for stats update after sending
3372        let send_result = {
3373            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3374            let transport = self
3375                .transports
3376                .get(&transport_id)
3377                .ok_or(NodeError::TransportNotFound(transport_id))?;
3378            transport.send(&remote_addr, &wire_packet).await
3379        };
3380        self.note_local_send_outcome(node_addr, &send_result);
3381        let bytes_sent = send_result.map_err(|e| match e {
3382            TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3383                node_addr: *node_addr,
3384                packet_size,
3385                mtu,
3386            },
3387            other => NodeError::SendFailed {
3388                node_addr: *node_addr,
3389                reason: format!("transport send: {}", other),
3390            },
3391        })?;
3392
3393        // Update send statistics
3394        if let Some(peer) = self.peers.get_mut(node_addr) {
3395            peer.link_stats_mut().record_sent(bytes_sent);
3396            // MMP: record sent frame for sender report generation
3397            if let Some(mmp) = peer.mmp_mut() {
3398                mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3399            }
3400        }
3401
3402        Ok(())
3403    }
3404}
3405
3406impl fmt::Debug for Node {
3407    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3408        f.debug_struct("Node")
3409            .field("node_addr", self.node_addr())
3410            .field("state", &self.state)
3411            .field("is_leaf_only", &self.is_leaf_only)
3412            .field("connections", &self.connection_count())
3413            .field("peers", &self.peer_count())
3414            .field("links", &self.link_count())
3415            .field("transports", &self.transport_count())
3416            .finish()
3417    }
3418}