Skip to main content

fips_core/node/
mod.rs

1//! FIPS Node Entity
2//!
3//! Top-level structure representing a running FIPS instance. The Node
4//! holds all state required for mesh routing: identity, tree state,
5//! Bloom filters, coordinate caches, transports, links, and peers.
6
7mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32    ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33    build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
38use crate::node::session::SessionEntry;
39use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
40use crate::peer::{ActivePeer, PeerConnection};
41#[cfg(any(target_os = "linux", target_os = "macos"))]
42use crate::transport::ethernet::EthernetTransport;
43use crate::transport::tcp::TcpTransport;
44use crate::transport::tor::TorTransport;
45use crate::transport::udp::UdpTransport;
46#[cfg(feature = "webrtc-transport")]
47use crate::transport::webrtc::WebRtcTransport;
48use crate::transport::{
49    ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
50    TransportHandle, TransportId,
51};
52use crate::tree::TreeState;
53use crate::upper::hosts::HostMap;
54use crate::upper::icmp_rate_limit::IcmpRateLimiter;
55use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
56use crate::utils::index::IndexAllocator;
57use crate::{
58    Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
59    PeerIdentity, encode_npub,
60};
61use rand::Rng;
62use std::collections::{HashMap, HashSet, VecDeque};
63use std::fmt;
64use std::sync::Arc;
65use std::thread::JoinHandle;
66use thiserror::Error;
67use tracing::{debug, warn};
68
69const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
70const SESSION_DIRECT_DEGRADED_HOLD_MS: u64 = 20_000;
71const SESSION_DIRECT_DEGRADED_MIN_SAMPLE: u64 = 16;
72const SESSION_DIRECT_DEGRADED_LOSS_THRESHOLD: f64 = 0.08;
73const SESSION_DIRECT_RECOVERY_LOSS_THRESHOLD: f64 = 0.02;
74const ROUTING_FALLBACK_MIN_COST_ADVANTAGE: f64 = 0.25;
75
76fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
77    if plaintext
78        .first()
79        .is_none_or(|ty| *ty != LinkMessageType::SessionDatagram.to_byte())
80    {
81        return false;
82    }
83    let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
84        return false;
85    };
86    FspCommonPrefix::parse(fsp_payload).is_some_and(|prefix| {
87        prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted() && !prefix.has_coords()
88    })
89}
90
91fn endpoint_payload_is_tcp(payload: &[u8]) -> bool {
92    const IPPROTO_TCP: u8 = 6;
93    const IPV4_MIN_HEADER_LEN: usize = 20;
94
95    let Some(version_ihl) = payload.first().copied() else {
96        return false;
97    };
98
99    match version_ihl >> 4 {
100        4 => {
101            if payload.len() < IPV4_MIN_HEADER_LEN {
102                return false;
103            }
104            let header_len = usize::from(version_ihl & 0x0f) * 4;
105            header_len >= IPV4_MIN_HEADER_LEN
106                && payload.len() >= header_len
107                && payload[9] == IPPROTO_TCP
108        }
109        6 => ipv6_payload_next_header(payload).is_some_and(|proto| proto == IPPROTO_TCP),
110        _ => false,
111    }
112}
113
114fn ipv6_payload_next_header(payload: &[u8]) -> Option<u8> {
115    const IPV6_HEADER_LEN: usize = 40;
116    const IPV6_FRAGMENT_HEADER_LEN: usize = 8;
117
118    if payload.len() < IPV6_HEADER_LEN || payload[0] >> 4 != 6 {
119        return None;
120    }
121
122    let mut next_header = payload[6];
123    let mut offset = IPV6_HEADER_LEN;
124    let mut extension_count = 0usize;
125    while ipv6_extension_header_is_skippable(next_header) {
126        if next_header == 44 {
127            if payload.len() < offset + IPV6_FRAGMENT_HEADER_LEN {
128                return None;
129            }
130            next_header = payload[offset];
131            offset += IPV6_FRAGMENT_HEADER_LEN;
132        } else if next_header == 51 {
133            if payload.len() < offset + 2 {
134                return None;
135            }
136            let header_len = (usize::from(payload[offset + 1]) + 2) * 4;
137            if payload.len() < offset + header_len {
138                return None;
139            }
140            next_header = payload[offset];
141            offset += header_len;
142        } else {
143            if payload.len() < offset + 2 {
144                return None;
145            }
146            let header_len = (usize::from(payload[offset + 1]) + 1) * 8;
147            if payload.len() < offset + header_len {
148                return None;
149            }
150            next_header = payload[offset];
151            offset += header_len;
152        }
153        extension_count += 1;
154        if extension_count > 8 {
155            return None;
156        }
157    }
158
159    Some(next_header)
160}
161
162fn ipv6_extension_header_is_skippable(next_header: u8) -> bool {
163    matches!(next_header, 0 | 43 | 44 | 51 | 60 | 135)
164}
165
166/// Half-range of the symmetric jitter applied to per-session rekey timers.
167///
168/// Each FMP/FSP session draws an offset uniformly from
169/// `[-REKEY_JITTER_SECS, +REKEY_JITTER_SECS]` seconds at construction and
170/// after each cutover. This preserves the configured mean interval while
171/// reducing dual-initiation bursts in symmetric-start meshes.
172pub(crate) const REKEY_JITTER_SECS: i64 = 15;
173
174/// Errors related to node operations.
175#[derive(Debug, Error)]
176pub enum NodeError {
177    #[error("node not started")]
178    NotStarted,
179
180    #[error("node already started")]
181    AlreadyStarted,
182
183    #[error("node already stopped")]
184    AlreadyStopped,
185
186    #[error("transport not found: {0}")]
187    TransportNotFound(TransportId),
188
189    #[error("no transport available for type: {0}")]
190    NoTransportForType(String),
191
192    #[error("link not found: {0}")]
193    LinkNotFound(LinkId),
194
195    #[error("connection not found: {0}")]
196    ConnectionNotFound(LinkId),
197
198    #[error("peer not found: {0:?}")]
199    PeerNotFound(NodeAddr),
200
201    #[error("peer already exists: {0:?}")]
202    PeerAlreadyExists(NodeAddr),
203
204    #[error("connection already exists for link: {0}")]
205    ConnectionAlreadyExists(LinkId),
206
207    #[error("invalid peer npub '{npub}': {reason}")]
208    InvalidPeerNpub { npub: String, reason: String },
209
210    #[error("discovery error: {0}")]
211    Discovery(String),
212
213    #[error("access denied: {0}")]
214    AccessDenied(String),
215
216    #[error("max connections exceeded: {max}")]
217    MaxConnectionsExceeded { max: usize },
218
219    #[error("max peers exceeded: {max}")]
220    MaxPeersExceeded { max: usize },
221
222    #[error("max links exceeded: {max}")]
223    MaxLinksExceeded { max: usize },
224
225    #[error("handshake incomplete for link {0}")]
226    HandshakeIncomplete(LinkId),
227
228    #[error("no session available for link {0}")]
229    NoSession(LinkId),
230
231    #[error("promotion failed for link {link_id}: {reason}")]
232    PromotionFailed { link_id: LinkId, reason: String },
233
234    #[error("send failed to {node_addr}: {reason}")]
235    SendFailed { node_addr: NodeAddr, reason: String },
236
237    #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
238    MtuExceeded {
239        node_addr: NodeAddr,
240        packet_size: usize,
241        mtu: u16,
242    },
243
244    #[error("config error: {0}")]
245    Config(#[from] ConfigError),
246
247    #[error("identity error: {0}")]
248    Identity(#[from] IdentityError),
249
250    #[error("TUN error: {0}")]
251    Tun(#[from] TunError),
252
253    #[error("index allocation failed: {0}")]
254    IndexAllocationFailed(String),
255
256    #[error("handshake failed: {0}")]
257    HandshakeFailed(String),
258
259    #[error("transport error: {0}")]
260    TransportError(String),
261
262    #[error("local route unavailable: {0}")]
263    LocalRouteUnavailable(String),
264
265    #[error("bootstrap handoff failed: {0}")]
266    BootstrapHandoff(String),
267}
268
269impl NodeError {
270    pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
271        if error.is_local_route_unavailable() {
272            Self::LocalRouteUnavailable(error.to_string())
273        } else {
274            Self::TransportError(error.to_string())
275        }
276    }
277
278    pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
279        matches!(self, Self::LocalRouteUnavailable(_))
280    }
281}
282
283/// Source-attributed packet delivered by a node running without a system TUN.
284#[derive(Debug, Clone, PartialEq, Eq)]
285pub struct NodeDeliveredPacket {
286    /// FIPS node address that originated the packet.
287    pub source_node_addr: NodeAddr,
288    /// Source Nostr public key when the node has learned it.
289    pub source_npub: Option<String>,
290    /// Destination FIPS address from the IPv6 packet.
291    pub destination: FipsAddress,
292    /// Full IPv6 packet after FIPS session decapsulation.
293    pub packet: Vec<u8>,
294}
295
296#[derive(Debug, Clone)]
297struct IdentityCacheEntry {
298    node_addr: NodeAddr,
299    pubkey: secp256k1::PublicKey,
300    npub: String,
301    last_seen_ms: u64,
302}
303
304impl IdentityCacheEntry {
305    fn new(
306        node_addr: NodeAddr,
307        pubkey: secp256k1::PublicKey,
308        npub: String,
309        last_seen_ms: u64,
310    ) -> Self {
311        Self {
312            node_addr,
313            pubkey,
314            npub,
315            last_seen_ms,
316        }
317    }
318}
319
320/// App-owned packet channels for embedding FIPS without a system TUN.
321#[derive(Debug)]
322pub struct ExternalPacketIo {
323    /// Send outbound IPv6 packets into the node.
324    pub outbound_tx: crate::upper::tun::TunOutboundTx,
325    /// Receive inbound IPv6 packets delivered by FIPS sessions.
326    pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
327}
328
329/// App-owned endpoint data channels for embedding FIPS without a daemon.
330#[derive(Debug)]
331pub(crate) struct EndpointDataIo {
332    /// Send endpoint data commands into the node RX loop.
333    ///
334    /// Bounded with a generous default so normal sender bursts do not
335    /// stall on semaphore acquisition. macOS pacing happens at the UDP
336    /// egress thread where the real Wi-Fi/interface bottleneck is visible;
337    /// constraining this app queue instead caused the inner TCP flow to
338    /// collapse under iperf. `FIPS_ENDPOINT_DATA_QUEUE_CAP` overrides the
339    /// default for benches.
340    pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
341    /// Receive endpoint data delivered by FIPS sessions.
342    ///
343    /// Unbounded so the rx_loop's send on inbound packet delivery is a
344    /// wait-free push (no semaphore acquire), and so we can drop the
345    /// per-packet cross-task relay that previously sat between the node
346    /// task and the `FipsEndpoint::recv()` consumer. Backpressure is
347    /// naturally bounded — the rx_loop both produces here and runs the
348    /// same runtime that schedules the consumer, so a stalled consumer
349    /// stalls production too.
350    pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
351    /// Clone of the event_tx exposed for in-process loopback (e.g.
352    /// `FipsEndpoint::send` to self_npub). Lets the endpoint inject an
353    /// event into the same queue without going through the encrypt /
354    /// decrypt path, while keeping every consumer reading from a single
355    /// channel.
356    pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
357}
358
359fn endpoint_data_command_capacity(requested: usize) -> usize {
360    if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
361        && let Ok(value) = raw.trim().parse::<usize>()
362        && value > 0
363    {
364        return value;
365    }
366
367    requested.max(1).max(32_768)
368}
369
370/// Commands accepted by the node endpoint data service.
371#[derive(Debug)]
372pub(crate) enum NodeEndpointCommand {
373    /// Send with an explicit response channel — used by callers that
374    /// care whether the local-stack handoff succeeded (e.g.
375    /// `blocking_send` waits for the runtime to accept the send).
376    Send {
377        remote: PeerIdentity,
378        payload: Vec<u8>,
379        queued_at: Option<std::time::Instant>,
380        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
381    },
382    /// **Fire-and-forget** variant of `Send` — no oneshot allocation,
383    /// no per-packet result channel. Used by the data-plane fast path
384    /// (`FipsEndpoint::send`) where the caller already discards the
385    /// result. Saves one oneshot::channel() allocation per outbound
386    /// packet on the application's send hot path.
387    SendOneway {
388        remote: PeerIdentity,
389        payload: Vec<u8>,
390        queued_at: Option<std::time::Instant>,
391    },
392    PeerSnapshot {
393        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
394    },
395    RelaySnapshot {
396        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
397    },
398    UpdateRelays {
399        advert_relays: Vec<String>,
400        dm_relays: Vec<String>,
401        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
402    },
403    /// Replace the runtime peer list. Newly added auto-connect peers get
404    /// `initiate_peer_connection` immediately; removed peers are dropped
405    /// from the retry queue (the regular liveness timeout reaps any active
406    /// session). Existing entries are kept and their `addresses` field is
407    /// refreshed so the next retry sees the latest hints.
408    UpdatePeers {
409        peers: Vec<crate::config::PeerConfig>,
410        response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
411    },
412}
413
414/// Reports what changed in response to `UpdatePeers`.
415#[derive(Debug, Clone, Default, PartialEq, Eq)]
416pub(crate) struct UpdatePeersOutcome {
417    pub(crate) added: usize,
418    pub(crate) removed: usize,
419    pub(crate) updated: usize,
420    pub(crate) unchanged: usize,
421}
422
423/// Endpoint data events emitted by the node session receive path.
424#[derive(Debug)]
425pub(crate) enum NodeEndpointEvent {
426    Data {
427        source_node_addr: NodeAddr,
428        source_npub: Option<String>,
429        payload: Vec<u8>,
430        queued_at: Option<std::time::Instant>,
431    },
432}
433
434/// Authenticated peer state exposed to embedded endpoint callers.
435#[derive(Debug, Clone, PartialEq, Eq)]
436pub(crate) struct NodeEndpointPeer {
437    pub(crate) npub: String,
438    pub(crate) connected: bool,
439    pub(crate) transport_addr: Option<String>,
440    pub(crate) transport_type: Option<String>,
441    pub(crate) link_id: u64,
442    pub(crate) srtt_ms: Option<u64>,
443    pub(crate) packets_sent: u64,
444    pub(crate) packets_recv: u64,
445    pub(crate) bytes_sent: u64,
446    pub(crate) bytes_recv: u64,
447    pub(crate) direct_probe_pending: bool,
448    pub(crate) direct_probe_after_ms: Option<u64>,
449}
450
451/// Live Nostr relay state exposed to embedded endpoint callers.
452#[derive(Debug, Clone, PartialEq, Eq)]
453pub(crate) struct NodeEndpointRelayStatus {
454    pub(crate) url: String,
455    pub(crate) status: String,
456}
457
458/// Node operational state.
459#[derive(Clone, Copy, Debug, PartialEq, Eq)]
460pub enum NodeState {
461    /// Created but not started.
462    Created,
463    /// Starting up (initializing transports).
464    Starting,
465    /// Fully operational.
466    Running,
467    /// Shutting down.
468    Stopping,
469    /// Stopped.
470    Stopped,
471}
472
473impl NodeState {
474    /// Check if node is operational.
475    pub fn is_operational(&self) -> bool {
476        matches!(self, NodeState::Running)
477    }
478
479    /// Check if node can be started.
480    pub fn can_start(&self) -> bool {
481        matches!(self, NodeState::Created | NodeState::Stopped)
482    }
483
484    /// Check if node can be stopped.
485    pub fn can_stop(&self) -> bool {
486        matches!(self, NodeState::Running)
487    }
488}
489
490impl fmt::Display for NodeState {
491    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
492        let s = match self {
493            NodeState::Created => "created",
494            NodeState::Starting => "starting",
495            NodeState::Running => "running",
496            NodeState::Stopping => "stopping",
497            NodeState::Stopped => "stopped",
498        };
499        write!(f, "{}", s)
500    }
501}
502
503/// Recent request tracking for dedup and reverse-path forwarding.
504///
505/// When a LookupRequest is forwarded through a node, the node stores the
506/// request_id and which peer sent it. When the corresponding LookupResponse
507/// arrives, it's forwarded back to that peer (reverse-path forwarding).
508/// The `response_forwarded` flag prevents response routing loops.
509#[derive(Clone, Debug)]
510pub(crate) struct RecentRequest {
511    /// The peer who sent this request to us.
512    pub(crate) from_peer: NodeAddr,
513    /// When we received this request (Unix milliseconds).
514    pub(crate) timestamp_ms: u64,
515    /// Whether we've already forwarded a response for this request.
516    /// Prevents response routing loops when convergent request paths
517    /// create bidirectional entries in recent_requests.
518    pub(crate) response_forwarded: bool,
519}
520
521impl RecentRequest {
522    pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
523        Self {
524            from_peer,
525            timestamp_ms,
526            response_forwarded: false,
527        }
528    }
529
530    /// Check if this entry has expired (older than expiry_ms).
531    pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
532        current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
533    }
534}
535
536/// Key for addr_to_link reverse lookup.
537type AddrKey = (TransportId, TransportAddr);
538
539/// Per-transport kernel drop tracking for congestion detection.
540///
541/// Sampled every tick (1s). The `dropping` flag indicates whether new
542/// kernel drops were observed since the previous sample.
543#[derive(Debug, Default)]
544struct TransportDropState {
545    /// Previous `recv_drops` sample (cumulative counter).
546    prev_drops: u64,
547    /// True if drops increased since the last sample.
548    dropping: bool,
549}
550
551/// State for a link waiting for transport-level connection establishment.
552///
553/// For connection-oriented transports (TCP, Tor), the transport connect runs
554/// asynchronously. This struct holds the data needed to complete the handshake
555/// once the connection is ready.
556struct PendingConnect {
557    /// The link that was created for this connection.
558    link_id: LinkId,
559    /// Which transport is being used.
560    transport_id: TransportId,
561    /// The remote address being connected to.
562    remote_addr: TransportAddr,
563    /// The peer identity (for handshake initiation).
564    peer_identity: PeerIdentity,
565}
566
567/// A running FIPS node instance.
568///
569/// This is the top-level container holding all node state.
570///
571/// ## Peer Lifecycle
572///
573/// Peers go through two phases:
574/// 1. **Connection phase** (`connections`): Handshake in progress, indexed by LinkId
575/// 2. **Active phase** (`peers`): Authenticated, indexed by NodeAddr
576///
577/// The `addr_to_link` map enables dispatching incoming packets to the right
578/// connection before authentication completes.
579// Discovery lookup constants moved to config: node.discovery.attempt_timeouts_secs, node.discovery.ttl
580pub struct Node {
581    // === Identity ===
582    /// This node's cryptographic identity.
583    identity: Identity,
584
585    /// Random epoch generated at startup for peer restart detection.
586    /// Exchanged inside Noise handshake messages so peers can detect restarts.
587    startup_epoch: [u8; 8],
588
589    /// Instant when the node was created, for uptime reporting.
590    started_at: std::time::Instant,
591
592    // === Configuration ===
593    /// Loaded configuration.
594    config: Config,
595
596    // === State ===
597    /// Node operational state.
598    state: NodeState,
599
600    /// Whether this is a leaf-only node.
601    is_leaf_only: bool,
602
603    // === Spanning Tree ===
604    /// Local spanning tree state.
605    tree_state: TreeState,
606
607    // === Bloom Filter ===
608    /// Local Bloom filter state.
609    bloom_state: BloomState,
610
611    // === Routing ===
612    /// Address -> coordinates cache (from session setup and discovery).
613    coord_cache: CoordCache,
614    /// Locally learned reverse-path next-hop hints.
615    learned_routes: LearnedRouteTable,
616    /// Destinations whose direct first-hop path is temporarily suspect because
617    /// session-layer MMP observed sustained loss while using that direct path.
618    session_direct_degraded_until_ms: HashMap<NodeAddr, u64>,
619    /// Recent discovery requests (dedup + reverse-path forwarding).
620    /// Maps request_id → RecentRequest.
621    recent_requests: HashMap<u64, RecentRequest>,
622    /// Per-destination path MTU lookup, keyed by FipsAddress (mirrors
623    /// `coord_cache.entries[*].path_mtu`). Sync read-only access from
624    /// the TUN reader/writer threads at TCP MSS clamp time so the
625    /// SYN/SYN-ACK clamp can use the smaller of the local-egress floor
626    /// and the learned per-destination path MTU.
627    path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
628
629    // === Transports & Links ===
630    /// Active transports (owned by Node).
631    transports: HashMap<TransportId, TransportHandle>,
632    /// Per-transport kernel drop tracking for congestion detection.
633    transport_drops: HashMap<TransportId, TransportDropState>,
634    /// Active links.
635    links: HashMap<LinkId, Link>,
636    /// Reverse lookup: (transport_id, remote_addr) -> link_id.
637    addr_to_link: HashMap<AddrKey, LinkId>,
638
639    // === Packet Channel ===
640    /// Packet sender for transports.
641    packet_tx: Option<PacketTx>,
642    /// Packet receiver (for event loop).
643    packet_rx: Option<PacketRx>,
644
645    // === Connections (Handshake Phase) ===
646    /// Pending connections (handshake in progress).
647    /// Indexed by LinkId since we don't know the peer's identity yet.
648    connections: HashMap<LinkId, PeerConnection>,
649
650    // === Peers (Active Phase) ===
651    /// Authenticated peers.
652    /// Indexed by NodeAddr (verified identity).
653    peers: HashMap<NodeAddr, ActivePeer>,
654
655    // === End-to-End Sessions ===
656    /// Session table for end-to-end encrypted sessions.
657    /// Keyed by remote NodeAddr.
658    sessions: HashMap<NodeAddr, SessionEntry>,
659
660    // === Identity Cache ===
661    /// Maps FipsAddress prefix bytes (bytes 1-15) to cached peer identity data.
662    /// Enables reverse lookup from IPv6 destination to session/routing identity.
663    identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
664
665    // === Pending TUN Packets ===
666    /// Packets queued while waiting for session establishment.
667    /// Keyed by destination NodeAddr, bounded per-dest and total.
668    pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
669    /// Endpoint data payloads queued while waiting for session establishment.
670    pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
671    // === Pending Discovery Lookups ===
672    /// Tracks in-flight discovery lookups. Maps target NodeAddr to the
673    /// initiation timestamp (Unix ms). Prevents duplicate flood queries.
674    pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
675
676    // === Resource Limits ===
677    /// Maximum connections (0 = unlimited).
678    max_connections: usize,
679    /// Maximum peers (0 = unlimited).
680    max_peers: usize,
681    /// Maximum links (0 = unlimited).
682    max_links: usize,
683
684    // === Counters ===
685    /// Next link ID to allocate.
686    next_link_id: u64,
687    /// Next transport ID to allocate.
688    next_transport_id: u32,
689
690    // === Node Statistics ===
691    /// Routing, forwarding, discovery, and error signal counters.
692    stats: stats::NodeStats,
693
694    /// Time-series history of node-level metrics (1s/1m rings).
695    stats_history: stats_history::StatsHistory,
696
697    // === TUN Interface ===
698    /// TUN device state.
699    tun_state: TunState,
700    /// TUN interface name (for cleanup).
701    tun_name: Option<String>,
702    /// TUN packet sender channel.
703    tun_tx: Option<TunTx>,
704    /// Receiver for outbound packets from the TUN reader.
705    tun_outbound_rx: Option<TunOutboundRx>,
706    /// App-owned packet sink used by embedded/no-TUN integrations.
707    external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
708    /// Endpoint data command receiver used by embedded/no-daemon integrations.
709    endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
710    /// Endpoint data event sink used by embedded/no-daemon integrations.
711    endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
712    /// Off-task FMP-encrypt + UDP-send worker pool. `None` if not yet
713    /// spawned (set up in `start()` once transports are running).
714    /// `Some(pool)` once available; the pool internally holds
715    /// per-worker mpsc senders and round-robins jobs across them.
716    /// See `node::encrypt_worker` for the rationale and layout.
717    encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
718    /// Off-task FMP + FSP decrypt + delivery worker pool. Mirror of
719    /// `encrypt_workers` for the receive side.
720    decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
721    /// Set of sessions that have been registered with the decrypt
722    /// shard worker pool. Used by rx_loop to decide between fast-path
723    /// dispatch (worker owns the session) and legacy in-place decrypt
724    /// (worker doesn't have it yet). Per the data-plane restructure,
725    /// the worker owns its session state directly — there's no shared
726    /// `Arc<RwLock<HashMap>>` of cipher / replay state anymore, only
727    /// this set tracks **whether** the worker has been told about a
728    /// given session.
729    decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
730    /// Fallback channel: decrypt worker bounces non-fast-path packets
731    /// (anything that's not bulk EndpointData) back here for rx_loop
732    /// to handle via the legacy path. Drained by a new rx_loop arm.
733    decrypt_fallback_rx:
734        Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
735    decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
736    /// TUN reader thread handle.
737    tun_reader_handle: Option<JoinHandle<()>>,
738    /// TUN writer thread handle.
739    tun_writer_handle: Option<JoinHandle<()>>,
740    /// Shutdown pipe: writing to this fd unblocks the TUN reader thread on macOS.
741    /// On Linux, deleting the interface via netlink serves the same purpose.
742    #[cfg(target_os = "macos")]
743    tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
744
745    // === DNS Responder ===
746    /// Receiver for resolved identities from the DNS responder.
747    dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
748    /// DNS responder task handle.
749    dns_task: Option<tokio::task::JoinHandle<()>>,
750
751    // === Index-Based Session Dispatch ===
752    /// Allocator for session indices.
753    index_allocator: IndexAllocator,
754    /// O(1) lookup: (transport_id, our_index) → NodeAddr.
755    /// This maps our session index to the peer that uses it.
756    peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
757    /// Pending outbound handshakes by our sender_idx.
758    /// Tracks which LinkId corresponds to which session index.
759    pending_outbound: HashMap<(TransportId, u32), LinkId>,
760
761    // === Rate Limiting ===
762    /// Rate limiter for msg1 processing (DoS protection).
763    msg1_rate_limiter: HandshakeRateLimiter,
764    /// Rate limiter for ICMP Packet Too Big messages.
765    icmp_rate_limiter: IcmpRateLimiter,
766    /// Rate limiter for routing error signals (CoordsRequired / PathBroken).
767    routing_error_rate_limiter: RoutingErrorRateLimiter,
768    /// Rate limiter for source-side CoordsRequired/PathBroken responses.
769    coords_response_rate_limiter: RoutingErrorRateLimiter,
770    /// Backoff for failed discovery lookups (originator-side).
771    discovery_backoff: DiscoveryBackoff,
772    /// Rate limiter for forwarded discovery requests (transit-side).
773    discovery_forward_limiter: DiscoveryForwardRateLimiter,
774
775    // === Pending Transport Connects ===
776    /// Links waiting for transport-level connection establishment before
777    /// sending handshake msg1. For connection-oriented transports (TCP, Tor),
778    /// the transport connect runs in the background; the tick handler polls
779    /// connection_state() and initiates the handshake when connected.
780    pending_connects: Vec<PendingConnect>,
781
782    // === Connection Retry ===
783    /// Retry state for peers whose outbound connections have failed.
784    /// Keyed by NodeAddr. Entries are created when a handshake times out
785    /// or fails, and removed on successful promotion or when max retries
786    /// are exhausted.
787    retry_pending: HashMap<NodeAddr, retry::RetryState>,
788
789    /// Optional Nostr/STUN overlay discovery coordinator for `udp:nat` peers.
790    nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
791    /// mDNS / DNS-SD responder + browser for local-link peer discovery.
792    /// Identity is unverified at this layer — the Noise XX handshake
793    /// initiated against an mDNS-observed endpoint is what proves the
794    /// peer holds the matching private key.
795    lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
796    /// Same-host JSON registry under `~/.fips/instances`. Records are
797    /// loopback routing hints only; peer identity is still verified by the
798    /// Noise handshake.
799    local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
800    local_instance_started_at_ms: Option<u64>,
801    last_local_instance_publish_ms: Option<u64>,
802    last_local_instance_scan_ms: Option<u64>,
803    /// Wall-clock ms when Nostr discovery successfully started, used to
804    /// schedule the one-shot startup advert sweep after a settle delay.
805    /// `None` until discovery comes up; remains `None` if discovery is
806    /// disabled or failed to start.
807    nostr_discovery_started_at_ms: Option<u64>,
808    /// Whether the one-shot startup advert sweep has run. Set to true
809    /// after the first sweep fires (under `policy: open`); thereafter
810    /// only the per-tick `queue_open_discovery_retries` continues.
811    startup_open_discovery_sweep_done: bool,
812    /// Per-peer UDP transports adopted from NAT traversal handoff.
813    bootstrap_transports: HashSet<TransportId>,
814    /// Originating peer npub (bech32) for each adopted bootstrap
815    /// transport, captured at `adopt_established_traversal` time.
816    /// Populated alongside `bootstrap_transports`; cleared in
817    /// `cleanup_bootstrap_transport_if_unused`. Used by the rx loop to
818    /// route fatal-protocol-mismatch observations back to the
819    /// Nostr-discovery `failure_state` for long cooldown application.
820    bootstrap_transport_npubs: HashMap<TransportId, String>,
821    /// Peers that should not be used as reply-learned fallback transit for
822    /// other destinations. Direct lookups to the peer are still permitted.
823    discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
824
825    // === Periodic Parent Re-evaluation ===
826    /// Timestamp of last periodic parent re-evaluation (for pacing).
827    last_parent_reeval: Option<crate::time::Instant>,
828
829    // === Congestion Logging ===
830    /// Timestamp of last congestion detection log (rate-limited to 5s).
831    last_congestion_log: Option<std::time::Instant>,
832
833    // === Mesh Size Estimate ===
834    /// Cached estimated mesh size (computed once per tick from bloom filters).
835    estimated_mesh_size: Option<u64>,
836    /// Timestamp of last mesh size log emission.
837    last_mesh_size_log: Option<std::time::Instant>,
838
839    // === Bloom Self-Plausibility ===
840    /// Rate-limit state for the self-plausibility WARN. Fires at most
841    /// once per 60s globally when our own outgoing FilterAnnounce has
842    /// an FPR above `node.bloom.max_inbound_fpr`, signalling either
843    /// aggregation drift or an ingress bypass.
844    last_self_warn: Option<std::time::Instant>,
845
846    // === Local Outbound Liveness ===
847    /// Set per peer when a `transport.send` returned a local-side io error
848    /// (`NetworkUnreachable` / `HostUnreachable` / `AddrNotAvailable`),
849    /// cleared on the next successful send to that peer. Used by
850    /// `check_link_heartbeats` to compress only that peer's dead-timeout to
851    /// `fast_link_dead_timeout_secs` while its outbound is observed broken.
852    local_send_failure_at_by_peer: HashMap<NodeAddr, std::time::Instant>,
853    /// Set when the rx loop could not complete its 1s maintenance work
854    /// inside the watchdog timeout. Link-dead detection may be valid during
855    /// overload, but traversal cooldown should not punish a path just because
856    /// our own scheduler/worker queue was late.
857    last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
858
859    // === Display Names ===
860    /// Human-readable names for configured peers (alias or short npub).
861    /// Populated at startup from peer config.
862    peer_aliases: HashMap<NodeAddr, String>,
863    /// Scheduler weight for explicitly configured peers. Built when config
864    /// changes so the packet hot path only does a NodeAddr hash lookup.
865    configured_peer_send_weights: HashMap<NodeAddr, u8>,
866
867    /// Reloadable peer ACL state from standard allow/deny files.
868    peer_acl: acl::PeerAclReloader,
869
870    // === Host Map ===
871    /// Static hostname → npub mapping for DNS resolution.
872    /// Built at construction from peer aliases and /etc/fips/hosts.
873    host_map: Arc<HostMap>,
874}
875
876impl Node {
877    /// Create a new node from configuration.
878    pub fn new(config: Config) -> Result<Self, NodeError> {
879        config.validate()?;
880        let identity = config.create_identity()?;
881        let node_addr = *identity.node_addr();
882        let is_leaf_only = config.is_leaf_only();
883
884        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
885        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
886
887        let mut startup_epoch = [0u8; 8];
888        rand::rng().fill_bytes(&mut startup_epoch);
889
890        let mut bloom_state = if is_leaf_only {
891            BloomState::leaf_only(node_addr)
892        } else {
893            BloomState::new(node_addr)
894        };
895        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
896
897        let tun_state = if config.tun.enabled {
898            TunState::Configured
899        } else {
900            TunState::Disabled
901        };
902
903        // Initialize tree state with signed self-declaration
904        let mut tree_state = TreeState::new(node_addr);
905        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
906        tree_state.set_hold_down(config.node.tree.hold_down_secs);
907        tree_state.set_flap_dampening(
908            config.node.tree.flap_threshold,
909            config.node.tree.flap_window_secs,
910            config.node.tree.flap_dampening_secs,
911        );
912        tree_state
913            .sign_declaration(&identity)
914            .expect("signing own declaration should never fail");
915
916        let coord_cache = CoordCache::new(
917            config.node.cache.coord_size,
918            config.node.cache.coord_ttl_secs * 1000,
919        );
920        let rl = &config.node.rate_limit;
921        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
922            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
923            config.node.limits.max_pending_inbound,
924        );
925
926        let max_connections = config.node.limits.max_connections;
927        let max_peers = config.node.limits.max_peers;
928        let max_links = config.node.limits.max_links;
929        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
930        let backoff_base_secs = config.node.discovery.backoff_base_secs;
931        let backoff_max_secs = config.node.discovery.backoff_max_secs;
932        let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
933
934        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
935        let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
936
937        Ok(Self {
938            identity,
939            startup_epoch,
940            started_at: std::time::Instant::now(),
941            config,
942            state: NodeState::Created,
943            is_leaf_only,
944            tree_state,
945            bloom_state,
946            coord_cache,
947            learned_routes: LearnedRouteTable::default(),
948            session_direct_degraded_until_ms: HashMap::new(),
949            recent_requests: HashMap::new(),
950            transports: HashMap::new(),
951            transport_drops: HashMap::new(),
952            links: HashMap::new(),
953            addr_to_link: HashMap::new(),
954            packet_tx: None,
955            packet_rx: None,
956            connections: HashMap::new(),
957            peers: HashMap::new(),
958            sessions: HashMap::new(),
959            identity_cache: HashMap::new(),
960            pending_tun_packets: HashMap::new(),
961            pending_endpoint_data: HashMap::new(),
962            pending_lookups: HashMap::new(),
963            max_connections,
964            max_peers,
965            max_links,
966            next_link_id: 1,
967            next_transport_id: 1,
968            stats: stats::NodeStats::new(),
969            stats_history: stats_history::StatsHistory::new(),
970            tun_state,
971            tun_name: None,
972            tun_tx: None,
973            tun_outbound_rx: None,
974            external_packet_tx: None,
975            endpoint_command_rx: None,
976            endpoint_event_tx: None,
977            encrypt_workers: None,
978            decrypt_workers: None,
979            decrypt_registered_sessions: std::collections::HashSet::new(),
980            decrypt_fallback_tx,
981            decrypt_fallback_rx,
982            tun_reader_handle: None,
983            tun_writer_handle: None,
984            #[cfg(target_os = "macos")]
985            tun_shutdown_fd: None,
986            dns_identity_rx: None,
987            dns_task: None,
988            index_allocator: IndexAllocator::new(),
989            peers_by_index: HashMap::new(),
990            pending_outbound: HashMap::new(),
991            msg1_rate_limiter,
992            icmp_rate_limiter: IcmpRateLimiter::new(),
993            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
994            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
995                std::time::Duration::from_millis(coords_response_interval_ms),
996            ),
997            discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
998            discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
999                std::time::Duration::from_secs(forward_min_interval_secs),
1000            ),
1001            pending_connects: Vec::new(),
1002            retry_pending: HashMap::new(),
1003            nostr_discovery: None,
1004            nostr_discovery_started_at_ms: None,
1005            lan_discovery: None,
1006            local_instance_registry: None,
1007            local_instance_started_at_ms: None,
1008            last_local_instance_publish_ms: None,
1009            last_local_instance_scan_ms: None,
1010            startup_open_discovery_sweep_done: false,
1011            bootstrap_transports: HashSet::new(),
1012            bootstrap_transport_npubs: HashMap::new(),
1013            discovery_fallback_transit_blocked_peers: HashSet::new(),
1014            last_parent_reeval: None,
1015            last_congestion_log: None,
1016            estimated_mesh_size: None,
1017            last_mesh_size_log: None,
1018            last_self_warn: None,
1019            local_send_failure_at_by_peer: HashMap::new(),
1020            last_rx_loop_maintenance_timeout_at: None,
1021            peer_aliases: HashMap::new(),
1022            configured_peer_send_weights,
1023            peer_acl,
1024            host_map,
1025            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1026        })
1027    }
1028
1029    /// Create a node with a specific identity.
1030    ///
1031    /// This constructor validates cross-field config invariants before
1032    /// constructing the node, same as [`Node::new`].
1033    pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
1034        config.validate()?;
1035        let node_addr = *identity.node_addr();
1036
1037        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
1038        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
1039
1040        let mut startup_epoch = [0u8; 8];
1041        rand::rng().fill_bytes(&mut startup_epoch);
1042
1043        let tun_state = if config.tun.enabled {
1044            TunState::Configured
1045        } else {
1046            TunState::Disabled
1047        };
1048
1049        // Initialize tree state with signed self-declaration
1050        let mut tree_state = TreeState::new(node_addr);
1051        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
1052        tree_state.set_hold_down(config.node.tree.hold_down_secs);
1053        tree_state.set_flap_dampening(
1054            config.node.tree.flap_threshold,
1055            config.node.tree.flap_window_secs,
1056            config.node.tree.flap_dampening_secs,
1057        );
1058        tree_state
1059            .sign_declaration(&identity)
1060            .expect("signing own declaration should never fail");
1061
1062        let mut bloom_state = BloomState::new(node_addr);
1063        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
1064
1065        let coord_cache = CoordCache::new(
1066            config.node.cache.coord_size,
1067            config.node.cache.coord_ttl_secs * 1000,
1068        );
1069        let rl = &config.node.rate_limit;
1070        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
1071            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
1072            config.node.limits.max_pending_inbound,
1073        );
1074
1075        let max_connections = config.node.limits.max_connections;
1076        let max_peers = config.node.limits.max_peers;
1077        let max_links = config.node.limits.max_links;
1078        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1079
1080        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1081        let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1082
1083        Ok(Self {
1084            identity,
1085            startup_epoch,
1086            started_at: std::time::Instant::now(),
1087            config,
1088            state: NodeState::Created,
1089            is_leaf_only: false,
1090            tree_state,
1091            bloom_state,
1092            coord_cache,
1093            learned_routes: LearnedRouteTable::default(),
1094            session_direct_degraded_until_ms: HashMap::new(),
1095            recent_requests: HashMap::new(),
1096            transports: HashMap::new(),
1097            transport_drops: HashMap::new(),
1098            links: HashMap::new(),
1099            addr_to_link: HashMap::new(),
1100            packet_tx: None,
1101            packet_rx: None,
1102            connections: HashMap::new(),
1103            peers: HashMap::new(),
1104            sessions: HashMap::new(),
1105            identity_cache: HashMap::new(),
1106            pending_tun_packets: HashMap::new(),
1107            pending_endpoint_data: HashMap::new(),
1108            pending_lookups: HashMap::new(),
1109            max_connections,
1110            max_peers,
1111            max_links,
1112            next_link_id: 1,
1113            next_transport_id: 1,
1114            stats: stats::NodeStats::new(),
1115            stats_history: stats_history::StatsHistory::new(),
1116            tun_state,
1117            tun_name: None,
1118            tun_tx: None,
1119            tun_outbound_rx: None,
1120            external_packet_tx: None,
1121            endpoint_command_rx: None,
1122            endpoint_event_tx: None,
1123            encrypt_workers: None,
1124            decrypt_workers: None,
1125            decrypt_registered_sessions: std::collections::HashSet::new(),
1126            decrypt_fallback_tx,
1127            decrypt_fallback_rx,
1128            tun_reader_handle: None,
1129            tun_writer_handle: None,
1130            #[cfg(target_os = "macos")]
1131            tun_shutdown_fd: None,
1132            dns_identity_rx: None,
1133            dns_task: None,
1134            index_allocator: IndexAllocator::new(),
1135            peers_by_index: HashMap::new(),
1136            pending_outbound: HashMap::new(),
1137            msg1_rate_limiter,
1138            icmp_rate_limiter: IcmpRateLimiter::new(),
1139            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1140            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1141                std::time::Duration::from_millis(coords_response_interval_ms),
1142            ),
1143            discovery_backoff: DiscoveryBackoff::new(),
1144            discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1145            pending_connects: Vec::new(),
1146            retry_pending: HashMap::new(),
1147            nostr_discovery: None,
1148            nostr_discovery_started_at_ms: None,
1149            lan_discovery: None,
1150            local_instance_registry: None,
1151            local_instance_started_at_ms: None,
1152            last_local_instance_publish_ms: None,
1153            last_local_instance_scan_ms: None,
1154            startup_open_discovery_sweep_done: false,
1155            bootstrap_transports: HashSet::new(),
1156            bootstrap_transport_npubs: HashMap::new(),
1157            discovery_fallback_transit_blocked_peers: HashSet::new(),
1158            last_parent_reeval: None,
1159            last_congestion_log: None,
1160            estimated_mesh_size: None,
1161            last_mesh_size_log: None,
1162            last_self_warn: None,
1163            local_send_failure_at_by_peer: HashMap::new(),
1164            last_rx_loop_maintenance_timeout_at: None,
1165            peer_aliases: HashMap::new(),
1166            configured_peer_send_weights,
1167            peer_acl,
1168            host_map,
1169            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1170        })
1171    }
1172
1173    /// Create a leaf-only node (simplified state).
1174    pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1175        let mut node = Self::new(config)?;
1176        node.is_leaf_only = true;
1177        node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1178        Ok(node)
1179    }
1180
1181    fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1182        let base_host_map = HostMap::from_peer_configs(config.peers());
1183        if !config.node.system_files_enabled {
1184            return (
1185                Arc::new(base_host_map.clone()),
1186                acl::PeerAclReloader::memory_only(base_host_map),
1187            );
1188        }
1189
1190        let mut host_map = base_host_map.clone();
1191        let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1192        let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1193            crate::upper::hosts::DEFAULT_HOSTS_PATH,
1194        ));
1195        host_map.merge(hosts_file);
1196        let peer_acl = acl::PeerAclReloader::with_alias_sources(
1197            std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1198            std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1199            base_host_map,
1200            hosts_path,
1201        );
1202        (Arc::new(host_map), peer_acl)
1203    }
1204
1205    fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1206        config
1207            .peers()
1208            .iter()
1209            .filter_map(|peer| {
1210                PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1211                    (
1212                        *identity.node_addr(),
1213                        encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1214                    )
1215                })
1216            })
1217            .collect()
1218    }
1219
1220    fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1221        self.configured_peer_send_weights
1222            .get(peer_addr)
1223            .copied()
1224            .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1225    }
1226
1227    /// Create transport instances from configuration.
1228    ///
1229    /// Returns a vector of TransportHandles for all configured transports.
1230    async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1231        let mut transports = Vec::new();
1232
1233        // Collect UDP configs with optional names to avoid borrow conflicts
1234        let udp_instances: Vec<_> = self
1235            .config
1236            .transports
1237            .udp
1238            .iter()
1239            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1240            .collect();
1241
1242        // Create UDP transport instances
1243        for (name, udp_config) in udp_instances {
1244            let transport_id = self.allocate_transport_id();
1245            let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1246            transports.push(TransportHandle::Udp(udp));
1247        }
1248
1249        #[cfg(feature = "sim-transport")]
1250        {
1251            let sim_instances: Vec<_> = self
1252                .config
1253                .transports
1254                .sim
1255                .iter()
1256                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1257                .collect();
1258
1259            for (name, sim_config) in sim_instances {
1260                let transport_id = self.allocate_transport_id();
1261                let sim = crate::transport::sim::SimTransport::new(
1262                    transport_id,
1263                    name,
1264                    sim_config,
1265                    packet_tx.clone(),
1266                );
1267                transports.push(TransportHandle::Sim(sim));
1268            }
1269        }
1270
1271        // Create Ethernet transport instances where raw-socket support exists.
1272        #[cfg(any(target_os = "linux", target_os = "macos"))]
1273        {
1274            let eth_instances: Vec<_> = self
1275                .config
1276                .transports
1277                .ethernet
1278                .iter()
1279                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1280                .collect();
1281            let xonly = self.identity.pubkey();
1282            for (name, eth_config) in eth_instances {
1283                let mut eth_config = eth_config;
1284                if eth_config.discovery_scope.is_none() {
1285                    eth_config.discovery_scope = self.lan_discovery_scope();
1286                }
1287                let transport_id = self.allocate_transport_id();
1288                let mut eth =
1289                    EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1290                eth.set_local_pubkey(xonly);
1291                transports.push(TransportHandle::Ethernet(eth));
1292            }
1293        }
1294
1295        // Create TCP transport instances
1296        let tcp_instances: Vec<_> = self
1297            .config
1298            .transports
1299            .tcp
1300            .iter()
1301            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1302            .collect();
1303
1304        for (name, tcp_config) in tcp_instances {
1305            let transport_id = self.allocate_transport_id();
1306            let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1307            transports.push(TransportHandle::Tcp(tcp));
1308        }
1309
1310        // Create Tor transport instances
1311        let tor_instances: Vec<_> = self
1312            .config
1313            .transports
1314            .tor
1315            .iter()
1316            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1317            .collect();
1318
1319        for (name, tor_config) in tor_instances {
1320            let transport_id = self.allocate_transport_id();
1321            let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1322            transports.push(TransportHandle::Tor(tor));
1323        }
1324
1325        let webrtc_instances: Vec<_> = self
1326            .config
1327            .transports
1328            .webrtc
1329            .iter()
1330            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1331            .collect();
1332
1333        #[cfg(feature = "webrtc-transport")]
1334        {
1335            for (name, webrtc_config) in webrtc_instances {
1336                let transport_id = self.allocate_transport_id();
1337                match WebRtcTransport::new(
1338                    transport_id,
1339                    name,
1340                    webrtc_config,
1341                    packet_tx.clone(),
1342                    &self.identity,
1343                    &self.config.node.discovery.nostr,
1344                ) {
1345                    Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1346                    Err(err) => {
1347                        warn!(
1348                            transport_id = %transport_id,
1349                            error = %err,
1350                            "failed to initialize WebRTC transport"
1351                        );
1352                    }
1353                }
1354            }
1355        }
1356        #[cfg(not(feature = "webrtc-transport"))]
1357        if !webrtc_instances.is_empty() {
1358            warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1359        }
1360
1361        // Create BLE transport instances
1362        #[cfg(bluer_available)]
1363        {
1364            let ble_instances: Vec<_> = self
1365                .config
1366                .transports
1367                .ble
1368                .iter()
1369                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1370                .collect();
1371
1372            #[cfg(all(bluer_available, not(test)))]
1373            for (name, ble_config) in ble_instances {
1374                let transport_id = self.allocate_transport_id();
1375                let adapter = ble_config.adapter().to_string();
1376                let mtu = ble_config.mtu();
1377                match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1378                    Ok(io) => {
1379                        let mut ble = crate::transport::ble::BleTransport::new(
1380                            transport_id,
1381                            name,
1382                            ble_config,
1383                            io,
1384                            packet_tx.clone(),
1385                        );
1386                        ble.set_local_pubkey(self.identity.pubkey().serialize());
1387                        transports.push(TransportHandle::Ble(ble));
1388                    }
1389                    Err(e) => {
1390                        tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1391                    }
1392                }
1393            }
1394
1395            #[cfg(any(not(bluer_available), test))]
1396            if !ble_instances.is_empty() {
1397                #[cfg(not(test))]
1398                tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1399            }
1400        }
1401
1402        transports
1403    }
1404
1405    /// Find an operational transport that matches the given transport type name.
1406    ///
1407    /// Adopted UDP bootstrap transports are point-to-point sockets handed off
1408    /// from Nostr/STUN traversal. They must not be reused for ordinary
1409    /// `udp host:port` dials discovered through static config, mDNS, or overlay
1410    /// adverts: on macOS a `send_to` through the wrong adopted socket can fail
1411    /// with `EINVAL`, and even on platforms that allow it the packet would use
1412    /// the wrong 5-tuple/NAT mapping. Prefer configured transports and make the
1413    /// choice deterministic by lowest transport id instead of HashMap order.
1414    fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1415        self.transports
1416            .iter()
1417            .filter(|(id, handle)| {
1418                handle.transport_type().name == transport_type
1419                    && handle.is_operational()
1420                    && !self.bootstrap_transports.contains(id)
1421            })
1422            .min_by_key(|(id, _)| id.as_u32())
1423            .map(|(id, _)| *id)
1424    }
1425
1426    /// Resolve an Ethernet peer address ("interface/mac") to a transport ID
1427    /// and binary TransportAddr.
1428    ///
1429    /// Finds the Ethernet transport instance bound to the named interface
1430    /// and parses the MAC portion into a 6-byte TransportAddr.
1431    #[allow(unused_variables)]
1432    fn resolve_ethernet_addr(
1433        &self,
1434        addr_str: &str,
1435    ) -> Result<(TransportId, TransportAddr), NodeError> {
1436        #[cfg(any(target_os = "linux", target_os = "macos"))]
1437        {
1438            let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1439                NodeError::NoTransportForType(format!(
1440                    "invalid Ethernet address format '{}': expected 'interface/mac'",
1441                    addr_str
1442                ))
1443            })?;
1444
1445            // Find the Ethernet transport bound to this interface
1446            let transport_id = self
1447                .transports
1448                .iter()
1449                .find(|(_, handle)| {
1450                    handle.transport_type().name == "ethernet"
1451                        && handle.is_operational()
1452                        && handle.interface_name() == Some(iface)
1453                })
1454                .map(|(id, _)| *id)
1455                .ok_or_else(|| {
1456                    NodeError::NoTransportForType(format!(
1457                        "no operational Ethernet transport for interface '{}'",
1458                        iface
1459                    ))
1460                })?;
1461
1462            let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1463                NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1464            })?;
1465
1466            Ok((transport_id, TransportAddr::from_bytes(&mac)))
1467        }
1468        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1469        {
1470            Err(NodeError::NoTransportForType(
1471                "Ethernet transport is not supported on this platform".to_string(),
1472            ))
1473        }
1474    }
1475
1476    /// Resolve a BLE address string (`"adapter/AA:BB:CC:DD:EE:FF"`) to a
1477    /// (TransportId, TransportAddr) pair by finding the BLE transport
1478    /// instance matching the adapter name.
1479    #[cfg(bluer_available)]
1480    fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1481        let ta = TransportAddr::from_string(addr_str);
1482        let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1483            NodeError::NoTransportForType(format!(
1484                "invalid BLE address format '{}': expected 'adapter/mac'",
1485                addr_str
1486            ))
1487        })?;
1488
1489        // Find the BLE transport for this adapter
1490        let transport_id = self
1491            .transports
1492            .iter()
1493            .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1494            .map(|(id, _)| *id)
1495            .ok_or_else(|| {
1496                NodeError::NoTransportForType(format!(
1497                    "no operational BLE transport for adapter '{}'",
1498                    adapter
1499                ))
1500            })?;
1501
1502        // Validate the address format
1503        crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1504            NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1505        })?;
1506
1507        Ok((transport_id, TransportAddr::from_string(addr_str)))
1508    }
1509
1510    // === Identity Accessors ===
1511
1512    /// Get this node's identity.
1513    pub fn identity(&self) -> &Identity {
1514        &self.identity
1515    }
1516
1517    /// Get this node's NodeAddr.
1518    pub fn node_addr(&self) -> &NodeAddr {
1519        self.identity.node_addr()
1520    }
1521
1522    /// Get this node's npub.
1523    pub fn npub(&self) -> String {
1524        self.identity.npub()
1525    }
1526
1527    /// Return a human-readable display name for a NodeAddr.
1528    ///
1529    /// Lookup order:
1530    /// 1. Host map hostname (from peer aliases + /etc/fips/hosts)
1531    /// 2. Configured peer alias or short npub (from startup map)
1532    /// 3. Active peer's short npub (e.g., inbound peer not in config)
1533    /// 4. Session endpoint's short npub (end-to-end, may not be direct peer)
1534    /// 5. Truncated NodeAddr hex (unknown address)
1535    pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1536        if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1537            return hostname.to_string();
1538        }
1539        if let Some(name) = self.peer_aliases.get(addr) {
1540            return name.clone();
1541        }
1542        if let Some(peer) = self.peers.get(addr) {
1543            return peer.identity().short_npub();
1544        }
1545        if let Some(entry) = self.sessions.get(addr) {
1546            let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1547            return PeerIdentity::from_pubkey(xonly).short_npub();
1548        }
1549        addr.short_hex()
1550    }
1551
1552    /// Tear down a `peers_by_index` entry **and** keep the shard-owned
1553    /// decrypt-worker state coherent: removes the same `cache_key`
1554    /// from the registered-sessions tracking set and tells the
1555    /// assigned shard worker to drop its `OwnedSessionState` entry.
1556    ///
1557    /// Use this instead of a bare `self.peers_by_index.remove(&key)`
1558    /// at every session-lifecycle teardown site (rekey cross-connection
1559    /// swap, peer disconnect, dispatch session-rotation) so the worker
1560    /// doesn't keep stale ciphers / replay windows around. The
1561    /// follow-up `RegisterSession` for the NEW key (if any) will then
1562    /// install the fresh state on the same shard.
1563    pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1564        // Find the peer that owns this index BEFORE removing it from
1565        // the index map, so we can decide whether the deregistration
1566        // also tears down the peer's connected UDP socket.
1567        let owning_peer = self.peers_by_index.get(&cache_key).copied();
1568        self.peers_by_index.remove(&cache_key);
1569        if self.decrypt_registered_sessions.remove(&cache_key)
1570            && let Some(workers) = self.decrypt_workers.as_ref()
1571        {
1572            workers.unregister_session(cache_key);
1573        }
1574        // Tear down the per-peer connected UDP socket *only* if no
1575        // other peers_by_index entry still resolves to this peer.
1576        // Rekey drain calls into this helper with the OLD session
1577        // index while the NEW index is already installed and points
1578        // at the same peer — there the connect()-ed 5-tuple is
1579        // still valid for the new session and we must not close it.
1580        // Peer-teardown sites (CrossConnection swap, stale-index
1581        // fall-through in encrypted.rs, disconnect handler) call
1582        // here when this is the peer's last index, so the connected
1583        // socket goes away with the peer.
1584        if let Some(peer_addr) = owning_peer {
1585            let peer_has_other_index = self
1586                .peers_by_index
1587                .values()
1588                .any(|other| *other == peer_addr);
1589            if !peer_has_other_index {
1590                self.clear_connected_udp_for_peer(&peer_addr);
1591            }
1592        }
1593    }
1594
1595    /// Ensure the current FMP receive index resolves to this peer.
1596    ///
1597    /// Rekey msg1/msg2 handlers pre-register the pending index before
1598    /// cutover, but losing that registration in a debug build used to
1599    /// panic in the cutover path. Repairing the map here is safe: the
1600    /// peer has already promoted the pending session, and the decrypt
1601    /// worker registration immediately after cutover depends on the
1602    /// same `(transport_id, our_index)` key.
1603    pub(in crate::node) fn ensure_current_session_index_registered(
1604        &mut self,
1605        node_addr: &NodeAddr,
1606        context: &'static str,
1607    ) -> bool {
1608        let Some(peer) = self.peers.get(node_addr) else {
1609            return false;
1610        };
1611        let Some(transport_id) = peer.transport_id() else {
1612            warn!(
1613                peer = %self.peer_display_name(node_addr),
1614                context,
1615                "Cannot register current session index without transport id"
1616            );
1617            return false;
1618        };
1619        let Some(our_index) = peer.our_index() else {
1620            warn!(
1621                peer = %self.peer_display_name(node_addr),
1622                context,
1623                "Cannot register current session index without local index"
1624            );
1625            return false;
1626        };
1627
1628        let cache_key = (transport_id, our_index.as_u32());
1629        match self.peers_by_index.get(&cache_key).copied() {
1630            Some(existing) if existing == *node_addr => true,
1631            Some(existing) => {
1632                warn!(
1633                    peer = %self.peer_display_name(node_addr),
1634                    previous_owner = %self.peer_display_name(&existing),
1635                    transport_id = %transport_id,
1636                    our_index = %our_index,
1637                    context,
1638                    "Repairing current session index with stale owner"
1639                );
1640                self.peers_by_index.insert(cache_key, *node_addr);
1641                true
1642            }
1643            None => {
1644                warn!(
1645                    peer = %self.peer_display_name(node_addr),
1646                    transport_id = %transport_id,
1647                    our_index = %our_index,
1648                    context,
1649                    "Repairing missing current session index"
1650                );
1651                self.peers_by_index.insert(cache_key, *node_addr);
1652                true
1653            }
1654        }
1655    }
1656
1657    // === Configuration ===
1658
1659    /// Get the configuration.
1660    pub fn config(&self) -> &Config {
1661        &self.config
1662    }
1663
1664    /// Calculate the effective IPv6 MTU that can be sent over FIPS.
1665    ///
1666    /// Delegates to `upper::icmp::effective_ipv6_mtu()` with this node's
1667    /// transport MTU. Returns the maximum IPv6 packet size (including
1668    /// IPv6 header) that can be transmitted through the FIPS mesh.
1669    pub fn effective_ipv6_mtu(&self) -> u16 {
1670        crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1671    }
1672
1673    /// Get the transport MTU governing the global TUN-boundary MSS clamp.
1674    ///
1675    /// Returns the **minimum** MTU across all operational transports, or
1676    /// 1280 (IPv6 minimum) as fallback. Used for initial TUN configuration
1677    /// where a specific egress transport isn't yet known: the resulting
1678    /// `effective_ipv6_mtu` (transport_mtu - 77) and `max_mss`
1679    /// (effective_mtu - 60) form a conservative ceiling that fits ANY
1680    /// configured-transport's egress, eliminating PMTU-D black holes that
1681    /// would otherwise occur when a flow's actual egress is smaller than
1682    /// the clamp ceiling assumed at TUN init.
1683    ///
1684    /// Returning the smallest (rather than the first-iterated, which used
1685    /// to vary across HashMap iteration order + async-startup race) makes
1686    /// the clamp deterministic across daemon restarts.
1687    ///
1688    /// See `ISSUE-2026-0011` for the empirical investigation.
1689    pub fn transport_mtu(&self) -> u16 {
1690        let min_operational = self
1691            .transports
1692            .values()
1693            .filter(|h| h.is_operational())
1694            .map(|h| h.mtu())
1695            .min();
1696        if let Some(mtu) = min_operational {
1697            return mtu;
1698        }
1699        // Fallback to config: try UDP first, then Ethernet
1700        if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1701            return cfg.mtu();
1702        }
1703        1280
1704    }
1705
1706    // === State ===
1707
1708    /// Get the node state.
1709    pub fn state(&self) -> NodeState {
1710        self.state
1711    }
1712
1713    /// Get the node uptime.
1714    pub fn uptime(&self) -> std::time::Duration {
1715        self.started_at.elapsed()
1716    }
1717
1718    /// Check if node is operational.
1719    pub fn is_running(&self) -> bool {
1720        self.state.is_operational()
1721    }
1722
1723    /// Check if this is a leaf-only node.
1724    pub fn is_leaf_only(&self) -> bool {
1725        self.is_leaf_only
1726    }
1727
1728    // === Tree State ===
1729
1730    /// Get the tree state.
1731    pub fn tree_state(&self) -> &TreeState {
1732        &self.tree_state
1733    }
1734
1735    /// Get mutable tree state.
1736    pub fn tree_state_mut(&mut self) -> &mut TreeState {
1737        &mut self.tree_state
1738    }
1739
1740    // === Bloom State ===
1741
1742    /// Get the Bloom filter state.
1743    pub fn bloom_state(&self) -> &BloomState {
1744        &self.bloom_state
1745    }
1746
1747    /// Get mutable Bloom filter state.
1748    pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1749        &mut self.bloom_state
1750    }
1751
1752    // === Mesh Size Estimate ===
1753
1754    /// Get the cached estimated mesh size.
1755    pub fn estimated_mesh_size(&self) -> Option<u64> {
1756        self.estimated_mesh_size
1757    }
1758
1759    /// Compute and cache the estimated mesh size from bloom filters.
1760    ///
1761    /// Uses the spanning tree partition: parent's filter covers nodes reachable
1762    /// upward, children's filters cover disjoint subtrees downward. The sum
1763    /// of estimated entry counts plus one (self) approximates total network size.
1764    pub(crate) fn compute_mesh_size(&mut self) {
1765        let my_addr = *self.tree_state.my_node_addr();
1766        let parent_id = *self.tree_state.my_declaration().parent_id();
1767        let is_root = self.tree_state.is_root();
1768
1769        let max_fpr = self.config.node.bloom.max_inbound_fpr;
1770        let mut total: f64 = 1.0; // count self
1771        let mut child_count: u32 = 0;
1772        let mut has_data = false;
1773
1774        // Parent's filter: nodes reachable upward through the tree.
1775        // If any contributing filter is above the FPR cap, we refuse to
1776        // estimate rather than substitute a partial/biased aggregate —
1777        // Node.estimated_mesh_size is already Option<u64> and consumers
1778        // (control socket, fipstop, periodic debug log) handle None.
1779        if !is_root
1780            && let Some(parent) = self.peers.get(&parent_id)
1781            && let Some(filter) = parent.inbound_filter()
1782        {
1783            match filter.estimated_count(max_fpr) {
1784                Some(n) => {
1785                    total += n;
1786                    has_data = true;
1787                }
1788                None => {
1789                    self.estimated_mesh_size = None;
1790                    return;
1791                }
1792            }
1793        }
1794
1795        // Children's filters: each child's subtree is disjoint
1796        for (peer_addr, peer) in &self.peers {
1797            if peer_addr == &parent_id {
1798                continue;
1799            }
1800            if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1801                && *decl.parent_id() == my_addr
1802            {
1803                child_count += 1;
1804                if let Some(filter) = peer.inbound_filter() {
1805                    match filter.estimated_count(max_fpr) {
1806                        Some(n) => {
1807                            total += n;
1808                            has_data = true;
1809                        }
1810                        None => {
1811                            self.estimated_mesh_size = None;
1812                            return;
1813                        }
1814                    }
1815                }
1816            }
1817        }
1818
1819        if !has_data {
1820            self.estimated_mesh_size = None;
1821            return;
1822        }
1823
1824        let size = total.round() as u64;
1825        self.estimated_mesh_size = Some(size);
1826
1827        // Periodic logging (reuse MMP default interval: 30s)
1828        let now = std::time::Instant::now();
1829        let should_log = match self.last_mesh_size_log {
1830            None => true,
1831            Some(last) => {
1832                now.duration_since(last)
1833                    >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1834            }
1835        };
1836        if should_log {
1837            tracing::debug!(
1838                estimated_mesh_size = size,
1839                peers = self.peers.len(),
1840                children = child_count,
1841                "Mesh size estimate"
1842            );
1843            self.last_mesh_size_log = Some(now);
1844        }
1845    }
1846
1847    // === Coord Cache ===
1848
1849    /// Get the coordinate cache.
1850    pub fn coord_cache(&self) -> &CoordCache {
1851        &self.coord_cache
1852    }
1853
1854    /// Get mutable coordinate cache.
1855    pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1856        &mut self.coord_cache
1857    }
1858
1859    // === Node Statistics ===
1860
1861    /// Get the node statistics.
1862    pub fn stats(&self) -> &stats::NodeStats {
1863        &self.stats
1864    }
1865
1866    /// Get mutable node statistics.
1867    pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1868        &mut self.stats
1869    }
1870
1871    /// Get the stats history collector.
1872    pub fn stats_history(&self) -> &stats_history::StatsHistory {
1873        &self.stats_history
1874    }
1875
1876    /// Sample the current node state into the stats history ring.
1877    /// Called once per tick from the RX loop.
1878    pub(crate) fn record_stats_history(&mut self) {
1879        let fwd = &self.stats.forwarding;
1880        let peers_with_mmp: Vec<f64> = self
1881            .peers
1882            .values()
1883            .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1884            .collect();
1885        let loss_rate = if peers_with_mmp.is_empty() {
1886            0.0
1887        } else {
1888            peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1889        };
1890
1891        let snap = stats_history::Snapshot {
1892            mesh_size: self.estimated_mesh_size,
1893            tree_depth: self.tree_state.my_coords().depth() as u32,
1894            peer_count: self.peers.len() as u64,
1895            parent_switches_total: self.stats.tree.parent_switches,
1896            bytes_in_total: fwd.received_bytes,
1897            bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1898            packets_in_total: fwd.received_packets,
1899            packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1900            loss_rate,
1901            active_sessions: self.sessions.len() as u64,
1902        };
1903
1904        let now = std::time::Instant::now();
1905        let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1906            .peers
1907            .values()
1908            .map(|p| {
1909                let stats = p.link_stats();
1910                let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1911                    Some(m) => (
1912                        m.metrics.srtt_ms(),
1913                        Some(m.metrics.loss_rate()),
1914                        m.receiver.ecn_ce_count() as u64,
1915                    ),
1916                    None => (None, None, 0),
1917                };
1918                stats_history::PeerSnapshot {
1919                    node_addr: *p.node_addr(),
1920                    last_seen: now,
1921                    srtt_ms,
1922                    loss_rate,
1923                    bytes_in_total: stats.bytes_recv,
1924                    bytes_out_total: stats.bytes_sent,
1925                    packets_in_total: stats.packets_recv,
1926                    packets_out_total: stats.packets_sent,
1927                    ecn_ce_total: ecn_ce,
1928                }
1929            })
1930            .collect();
1931
1932        self.stats_history.tick(now, &snap, &peer_snaps);
1933    }
1934
1935    // === TUN Interface ===
1936
1937    /// Get the TUN state.
1938    pub fn tun_state(&self) -> TunState {
1939        self.tun_state
1940    }
1941
1942    /// Get the TUN interface name, if active.
1943    pub fn tun_name(&self) -> Option<&str> {
1944        self.tun_name.as_deref()
1945    }
1946
1947    // === Resource Limits ===
1948
1949    /// Set the maximum number of connections (handshake phase).
1950    pub fn set_max_connections(&mut self, max: usize) {
1951        self.max_connections = max;
1952    }
1953
1954    /// Set the maximum number of peers (authenticated).
1955    pub fn set_max_peers(&mut self, max: usize) {
1956        self.max_peers = max;
1957    }
1958
1959    /// Returns false when starting more outbound work would exceed a resource
1960    /// cap. A cap of `0` means uncapped.
1961    pub(crate) fn outbound_admission_check(&self) -> bool {
1962        let connection_used = self
1963            .connections
1964            .len()
1965            .saturating_add(self.pending_connects.len());
1966        let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
1967        let connection_allowed =
1968            self.max_connections == 0 || connection_used < self.max_connections;
1969        let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
1970        peer_allowed && connection_allowed && link_allowed
1971    }
1972
1973    /// Admission for public/open-discovery outbound work. This includes the
1974    /// general connection/link caps and, when open Nostr discovery is enabled,
1975    /// the configured non-peer budget.
1976    pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
1977        if !self.outbound_admission_check() {
1978            return false;
1979        }
1980
1981        let nostr = &self.config.node.discovery.nostr;
1982        if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
1983            return true;
1984        }
1985
1986        let configured_npubs = self
1987            .config
1988            .peers()
1989            .iter()
1990            .map(|peer| peer.npub.clone())
1991            .collect::<HashSet<_>>();
1992        self.open_discovery_enqueue_budget(&configured_npubs) > 0
1993    }
1994
1995    /// Like `outbound_admission_check`, but for racing a better path to a
1996    /// peer that is already authenticated. This may temporarily add a
1997    /// connection/link, but it does not consume a new peer slot.
1998    pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
1999        let connection_used = self
2000            .connections
2001            .len()
2002            .saturating_add(self.pending_connects.len());
2003        let connection_allowed =
2004            self.max_connections == 0 || connection_used < self.max_connections;
2005        let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
2006        connection_allowed && link_allowed
2007    }
2008
2009    /// Set the maximum number of links.
2010    pub fn set_max_links(&mut self, max: usize) {
2011        self.max_links = max;
2012    }
2013
2014    // === Counts ===
2015
2016    /// Number of pending connections (handshake in progress).
2017    pub fn connection_count(&self) -> usize {
2018        self.connections.len()
2019    }
2020
2021    /// Number of authenticated peers.
2022    pub fn peer_count(&self) -> usize {
2023        self.peers.len()
2024    }
2025
2026    /// Number of active links.
2027    pub fn link_count(&self) -> usize {
2028        self.links.len()
2029    }
2030
2031    /// Number of active transports.
2032    pub fn transport_count(&self) -> usize {
2033        self.transports.len()
2034    }
2035
2036    // === Transport Management ===
2037
2038    /// Allocate a new transport ID.
2039    pub fn allocate_transport_id(&mut self) -> TransportId {
2040        let id = TransportId::new(self.next_transport_id);
2041        self.next_transport_id += 1;
2042        id
2043    }
2044
2045    /// Get a transport by ID.
2046    pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
2047        self.transports.get(id)
2048    }
2049
2050    /// Get mutable transport by ID.
2051    pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
2052        self.transports.get_mut(id)
2053    }
2054
2055    /// Iterate over transport IDs.
2056    pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
2057        self.transports.keys()
2058    }
2059
2060    /// Get the packet receiver for the event loop.
2061    pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
2062        self.packet_rx.as_mut()
2063    }
2064
2065    // === Link Management ===
2066
2067    /// Allocate a new link ID.
2068    pub fn allocate_link_id(&mut self) -> LinkId {
2069        let id = LinkId::new(self.next_link_id);
2070        self.next_link_id += 1;
2071        id
2072    }
2073
2074    /// Add a link.
2075    pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
2076        if self.max_links > 0 && self.links.len() >= self.max_links {
2077            return Err(NodeError::MaxLinksExceeded {
2078                max: self.max_links,
2079            });
2080        }
2081        let link_id = link.link_id();
2082        let transport_id = link.transport_id();
2083        let remote_addr = link.remote_addr().clone();
2084
2085        self.links.insert(link_id, link);
2086        self.addr_to_link
2087            .insert((transport_id, remote_addr), link_id);
2088        Ok(())
2089    }
2090
2091    /// Get a link by ID.
2092    pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2093        self.links.get(link_id)
2094    }
2095
2096    /// Get a mutable link by ID.
2097    pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2098        self.links.get_mut(link_id)
2099    }
2100
2101    /// Find link ID by transport address.
2102    pub fn find_link_by_addr(
2103        &self,
2104        transport_id: TransportId,
2105        addr: &TransportAddr,
2106    ) -> Option<LinkId> {
2107        self.addr_to_link
2108            .get(&(transport_id, addr.clone()))
2109            .copied()
2110    }
2111
2112    /// Remove a link.
2113    ///
2114    /// Only removes the addr_to_link reverse lookup if it still points to this
2115    /// link. In cross-connection scenarios, a newer link may have replaced the
2116    /// entry for the same address.
2117    pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2118        if let Some(link) = self.links.remove(link_id) {
2119            // Clean up reverse lookup only if it still maps to this link
2120            let key = (link.transport_id(), link.remote_addr().clone());
2121            if self.addr_to_link.get(&key) == Some(link_id) {
2122                self.addr_to_link.remove(&key);
2123            }
2124            Some(link)
2125        } else {
2126            None
2127        }
2128    }
2129
2130    pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2131        if !self.bootstrap_transports.contains(&transport_id) {
2132            return;
2133        }
2134
2135        let transport_in_use = self
2136            .links
2137            .values()
2138            .any(|link| link.transport_id() == transport_id)
2139            || self
2140                .connections
2141                .values()
2142                .any(|conn| conn.transport_id() == Some(transport_id))
2143            || self
2144                .peers
2145                .values()
2146                .any(|peer| peer.transport_id() == Some(transport_id))
2147            || self
2148                .pending_connects
2149                .iter()
2150                .any(|pending| pending.transport_id == transport_id);
2151
2152        if transport_in_use {
2153            return;
2154        }
2155
2156        tracing::debug!(
2157            transport_id = %transport_id,
2158            "bootstrap transport has no remaining references; dropping"
2159        );
2160
2161        self.bootstrap_transports.remove(&transport_id);
2162        self.bootstrap_transport_npubs.remove(&transport_id);
2163        self.transport_drops.remove(&transport_id);
2164        self.transports.remove(&transport_id);
2165    }
2166
2167    /// Iterate over all links.
2168    pub fn links(&self) -> impl Iterator<Item = &Link> {
2169        self.links.values()
2170    }
2171
2172    // === Connection Management (Handshake Phase) ===
2173
2174    /// Add a pending connection.
2175    pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2176        let link_id = connection.link_id();
2177
2178        if self.connections.contains_key(&link_id) {
2179            return Err(NodeError::ConnectionAlreadyExists(link_id));
2180        }
2181
2182        if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2183            return Err(NodeError::MaxConnectionsExceeded {
2184                max: self.max_connections,
2185            });
2186        }
2187
2188        self.connections.insert(link_id, connection);
2189        Ok(())
2190    }
2191
2192    /// Get a connection by LinkId.
2193    pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2194        self.connections.get(link_id)
2195    }
2196
2197    /// Get a mutable connection by LinkId.
2198    pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2199        self.connections.get_mut(link_id)
2200    }
2201
2202    /// Remove a connection.
2203    pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2204        self.connections.remove(link_id)
2205    }
2206
2207    /// Iterate over all connections.
2208    pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2209        self.connections.values()
2210    }
2211
2212    // === Peer Management (Active Phase) ===
2213
2214    /// Get a peer by NodeAddr.
2215    pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2216        self.peers.get(node_addr)
2217    }
2218
2219    /// Get a mutable peer by NodeAddr.
2220    pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2221        self.peers.get_mut(node_addr)
2222    }
2223
2224    /// Remove a peer.
2225    pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2226        self.peers.remove(node_addr)
2227    }
2228
2229    /// Iterate over all peers.
2230    pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2231        self.peers.values()
2232    }
2233
2234    /// Reference to the Nostr discovery handle if discovery is enabled.
2235    /// Used by control queries (`show_peers` per-peer Nostr-traversal
2236    /// state) to read failure-state without taking shared ownership.
2237    pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2238        self.nostr_discovery.as_deref()
2239    }
2240
2241    /// Iterate over all peer node IDs.
2242    pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2243        self.peers.keys()
2244    }
2245
2246    /// Iterate over peers that can send traffic.
2247    pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2248        self.peers.values().filter(|p| p.can_send())
2249    }
2250
2251    /// Number of peers that can send traffic.
2252    pub fn sendable_peer_count(&self) -> usize {
2253        self.peers.values().filter(|p| p.can_send()).count()
2254    }
2255
2256    pub(crate) fn set_discovery_fallback_transit_allowed(
2257        &mut self,
2258        peer_addr: NodeAddr,
2259        allowed: bool,
2260    ) {
2261        if allowed {
2262            self.discovery_fallback_transit_blocked_peers
2263                .remove(&peer_addr);
2264        } else {
2265            self.discovery_fallback_transit_blocked_peers
2266                .insert(peer_addr);
2267        }
2268    }
2269
2270    pub(crate) fn configured_discovery_fallback_transit(
2271        &self,
2272        peer_addr: &NodeAddr,
2273    ) -> Option<bool> {
2274        self.configured_peer(peer_addr)
2275            .map(|peer| peer.discovery_fallback_transit)
2276    }
2277
2278    pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2279        self.config.peers().iter().find(|peer| {
2280            PeerIdentity::from_npub(&peer.npub)
2281                .ok()
2282                .is_some_and(|identity| identity.node_addr() == peer_addr)
2283        })
2284    }
2285
2286    pub(in crate::node) fn active_peer_uses_configured_static_udp_path(
2287        &self,
2288        peer_addr: &NodeAddr,
2289    ) -> bool {
2290        let Some(peer_config) = self.configured_peer(peer_addr) else {
2291            return false;
2292        };
2293
2294        peer_config.addresses.iter().any(|candidate| {
2295            candidate.seen_at_ms.is_none()
2296                && candidate.transport.eq_ignore_ascii_case("udp")
2297                && self.active_peer_matches_candidate(peer_addr, candidate)
2298        })
2299    }
2300
2301    pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2302        if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2303            return retry_state.peer_config.discovery_fallback_transit;
2304        }
2305
2306        if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2307            return allowed;
2308        }
2309
2310        self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2311    }
2312
2313    // === End-to-End Sessions ===
2314
2315    /// Get a session by remote NodeAddr.
2316    /// Disable the discovery forward rate limiter (for tests).
2317    #[cfg(test)]
2318    pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2319        self.discovery_forward_limiter
2320            .set_interval(std::time::Duration::ZERO);
2321    }
2322
2323    #[cfg(test)]
2324    pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2325        self.sessions.get(remote)
2326    }
2327
2328    /// Get a mutable session by remote NodeAddr.
2329    #[cfg(test)]
2330    pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2331        self.sessions.get_mut(remote)
2332    }
2333
2334    /// Remove a session.
2335    #[cfg(test)]
2336    pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2337        self.sessions.remove(remote)
2338    }
2339
2340    /// Read the path_mtu_lookup entry for a destination FipsAddress.
2341    #[cfg(test)]
2342    pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2343        self.path_mtu_lookup
2344            .read()
2345            .ok()
2346            .and_then(|map| map.get(fips_addr).copied())
2347    }
2348
2349    /// Write a path_mtu_lookup entry directly (for tests that pre-seed the map).
2350    #[cfg(test)]
2351    pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2352        if let Ok(mut map) = self.path_mtu_lookup.write() {
2353            map.insert(fips_addr, mtu);
2354        }
2355    }
2356
2357    /// Number of end-to-end sessions.
2358    pub fn session_count(&self) -> usize {
2359        self.sessions.len()
2360    }
2361
2362    /// Iterate over all session entries (for control queries).
2363    pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2364        self.sessions.iter()
2365    }
2366
2367    // === Identity Cache ===
2368
2369    /// Register a node in the identity cache for FipsAddress → NodeAddr lookup.
2370    pub(crate) fn register_identity(
2371        &mut self,
2372        node_addr: NodeAddr,
2373        pubkey: secp256k1::PublicKey,
2374    ) -> bool {
2375        let mut prefix = [0u8; 15];
2376        prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2377        if let Some(entry) = self.identity_cache.get(&prefix)
2378            && entry.node_addr == node_addr
2379            && entry.pubkey == pubkey
2380        {
2381            // Endpoint sends pass the same PeerIdentity on every packet. Once
2382            // validated, avoid re-deriving NodeAddr from the public key in the
2383            // data path; that hash showed up in macOS sender profiles.
2384            return true;
2385        }
2386
2387        let (xonly, _) = pubkey.x_only_public_key();
2388        let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2389        if derived_node_addr != node_addr {
2390            debug!(
2391                claimed_node_addr = %node_addr,
2392                derived_node_addr = %derived_node_addr,
2393                "Rejected identity cache entry with mismatched public key"
2394            );
2395            return false;
2396        }
2397
2398        let now_ms = Self::now_ms();
2399        if let Some(entry) = self.identity_cache.get_mut(&prefix)
2400            && entry.node_addr == node_addr
2401        {
2402            entry.pubkey = pubkey;
2403            entry.last_seen_ms = now_ms;
2404            return true;
2405        }
2406
2407        let npub = encode_npub(&xonly);
2408        self.identity_cache.insert(
2409            prefix,
2410            IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2411        );
2412        // LRU eviction
2413        let max = self.config.node.cache.identity_size;
2414        if self.identity_cache.len() > max
2415            && let Some(oldest_key) = self
2416                .identity_cache
2417                .iter()
2418                .min_by_key(|(_, entry)| entry.last_seen_ms)
2419                .map(|(k, _)| *k)
2420        {
2421            self.identity_cache.remove(&oldest_key);
2422        }
2423        true
2424    }
2425
2426    /// Look up a destination by FipsAddress prefix (bytes 1-15 of the IPv6 address).
2427    pub(crate) fn lookup_by_fips_prefix(
2428        &mut self,
2429        prefix: &[u8; 15],
2430    ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2431        if let Some(entry) = self.identity_cache.get_mut(prefix) {
2432            entry.last_seen_ms = Self::now_ms(); // LRU touch
2433            Some((entry.node_addr, entry.pubkey))
2434        } else {
2435            None
2436        }
2437    }
2438
2439    /// Check if a node's identity is in the cache (without LRU touch).
2440    pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2441        let mut prefix = [0u8; 15];
2442        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2443        self.identity_cache.contains_key(&prefix)
2444    }
2445
2446    /// Number of identity cache entries.
2447    pub fn identity_cache_len(&self) -> usize {
2448        self.identity_cache.len()
2449    }
2450
2451    /// Iterate over identity cache entries.
2452    ///
2453    /// Returns `(NodeAddr, PublicKey, last_seen_ms)` for each cached identity.
2454    /// Used by the `show_identity_cache` control query.
2455    pub fn identity_cache_iter(
2456        &self,
2457    ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2458        self.identity_cache
2459            .values()
2460            .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2461    }
2462
2463    /// Configured maximum identity cache size.
2464    pub fn identity_cache_max(&self) -> usize {
2465        self.config.node.cache.identity_size
2466    }
2467
2468    /// Number of pending discovery lookups.
2469    pub fn pending_lookup_count(&self) -> usize {
2470        self.pending_lookups.len()
2471    }
2472
2473    /// Iterate over pending discovery lookups for diagnostics.
2474    pub fn pending_lookups_iter(
2475        &self,
2476    ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2477        self.pending_lookups.iter()
2478    }
2479
2480    /// Number of recent discovery requests tracked.
2481    pub fn recent_request_count(&self) -> usize {
2482        self.recent_requests.len()
2483    }
2484
2485    /// Count of destinations with queued TUN packets awaiting session setup.
2486    pub fn pending_tun_destinations(&self) -> usize {
2487        self.pending_tun_packets.len()
2488    }
2489
2490    /// Total TUN packets queued across all destinations.
2491    pub fn pending_tun_total_packets(&self) -> usize {
2492        self.pending_tun_packets.values().map(|q| q.len()).sum()
2493    }
2494
2495    /// Iterate over retry state for diagnostics.
2496    pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2497        self.retry_pending.iter()
2498    }
2499
2500    // === Routing ===
2501
2502    /// Check if a peer is a tree neighbor (parent or child in the spanning tree).
2503    ///
2504    /// Returns true if the peer is our current tree parent, or if the peer
2505    /// has declared us as their parent (making them our child).
2506    pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2507        // Peer is our parent
2508        if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2509            return true;
2510        }
2511        // Peer is our child (their declaration names us as parent)
2512        if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2513            && decl.parent_id() == self.node_addr()
2514        {
2515            return true;
2516        }
2517        false
2518    }
2519
2520    /// Find next hop for a destination node address.
2521    ///
2522    /// Routing priority:
2523    /// 1. Destination is self → `None` (local delivery)
2524    /// 2. Destination is a healthy direct peer → that peer, unless a known
2525    ///    fallback next-hop has a meaningful link-quality advantage.
2526    /// 3. Reply-learned routes in `reply_learned` mode. These are locally
2527    ///    observed reverse paths, selected with weighted multipath plus
2528    ///    periodic coordinate/tree exploration.
2529    /// 4. Bloom filter candidates with cached dest coords → among peers whose
2530    ///    bloom filter contains the destination, pick the one that minimizes
2531    ///    tree distance to the destination, with
2532    ///    `(link_cost, tree_distance_to_dest, node_addr)` tie-breaking.
2533    ///    The self-distance check ensures only peers strictly closer to the
2534    ///    destination than us are considered (prevents routing loops).
2535    /// 5. Greedy tree routing fallback (requires cached dest coords)
2536    /// 6. No route → `None`
2537    ///
2538    /// Both the bloom filter and tree routing paths require cached destination
2539    /// coordinates (checked in `coord_cache`). Without coordinates, the node
2540    /// cannot make loop-free forwarding decisions. The caller should signal
2541    /// `CoordsRequired` back to the source when `None` is returned for a
2542    /// non-local destination.
2543    pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2544        // 1. Local delivery
2545        if dest_node_addr == self.node_addr() {
2546            return None;
2547        }
2548        let now_ms = Self::now_ms();
2549        let direct_session_degraded =
2550            self.session_direct_path_blocks_direct_payload(dest_node_addr, now_ms);
2551
2552        let healthy_direct_route = self
2553            .peers
2554            .get(dest_node_addr)
2555            .filter(|peer| peer.is_healthy() && !direct_session_degraded)
2556            .map(|_| *dest_node_addr);
2557        let direct_payload_eligible = healthy_direct_route.is_some();
2558        let payload_candidate_can_send = |addr: &NodeAddr, peer: &ActivePeer| {
2559            if addr == dest_node_addr {
2560                direct_payload_eligible
2561            } else {
2562                peer.is_healthy()
2563            }
2564        };
2565
2566        // A healthy direct path is not automatically the best path. A
2567        // hotspot/NAT hairpin can remain sendable with high RTT or mild loss;
2568        // in that case a lower-cost mesh next-hop should carry traffic while
2569        // direct probes continue in the background.
2570        let fallback_beats_direct = |node: &Self, fallback_addr: NodeAddr| {
2571            node.route_candidate_beats_direct(healthy_direct_route, fallback_addr)
2572        };
2573
2574        let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2575            Some(
2576                self.peers
2577                    .iter()
2578                    .filter(|(addr, peer)| payload_candidate_can_send(addr, peer))
2579                    .map(|(addr, _)| *addr)
2580                    .collect::<HashSet<_>>(),
2581            )
2582        } else {
2583            None
2584        };
2585
2586        // 3. Optional reply-learned routing. These entries are not peer
2587        // claims; they are local observations of which peer carried traffic
2588        // or a verified lookup response back from the destination. Most
2589        // packets use weighted multipath over learned routes, but periodic
2590        // fallback exploration lets coord/bloom/tree routes discover better
2591        // candidates.
2592        let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2593            self.learned_routes.should_explore_fallback(
2594                dest_node_addr,
2595                now_ms,
2596                self.config.node.routing.learned_fallback_explore_interval,
2597                |addr| sendable.contains(addr),
2598            )
2599        });
2600        if let Some(sendable) = &sendable_learned_peers
2601            && !explore_fallback
2602        {
2603            let eligible = sendable
2604                .iter()
2605                .copied()
2606                .filter(|addr| fallback_beats_direct(self, *addr))
2607                .collect::<HashSet<_>>();
2608            if !eligible.is_empty()
2609                && let Some(next_hop_addr) =
2610                    self.learned_routes
2611                        .select_next_hop(dest_node_addr, now_ms, |addr| eligible.contains(addr))
2612            {
2613                return self.peers.get(&next_hop_addr);
2614            }
2615        }
2616
2617        // Look up cached destination coordinates (required by both bloom and tree paths).
2618        let Some(dest_coords) = self
2619            .coord_cache
2620            .get_and_touch(dest_node_addr, now_ms)
2621            .cloned()
2622        else {
2623            if (healthy_direct_route.is_none() || explore_fallback)
2624                && let Some(sendable) = &sendable_learned_peers
2625                && let Some(next_hop_addr) =
2626                    self.learned_routes
2627                        .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2628            {
2629                return self.peers.get(&next_hop_addr);
2630            }
2631            if let Some(direct_addr) = healthy_direct_route {
2632                return self.peers.get(&direct_addr);
2633            }
2634            return None;
2635        };
2636
2637        // 4. Bloom filter candidates — requires dest_coords for loop-free selection.
2638        //    If no candidate is strictly closer, fall through to tree routing.
2639        let coordinate_route_addr = {
2640            let candidates: Vec<&ActivePeer> = self
2641                .peers
2642                .iter()
2643                .filter(|(addr, peer)| {
2644                    payload_candidate_can_send(addr, peer) && peer.may_reach(dest_node_addr)
2645                })
2646                .map(|(_, peer)| peer)
2647                .collect();
2648            if !candidates.is_empty() {
2649                self.select_best_candidate(&candidates, &dest_coords)
2650                    .map(|peer| *peer.node_addr())
2651            } else {
2652                None
2653            }
2654        };
2655        if let Some(next_hop_addr) = coordinate_route_addr
2656            && fallback_beats_direct(self, next_hop_addr)
2657        {
2658            return self.peers.get(&next_hop_addr);
2659        }
2660
2661        // 5. Greedy tree routing fallback
2662        let tree_route_addr = self.select_tree_payload_candidate(
2663            &dest_coords,
2664            dest_node_addr,
2665            direct_payload_eligible,
2666        );
2667        if let Some(next_hop_addr) = tree_route_addr
2668            && fallback_beats_direct(self, next_hop_addr)
2669        {
2670            return self.peers.get(&next_hop_addr);
2671        }
2672
2673        if explore_fallback {
2674            return sendable_learned_peers.as_ref().and_then(|sendable| {
2675                self.learned_routes
2676                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2677                    .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2678            });
2679        }
2680
2681        if let Some(direct_addr) = healthy_direct_route {
2682            return self.peers.get(&direct_addr);
2683        }
2684
2685        if let Some(sendable) = &sendable_learned_peers
2686            && let Some(next_hop_addr) =
2687                self.learned_routes
2688                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2689        {
2690            return self.peers.get(&next_hop_addr);
2691        }
2692
2693        None
2694    }
2695
2696    pub(in crate::node) fn find_transit_next_hop(
2697        &mut self,
2698        dest_node_addr: &NodeAddr,
2699        previous_hop: &NodeAddr,
2700    ) -> Option<NodeAddr> {
2701        if dest_node_addr == self.node_addr() {
2702            return None;
2703        }
2704
2705        if dest_node_addr != previous_hop
2706            && self
2707                .peers
2708                .get(dest_node_addr)
2709                .is_some_and(|peer| peer.is_healthy())
2710        {
2711            return Some(*dest_node_addr);
2712        }
2713
2714        let next_hop_addr = *self.find_next_hop(dest_node_addr)?.node_addr();
2715        if &next_hop_addr == previous_hop {
2716            self.record_route_failure(*dest_node_addr, next_hop_addr);
2717            return None;
2718        }
2719        Some(next_hop_addr)
2720    }
2721
2722    fn route_candidate_beats_direct(
2723        &self,
2724        healthy_direct_route: Option<NodeAddr>,
2725        candidate_addr: NodeAddr,
2726    ) -> bool {
2727        let Some(direct_addr) = healthy_direct_route else {
2728            return true;
2729        };
2730        if candidate_addr == direct_addr {
2731            return false;
2732        }
2733
2734        let Some(direct) = self.peers.get(&direct_addr) else {
2735            return true;
2736        };
2737        let Some(candidate) = self.peers.get(&candidate_addr) else {
2738            return false;
2739        };
2740        if !candidate.is_healthy() {
2741            return false;
2742        }
2743
2744        let direct_cost = direct.link_cost();
2745        let candidate_cost = candidate.link_cost();
2746        candidate_cost + ROUTING_FALLBACK_MIN_COST_ADVANTAGE < direct_cost
2747    }
2748
2749    fn select_tree_payload_candidate(
2750        &self,
2751        dest_coords: &crate::tree::TreeCoordinate,
2752        direct_dest: &NodeAddr,
2753        direct_payload_eligible: bool,
2754    ) -> Option<NodeAddr> {
2755        if self.tree_state.my_coords().root_id() != dest_coords.root_id() {
2756            return None;
2757        }
2758
2759        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2760        let mut best: Option<(NodeAddr, usize)> = None;
2761
2762        for (peer_addr, peer) in &self.peers {
2763            if peer_addr == direct_dest {
2764                if !direct_payload_eligible {
2765                    continue;
2766                }
2767            } else if !peer.is_healthy() {
2768                continue;
2769            }
2770
2771            let Some(peer_coords) = self.tree_state.peer_coords(peer_addr) else {
2772                continue;
2773            };
2774            let distance = peer_coords.distance_to(dest_coords);
2775            if distance >= my_distance {
2776                continue;
2777            }
2778
2779            let dominated = match &best {
2780                None => true,
2781                Some((best_id, best_dist)) => {
2782                    distance < *best_dist || (distance == *best_dist && peer_addr < best_id)
2783                }
2784            };
2785            if dominated {
2786                best = Some((*peer_addr, distance));
2787            }
2788        }
2789
2790        best.map(|(peer_addr, _)| peer_addr)
2791    }
2792
2793    pub(in crate::node) fn session_direct_path_is_degraded(
2794        &mut self,
2795        dest: &NodeAddr,
2796        now_ms: u64,
2797    ) -> bool {
2798        match self.session_direct_degraded_until_ms.get(dest).copied() {
2799            Some(until_ms) if until_ms > now_ms => true,
2800            Some(_) => {
2801                self.session_direct_degraded_until_ms.remove(dest);
2802                false
2803            }
2804            None => false,
2805        }
2806    }
2807
2808    pub(in crate::node) fn session_direct_path_blocks_direct_payload(
2809        &mut self,
2810        dest: &NodeAddr,
2811        now_ms: u64,
2812    ) -> bool {
2813        self.session_direct_path_is_degraded(dest, now_ms)
2814            && !self.active_peer_uses_configured_static_udp_path(dest)
2815    }
2816
2817    pub(in crate::node) fn mark_session_direct_path_degraded(
2818        &mut self,
2819        dest: NodeAddr,
2820        now_ms: u64,
2821    ) -> bool {
2822        let until_ms = now_ms.saturating_add(SESSION_DIRECT_DEGRADED_HOLD_MS);
2823        let entry = self
2824            .session_direct_degraded_until_ms
2825            .entry(dest)
2826            .or_insert(0);
2827        let was_degraded = *entry > now_ms;
2828        *entry = (*entry).max(until_ms);
2829        !was_degraded
2830    }
2831
2832    pub(in crate::node) fn clear_session_direct_path_degraded(&mut self, dest: &NodeAddr) -> bool {
2833        self.session_direct_degraded_until_ms.remove(dest).is_some()
2834    }
2835
2836    pub(in crate::node) fn learn_reverse_route(
2837        &mut self,
2838        destination: NodeAddr,
2839        next_hop: NodeAddr,
2840    ) {
2841        if self.config.node.routing.mode != RoutingMode::ReplyLearned
2842            || destination == *self.node_addr()
2843        {
2844            return;
2845        }
2846        let now_ms = Self::now_ms();
2847        self.learned_routes.learn(
2848            destination,
2849            next_hop,
2850            now_ms,
2851            self.config.node.routing.learned_ttl_secs,
2852            self.config.node.routing.max_learned_routes_per_dest,
2853        );
2854    }
2855
2856    pub(in crate::node) fn record_route_failure(
2857        &mut self,
2858        destination: NodeAddr,
2859        next_hop: NodeAddr,
2860    ) {
2861        if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2862            return;
2863        }
2864        self.learned_routes.record_failure(&destination, &next_hop);
2865    }
2866
2867    pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2868        self.learned_routes.snapshot(now_ms)
2869    }
2870
2871    pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2872        self.learned_routes.purge_expired(now_ms);
2873    }
2874
2875    /// Select the best peer from a set of bloom filter candidates.
2876    ///
2877    /// Uses distance from each candidate's tree coordinates to the destination
2878    /// as the primary metric (after link_cost). Only selects peers that are
2879    /// strictly closer to the destination than we are (self-distance check
2880    /// prevents routing loops).
2881    ///
2882    /// Ordering: `(link_cost, distance_to_dest, node_addr)`.
2883    fn select_best_candidate<'a>(
2884        &'a self,
2885        candidates: &[&'a ActivePeer],
2886        dest_coords: &crate::tree::TreeCoordinate,
2887    ) -> Option<&'a ActivePeer> {
2888        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2889
2890        let mut best: Option<(&ActivePeer, f64, usize)> = None;
2891
2892        for &candidate in candidates {
2893            if !candidate.can_send() {
2894                continue;
2895            }
2896
2897            let cost = candidate.link_cost();
2898
2899            let dist = self
2900                .tree_state
2901                .peer_coords(candidate.node_addr())
2902                .map(|pc| pc.distance_to(dest_coords))
2903                .unwrap_or(usize::MAX);
2904
2905            // Self-distance check: only consider peers strictly closer
2906            // to the destination than we are (prevents routing loops)
2907            if dist >= my_distance {
2908                continue;
2909            }
2910
2911            let dominated = match &best {
2912                None => true,
2913                Some((_, best_cost, best_dist)) => {
2914                    cost < *best_cost
2915                        || (cost == *best_cost && dist < *best_dist)
2916                        || (cost == *best_cost
2917                            && dist == *best_dist
2918                            && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2919                }
2920            };
2921
2922            if dominated {
2923                best = Some((candidate, cost, dist));
2924            }
2925        }
2926
2927        best.map(|(peer, _, _)| peer)
2928    }
2929
2930    /// Check if a destination is in any peer's bloom filter.
2931    pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2932        self.peers.values().filter(|p| p.may_reach(dest)).collect()
2933    }
2934
2935    /// Get the TUN packet sender channel.
2936    ///
2937    /// Returns None if TUN is not active or the node hasn't been started.
2938    pub fn tun_tx(&self) -> Option<&TunTx> {
2939        self.tun_tx.as_ref()
2940    }
2941
2942    /// Attach app-owned packet I/O for embedded operation without a system TUN.
2943    ///
2944    /// This must be called before [`Node::start`] and requires `tun.enabled =
2945    /// false`. Outbound packets sent to the returned sender are processed by the
2946    /// normal session pipeline. Inbound packets delivered by FIPS sessions are
2947    /// sent to the returned receiver with source attribution.
2948    pub fn attach_external_packet_io(
2949        &mut self,
2950        capacity: usize,
2951    ) -> Result<ExternalPacketIo, NodeError> {
2952        if self.state != NodeState::Created {
2953            return Err(NodeError::Config(ConfigError::Validation(
2954                "external packet I/O must be attached before node start".to_string(),
2955            )));
2956        }
2957        if self.config.tun.enabled {
2958            return Err(NodeError::Config(ConfigError::Validation(
2959                "external packet I/O requires tun.enabled=false".to_string(),
2960            )));
2961        }
2962
2963        let capacity = capacity.max(1);
2964        let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2965        let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2966        self.tun_outbound_rx = Some(outbound_rx);
2967        self.external_packet_tx = Some(inbound_tx);
2968
2969        Ok(ExternalPacketIo {
2970            outbound_tx,
2971            inbound_rx,
2972        })
2973    }
2974
2975    /// Attach app-owned endpoint data I/O for embedded operation.
2976    ///
2977    /// Commands sent to the returned sender are processed by the node RX loop.
2978    /// Incoming endpoint data is emitted as source-attributed events.
2979    pub(crate) fn attach_endpoint_data_io(
2980        &mut self,
2981        capacity: usize,
2982    ) -> Result<EndpointDataIo, NodeError> {
2983        if self.state != NodeState::Created {
2984            return Err(NodeError::Config(ConfigError::Validation(
2985                "endpoint data I/O must be attached before node start".to_string(),
2986            )));
2987        }
2988
2989        let command_capacity = endpoint_data_command_capacity(capacity);
2990        let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2991        // Inbound endpoint-data events use an unbounded channel — see
2992        // `EndpointDataIo::event_rx` docs for the rationale (kills the
2993        // per-packet semaphore + the cross-task relay task that used to
2994        // sit on top of this channel).
2995        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2996        self.endpoint_command_rx = Some(command_rx);
2997        self.endpoint_event_tx = Some(event_tx.clone());
2998
2999        Ok(EndpointDataIo {
3000            command_tx,
3001            event_rx,
3002            event_tx,
3003        })
3004    }
3005
3006    pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
3007        let mut prefix = [0u8; 15];
3008        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
3009        self.identity_cache
3010            .get(&prefix)
3011            .filter(|entry| &entry.node_addr == addr)
3012            .map(|entry| entry.pubkey)
3013    }
3014
3015    pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
3016        let mut prefix = [0u8; 15];
3017        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
3018        self.identity_cache
3019            .get(&prefix)
3020            .filter(|entry| &entry.node_addr == addr)
3021            .map(|entry| entry.npub.clone())
3022    }
3023
3024    pub(in crate::node) fn deliver_external_ipv6_packet(
3025        &self,
3026        src_addr: &NodeAddr,
3027        packet: Vec<u8>,
3028    ) {
3029        let Some(external_packet_tx) = &self.external_packet_tx else {
3030            return;
3031        };
3032        if packet.len() < 40 {
3033            return;
3034        }
3035        let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
3036            return;
3037        };
3038        let delivered = NodeDeliveredPacket {
3039            source_node_addr: *src_addr,
3040            source_npub: self.npub_for_node_addr(src_addr),
3041            destination,
3042            packet,
3043        };
3044        if let Err(error) = external_packet_tx.try_send(delivered) {
3045            debug!(error = %error, "Failed to deliver packet to external app sink");
3046        }
3047    }
3048
3049    // === Sending ===
3050
3051    /// Encrypt and send a link-layer message to an authenticated peer.
3052    ///
3053    /// The plaintext should include the message type byte followed by the
3054    /// message-specific payload (e.g., `[0x50, reason]` for Disconnect).
3055    ///
3056    /// The send path prepends a 4-byte session-relative timestamp (inner
3057    /// header) before encryption. The full 16-byte outer header is used
3058    /// as AAD for the AEAD construction.
3059    ///
3060    /// This is the standard path for sending any link-layer control message
3061    /// to a peer over their encrypted Noise session.
3062    pub(super) async fn send_encrypted_link_message(
3063        &mut self,
3064        node_addr: &NodeAddr,
3065        plaintext: &[u8],
3066    ) -> Result<(), NodeError> {
3067        self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
3068            .await
3069    }
3070
3071    /// Update one peer's local-outbound-broken signal from a `transport.send`
3072    /// outcome. Sets a per-peer timestamp on local-side io errors
3073    /// (NetworkUnreachable / HostUnreachable / AddrNotAvailable); clears that
3074    /// peer on success. The reaper consults this in `check_link_heartbeats` to
3075    /// switch only that peer to `fast_link_dead_timeout_secs`.
3076    pub(in crate::node) fn note_local_send_outcome(
3077        &mut self,
3078        node_addr: &NodeAddr,
3079        result: &Result<usize, TransportError>,
3080    ) {
3081        match result {
3082            Ok(_) => {
3083                self.local_send_failure_at_by_peer.remove(node_addr);
3084            }
3085            Err(error) if error.is_local_route_unavailable() => {
3086                self.local_send_failure_at_by_peer
3087                    .insert(*node_addr, std::time::Instant::now());
3088            }
3089            Err(_) => {}
3090        }
3091    }
3092
3093    /// Return the active dead-timeout for one peer after considering recent
3094    /// local route failures. The fast-dead signal is intentionally short-lived:
3095    /// on the UDP worker path a send call can return before the kernel result
3096    /// is observed, so a stale route error must not compress liveness for the
3097    /// whole normal dead-timeout window.
3098    pub(in crate::node) fn local_send_failure_dead_timeout_for_peer(
3099        &self,
3100        node_addr: &NodeAddr,
3101        now: std::time::Instant,
3102        dead_timeout: std::time::Duration,
3103        fast_dead_timeout: std::time::Duration,
3104    ) -> std::time::Duration {
3105        match self.local_send_failure_at_by_peer.get(node_addr).copied() {
3106            Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
3107                fast_dead_timeout.min(dead_timeout)
3108            }
3109            None => dead_timeout,
3110            Some(_) => dead_timeout,
3111        }
3112    }
3113
3114    pub(in crate::node) fn purge_expired_local_send_failures(&mut self, now: std::time::Instant) {
3115        self.local_send_failure_at_by_peer
3116            .retain(|_, at| now.duration_since(*at) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW);
3117    }
3118
3119    pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
3120        self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
3121    }
3122
3123    pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
3124        let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
3125            return false;
3126        };
3127        let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
3128        std::time::Instant::now().duration_since(t) <= grace
3129    }
3130
3131    /// Like `send_encrypted_link_message` but allows setting the FMP CE flag.
3132    ///
3133    /// Used by the forwarding path to relay congestion signals hop-by-hop.
3134    pub(super) async fn send_encrypted_link_message_with_ce(
3135        &mut self,
3136        node_addr: &NodeAddr,
3137        plaintext: &[u8],
3138        ce_flag: bool,
3139    ) -> Result<(), NodeError> {
3140        let peer = self
3141            .peers
3142            .get_mut(node_addr)
3143            .ok_or(NodeError::PeerNotFound(*node_addr))?;
3144
3145        let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
3146            node_addr: *node_addr,
3147            reason: "no their_index".into(),
3148        })?;
3149        let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
3150            node_addr: *node_addr,
3151            reason: "no transport_id".into(),
3152        })?;
3153        let remote_addr = peer
3154            .current_addr()
3155            .cloned()
3156            .ok_or_else(|| NodeError::SendFailed {
3157                node_addr: *node_addr,
3158                reason: "no current_addr".into(),
3159            })?;
3160        #[cfg(any(target_os = "linux", target_os = "macos"))]
3161        let connected_socket = peer.connected_udp();
3162
3163        // Prepend 4-byte session-relative timestamp (inner header)
3164        let timestamp_ms = peer.session_elapsed_ms();
3165
3166        // MMP: read spin bit value before entering session borrow
3167        let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
3168        let mut flags = if sp_flag { FLAG_SP } else { 0 };
3169        if ce_flag {
3170            flags |= FLAG_CE;
3171        }
3172        if peer.current_k_bit() {
3173            flags |= FLAG_KEY_EPOCH;
3174        }
3175
3176        let session = peer
3177            .noise_session_mut()
3178            .ok_or_else(|| NodeError::SendFailed {
3179                node_addr: *node_addr,
3180                reason: "no noise session".into(),
3181            })?;
3182
3183        // Build 16-byte outer header upfront. The inner-plaintext
3184        // layout is `[ts:4 LE][plaintext...]`, so its length is exactly
3185        // `INNER_TS_LEN + plaintext.len()` — no need to build the Vec
3186        // just to measure it. The worker path uses this length to size
3187        // the wire buffer directly; the legacy path below still
3188        // materialises a separate `inner_plaintext` Vec for the inline
3189        // encrypt-and-send call.
3190        const INNER_TS_LEN: usize = 4;
3191        let counter = session.current_send_counter();
3192        let inner_len = INNER_TS_LEN + plaintext.len();
3193        let payload_len = inner_len as u16;
3194        let header = build_established_header(their_index, counter, flags, payload_len);
3195
3196        // **UDP send fast path.** The encrypt-worker pool is always
3197        // spawned at lifecycle start (workers = num_cpus) in
3198        // production, so this branch is taken for every authentic
3199        // send on every UDP-transported established session. The
3200        // AEAD work + sendmsg syscall run on a dedicated OS thread;
3201        // the rx_loop only builds the wire buffer + reserves the
3202        // counter inline.
3203        //
3204        // Other transport kinds (BLE, TCP, sim, ethernet) fall
3205        // through to the inline encrypt + transport.send path
3206        // below — those don't have raw-fd / sendmmsg / UDP_GSO
3207        // benefits to expose through the worker pool, so the simpler
3208        // synchronous send is the right shape for them.
3209        //
3210        // The `encrypt_workers.is_some()` check below is true in
3211        // production (lifecycle::start spawns the pool); it stays
3212        // checked rather than `expect()`-ed because unit tests
3213        // construct `Node` without calling `start()`.
3214        let transport_for_send = self
3215            .transports
3216            .get(&transport_id)
3217            .ok_or(NodeError::TransportNotFound(transport_id))?;
3218        match transport_for_send.connection_state(&remote_addr) {
3219            ConnectionState::Connected => {}
3220            other => {
3221                if matches!(other, ConnectionState::None) {
3222                    let _ = transport_for_send.connect(&remote_addr).await;
3223                }
3224                return Err(NodeError::SendFailed {
3225                    node_addr: *node_addr,
3226                    reason: format!("transport connection not ready: {:?}", other),
3227                });
3228            }
3229        }
3230        let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
3231        if let Some(workers) = self.encrypt_workers.as_ref().cloned()
3232            && is_udp
3233            && let Some(cipher_clone) = session.send_cipher_clone()
3234        {
3235            {
3236                // Reserve the counter on the session so subsequent
3237                // sends don't reuse it. `current_send_counter` only
3238                // peeks; we advance via `take_send_counter`.
3239                let reserved_counter =
3240                    session
3241                        .take_send_counter()
3242                        .map_err(|e| NodeError::SendFailed {
3243                            node_addr: *node_addr,
3244                            reason: format!("counter reservation failed: {}", e),
3245                        })?;
3246                debug_assert_eq!(reserved_counter, counter);
3247                // Re-derive the header with the now-locked-in counter
3248                // value (same value, but the call sequence is more
3249                // explicit).
3250                let header =
3251                    build_established_header(their_index, reserved_counter, flags, payload_len);
3252                let transport = transport_for_send;
3253                // Snapshot the per-peer connected UDP socket before
3254                // resolving the fallback address. On the established
3255                // steady-state path this socket already carries the
3256                // kernel peer address, so re-parsing the configured
3257                // transport address and touching the DNS cache on every
3258                // packet is pure overhead on the sender hot path.
3259                let send_target = {
3260                    if let TransportHandle::Udp(udp) = transport {
3261                        let socket_addr = {
3262                            #[cfg(any(target_os = "linux", target_os = "macos"))]
3263                            {
3264                                match connected_socket.as_ref() {
3265                                    Some(socket) => Some(socket.peer_addr()),
3266                                    None => udp.resolve_for_off_task(&remote_addr).await.ok(),
3267                                }
3268                            }
3269                            #[cfg(not(any(target_os = "linux", target_os = "macos")))]
3270                            {
3271                                udp.resolve_for_off_task(&remote_addr).await.ok()
3272                            }
3273                        };
3274                        match (udp.async_socket(), socket_addr) {
3275                            (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3276                            _ => None,
3277                        }
3278                    } else {
3279                        None
3280                    }
3281                };
3282                if let Some((socket, socket_addr)) = send_target {
3283                    // Build the wire buffer **directly** from
3284                    // `plaintext` with a single allocation:
3285                    //   `[16 header][4 ts][plaintext...]` with
3286                    // +16 trailing capacity for the AEAD tag.
3287                    // The worker seals `wire_buf[16..]` in
3288                    // place and appends the tag — no second
3289                    // alloc, no second memcpy.
3290                    //
3291                    // Previous design built `inner_plaintext`
3292                    // via `prepend_inner_header` (1 alloc + 1
3293                    // copy) and then let the worker memcpy
3294                    // header + plaintext into a fresh Vec
3295                    // (another alloc + copy). At ~100 kpps the
3296                    // saved alloc/copy is ~150 MB/sec of memory
3297                    // bandwidth on the hot rx_loop + worker.
3298                    let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3299                    let mut wire_buf = Vec::with_capacity(wire_capacity);
3300                    wire_buf.extend_from_slice(&header);
3301                    wire_buf.extend_from_slice(&timestamp_ms.to_le_bytes());
3302                    wire_buf.extend_from_slice(plaintext);
3303                    let predicted_bytes = wire_capacity;
3304                    // Stats / MMP update inline — predicted size
3305                    // is exact for ChaCha20-Poly1305 (tag is
3306                    // constant 16 bytes). When `connected_socket` is
3307                    // `Some`, the worker sends on it without a
3308                    // destination sockaddr — the kernel skips the
3309                    // per-packet sockaddr + route + neighbor resolve.
3310                    if let Some(peer) = self.peers.get_mut(node_addr) {
3311                        peer.link_stats_mut().record_sent(predicted_bytes);
3312                        if let Some(mmp) = peer.mmp_mut() {
3313                            mmp.sender
3314                                .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3315                        }
3316                    }
3317                    let scheduling_weight = self.send_weight_for_peer(node_addr);
3318                    let bulk_endpoint_data = fmp_plaintext_is_bulk_session_datagram(plaintext);
3319                    workers.dispatch(self::encrypt_worker::FmpSendJob {
3320                        cipher: cipher_clone,
3321                        counter: reserved_counter,
3322                        wire_buf,
3323                        fsp_seal: None,
3324                        socket,
3325                        dest_addr: socket_addr,
3326                        #[cfg(any(target_os = "linux", target_os = "macos"))]
3327                        connected_socket,
3328                        bulk_endpoint_data,
3329                        drop_on_backpressure: bulk_endpoint_data,
3330                        scheduling_weight,
3331                        queued_at: crate::perf_profile::stamp(),
3332                    });
3333                    return Ok(());
3334                }
3335            }
3336        }
3337
3338        // Inline (legacy) path: encrypt + send on the rx_loop.
3339        // Build the inner plaintext lazily here — the worker path
3340        // above never reaches this point, so the prepend_inner_header
3341        // alloc is avoided in the fast path.
3342        let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3343        // Encrypt with AAD binding to the outer header
3344        let ciphertext = {
3345            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3346            session
3347                .encrypt_with_aad(&inner_plaintext, &header)
3348                .map_err(|e| NodeError::SendFailed {
3349                    node_addr: *node_addr,
3350                    reason: format!("encryption failed: {}", e),
3351                })?
3352        };
3353
3354        let wire_packet = build_encrypted(&header, &ciphertext);
3355
3356        // Re-borrow peer for stats update after sending
3357        let send_result = {
3358            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3359            let transport = self
3360                .transports
3361                .get(&transport_id)
3362                .ok_or(NodeError::TransportNotFound(transport_id))?;
3363            transport.send(&remote_addr, &wire_packet).await
3364        };
3365        self.note_local_send_outcome(node_addr, &send_result);
3366        let bytes_sent = send_result.map_err(|e| match e {
3367            TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3368                node_addr: *node_addr,
3369                packet_size,
3370                mtu,
3371            },
3372            other => NodeError::SendFailed {
3373                node_addr: *node_addr,
3374                reason: format!("transport send: {}", other),
3375            },
3376        })?;
3377
3378        // Update send statistics
3379        if let Some(peer) = self.peers.get_mut(node_addr) {
3380            peer.link_stats_mut().record_sent(bytes_sent);
3381            // MMP: record sent frame for sender report generation
3382            if let Some(mmp) = peer.mmp_mut() {
3383                mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3384            }
3385        }
3386
3387        Ok(())
3388    }
3389}
3390
3391impl fmt::Debug for Node {
3392    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3393        f.debug_struct("Node")
3394            .field("node_addr", self.node_addr())
3395            .field("state", &self.state)
3396            .field("is_leaf_only", &self.is_leaf_only)
3397            .field("connections", &self.connection_count())
3398            .field("peers", &self.peer_count())
3399            .field("links", &self.link_count())
3400            .field("transports", &self.transport_count())
3401            .finish()
3402    }
3403}