Skip to main content

fips_core/node/
mod.rs

1//! FIPS Node Entity
2//!
3//! Top-level structure representing a running FIPS instance. The Node
4//! holds all state required for mesh routing: identity, tree state,
5//! Bloom filters, coordinate caches, transports, links, and peers.
6
7mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31#[cfg(unix)]
32use self::wire::ESTABLISHED_HEADER_SIZE;
33use self::wire::{
34    FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted, build_established_header,
35    prepend_inner_header,
36};
37use crate::bloom::BloomState;
38use crate::cache::CoordCache;
39use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
40use crate::node::session::SessionEntry;
41use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
42use crate::peer::{ActivePeer, PeerConnection};
43#[cfg(any(target_os = "linux", target_os = "macos"))]
44use crate::transport::ethernet::EthernetTransport;
45use crate::transport::tcp::TcpTransport;
46use crate::transport::tor::TorTransport;
47use crate::transport::udp::UdpTransport;
48#[cfg(feature = "webrtc-transport")]
49use crate::transport::webrtc::WebRtcTransport;
50use crate::transport::{
51    ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
52    TransportHandle, TransportId,
53};
54use crate::tree::TreeState;
55use crate::upper::hosts::HostMap;
56use crate::upper::icmp_rate_limit::IcmpRateLimiter;
57use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
58use crate::utils::index::IndexAllocator;
59use crate::{
60    Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
61    PeerIdentity, encode_npub,
62};
63use rand::Rng;
64use std::collections::{HashMap, HashSet, VecDeque};
65use std::fmt;
66use std::sync::Arc;
67use std::thread::JoinHandle;
68use thiserror::Error;
69use tracing::{debug, warn};
70
71const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
72const SESSION_DIRECT_DEGRADED_HOLD_MS: u64 = 20_000;
73const SESSION_DIRECT_DEGRADED_MIN_SAMPLE: u64 = 16;
74const SESSION_DIRECT_DEGRADED_LOSS_THRESHOLD: f64 = 0.08;
75const SESSION_DIRECT_RECOVERY_LOSS_THRESHOLD: f64 = 0.02;
76const ROUTING_FALLBACK_MIN_COST_ADVANTAGE: f64 = 0.25;
77
78#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
79struct FmpPlaintextTrafficClass {
80    bulk_endpoint_data: bool,
81    drop_on_backpressure: bool,
82}
83
84#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
85struct EndpointPayloadTrafficClass {
86    bulk_endpoint_data: bool,
87    drop_on_backpressure: bool,
88}
89
90fn classify_fmp_plaintext_traffic(plaintext: &[u8]) -> FmpPlaintextTrafficClass {
91    let bulk_endpoint_data = fmp_plaintext_is_bulk_session_datagram(plaintext);
92    // At this layer established FSP payloads are already end-to-end encrypted,
93    // so a bulk SessionDatagram may still be TCP endpoint traffic. Keep it out
94    // of the control lane, but only the pre-FSP endpoint path may mark known
95    // non-TCP packets as discardable under sender backpressure.
96    FmpPlaintextTrafficClass {
97        bulk_endpoint_data,
98        drop_on_backpressure: false,
99    }
100}
101
102fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
103    if plaintext
104        .first()
105        .is_none_or(|ty| *ty != LinkMessageType::SessionDatagram.to_byte())
106    {
107        return false;
108    }
109    let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
110        return false;
111    };
112    FspCommonPrefix::parse(fsp_payload).is_some_and(|prefix| {
113        prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted() && !prefix.has_coords()
114    })
115}
116
117fn classify_endpoint_payload(payload: &[u8]) -> EndpointPayloadTrafficClass {
118    const IPPROTO_TCP: u8 = 6;
119    const IPPROTO_ICMPV6: u8 = 58;
120
121    match parse_endpoint_payload_ip_proto(payload) {
122        Some((IPPROTO_ICMPV6, _)) => EndpointPayloadTrafficClass::default(),
123        Some((IPPROTO_TCP, offset)) => {
124            let latency_sensitive = endpoint_tcp_payload_is_latency_sensitive(payload, offset);
125            EndpointPayloadTrafficClass {
126                bulk_endpoint_data: !latency_sensitive,
127                drop_on_backpressure: false,
128            }
129        }
130        _ => EndpointPayloadTrafficClass {
131            bulk_endpoint_data: true,
132            drop_on_backpressure: true,
133        },
134    }
135}
136
137fn endpoint_tcp_payload_is_latency_sensitive(payload: &[u8], tcp_offset: usize) -> bool {
138    const TCP_MIN_HEADER_LEN: usize = 20;
139    const TCP_FLAG_FIN: u8 = 0x01;
140    const TCP_FLAG_SYN: u8 = 0x02;
141    const TCP_FLAG_RST: u8 = 0x04;
142    const INTERACTIVE_TCP_PAYLOAD_MAX: usize = 256;
143
144    if payload.len() < tcp_offset + TCP_MIN_HEADER_LEN {
145        return true;
146    }
147
148    let tcp_header_len = usize::from(payload[tcp_offset + 12] >> 4) * 4;
149    if tcp_header_len < TCP_MIN_HEADER_LEN || payload.len() < tcp_offset + tcp_header_len {
150        return true;
151    }
152
153    let flags = payload[tcp_offset + 13];
154    if flags & (TCP_FLAG_FIN | TCP_FLAG_SYN | TCP_FLAG_RST) != 0 {
155        return true;
156    }
157
158    let payload_len = endpoint_ip_payload_len(payload)
159        .and_then(|ip_payload_len| ip_payload_len.checked_sub(tcp_header_len))
160        .unwrap_or_else(|| payload.len().saturating_sub(tcp_offset + tcp_header_len));
161    payload_len <= INTERACTIVE_TCP_PAYLOAD_MAX
162}
163
164fn endpoint_ip_payload_len(payload: &[u8]) -> Option<usize> {
165    const IPV4_MIN_HEADER_LEN: usize = 20;
166    const IPV6_HEADER_LEN: usize = 40;
167
168    let version_ihl = payload.first().copied()?;
169    match version_ihl >> 4 {
170        4 => {
171            if payload.len() < IPV4_MIN_HEADER_LEN {
172                return None;
173            }
174            let header_len = usize::from(version_ihl & 0x0f) * 4;
175            if header_len < IPV4_MIN_HEADER_LEN || payload.len() < header_len {
176                return None;
177            }
178            let total_len = usize::from(u16::from_be_bytes([payload[2], payload[3]]));
179            total_len.checked_sub(header_len)
180        }
181        6 => {
182            if payload.len() < IPV6_HEADER_LEN {
183                return None;
184            }
185            Some(usize::from(u16::from_be_bytes([payload[4], payload[5]])))
186        }
187        _ => None,
188    }
189}
190
191fn parse_endpoint_payload_ip_proto(payload: &[u8]) -> Option<(u8, usize)> {
192    const IPV4_MIN_HEADER_LEN: usize = 20;
193
194    let version_ihl = payload.first().copied()?;
195
196    match version_ihl >> 4 {
197        4 => {
198            if payload.len() < IPV4_MIN_HEADER_LEN {
199                return None;
200            }
201            let header_len = usize::from(version_ihl & 0x0f) * 4;
202            if header_len >= IPV4_MIN_HEADER_LEN && payload.len() >= header_len {
203                Some((payload[9], header_len))
204            } else {
205                None
206            }
207        }
208        6 => ipv6_payload_next_header(payload),
209        _ => None,
210    }
211}
212
213#[cfg(test)]
214fn endpoint_payload_is_tcp(payload: &[u8]) -> bool {
215    const IPPROTO_TCP: u8 = 6;
216    parse_endpoint_payload_ip_proto(payload).is_some_and(|(proto, _)| proto == IPPROTO_TCP)
217}
218
219fn ipv6_payload_next_header(payload: &[u8]) -> Option<(u8, usize)> {
220    const IPV6_HEADER_LEN: usize = 40;
221    const IPV6_FRAGMENT_HEADER_LEN: usize = 8;
222
223    if payload.len() < IPV6_HEADER_LEN || payload[0] >> 4 != 6 {
224        return None;
225    }
226
227    let mut next_header = payload[6];
228    let mut offset = IPV6_HEADER_LEN;
229    let mut extension_count = 0usize;
230    while ipv6_extension_header_is_skippable(next_header) {
231        if next_header == 44 {
232            if payload.len() < offset + IPV6_FRAGMENT_HEADER_LEN {
233                return None;
234            }
235            next_header = payload[offset];
236            offset += IPV6_FRAGMENT_HEADER_LEN;
237        } else if next_header == 51 {
238            if payload.len() < offset + 2 {
239                return None;
240            }
241            let header_len = (usize::from(payload[offset + 1]) + 2) * 4;
242            if payload.len() < offset + header_len {
243                return None;
244            }
245            next_header = payload[offset];
246            offset += header_len;
247        } else {
248            if payload.len() < offset + 2 {
249                return None;
250            }
251            let header_len = (usize::from(payload[offset + 1]) + 1) * 8;
252            if payload.len() < offset + header_len {
253                return None;
254            }
255            next_header = payload[offset];
256            offset += header_len;
257        }
258        extension_count += 1;
259        if extension_count > 8 {
260            return None;
261        }
262    }
263
264    Some((next_header, offset))
265}
266
267fn ipv6_extension_header_is_skippable(next_header: u8) -> bool {
268    matches!(next_header, 0 | 43 | 44 | 51 | 60 | 135)
269}
270
271/// Half-range of the symmetric jitter applied to per-session rekey timers.
272///
273/// Each FMP/FSP session draws an offset uniformly from
274/// `[-REKEY_JITTER_SECS, +REKEY_JITTER_SECS]` seconds at construction and
275/// after each cutover. This preserves the configured mean interval while
276/// reducing dual-initiation bursts in symmetric-start meshes.
277pub(crate) const REKEY_JITTER_SECS: i64 = 15;
278
279/// Errors related to node operations.
280#[derive(Debug, Error)]
281pub enum NodeError {
282    #[error("node not started")]
283    NotStarted,
284
285    #[error("node already started")]
286    AlreadyStarted,
287
288    #[error("node already stopped")]
289    AlreadyStopped,
290
291    #[error("transport not found: {0}")]
292    TransportNotFound(TransportId),
293
294    #[error("no transport available for type: {0}")]
295    NoTransportForType(String),
296
297    #[error("link not found: {0}")]
298    LinkNotFound(LinkId),
299
300    #[error("connection not found: {0}")]
301    ConnectionNotFound(LinkId),
302
303    #[error("peer not found: {0:?}")]
304    PeerNotFound(NodeAddr),
305
306    #[error("peer already exists: {0:?}")]
307    PeerAlreadyExists(NodeAddr),
308
309    #[error("connection already exists for link: {0}")]
310    ConnectionAlreadyExists(LinkId),
311
312    #[error("invalid peer npub '{npub}': {reason}")]
313    InvalidPeerNpub { npub: String, reason: String },
314
315    #[error("discovery error: {0}")]
316    Discovery(String),
317
318    #[error("access denied: {0}")]
319    AccessDenied(String),
320
321    #[error("max connections exceeded: {max}")]
322    MaxConnectionsExceeded { max: usize },
323
324    #[error("max peers exceeded: {max}")]
325    MaxPeersExceeded { max: usize },
326
327    #[error("max links exceeded: {max}")]
328    MaxLinksExceeded { max: usize },
329
330    #[error("handshake incomplete for link {0}")]
331    HandshakeIncomplete(LinkId),
332
333    #[error("no session available for link {0}")]
334    NoSession(LinkId),
335
336    #[error("promotion failed for link {link_id}: {reason}")]
337    PromotionFailed { link_id: LinkId, reason: String },
338
339    #[error("send failed to {node_addr}: {reason}")]
340    SendFailed { node_addr: NodeAddr, reason: String },
341
342    #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
343    MtuExceeded {
344        node_addr: NodeAddr,
345        packet_size: usize,
346        mtu: u16,
347    },
348
349    #[error("config error: {0}")]
350    Config(#[from] ConfigError),
351
352    #[error("identity error: {0}")]
353    Identity(#[from] IdentityError),
354
355    #[error("TUN error: {0}")]
356    Tun(#[from] TunError),
357
358    #[error("index allocation failed: {0}")]
359    IndexAllocationFailed(String),
360
361    #[error("handshake failed: {0}")]
362    HandshakeFailed(String),
363
364    #[error("transport error: {0}")]
365    TransportError(String),
366
367    #[error("local route unavailable: {0}")]
368    LocalRouteUnavailable(String),
369
370    #[error("bootstrap handoff failed: {0}")]
371    BootstrapHandoff(String),
372}
373
374impl NodeError {
375    pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
376        if error.is_local_route_unavailable() {
377            Self::LocalRouteUnavailable(error.to_string())
378        } else {
379            Self::TransportError(error.to_string())
380        }
381    }
382
383    pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
384        matches!(self, Self::LocalRouteUnavailable(_))
385    }
386}
387
388/// Source-attributed packet delivered by a node running without a system TUN.
389#[derive(Debug, Clone, PartialEq, Eq)]
390pub struct NodeDeliveredPacket {
391    /// FIPS node address that originated the packet.
392    pub source_node_addr: NodeAddr,
393    /// Source Nostr public key when the node has learned it.
394    pub source_npub: Option<String>,
395    /// Destination FIPS address from the IPv6 packet.
396    pub destination: FipsAddress,
397    /// Full IPv6 packet after FIPS session decapsulation.
398    pub packet: Vec<u8>,
399}
400
401#[derive(Debug, Clone)]
402struct IdentityCacheEntry {
403    node_addr: NodeAddr,
404    pubkey: secp256k1::PublicKey,
405    npub: String,
406    last_seen_ms: u64,
407}
408
409impl IdentityCacheEntry {
410    fn new(
411        node_addr: NodeAddr,
412        pubkey: secp256k1::PublicKey,
413        npub: String,
414        last_seen_ms: u64,
415    ) -> Self {
416        Self {
417            node_addr,
418            pubkey,
419            npub,
420            last_seen_ms,
421        }
422    }
423}
424
425/// App-owned packet channels for embedding FIPS without a system TUN.
426#[derive(Debug)]
427pub struct ExternalPacketIo {
428    /// Send outbound IPv6 packets into the node.
429    pub outbound_tx: crate::upper::tun::TunOutboundTx,
430    /// Receive inbound IPv6 packets delivered by FIPS sessions.
431    pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
432}
433
434/// App-owned endpoint data channels for embedding FIPS without a daemon.
435#[derive(Debug)]
436pub(crate) struct EndpointDataIo {
437    /// Send endpoint data commands into the node RX loop.
438    ///
439    /// Bounded with a generous default so normal sender bursts do not
440    /// stall on semaphore acquisition. macOS pacing happens at the UDP
441    /// egress thread where the real Wi-Fi/interface bottleneck is visible;
442    /// constraining this app queue instead caused the inner TCP flow to
443    /// collapse under iperf. `FIPS_ENDPOINT_DATA_QUEUE_CAP` overrides the
444    /// default for benches.
445    pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
446    /// Receive endpoint data delivered by FIPS sessions.
447    ///
448    /// Unbounded so the rx_loop's send on inbound packet delivery is a
449    /// wait-free push (no semaphore acquire), and so we can drop the
450    /// per-packet cross-task relay that previously sat between the node
451    /// task and the `FipsEndpoint::recv()` consumer. Backpressure is
452    /// naturally bounded — the rx_loop both produces here and runs the
453    /// same runtime that schedules the consumer, so a stalled consumer
454    /// stalls production too.
455    pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
456    /// Clone of the event_tx exposed for in-process loopback (e.g.
457    /// `FipsEndpoint::send` to self_npub). Lets the endpoint inject an
458    /// event into the same queue without going through the encrypt /
459    /// decrypt path, while keeping every consumer reading from a single
460    /// channel.
461    pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
462}
463
464fn endpoint_data_command_capacity(requested: usize) -> usize {
465    if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
466        && let Ok(value) = raw.trim().parse::<usize>()
467        && value > 0
468    {
469        return value;
470    }
471
472    requested.max(1).max(32_768)
473}
474
475/// Commands accepted by the node endpoint data service.
476#[derive(Debug)]
477pub(crate) enum NodeEndpointCommand {
478    /// Send with an explicit response channel — used by callers that
479    /// care whether the local-stack handoff succeeded (e.g.
480    /// `blocking_send` waits for the runtime to accept the send).
481    Send {
482        remote: PeerIdentity,
483        payload: Vec<u8>,
484        queued_at: Option<std::time::Instant>,
485        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
486    },
487    /// **Fire-and-forget** variant of `Send` — no oneshot allocation,
488    /// no per-packet result channel. Used by the data-plane fast path
489    /// (`FipsEndpoint::send`) where the caller already discards the
490    /// result. Saves one oneshot::channel() allocation per outbound
491    /// packet on the application's send hot path.
492    SendOneway {
493        remote: PeerIdentity,
494        payload: Vec<u8>,
495        queued_at: Option<std::time::Instant>,
496    },
497    PeerSnapshot {
498        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
499    },
500    RelaySnapshot {
501        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
502    },
503    UpdateRelays {
504        advert_relays: Vec<String>,
505        dm_relays: Vec<String>,
506        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
507    },
508    /// Replace the runtime peer list. Newly added auto-connect peers get
509    /// `initiate_peer_connection` immediately; removed peers are dropped
510    /// from the retry queue (the regular liveness timeout reaps any active
511    /// session). Existing entries are kept and their `addresses` field is
512    /// refreshed so the next retry sees the latest hints.
513    UpdatePeers {
514        peers: Vec<crate::config::PeerConfig>,
515        response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
516    },
517}
518
519/// Reports what changed in response to `UpdatePeers`.
520#[derive(Debug, Clone, Default, PartialEq, Eq)]
521pub(crate) struct UpdatePeersOutcome {
522    pub(crate) added: usize,
523    pub(crate) removed: usize,
524    pub(crate) updated: usize,
525    pub(crate) unchanged: usize,
526}
527
528/// Endpoint data events emitted by the node session receive path.
529#[derive(Debug)]
530pub(crate) enum NodeEndpointEvent {
531    Data {
532        source_node_addr: NodeAddr,
533        source_npub: Option<String>,
534        payload: Vec<u8>,
535        queued_at: Option<std::time::Instant>,
536    },
537}
538
539/// Authenticated peer state exposed to embedded endpoint callers.
540#[derive(Debug, Clone, PartialEq, Eq)]
541pub(crate) struct NodeEndpointPeer {
542    pub(crate) npub: String,
543    pub(crate) connected: bool,
544    pub(crate) transport_addr: Option<String>,
545    pub(crate) transport_type: Option<String>,
546    pub(crate) link_id: u64,
547    pub(crate) srtt_ms: Option<u64>,
548    pub(crate) packets_sent: u64,
549    pub(crate) packets_recv: u64,
550    pub(crate) bytes_sent: u64,
551    pub(crate) bytes_recv: u64,
552    pub(crate) direct_probe_pending: bool,
553    pub(crate) direct_probe_after_ms: Option<u64>,
554}
555
556/// Live Nostr relay state exposed to embedded endpoint callers.
557#[derive(Debug, Clone, PartialEq, Eq)]
558pub(crate) struct NodeEndpointRelayStatus {
559    pub(crate) url: String,
560    pub(crate) status: String,
561}
562
563/// Node operational state.
564#[derive(Clone, Copy, Debug, PartialEq, Eq)]
565pub enum NodeState {
566    /// Created but not started.
567    Created,
568    /// Starting up (initializing transports).
569    Starting,
570    /// Fully operational.
571    Running,
572    /// Shutting down.
573    Stopping,
574    /// Stopped.
575    Stopped,
576}
577
578impl NodeState {
579    /// Check if node is operational.
580    pub fn is_operational(&self) -> bool {
581        matches!(self, NodeState::Running)
582    }
583
584    /// Check if node can be started.
585    pub fn can_start(&self) -> bool {
586        matches!(self, NodeState::Created | NodeState::Stopped)
587    }
588
589    /// Check if node can be stopped.
590    pub fn can_stop(&self) -> bool {
591        matches!(self, NodeState::Running)
592    }
593}
594
595impl fmt::Display for NodeState {
596    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
597        let s = match self {
598            NodeState::Created => "created",
599            NodeState::Starting => "starting",
600            NodeState::Running => "running",
601            NodeState::Stopping => "stopping",
602            NodeState::Stopped => "stopped",
603        };
604        write!(f, "{}", s)
605    }
606}
607
608/// Recent request tracking for dedup and reverse-path forwarding.
609///
610/// When a LookupRequest is forwarded through a node, the node stores the
611/// request_id and which peer sent it. When the corresponding LookupResponse
612/// arrives, it's forwarded back to that peer (reverse-path forwarding).
613/// The `response_forwarded` flag prevents response routing loops.
614#[derive(Clone, Debug)]
615pub(crate) struct RecentRequest {
616    /// The peer who sent this request to us.
617    pub(crate) from_peer: NodeAddr,
618    /// When we received this request (Unix milliseconds).
619    pub(crate) timestamp_ms: u64,
620    /// Whether we've already forwarded a response for this request.
621    /// Prevents response routing loops when convergent request paths
622    /// create bidirectional entries in recent_requests.
623    pub(crate) response_forwarded: bool,
624}
625
626impl RecentRequest {
627    pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
628        Self {
629            from_peer,
630            timestamp_ms,
631            response_forwarded: false,
632        }
633    }
634
635    /// Check if this entry has expired (older than expiry_ms).
636    pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
637        current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
638    }
639}
640
641/// Key for addr_to_link reverse lookup.
642type AddrKey = (TransportId, TransportAddr);
643
644/// Per-transport kernel drop tracking for congestion detection.
645///
646/// Sampled every tick (1s). The `dropping` flag indicates whether new
647/// kernel drops were observed since the previous sample.
648#[derive(Debug, Default)]
649struct TransportDropState {
650    /// Previous `recv_drops` sample (cumulative counter).
651    prev_drops: u64,
652    /// True if drops increased since the last sample.
653    dropping: bool,
654}
655
656/// State for a link waiting for transport-level connection establishment.
657///
658/// For connection-oriented transports (TCP, Tor), the transport connect runs
659/// asynchronously. This struct holds the data needed to complete the handshake
660/// once the connection is ready.
661struct PendingConnect {
662    /// The link that was created for this connection.
663    link_id: LinkId,
664    /// Which transport is being used.
665    transport_id: TransportId,
666    /// The remote address being connected to.
667    remote_addr: TransportAddr,
668    /// The peer identity (for handshake initiation).
669    peer_identity: PeerIdentity,
670}
671
672/// A running FIPS node instance.
673///
674/// This is the top-level container holding all node state.
675///
676/// ## Peer Lifecycle
677///
678/// Peers go through two phases:
679/// 1. **Connection phase** (`connections`): Handshake in progress, indexed by LinkId
680/// 2. **Active phase** (`peers`): Authenticated, indexed by NodeAddr
681///
682/// The `addr_to_link` map enables dispatching incoming packets to the right
683/// connection before authentication completes.
684// Discovery lookup constants moved to config: node.discovery.attempt_timeouts_secs, node.discovery.ttl
685pub struct Node {
686    // === Identity ===
687    /// This node's cryptographic identity.
688    identity: Identity,
689
690    /// Random epoch generated at startup for peer restart detection.
691    /// Exchanged inside Noise handshake messages so peers can detect restarts.
692    startup_epoch: [u8; 8],
693
694    /// Instant when the node was created, for uptime reporting.
695    started_at: std::time::Instant,
696
697    // === Configuration ===
698    /// Loaded configuration.
699    config: Config,
700
701    // === State ===
702    /// Node operational state.
703    state: NodeState,
704
705    /// Whether this is a leaf-only node.
706    is_leaf_only: bool,
707
708    // === Spanning Tree ===
709    /// Local spanning tree state.
710    tree_state: TreeState,
711
712    // === Bloom Filter ===
713    /// Local Bloom filter state.
714    bloom_state: BloomState,
715
716    // === Routing ===
717    /// Address -> coordinates cache (from session setup and discovery).
718    coord_cache: CoordCache,
719    /// Locally learned reverse-path next-hop hints.
720    learned_routes: LearnedRouteTable,
721    /// Destinations whose direct first-hop path is temporarily suspect because
722    /// session-layer MMP observed sustained loss while using that direct path.
723    session_direct_degraded_until_ms: HashMap<NodeAddr, u64>,
724    /// Recent discovery requests (dedup + reverse-path forwarding).
725    /// Maps request_id → RecentRequest.
726    recent_requests: HashMap<u64, RecentRequest>,
727    /// Per-destination path MTU lookup, keyed by FipsAddress (mirrors
728    /// `coord_cache.entries[*].path_mtu`). Sync read-only access from
729    /// the TUN reader/writer threads at TCP MSS clamp time so the
730    /// SYN/SYN-ACK clamp can use the smaller of the local-egress floor
731    /// and the learned per-destination path MTU.
732    path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
733
734    // === Transports & Links ===
735    /// Active transports (owned by Node).
736    transports: HashMap<TransportId, TransportHandle>,
737    /// Per-transport kernel drop tracking for congestion detection.
738    transport_drops: HashMap<TransportId, TransportDropState>,
739    /// Active links.
740    links: HashMap<LinkId, Link>,
741    /// Reverse lookup: (transport_id, remote_addr) -> link_id.
742    addr_to_link: HashMap<AddrKey, LinkId>,
743
744    // === Packet Channel ===
745    /// Packet sender for transports.
746    packet_tx: Option<PacketTx>,
747    /// Packet receiver (for event loop).
748    packet_rx: Option<PacketRx>,
749
750    // === Connections (Handshake Phase) ===
751    /// Pending connections (handshake in progress).
752    /// Indexed by LinkId since we don't know the peer's identity yet.
753    connections: HashMap<LinkId, PeerConnection>,
754
755    // === Peers (Active Phase) ===
756    /// Authenticated peers.
757    /// Indexed by NodeAddr (verified identity).
758    peers: HashMap<NodeAddr, ActivePeer>,
759
760    // === End-to-End Sessions ===
761    /// Session table for end-to-end encrypted sessions.
762    /// Keyed by remote NodeAddr.
763    sessions: HashMap<NodeAddr, SessionEntry>,
764
765    // === Identity Cache ===
766    /// Maps FipsAddress prefix bytes (bytes 1-15) to cached peer identity data.
767    /// Enables reverse lookup from IPv6 destination to session/routing identity.
768    identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
769
770    // === Pending TUN Packets ===
771    /// Packets queued while waiting for session establishment.
772    /// Keyed by destination NodeAddr, bounded per-dest and total.
773    pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
774    /// Endpoint data payloads queued while waiting for session establishment.
775    pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
776    // === Pending Discovery Lookups ===
777    /// Tracks in-flight discovery lookups. Maps target NodeAddr to the
778    /// initiation timestamp (Unix ms). Prevents duplicate flood queries.
779    pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
780
781    // === Resource Limits ===
782    /// Maximum connections (0 = unlimited).
783    max_connections: usize,
784    /// Maximum peers (0 = unlimited).
785    max_peers: usize,
786    /// Maximum links (0 = unlimited).
787    max_links: usize,
788
789    // === Counters ===
790    /// Next link ID to allocate.
791    next_link_id: u64,
792    /// Next transport ID to allocate.
793    next_transport_id: u32,
794
795    // === Node Statistics ===
796    /// Routing, forwarding, discovery, and error signal counters.
797    stats: stats::NodeStats,
798
799    /// Time-series history of node-level metrics (1s/1m rings).
800    stats_history: stats_history::StatsHistory,
801
802    // === TUN Interface ===
803    /// TUN device state.
804    tun_state: TunState,
805    /// TUN interface name (for cleanup).
806    tun_name: Option<String>,
807    /// TUN packet sender channel.
808    tun_tx: Option<TunTx>,
809    /// Receiver for outbound packets from the TUN reader.
810    tun_outbound_rx: Option<TunOutboundRx>,
811    /// App-owned packet sink used by embedded/no-TUN integrations.
812    external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
813    /// Endpoint data command receiver used by embedded/no-daemon integrations.
814    endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
815    /// Endpoint data event sink used by embedded/no-daemon integrations.
816    endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
817    /// Off-task FMP-encrypt + UDP-send worker pool. `None` if not yet
818    /// spawned (set up in `start()` once transports are running).
819    /// `Some(pool)` once available; the pool internally holds
820    /// per-worker mpsc senders and round-robins jobs across them.
821    /// See `node::encrypt_worker` for the rationale and layout.
822    encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
823    /// Off-task FMP + FSP decrypt + delivery worker pool. Mirror of
824    /// `encrypt_workers` for the receive side.
825    decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
826    /// Set of sessions that have been registered with the decrypt
827    /// shard worker pool. Used by rx_loop to decide between fast-path
828    /// dispatch (worker owns the session) and legacy in-place decrypt
829    /// (worker doesn't have it yet). Per the data-plane restructure,
830    /// the worker owns its session state directly — there's no shared
831    /// `Arc<RwLock<HashMap>>` of cipher / replay state anymore, only
832    /// this set tracks **whether** the worker has been told about a
833    /// given session.
834    decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
835    /// Fallback channel: decrypt worker bounces non-fast-path packets
836    /// (anything that's not bulk EndpointData) back here for rx_loop
837    /// to handle via the legacy path. Drained by a new rx_loop arm.
838    decrypt_fallback_rx:
839        Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
840    decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
841    /// TUN reader thread handle.
842    tun_reader_handle: Option<JoinHandle<()>>,
843    /// TUN writer thread handle.
844    tun_writer_handle: Option<JoinHandle<()>>,
845    /// Shutdown pipe: writing to this fd unblocks the TUN reader thread on macOS.
846    /// On Linux, deleting the interface via netlink serves the same purpose.
847    #[cfg(target_os = "macos")]
848    tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
849
850    // === DNS Responder ===
851    /// Receiver for resolved identities from the DNS responder.
852    dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
853    /// DNS responder task handle.
854    dns_task: Option<tokio::task::JoinHandle<()>>,
855
856    // === Index-Based Session Dispatch ===
857    /// Allocator for session indices.
858    index_allocator: IndexAllocator,
859    /// O(1) lookup: (transport_id, our_index) → NodeAddr.
860    /// This maps our session index to the peer that uses it.
861    peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
862    /// Pending outbound handshakes by our sender_idx.
863    /// Tracks which LinkId corresponds to which session index.
864    pending_outbound: HashMap<(TransportId, u32), LinkId>,
865
866    // === Rate Limiting ===
867    /// Rate limiter for msg1 processing (DoS protection).
868    msg1_rate_limiter: HandshakeRateLimiter,
869    /// Rate limiter for ICMP Packet Too Big messages.
870    icmp_rate_limiter: IcmpRateLimiter,
871    /// Rate limiter for routing error signals (CoordsRequired / PathBroken).
872    routing_error_rate_limiter: RoutingErrorRateLimiter,
873    /// Rate limiter for source-side CoordsRequired/PathBroken responses.
874    coords_response_rate_limiter: RoutingErrorRateLimiter,
875    /// Backoff for failed discovery lookups (originator-side).
876    discovery_backoff: DiscoveryBackoff,
877    /// Rate limiter for forwarded discovery requests (transit-side).
878    discovery_forward_limiter: DiscoveryForwardRateLimiter,
879
880    // === Pending Transport Connects ===
881    /// Links waiting for transport-level connection establishment before
882    /// sending handshake msg1. For connection-oriented transports (TCP, Tor),
883    /// the transport connect runs in the background; the tick handler polls
884    /// connection_state() and initiates the handshake when connected.
885    pending_connects: Vec<PendingConnect>,
886
887    // === Connection Retry ===
888    /// Retry state for peers whose outbound connections have failed.
889    /// Keyed by NodeAddr. Entries are created when a handshake times out
890    /// or fails, and removed on successful promotion or when max retries
891    /// are exhausted.
892    retry_pending: HashMap<NodeAddr, retry::RetryState>,
893
894    /// Optional Nostr/STUN overlay discovery coordinator for `udp:nat` peers.
895    nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
896    /// mDNS / DNS-SD responder + browser for local-link peer discovery.
897    /// Identity is unverified at this layer — the Noise XX handshake
898    /// initiated against an mDNS-observed endpoint is what proves the
899    /// peer holds the matching private key.
900    lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
901    /// Same-host JSON registry under `~/.fips/instances`. Records are
902    /// loopback routing hints only; peer identity is still verified by the
903    /// Noise handshake.
904    local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
905    local_instance_started_at_ms: Option<u64>,
906    last_local_instance_publish_ms: Option<u64>,
907    last_local_instance_scan_ms: Option<u64>,
908    /// Wall-clock ms when Nostr discovery successfully started, used to
909    /// schedule the one-shot startup advert sweep after a settle delay.
910    /// `None` until discovery comes up; remains `None` if discovery is
911    /// disabled or failed to start.
912    nostr_discovery_started_at_ms: Option<u64>,
913    /// Whether the one-shot startup advert sweep has run. Set to true
914    /// after the first sweep fires (under `policy: open`); thereafter
915    /// only the per-tick `queue_open_discovery_retries` continues.
916    startup_open_discovery_sweep_done: bool,
917    /// Per-peer UDP transports adopted from NAT traversal handoff.
918    bootstrap_transports: HashSet<TransportId>,
919    /// Originating peer npub (bech32) for each adopted bootstrap
920    /// transport, captured at `adopt_established_traversal` time.
921    /// Populated alongside `bootstrap_transports`; cleared in
922    /// `cleanup_bootstrap_transport_if_unused`. Used by the rx loop to
923    /// route fatal-protocol-mismatch observations back to the
924    /// Nostr-discovery `failure_state` for long cooldown application.
925    bootstrap_transport_npubs: HashMap<TransportId, String>,
926    /// Peers that should not be used as reply-learned fallback transit for
927    /// other destinations. Direct lookups to the peer are still permitted.
928    discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
929
930    // === Periodic Parent Re-evaluation ===
931    /// Timestamp of last periodic parent re-evaluation (for pacing).
932    last_parent_reeval: Option<crate::time::Instant>,
933
934    // === Congestion Logging ===
935    /// Timestamp of last congestion detection log (rate-limited to 5s).
936    last_congestion_log: Option<std::time::Instant>,
937
938    // === Mesh Size Estimate ===
939    /// Cached estimated mesh size (computed once per tick from bloom filters).
940    estimated_mesh_size: Option<u64>,
941    /// Timestamp of last mesh size log emission.
942    last_mesh_size_log: Option<std::time::Instant>,
943
944    // === Bloom Self-Plausibility ===
945    /// Rate-limit state for the self-plausibility WARN. Fires at most
946    /// once per 60s globally when our own outgoing FilterAnnounce has
947    /// an FPR above `node.bloom.max_inbound_fpr`, signalling either
948    /// aggregation drift or an ingress bypass.
949    last_self_warn: Option<std::time::Instant>,
950
951    // === Local Outbound Liveness ===
952    /// Set per peer when a `transport.send` returned a local-side io error
953    /// (`NetworkUnreachable` / `HostUnreachable` / `AddrNotAvailable`),
954    /// cleared on the next successful send to that peer. Used by
955    /// `check_link_heartbeats` to compress only that peer's dead-timeout to
956    /// `fast_link_dead_timeout_secs` while its outbound is observed broken.
957    local_send_failure_at_by_peer: HashMap<NodeAddr, std::time::Instant>,
958    /// Set when the rx loop could not complete its 1s maintenance work
959    /// inside the watchdog timeout. Link-dead detection may be valid during
960    /// overload, but traversal cooldown should not punish a path just because
961    /// our own scheduler/worker queue was late.
962    last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
963
964    // === Display Names ===
965    /// Human-readable names for configured peers (alias or short npub).
966    /// Populated at startup from peer config.
967    peer_aliases: HashMap<NodeAddr, String>,
968    /// Scheduler weight for explicitly configured peers. Built when config
969    /// changes so the packet hot path only does a NodeAddr hash lookup.
970    configured_peer_send_weights: HashMap<NodeAddr, u8>,
971
972    /// Reloadable peer ACL state from standard allow/deny files.
973    peer_acl: acl::PeerAclReloader,
974
975    // === Host Map ===
976    /// Static hostname → npub mapping for DNS resolution.
977    /// Built at construction from peer aliases and /etc/fips/hosts.
978    host_map: Arc<HostMap>,
979}
980
981impl Node {
982    /// Create a new node from configuration.
983    pub fn new(config: Config) -> Result<Self, NodeError> {
984        config.validate()?;
985        let identity = config.create_identity()?;
986        let node_addr = *identity.node_addr();
987        let is_leaf_only = config.is_leaf_only();
988
989        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
990        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
991
992        let mut startup_epoch = [0u8; 8];
993        rand::rng().fill_bytes(&mut startup_epoch);
994
995        let mut bloom_state = if is_leaf_only {
996            BloomState::leaf_only(node_addr)
997        } else {
998            BloomState::new(node_addr)
999        };
1000        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
1001
1002        let tun_state = if config.tun.enabled {
1003            TunState::Configured
1004        } else {
1005            TunState::Disabled
1006        };
1007
1008        // Initialize tree state with signed self-declaration
1009        let mut tree_state = TreeState::new(node_addr);
1010        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
1011        tree_state.set_hold_down(config.node.tree.hold_down_secs);
1012        tree_state.set_flap_dampening(
1013            config.node.tree.flap_threshold,
1014            config.node.tree.flap_window_secs,
1015            config.node.tree.flap_dampening_secs,
1016        );
1017        tree_state
1018            .sign_declaration(&identity)
1019            .expect("signing own declaration should never fail");
1020
1021        let coord_cache = CoordCache::new(
1022            config.node.cache.coord_size,
1023            config.node.cache.coord_ttl_secs * 1000,
1024        );
1025        let rl = &config.node.rate_limit;
1026        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
1027            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
1028            config.node.limits.max_pending_inbound,
1029        );
1030
1031        let max_connections = config.node.limits.max_connections;
1032        let max_peers = config.node.limits.max_peers;
1033        let max_links = config.node.limits.max_links;
1034        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1035        let backoff_base_secs = config.node.discovery.backoff_base_secs;
1036        let backoff_max_secs = config.node.discovery.backoff_max_secs;
1037        let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
1038
1039        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1040        let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1041
1042        Ok(Self {
1043            identity,
1044            startup_epoch,
1045            started_at: std::time::Instant::now(),
1046            config,
1047            state: NodeState::Created,
1048            is_leaf_only,
1049            tree_state,
1050            bloom_state,
1051            coord_cache,
1052            learned_routes: LearnedRouteTable::default(),
1053            session_direct_degraded_until_ms: HashMap::new(),
1054            recent_requests: HashMap::new(),
1055            transports: HashMap::new(),
1056            transport_drops: HashMap::new(),
1057            links: HashMap::new(),
1058            addr_to_link: HashMap::new(),
1059            packet_tx: None,
1060            packet_rx: None,
1061            connections: HashMap::new(),
1062            peers: HashMap::new(),
1063            sessions: HashMap::new(),
1064            identity_cache: HashMap::new(),
1065            pending_tun_packets: HashMap::new(),
1066            pending_endpoint_data: HashMap::new(),
1067            pending_lookups: HashMap::new(),
1068            max_connections,
1069            max_peers,
1070            max_links,
1071            next_link_id: 1,
1072            next_transport_id: 1,
1073            stats: stats::NodeStats::new(),
1074            stats_history: stats_history::StatsHistory::new(),
1075            tun_state,
1076            tun_name: None,
1077            tun_tx: None,
1078            tun_outbound_rx: None,
1079            external_packet_tx: None,
1080            endpoint_command_rx: None,
1081            endpoint_event_tx: None,
1082            encrypt_workers: None,
1083            decrypt_workers: None,
1084            decrypt_registered_sessions: std::collections::HashSet::new(),
1085            decrypt_fallback_tx,
1086            decrypt_fallback_rx,
1087            tun_reader_handle: None,
1088            tun_writer_handle: None,
1089            #[cfg(target_os = "macos")]
1090            tun_shutdown_fd: None,
1091            dns_identity_rx: None,
1092            dns_task: None,
1093            index_allocator: IndexAllocator::new(),
1094            peers_by_index: HashMap::new(),
1095            pending_outbound: HashMap::new(),
1096            msg1_rate_limiter,
1097            icmp_rate_limiter: IcmpRateLimiter::new(),
1098            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1099            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1100                std::time::Duration::from_millis(coords_response_interval_ms),
1101            ),
1102            discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
1103            discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
1104                std::time::Duration::from_secs(forward_min_interval_secs),
1105            ),
1106            pending_connects: Vec::new(),
1107            retry_pending: HashMap::new(),
1108            nostr_discovery: None,
1109            nostr_discovery_started_at_ms: None,
1110            lan_discovery: None,
1111            local_instance_registry: None,
1112            local_instance_started_at_ms: None,
1113            last_local_instance_publish_ms: None,
1114            last_local_instance_scan_ms: None,
1115            startup_open_discovery_sweep_done: false,
1116            bootstrap_transports: HashSet::new(),
1117            bootstrap_transport_npubs: HashMap::new(),
1118            discovery_fallback_transit_blocked_peers: HashSet::new(),
1119            last_parent_reeval: None,
1120            last_congestion_log: None,
1121            estimated_mesh_size: None,
1122            last_mesh_size_log: None,
1123            last_self_warn: None,
1124            local_send_failure_at_by_peer: HashMap::new(),
1125            last_rx_loop_maintenance_timeout_at: None,
1126            peer_aliases: HashMap::new(),
1127            configured_peer_send_weights,
1128            peer_acl,
1129            host_map,
1130            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1131        })
1132    }
1133
1134    /// Create a node with a specific identity.
1135    ///
1136    /// This constructor validates cross-field config invariants before
1137    /// constructing the node, same as [`Node::new`].
1138    pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
1139        config.validate()?;
1140        let node_addr = *identity.node_addr();
1141
1142        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
1143        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
1144
1145        let mut startup_epoch = [0u8; 8];
1146        rand::rng().fill_bytes(&mut startup_epoch);
1147
1148        let tun_state = if config.tun.enabled {
1149            TunState::Configured
1150        } else {
1151            TunState::Disabled
1152        };
1153
1154        // Initialize tree state with signed self-declaration
1155        let mut tree_state = TreeState::new(node_addr);
1156        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
1157        tree_state.set_hold_down(config.node.tree.hold_down_secs);
1158        tree_state.set_flap_dampening(
1159            config.node.tree.flap_threshold,
1160            config.node.tree.flap_window_secs,
1161            config.node.tree.flap_dampening_secs,
1162        );
1163        tree_state
1164            .sign_declaration(&identity)
1165            .expect("signing own declaration should never fail");
1166
1167        let mut bloom_state = BloomState::new(node_addr);
1168        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
1169
1170        let coord_cache = CoordCache::new(
1171            config.node.cache.coord_size,
1172            config.node.cache.coord_ttl_secs * 1000,
1173        );
1174        let rl = &config.node.rate_limit;
1175        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
1176            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
1177            config.node.limits.max_pending_inbound,
1178        );
1179
1180        let max_connections = config.node.limits.max_connections;
1181        let max_peers = config.node.limits.max_peers;
1182        let max_links = config.node.limits.max_links;
1183        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1184
1185        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1186        let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1187
1188        Ok(Self {
1189            identity,
1190            startup_epoch,
1191            started_at: std::time::Instant::now(),
1192            config,
1193            state: NodeState::Created,
1194            is_leaf_only: false,
1195            tree_state,
1196            bloom_state,
1197            coord_cache,
1198            learned_routes: LearnedRouteTable::default(),
1199            session_direct_degraded_until_ms: HashMap::new(),
1200            recent_requests: HashMap::new(),
1201            transports: HashMap::new(),
1202            transport_drops: HashMap::new(),
1203            links: HashMap::new(),
1204            addr_to_link: HashMap::new(),
1205            packet_tx: None,
1206            packet_rx: None,
1207            connections: HashMap::new(),
1208            peers: HashMap::new(),
1209            sessions: HashMap::new(),
1210            identity_cache: HashMap::new(),
1211            pending_tun_packets: HashMap::new(),
1212            pending_endpoint_data: HashMap::new(),
1213            pending_lookups: HashMap::new(),
1214            max_connections,
1215            max_peers,
1216            max_links,
1217            next_link_id: 1,
1218            next_transport_id: 1,
1219            stats: stats::NodeStats::new(),
1220            stats_history: stats_history::StatsHistory::new(),
1221            tun_state,
1222            tun_name: None,
1223            tun_tx: None,
1224            tun_outbound_rx: None,
1225            external_packet_tx: None,
1226            endpoint_command_rx: None,
1227            endpoint_event_tx: None,
1228            encrypt_workers: None,
1229            decrypt_workers: None,
1230            decrypt_registered_sessions: std::collections::HashSet::new(),
1231            decrypt_fallback_tx,
1232            decrypt_fallback_rx,
1233            tun_reader_handle: None,
1234            tun_writer_handle: None,
1235            #[cfg(target_os = "macos")]
1236            tun_shutdown_fd: None,
1237            dns_identity_rx: None,
1238            dns_task: None,
1239            index_allocator: IndexAllocator::new(),
1240            peers_by_index: HashMap::new(),
1241            pending_outbound: HashMap::new(),
1242            msg1_rate_limiter,
1243            icmp_rate_limiter: IcmpRateLimiter::new(),
1244            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1245            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1246                std::time::Duration::from_millis(coords_response_interval_ms),
1247            ),
1248            discovery_backoff: DiscoveryBackoff::new(),
1249            discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1250            pending_connects: Vec::new(),
1251            retry_pending: HashMap::new(),
1252            nostr_discovery: None,
1253            nostr_discovery_started_at_ms: None,
1254            lan_discovery: None,
1255            local_instance_registry: None,
1256            local_instance_started_at_ms: None,
1257            last_local_instance_publish_ms: None,
1258            last_local_instance_scan_ms: None,
1259            startup_open_discovery_sweep_done: false,
1260            bootstrap_transports: HashSet::new(),
1261            bootstrap_transport_npubs: HashMap::new(),
1262            discovery_fallback_transit_blocked_peers: HashSet::new(),
1263            last_parent_reeval: None,
1264            last_congestion_log: None,
1265            estimated_mesh_size: None,
1266            last_mesh_size_log: None,
1267            last_self_warn: None,
1268            local_send_failure_at_by_peer: HashMap::new(),
1269            last_rx_loop_maintenance_timeout_at: None,
1270            peer_aliases: HashMap::new(),
1271            configured_peer_send_weights,
1272            peer_acl,
1273            host_map,
1274            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1275        })
1276    }
1277
1278    /// Create a leaf-only node (simplified state).
1279    pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1280        let mut node = Self::new(config)?;
1281        node.is_leaf_only = true;
1282        node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1283        Ok(node)
1284    }
1285
1286    fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1287        let base_host_map = HostMap::from_peer_configs(config.peers());
1288        if !config.node.system_files_enabled {
1289            return (
1290                Arc::new(base_host_map.clone()),
1291                acl::PeerAclReloader::memory_only(base_host_map),
1292            );
1293        }
1294
1295        let mut host_map = base_host_map.clone();
1296        let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1297        let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1298            crate::upper::hosts::DEFAULT_HOSTS_PATH,
1299        ));
1300        host_map.merge(hosts_file);
1301        let peer_acl = acl::PeerAclReloader::with_alias_sources(
1302            std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1303            std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1304            base_host_map,
1305            hosts_path,
1306        );
1307        (Arc::new(host_map), peer_acl)
1308    }
1309
1310    fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1311        config
1312            .peers()
1313            .iter()
1314            .filter_map(|peer| {
1315                PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1316                    (
1317                        *identity.node_addr(),
1318                        encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1319                    )
1320                })
1321            })
1322            .collect()
1323    }
1324
1325    #[cfg(unix)]
1326    fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1327        self.configured_peer_send_weights
1328            .get(peer_addr)
1329            .copied()
1330            .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1331    }
1332
1333    /// Create transport instances from configuration.
1334    ///
1335    /// Returns a vector of TransportHandles for all configured transports.
1336    async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1337        let mut transports = Vec::new();
1338
1339        // Collect UDP configs with optional names to avoid borrow conflicts
1340        let udp_instances: Vec<_> = self
1341            .config
1342            .transports
1343            .udp
1344            .iter()
1345            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1346            .collect();
1347
1348        // Create UDP transport instances
1349        for (name, udp_config) in udp_instances {
1350            let transport_id = self.allocate_transport_id();
1351            let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1352            transports.push(TransportHandle::Udp(udp));
1353        }
1354
1355        #[cfg(feature = "sim-transport")]
1356        {
1357            let sim_instances: Vec<_> = self
1358                .config
1359                .transports
1360                .sim
1361                .iter()
1362                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1363                .collect();
1364
1365            for (name, sim_config) in sim_instances {
1366                let transport_id = self.allocate_transport_id();
1367                let sim = crate::transport::sim::SimTransport::new(
1368                    transport_id,
1369                    name,
1370                    sim_config,
1371                    packet_tx.clone(),
1372                );
1373                transports.push(TransportHandle::Sim(sim));
1374            }
1375        }
1376
1377        // Create Ethernet transport instances where raw-socket support exists.
1378        #[cfg(any(target_os = "linux", target_os = "macos"))]
1379        {
1380            let eth_instances: Vec<_> = self
1381                .config
1382                .transports
1383                .ethernet
1384                .iter()
1385                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1386                .collect();
1387            let xonly = self.identity.pubkey();
1388            for (name, eth_config) in eth_instances {
1389                let mut eth_config = eth_config;
1390                if eth_config.discovery_scope.is_none() {
1391                    eth_config.discovery_scope = self.lan_discovery_scope();
1392                }
1393                let transport_id = self.allocate_transport_id();
1394                let mut eth =
1395                    EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1396                eth.set_local_pubkey(xonly);
1397                transports.push(TransportHandle::Ethernet(eth));
1398            }
1399        }
1400
1401        // Create TCP transport instances
1402        let tcp_instances: Vec<_> = self
1403            .config
1404            .transports
1405            .tcp
1406            .iter()
1407            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1408            .collect();
1409
1410        for (name, tcp_config) in tcp_instances {
1411            let transport_id = self.allocate_transport_id();
1412            let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1413            transports.push(TransportHandle::Tcp(tcp));
1414        }
1415
1416        // Create Tor transport instances
1417        let tor_instances: Vec<_> = self
1418            .config
1419            .transports
1420            .tor
1421            .iter()
1422            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1423            .collect();
1424
1425        for (name, tor_config) in tor_instances {
1426            let transport_id = self.allocate_transport_id();
1427            let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1428            transports.push(TransportHandle::Tor(tor));
1429        }
1430
1431        let webrtc_instances: Vec<_> = self
1432            .config
1433            .transports
1434            .webrtc
1435            .iter()
1436            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1437            .collect();
1438
1439        #[cfg(feature = "webrtc-transport")]
1440        {
1441            for (name, webrtc_config) in webrtc_instances {
1442                let transport_id = self.allocate_transport_id();
1443                match WebRtcTransport::new(
1444                    transport_id,
1445                    name,
1446                    webrtc_config,
1447                    packet_tx.clone(),
1448                    &self.identity,
1449                    &self.config.node.discovery.nostr,
1450                ) {
1451                    Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1452                    Err(err) => {
1453                        warn!(
1454                            transport_id = %transport_id,
1455                            error = %err,
1456                            "failed to initialize WebRTC transport"
1457                        );
1458                    }
1459                }
1460            }
1461        }
1462        #[cfg(not(feature = "webrtc-transport"))]
1463        if !webrtc_instances.is_empty() {
1464            warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1465        }
1466
1467        // Create BLE transport instances
1468        #[cfg(bluer_available)]
1469        {
1470            let ble_instances: Vec<_> = self
1471                .config
1472                .transports
1473                .ble
1474                .iter()
1475                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1476                .collect();
1477
1478            #[cfg(all(bluer_available, not(test)))]
1479            for (name, ble_config) in ble_instances {
1480                let transport_id = self.allocate_transport_id();
1481                let adapter = ble_config.adapter().to_string();
1482                let mtu = ble_config.mtu();
1483                match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1484                    Ok(io) => {
1485                        let mut ble = crate::transport::ble::BleTransport::new(
1486                            transport_id,
1487                            name,
1488                            ble_config,
1489                            io,
1490                            packet_tx.clone(),
1491                        );
1492                        ble.set_local_pubkey(self.identity.pubkey().serialize());
1493                        transports.push(TransportHandle::Ble(ble));
1494                    }
1495                    Err(e) => {
1496                        tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1497                    }
1498                }
1499            }
1500
1501            #[cfg(any(not(bluer_available), test))]
1502            if !ble_instances.is_empty() {
1503                #[cfg(not(test))]
1504                tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1505            }
1506        }
1507
1508        transports
1509    }
1510
1511    /// Find an operational transport that matches the given transport type name.
1512    ///
1513    /// Adopted UDP bootstrap transports are point-to-point sockets handed off
1514    /// from Nostr/STUN traversal. They must not be reused for ordinary
1515    /// `udp host:port` dials discovered through static config, mDNS, or overlay
1516    /// adverts: on macOS a `send_to` through the wrong adopted socket can fail
1517    /// with `EINVAL`, and even on platforms that allow it the packet would use
1518    /// the wrong 5-tuple/NAT mapping. Prefer configured transports and make the
1519    /// choice deterministic by lowest transport id instead of HashMap order.
1520    fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1521        self.transports
1522            .iter()
1523            .filter(|(id, handle)| {
1524                handle.transport_type().name == transport_type
1525                    && handle.is_operational()
1526                    && !self.bootstrap_transports.contains(id)
1527            })
1528            .min_by_key(|(id, _)| id.as_u32())
1529            .map(|(id, _)| *id)
1530    }
1531
1532    /// Resolve an Ethernet peer address ("interface/mac") to a transport ID
1533    /// and binary TransportAddr.
1534    ///
1535    /// Finds the Ethernet transport instance bound to the named interface
1536    /// and parses the MAC portion into a 6-byte TransportAddr.
1537    #[allow(unused_variables)]
1538    fn resolve_ethernet_addr(
1539        &self,
1540        addr_str: &str,
1541    ) -> Result<(TransportId, TransportAddr), NodeError> {
1542        #[cfg(any(target_os = "linux", target_os = "macos"))]
1543        {
1544            let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1545                NodeError::NoTransportForType(format!(
1546                    "invalid Ethernet address format '{}': expected 'interface/mac'",
1547                    addr_str
1548                ))
1549            })?;
1550
1551            // Find the Ethernet transport bound to this interface
1552            let transport_id = self
1553                .transports
1554                .iter()
1555                .find(|(_, handle)| {
1556                    handle.transport_type().name == "ethernet"
1557                        && handle.is_operational()
1558                        && handle.interface_name() == Some(iface)
1559                })
1560                .map(|(id, _)| *id)
1561                .ok_or_else(|| {
1562                    NodeError::NoTransportForType(format!(
1563                        "no operational Ethernet transport for interface '{}'",
1564                        iface
1565                    ))
1566                })?;
1567
1568            let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1569                NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1570            })?;
1571
1572            Ok((transport_id, TransportAddr::from_bytes(&mac)))
1573        }
1574        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1575        {
1576            Err(NodeError::NoTransportForType(
1577                "Ethernet transport is not supported on this platform".to_string(),
1578            ))
1579        }
1580    }
1581
1582    /// Resolve a BLE address string (`"adapter/AA:BB:CC:DD:EE:FF"`) to a
1583    /// (TransportId, TransportAddr) pair by finding the BLE transport
1584    /// instance matching the adapter name.
1585    #[cfg(bluer_available)]
1586    fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1587        let ta = TransportAddr::from_string(addr_str);
1588        let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1589            NodeError::NoTransportForType(format!(
1590                "invalid BLE address format '{}': expected 'adapter/mac'",
1591                addr_str
1592            ))
1593        })?;
1594
1595        // Find the BLE transport for this adapter
1596        let transport_id = self
1597            .transports
1598            .iter()
1599            .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1600            .map(|(id, _)| *id)
1601            .ok_or_else(|| {
1602                NodeError::NoTransportForType(format!(
1603                    "no operational BLE transport for adapter '{}'",
1604                    adapter
1605                ))
1606            })?;
1607
1608        // Validate the address format
1609        crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1610            NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1611        })?;
1612
1613        Ok((transport_id, TransportAddr::from_string(addr_str)))
1614    }
1615
1616    // === Identity Accessors ===
1617
1618    /// Get this node's identity.
1619    pub fn identity(&self) -> &Identity {
1620        &self.identity
1621    }
1622
1623    /// Get this node's NodeAddr.
1624    pub fn node_addr(&self) -> &NodeAddr {
1625        self.identity.node_addr()
1626    }
1627
1628    /// Get this node's npub.
1629    pub fn npub(&self) -> String {
1630        self.identity.npub()
1631    }
1632
1633    /// Return a human-readable display name for a NodeAddr.
1634    ///
1635    /// Lookup order:
1636    /// 1. Host map hostname (from peer aliases + /etc/fips/hosts)
1637    /// 2. Configured peer alias or short npub (from startup map)
1638    /// 3. Active peer's short npub (e.g., inbound peer not in config)
1639    /// 4. Session endpoint's short npub (end-to-end, may not be direct peer)
1640    /// 5. Truncated NodeAddr hex (unknown address)
1641    pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1642        if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1643            return hostname.to_string();
1644        }
1645        if let Some(name) = self.peer_aliases.get(addr) {
1646            return name.clone();
1647        }
1648        if let Some(peer) = self.peers.get(addr) {
1649            return peer.identity().short_npub();
1650        }
1651        if let Some(entry) = self.sessions.get(addr) {
1652            let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1653            return PeerIdentity::from_pubkey(xonly).short_npub();
1654        }
1655        addr.short_hex()
1656    }
1657
1658    /// Tear down a `peers_by_index` entry **and** keep the shard-owned
1659    /// decrypt-worker state coherent: removes the same `cache_key`
1660    /// from the registered-sessions tracking set and tells the
1661    /// assigned shard worker to drop its `OwnedSessionState` entry.
1662    ///
1663    /// Use this instead of a bare `self.peers_by_index.remove(&key)`
1664    /// at every session-lifecycle teardown site (rekey cross-connection
1665    /// swap, peer disconnect, dispatch session-rotation) so the worker
1666    /// doesn't keep stale ciphers / replay windows around. The
1667    /// follow-up `RegisterSession` for the NEW key (if any) will then
1668    /// install the fresh state on the same shard.
1669    pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1670        // Find the peer that owns this index BEFORE removing it from
1671        // the index map, so we can decide whether the deregistration
1672        // also tears down the peer's connected UDP socket.
1673        let owning_peer = self.peers_by_index.get(&cache_key).copied();
1674        self.peers_by_index.remove(&cache_key);
1675        if self.decrypt_registered_sessions.remove(&cache_key)
1676            && let Some(workers) = self.decrypt_workers.as_ref()
1677        {
1678            workers.unregister_session(cache_key);
1679        }
1680        // Tear down the per-peer connected UDP socket *only* if no
1681        // other peers_by_index entry still resolves to this peer.
1682        // Rekey drain calls into this helper with the OLD session
1683        // index while the NEW index is already installed and points
1684        // at the same peer — there the connect()-ed 5-tuple is
1685        // still valid for the new session and we must not close it.
1686        // Peer-teardown sites (CrossConnection swap, stale-index
1687        // fall-through in encrypted.rs, disconnect handler) call
1688        // here when this is the peer's last index, so the connected
1689        // socket goes away with the peer.
1690        if let Some(peer_addr) = owning_peer {
1691            let peer_has_other_index = self
1692                .peers_by_index
1693                .values()
1694                .any(|other| *other == peer_addr);
1695            if !peer_has_other_index {
1696                self.clear_connected_udp_for_peer(&peer_addr);
1697            }
1698        }
1699    }
1700
1701    /// Ensure the current FMP receive index resolves to this peer.
1702    ///
1703    /// Rekey msg1/msg2 handlers pre-register the pending index before
1704    /// cutover, but losing that registration in a debug build used to
1705    /// panic in the cutover path. Repairing the map here is safe: the
1706    /// peer has already promoted the pending session, and the decrypt
1707    /// worker registration immediately after cutover depends on the
1708    /// same `(transport_id, our_index)` key.
1709    pub(in crate::node) fn ensure_current_session_index_registered(
1710        &mut self,
1711        node_addr: &NodeAddr,
1712        context: &'static str,
1713    ) -> bool {
1714        let Some(peer) = self.peers.get(node_addr) else {
1715            return false;
1716        };
1717        let Some(transport_id) = peer.transport_id() else {
1718            warn!(
1719                peer = %self.peer_display_name(node_addr),
1720                context,
1721                "Cannot register current session index without transport id"
1722            );
1723            return false;
1724        };
1725        let Some(our_index) = peer.our_index() else {
1726            warn!(
1727                peer = %self.peer_display_name(node_addr),
1728                context,
1729                "Cannot register current session index without local index"
1730            );
1731            return false;
1732        };
1733
1734        let cache_key = (transport_id, our_index.as_u32());
1735        match self.peers_by_index.get(&cache_key).copied() {
1736            Some(existing) if existing == *node_addr => true,
1737            Some(existing) => {
1738                warn!(
1739                    peer = %self.peer_display_name(node_addr),
1740                    previous_owner = %self.peer_display_name(&existing),
1741                    transport_id = %transport_id,
1742                    our_index = %our_index,
1743                    context,
1744                    "Repairing current session index with stale owner"
1745                );
1746                self.peers_by_index.insert(cache_key, *node_addr);
1747                true
1748            }
1749            None => {
1750                warn!(
1751                    peer = %self.peer_display_name(node_addr),
1752                    transport_id = %transport_id,
1753                    our_index = %our_index,
1754                    context,
1755                    "Repairing missing current session index"
1756                );
1757                self.peers_by_index.insert(cache_key, *node_addr);
1758                true
1759            }
1760        }
1761    }
1762
1763    // === Configuration ===
1764
1765    /// Get the configuration.
1766    pub fn config(&self) -> &Config {
1767        &self.config
1768    }
1769
1770    /// Calculate the effective IPv6 MTU that can be sent over FIPS.
1771    ///
1772    /// Delegates to `upper::icmp::effective_ipv6_mtu()` with this node's
1773    /// transport MTU. Returns the maximum IPv6 packet size (including
1774    /// IPv6 header) that can be transmitted through the FIPS mesh.
1775    pub fn effective_ipv6_mtu(&self) -> u16 {
1776        crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1777    }
1778
1779    /// Get the transport MTU governing the global TUN-boundary MSS clamp.
1780    ///
1781    /// Returns the **minimum** MTU across all operational transports, or
1782    /// 1280 (IPv6 minimum) as fallback. Used for initial TUN configuration
1783    /// where a specific egress transport isn't yet known: the resulting
1784    /// `effective_ipv6_mtu` (transport_mtu - 77) and `max_mss`
1785    /// (effective_mtu - 60) form a conservative ceiling that fits ANY
1786    /// configured-transport's egress, eliminating PMTU-D black holes that
1787    /// would otherwise occur when a flow's actual egress is smaller than
1788    /// the clamp ceiling assumed at TUN init.
1789    ///
1790    /// Returning the smallest (rather than the first-iterated, which used
1791    /// to vary across HashMap iteration order + async-startup race) makes
1792    /// the clamp deterministic across daemon restarts.
1793    ///
1794    /// See `ISSUE-2026-0011` for the empirical investigation.
1795    pub fn transport_mtu(&self) -> u16 {
1796        let min_operational = self
1797            .transports
1798            .values()
1799            .filter(|h| h.is_operational())
1800            .map(|h| h.mtu())
1801            .min();
1802        if let Some(mtu) = min_operational {
1803            return mtu;
1804        }
1805        // Fallback to config: try UDP first, then Ethernet
1806        if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1807            return cfg.mtu();
1808        }
1809        1280
1810    }
1811
1812    // === State ===
1813
1814    /// Get the node state.
1815    pub fn state(&self) -> NodeState {
1816        self.state
1817    }
1818
1819    /// Get the node uptime.
1820    pub fn uptime(&self) -> std::time::Duration {
1821        self.started_at.elapsed()
1822    }
1823
1824    /// Check if node is operational.
1825    pub fn is_running(&self) -> bool {
1826        self.state.is_operational()
1827    }
1828
1829    /// Check if this is a leaf-only node.
1830    pub fn is_leaf_only(&self) -> bool {
1831        self.is_leaf_only
1832    }
1833
1834    // === Tree State ===
1835
1836    /// Get the tree state.
1837    pub fn tree_state(&self) -> &TreeState {
1838        &self.tree_state
1839    }
1840
1841    /// Get mutable tree state.
1842    pub fn tree_state_mut(&mut self) -> &mut TreeState {
1843        &mut self.tree_state
1844    }
1845
1846    // === Bloom State ===
1847
1848    /// Get the Bloom filter state.
1849    pub fn bloom_state(&self) -> &BloomState {
1850        &self.bloom_state
1851    }
1852
1853    /// Get mutable Bloom filter state.
1854    pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1855        &mut self.bloom_state
1856    }
1857
1858    // === Mesh Size Estimate ===
1859
1860    /// Get the cached estimated mesh size.
1861    pub fn estimated_mesh_size(&self) -> Option<u64> {
1862        self.estimated_mesh_size
1863    }
1864
1865    /// Compute and cache the estimated mesh size from bloom filters.
1866    ///
1867    /// Uses the spanning tree partition: parent's filter covers nodes reachable
1868    /// upward, children's filters cover disjoint subtrees downward. The sum
1869    /// of estimated entry counts plus one (self) approximates total network size.
1870    pub(crate) fn compute_mesh_size(&mut self) {
1871        let my_addr = *self.tree_state.my_node_addr();
1872        let parent_id = *self.tree_state.my_declaration().parent_id();
1873        let is_root = self.tree_state.is_root();
1874
1875        let max_fpr = self.config.node.bloom.max_inbound_fpr;
1876        let mut total: f64 = 1.0; // count self
1877        let mut child_count: u32 = 0;
1878        let mut has_data = false;
1879
1880        // Parent's filter: nodes reachable upward through the tree.
1881        // If any contributing filter is above the FPR cap, we refuse to
1882        // estimate rather than substitute a partial/biased aggregate —
1883        // Node.estimated_mesh_size is already Option<u64> and consumers
1884        // (control socket, fipstop, periodic debug log) handle None.
1885        if !is_root
1886            && let Some(parent) = self.peers.get(&parent_id)
1887            && let Some(filter) = parent.inbound_filter()
1888        {
1889            match filter.estimated_count(max_fpr) {
1890                Some(n) => {
1891                    total += n;
1892                    has_data = true;
1893                }
1894                None => {
1895                    self.estimated_mesh_size = None;
1896                    return;
1897                }
1898            }
1899        }
1900
1901        // Children's filters: each child's subtree is disjoint
1902        for (peer_addr, peer) in &self.peers {
1903            if peer_addr == &parent_id {
1904                continue;
1905            }
1906            if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1907                && *decl.parent_id() == my_addr
1908            {
1909                child_count += 1;
1910                if let Some(filter) = peer.inbound_filter() {
1911                    match filter.estimated_count(max_fpr) {
1912                        Some(n) => {
1913                            total += n;
1914                            has_data = true;
1915                        }
1916                        None => {
1917                            self.estimated_mesh_size = None;
1918                            return;
1919                        }
1920                    }
1921                }
1922            }
1923        }
1924
1925        if !has_data {
1926            self.estimated_mesh_size = None;
1927            return;
1928        }
1929
1930        let size = total.round() as u64;
1931        self.estimated_mesh_size = Some(size);
1932
1933        // Periodic logging (reuse MMP default interval: 30s)
1934        let now = std::time::Instant::now();
1935        let should_log = match self.last_mesh_size_log {
1936            None => true,
1937            Some(last) => {
1938                now.duration_since(last)
1939                    >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1940            }
1941        };
1942        if should_log {
1943            tracing::debug!(
1944                estimated_mesh_size = size,
1945                peers = self.peers.len(),
1946                children = child_count,
1947                "Mesh size estimate"
1948            );
1949            self.last_mesh_size_log = Some(now);
1950        }
1951    }
1952
1953    // === Coord Cache ===
1954
1955    /// Get the coordinate cache.
1956    pub fn coord_cache(&self) -> &CoordCache {
1957        &self.coord_cache
1958    }
1959
1960    /// Get mutable coordinate cache.
1961    pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1962        &mut self.coord_cache
1963    }
1964
1965    // === Node Statistics ===
1966
1967    /// Get the node statistics.
1968    pub fn stats(&self) -> &stats::NodeStats {
1969        &self.stats
1970    }
1971
1972    /// Get mutable node statistics.
1973    pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1974        &mut self.stats
1975    }
1976
1977    /// Get the stats history collector.
1978    pub fn stats_history(&self) -> &stats_history::StatsHistory {
1979        &self.stats_history
1980    }
1981
1982    /// Sample the current node state into the stats history ring.
1983    /// Called once per tick from the RX loop.
1984    pub(crate) fn record_stats_history(&mut self) {
1985        let fwd = &self.stats.forwarding;
1986        let peers_with_mmp: Vec<f64> = self
1987            .peers
1988            .values()
1989            .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1990            .collect();
1991        let loss_rate = if peers_with_mmp.is_empty() {
1992            0.0
1993        } else {
1994            peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1995        };
1996
1997        let snap = stats_history::Snapshot {
1998            mesh_size: self.estimated_mesh_size,
1999            tree_depth: self.tree_state.my_coords().depth() as u32,
2000            peer_count: self.peers.len() as u64,
2001            parent_switches_total: self.stats.tree.parent_switches,
2002            bytes_in_total: fwd.received_bytes,
2003            bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
2004            packets_in_total: fwd.received_packets,
2005            packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
2006            loss_rate,
2007            active_sessions: self.sessions.len() as u64,
2008        };
2009
2010        let now = std::time::Instant::now();
2011        let peer_snaps: Vec<stats_history::PeerSnapshot> = self
2012            .peers
2013            .values()
2014            .map(|p| {
2015                let stats = p.link_stats();
2016                let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
2017                    Some(m) => (
2018                        m.metrics.srtt_ms(),
2019                        Some(m.metrics.loss_rate()),
2020                        m.receiver.ecn_ce_count() as u64,
2021                    ),
2022                    None => (None, None, 0),
2023                };
2024                stats_history::PeerSnapshot {
2025                    node_addr: *p.node_addr(),
2026                    last_seen: now,
2027                    srtt_ms,
2028                    loss_rate,
2029                    bytes_in_total: stats.bytes_recv,
2030                    bytes_out_total: stats.bytes_sent,
2031                    packets_in_total: stats.packets_recv,
2032                    packets_out_total: stats.packets_sent,
2033                    ecn_ce_total: ecn_ce,
2034                }
2035            })
2036            .collect();
2037
2038        self.stats_history.tick(now, &snap, &peer_snaps);
2039    }
2040
2041    // === TUN Interface ===
2042
2043    /// Get the TUN state.
2044    pub fn tun_state(&self) -> TunState {
2045        self.tun_state
2046    }
2047
2048    /// Get the TUN interface name, if active.
2049    pub fn tun_name(&self) -> Option<&str> {
2050        self.tun_name.as_deref()
2051    }
2052
2053    // === Resource Limits ===
2054
2055    /// Set the maximum number of connections (handshake phase).
2056    pub fn set_max_connections(&mut self, max: usize) {
2057        self.max_connections = max;
2058    }
2059
2060    /// Set the maximum number of peers (authenticated).
2061    pub fn set_max_peers(&mut self, max: usize) {
2062        self.max_peers = max;
2063    }
2064
2065    /// Returns false when starting more outbound work would exceed a resource
2066    /// cap. A cap of `0` means uncapped.
2067    pub(crate) fn outbound_admission_check(&self) -> bool {
2068        let connection_used = self
2069            .connections
2070            .len()
2071            .saturating_add(self.pending_connects.len());
2072        let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
2073        let connection_allowed =
2074            self.max_connections == 0 || connection_used < self.max_connections;
2075        let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
2076        peer_allowed && connection_allowed && link_allowed
2077    }
2078
2079    /// Admission for public/open-discovery outbound work. This includes the
2080    /// general connection/link caps and, when open Nostr discovery is enabled,
2081    /// the configured non-peer budget.
2082    pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
2083        if !self.outbound_admission_check() {
2084            return false;
2085        }
2086
2087        let nostr = &self.config.node.discovery.nostr;
2088        if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
2089            return true;
2090        }
2091
2092        let configured_npubs = self
2093            .config
2094            .peers()
2095            .iter()
2096            .map(|peer| peer.npub.clone())
2097            .collect::<HashSet<_>>();
2098        self.open_discovery_enqueue_budget(&configured_npubs) > 0
2099    }
2100
2101    /// Like `outbound_admission_check`, but for racing a better path to a
2102    /// peer that is already authenticated. This may temporarily add a
2103    /// connection/link, but it does not consume a new peer slot.
2104    pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
2105        let connection_used = self
2106            .connections
2107            .len()
2108            .saturating_add(self.pending_connects.len());
2109        let connection_allowed =
2110            self.max_connections == 0 || connection_used < self.max_connections;
2111        let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
2112        connection_allowed && link_allowed
2113    }
2114
2115    /// Set the maximum number of links.
2116    pub fn set_max_links(&mut self, max: usize) {
2117        self.max_links = max;
2118    }
2119
2120    // === Counts ===
2121
2122    /// Number of pending connections (handshake in progress).
2123    pub fn connection_count(&self) -> usize {
2124        self.connections.len()
2125    }
2126
2127    /// Number of authenticated peers.
2128    pub fn peer_count(&self) -> usize {
2129        self.peers.len()
2130    }
2131
2132    /// Number of active links.
2133    pub fn link_count(&self) -> usize {
2134        self.links.len()
2135    }
2136
2137    /// Number of active transports.
2138    pub fn transport_count(&self) -> usize {
2139        self.transports.len()
2140    }
2141
2142    // === Transport Management ===
2143
2144    /// Allocate a new transport ID.
2145    pub fn allocate_transport_id(&mut self) -> TransportId {
2146        let id = TransportId::new(self.next_transport_id);
2147        self.next_transport_id += 1;
2148        id
2149    }
2150
2151    /// Get a transport by ID.
2152    pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
2153        self.transports.get(id)
2154    }
2155
2156    /// Get mutable transport by ID.
2157    pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
2158        self.transports.get_mut(id)
2159    }
2160
2161    /// Iterate over transport IDs.
2162    pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
2163        self.transports.keys()
2164    }
2165
2166    /// Get the packet receiver for the event loop.
2167    pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
2168        self.packet_rx.as_mut()
2169    }
2170
2171    // === Link Management ===
2172
2173    /// Allocate a new link ID.
2174    pub fn allocate_link_id(&mut self) -> LinkId {
2175        let id = LinkId::new(self.next_link_id);
2176        self.next_link_id += 1;
2177        id
2178    }
2179
2180    /// Add a link.
2181    pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
2182        if self.max_links > 0 && self.links.len() >= self.max_links {
2183            return Err(NodeError::MaxLinksExceeded {
2184                max: self.max_links,
2185            });
2186        }
2187        let link_id = link.link_id();
2188        let transport_id = link.transport_id();
2189        let remote_addr = link.remote_addr().clone();
2190
2191        self.links.insert(link_id, link);
2192        self.addr_to_link
2193            .insert((transport_id, remote_addr), link_id);
2194        Ok(())
2195    }
2196
2197    /// Get a link by ID.
2198    pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2199        self.links.get(link_id)
2200    }
2201
2202    /// Get a mutable link by ID.
2203    pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2204        self.links.get_mut(link_id)
2205    }
2206
2207    /// Find link ID by transport address.
2208    pub fn find_link_by_addr(
2209        &self,
2210        transport_id: TransportId,
2211        addr: &TransportAddr,
2212    ) -> Option<LinkId> {
2213        self.addr_to_link
2214            .get(&(transport_id, addr.clone()))
2215            .copied()
2216    }
2217
2218    /// Remove a link.
2219    ///
2220    /// Only removes the addr_to_link reverse lookup if it still points to this
2221    /// link. In cross-connection scenarios, a newer link may have replaced the
2222    /// entry for the same address.
2223    pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2224        if let Some(link) = self.links.remove(link_id) {
2225            // Clean up reverse lookup only if it still maps to this link
2226            let key = (link.transport_id(), link.remote_addr().clone());
2227            if self.addr_to_link.get(&key) == Some(link_id) {
2228                self.addr_to_link.remove(&key);
2229            }
2230            Some(link)
2231        } else {
2232            None
2233        }
2234    }
2235
2236    pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2237        if !self.bootstrap_transports.contains(&transport_id) {
2238            return;
2239        }
2240
2241        let transport_in_use = self
2242            .links
2243            .values()
2244            .any(|link| link.transport_id() == transport_id)
2245            || self
2246                .connections
2247                .values()
2248                .any(|conn| conn.transport_id() == Some(transport_id))
2249            || self
2250                .peers
2251                .values()
2252                .any(|peer| peer.transport_id() == Some(transport_id))
2253            || self
2254                .pending_connects
2255                .iter()
2256                .any(|pending| pending.transport_id == transport_id);
2257
2258        if transport_in_use {
2259            return;
2260        }
2261
2262        tracing::debug!(
2263            transport_id = %transport_id,
2264            "bootstrap transport has no remaining references; dropping"
2265        );
2266
2267        self.bootstrap_transports.remove(&transport_id);
2268        self.bootstrap_transport_npubs.remove(&transport_id);
2269        self.transport_drops.remove(&transport_id);
2270        self.transports.remove(&transport_id);
2271    }
2272
2273    /// Iterate over all links.
2274    pub fn links(&self) -> impl Iterator<Item = &Link> {
2275        self.links.values()
2276    }
2277
2278    // === Connection Management (Handshake Phase) ===
2279
2280    /// Add a pending connection.
2281    pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2282        let link_id = connection.link_id();
2283
2284        if self.connections.contains_key(&link_id) {
2285            return Err(NodeError::ConnectionAlreadyExists(link_id));
2286        }
2287
2288        if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2289            return Err(NodeError::MaxConnectionsExceeded {
2290                max: self.max_connections,
2291            });
2292        }
2293
2294        self.connections.insert(link_id, connection);
2295        Ok(())
2296    }
2297
2298    /// Get a connection by LinkId.
2299    pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2300        self.connections.get(link_id)
2301    }
2302
2303    /// Get a mutable connection by LinkId.
2304    pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2305        self.connections.get_mut(link_id)
2306    }
2307
2308    /// Remove a connection.
2309    pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2310        self.connections.remove(link_id)
2311    }
2312
2313    /// Iterate over all connections.
2314    pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2315        self.connections.values()
2316    }
2317
2318    // === Peer Management (Active Phase) ===
2319
2320    /// Get a peer by NodeAddr.
2321    pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2322        self.peers.get(node_addr)
2323    }
2324
2325    /// Get a mutable peer by NodeAddr.
2326    pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2327        self.peers.get_mut(node_addr)
2328    }
2329
2330    /// Remove a peer.
2331    pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2332        self.peers.remove(node_addr)
2333    }
2334
2335    /// Iterate over all peers.
2336    pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2337        self.peers.values()
2338    }
2339
2340    /// Reference to the Nostr discovery handle if discovery is enabled.
2341    /// Used by control queries (`show_peers` per-peer Nostr-traversal
2342    /// state) to read failure-state without taking shared ownership.
2343    pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2344        self.nostr_discovery.as_deref()
2345    }
2346
2347    /// Iterate over all peer node IDs.
2348    pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2349        self.peers.keys()
2350    }
2351
2352    /// Iterate over peers that can send traffic.
2353    pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2354        self.peers.values().filter(|p| p.can_send())
2355    }
2356
2357    /// Number of peers that can send traffic.
2358    pub fn sendable_peer_count(&self) -> usize {
2359        self.peers.values().filter(|p| p.can_send()).count()
2360    }
2361
2362    pub(crate) fn set_discovery_fallback_transit_allowed(
2363        &mut self,
2364        peer_addr: NodeAddr,
2365        allowed: bool,
2366    ) {
2367        if allowed {
2368            self.discovery_fallback_transit_blocked_peers
2369                .remove(&peer_addr);
2370        } else {
2371            self.discovery_fallback_transit_blocked_peers
2372                .insert(peer_addr);
2373        }
2374    }
2375
2376    pub(crate) fn configured_discovery_fallback_transit(
2377        &self,
2378        peer_addr: &NodeAddr,
2379    ) -> Option<bool> {
2380        self.configured_peer(peer_addr)
2381            .map(|peer| peer.discovery_fallback_transit)
2382    }
2383
2384    pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2385        self.config.peers().iter().find(|peer| {
2386            PeerIdentity::from_npub(&peer.npub)
2387                .ok()
2388                .is_some_and(|identity| identity.node_addr() == peer_addr)
2389        })
2390    }
2391
2392    pub(in crate::node) fn active_peer_uses_configured_static_udp_path(
2393        &self,
2394        peer_addr: &NodeAddr,
2395    ) -> bool {
2396        let Some(peer_config) = self.configured_peer(peer_addr) else {
2397            return false;
2398        };
2399
2400        peer_config.addresses.iter().any(|candidate| {
2401            candidate.seen_at_ms.is_none()
2402                && candidate.transport.eq_ignore_ascii_case("udp")
2403                && self.active_peer_matches_candidate(peer_addr, candidate)
2404        })
2405    }
2406
2407    pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2408        if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2409            return retry_state.peer_config.discovery_fallback_transit;
2410        }
2411
2412        if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2413            return allowed;
2414        }
2415
2416        self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2417    }
2418
2419    // === End-to-End Sessions ===
2420
2421    /// Get a session by remote NodeAddr.
2422    /// Disable the discovery forward rate limiter (for tests).
2423    #[cfg(test)]
2424    pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2425        self.discovery_forward_limiter
2426            .set_interval(std::time::Duration::ZERO);
2427    }
2428
2429    #[cfg(test)]
2430    pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2431        self.sessions.get(remote)
2432    }
2433
2434    /// Get a mutable session by remote NodeAddr.
2435    #[cfg(test)]
2436    pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2437        self.sessions.get_mut(remote)
2438    }
2439
2440    /// Remove a session.
2441    #[cfg(test)]
2442    pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2443        self.sessions.remove(remote)
2444    }
2445
2446    /// Read the path_mtu_lookup entry for a destination FipsAddress.
2447    #[cfg(test)]
2448    pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2449        self.path_mtu_lookup
2450            .read()
2451            .ok()
2452            .and_then(|map| map.get(fips_addr).copied())
2453    }
2454
2455    /// Write a path_mtu_lookup entry directly (for tests that pre-seed the map).
2456    #[cfg(test)]
2457    pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2458        if let Ok(mut map) = self.path_mtu_lookup.write() {
2459            map.insert(fips_addr, mtu);
2460        }
2461    }
2462
2463    /// Number of end-to-end sessions.
2464    pub fn session_count(&self) -> usize {
2465        self.sessions.len()
2466    }
2467
2468    /// Iterate over all session entries (for control queries).
2469    pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2470        self.sessions.iter()
2471    }
2472
2473    // === Identity Cache ===
2474
2475    /// Register a node in the identity cache for FipsAddress → NodeAddr lookup.
2476    pub(crate) fn register_identity(
2477        &mut self,
2478        node_addr: NodeAddr,
2479        pubkey: secp256k1::PublicKey,
2480    ) -> bool {
2481        let mut prefix = [0u8; 15];
2482        prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2483        if let Some(entry) = self.identity_cache.get(&prefix)
2484            && entry.node_addr == node_addr
2485            && entry.pubkey == pubkey
2486        {
2487            // Endpoint sends pass the same PeerIdentity on every packet. Once
2488            // validated, avoid re-deriving NodeAddr from the public key in the
2489            // data path; that hash showed up in macOS sender profiles.
2490            return true;
2491        }
2492
2493        let (xonly, _) = pubkey.x_only_public_key();
2494        let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2495        if derived_node_addr != node_addr {
2496            debug!(
2497                claimed_node_addr = %node_addr,
2498                derived_node_addr = %derived_node_addr,
2499                "Rejected identity cache entry with mismatched public key"
2500            );
2501            return false;
2502        }
2503
2504        let now_ms = Self::now_ms();
2505        if let Some(entry) = self.identity_cache.get_mut(&prefix)
2506            && entry.node_addr == node_addr
2507        {
2508            entry.pubkey = pubkey;
2509            entry.last_seen_ms = now_ms;
2510            return true;
2511        }
2512
2513        let npub = encode_npub(&xonly);
2514        self.identity_cache.insert(
2515            prefix,
2516            IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2517        );
2518        // LRU eviction
2519        let max = self.config.node.cache.identity_size;
2520        if self.identity_cache.len() > max
2521            && let Some(oldest_key) = self
2522                .identity_cache
2523                .iter()
2524                .min_by_key(|(_, entry)| entry.last_seen_ms)
2525                .map(|(k, _)| *k)
2526        {
2527            self.identity_cache.remove(&oldest_key);
2528        }
2529        true
2530    }
2531
2532    /// Look up a destination by FipsAddress prefix (bytes 1-15 of the IPv6 address).
2533    pub(crate) fn lookup_by_fips_prefix(
2534        &mut self,
2535        prefix: &[u8; 15],
2536    ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2537        if let Some(entry) = self.identity_cache.get_mut(prefix) {
2538            entry.last_seen_ms = Self::now_ms(); // LRU touch
2539            Some((entry.node_addr, entry.pubkey))
2540        } else {
2541            None
2542        }
2543    }
2544
2545    /// Check if a node's identity is in the cache (without LRU touch).
2546    pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2547        let mut prefix = [0u8; 15];
2548        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2549        self.identity_cache.contains_key(&prefix)
2550    }
2551
2552    /// Number of identity cache entries.
2553    pub fn identity_cache_len(&self) -> usize {
2554        self.identity_cache.len()
2555    }
2556
2557    /// Iterate over identity cache entries.
2558    ///
2559    /// Returns `(NodeAddr, PublicKey, last_seen_ms)` for each cached identity.
2560    /// Used by the `show_identity_cache` control query.
2561    pub fn identity_cache_iter(
2562        &self,
2563    ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2564        self.identity_cache
2565            .values()
2566            .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2567    }
2568
2569    /// Configured maximum identity cache size.
2570    pub fn identity_cache_max(&self) -> usize {
2571        self.config.node.cache.identity_size
2572    }
2573
2574    /// Number of pending discovery lookups.
2575    pub fn pending_lookup_count(&self) -> usize {
2576        self.pending_lookups.len()
2577    }
2578
2579    /// Iterate over pending discovery lookups for diagnostics.
2580    pub fn pending_lookups_iter(
2581        &self,
2582    ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2583        self.pending_lookups.iter()
2584    }
2585
2586    /// Number of recent discovery requests tracked.
2587    pub fn recent_request_count(&self) -> usize {
2588        self.recent_requests.len()
2589    }
2590
2591    /// Count of destinations with queued TUN packets awaiting session setup.
2592    pub fn pending_tun_destinations(&self) -> usize {
2593        self.pending_tun_packets.len()
2594    }
2595
2596    /// Total TUN packets queued across all destinations.
2597    pub fn pending_tun_total_packets(&self) -> usize {
2598        self.pending_tun_packets.values().map(|q| q.len()).sum()
2599    }
2600
2601    /// Iterate over retry state for diagnostics.
2602    pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2603        self.retry_pending.iter()
2604    }
2605
2606    // === Routing ===
2607
2608    /// Check if a peer is a tree neighbor (parent or child in the spanning tree).
2609    ///
2610    /// Returns true if the peer is our current tree parent, or if the peer
2611    /// has declared us as their parent (making them our child).
2612    pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2613        // Peer is our parent
2614        if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2615            return true;
2616        }
2617        // Peer is our child (their declaration names us as parent)
2618        if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2619            && decl.parent_id() == self.node_addr()
2620        {
2621            return true;
2622        }
2623        false
2624    }
2625
2626    /// Find next hop for a destination node address.
2627    ///
2628    /// Routing priority:
2629    /// 1. Destination is self → `None` (local delivery)
2630    /// 2. Destination is a healthy direct peer → that peer, unless a known
2631    ///    fallback next-hop has a meaningful link-quality advantage.
2632    /// 3. Reply-learned routes in `reply_learned` mode. These are locally
2633    ///    observed reverse paths, selected with weighted multipath plus
2634    ///    periodic coordinate/tree exploration.
2635    /// 4. Bloom filter candidates with cached dest coords → among peers whose
2636    ///    bloom filter contains the destination, pick the one that minimizes
2637    ///    tree distance to the destination, with
2638    ///    `(link_cost, tree_distance_to_dest, node_addr)` tie-breaking.
2639    ///    The self-distance check ensures only peers strictly closer to the
2640    ///    destination than us are considered (prevents routing loops).
2641    /// 5. Greedy tree routing fallback (requires cached dest coords)
2642    /// 6. No route → `None`
2643    ///
2644    /// Both the bloom filter and tree routing paths require cached destination
2645    /// coordinates (checked in `coord_cache`). Without coordinates, the node
2646    /// cannot make loop-free forwarding decisions. The caller should signal
2647    /// `CoordsRequired` back to the source when `None` is returned for a
2648    /// non-local destination.
2649    pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2650        // 1. Local delivery
2651        if dest_node_addr == self.node_addr() {
2652            return None;
2653        }
2654        let now_ms = Self::now_ms();
2655        let direct_session_degraded =
2656            self.session_direct_path_blocks_direct_payload(dest_node_addr, now_ms);
2657
2658        let healthy_direct_route = self
2659            .peers
2660            .get(dest_node_addr)
2661            .filter(|peer| peer.is_healthy() && !direct_session_degraded)
2662            .map(|_| *dest_node_addr);
2663        if let Some(direct_addr) = healthy_direct_route
2664            && self
2665                .peers
2666                .get(&direct_addr)
2667                .is_some_and(|peer| peer.link_cost() <= 1.0 + ROUTING_FALLBACK_MIN_COST_ADVANTAGE)
2668        {
2669            return self.peers.get(&direct_addr);
2670        }
2671        let direct_payload_eligible = healthy_direct_route.is_some();
2672        let payload_candidate_can_send = |addr: &NodeAddr, peer: &ActivePeer| {
2673            if addr == dest_node_addr {
2674                direct_payload_eligible
2675            } else {
2676                peer.is_healthy()
2677            }
2678        };
2679
2680        // A healthy direct path is not automatically the best path. A
2681        // hotspot/NAT hairpin can remain sendable with high RTT or mild loss;
2682        // in that case a lower-cost mesh next-hop should carry traffic while
2683        // direct probes continue in the background.
2684        let fallback_beats_direct = |node: &Self, fallback_addr: NodeAddr| {
2685            node.route_candidate_beats_direct(healthy_direct_route, fallback_addr)
2686        };
2687
2688        let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2689            Some(
2690                self.peers
2691                    .iter()
2692                    .filter(|(addr, peer)| payload_candidate_can_send(addr, peer))
2693                    .map(|(addr, _)| *addr)
2694                    .collect::<HashSet<_>>(),
2695            )
2696        } else {
2697            None
2698        };
2699
2700        // 3. Optional reply-learned routing. These entries are not peer
2701        // claims; they are local observations of which peer carried traffic
2702        // or a verified lookup response back from the destination. Most
2703        // packets use weighted multipath over learned routes, but periodic
2704        // fallback exploration lets coord/bloom/tree routes discover better
2705        // candidates.
2706        let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2707            self.learned_routes.should_explore_fallback(
2708                dest_node_addr,
2709                now_ms,
2710                self.config.node.routing.learned_fallback_explore_interval,
2711                |addr| sendable.contains(addr),
2712            )
2713        });
2714        if let Some(sendable) = &sendable_learned_peers
2715            && !explore_fallback
2716        {
2717            let eligible = sendable
2718                .iter()
2719                .copied()
2720                .filter(|addr| fallback_beats_direct(self, *addr))
2721                .collect::<HashSet<_>>();
2722            if !eligible.is_empty()
2723                && let Some(next_hop_addr) =
2724                    self.learned_routes
2725                        .select_next_hop(dest_node_addr, now_ms, |addr| eligible.contains(addr))
2726            {
2727                return self.peers.get(&next_hop_addr);
2728            }
2729        }
2730
2731        // Look up cached destination coordinates (required by both bloom and tree paths).
2732        let Some(dest_coords) = self
2733            .coord_cache
2734            .get_and_touch(dest_node_addr, now_ms)
2735            .cloned()
2736        else {
2737            if (healthy_direct_route.is_none() || explore_fallback)
2738                && let Some(sendable) = &sendable_learned_peers
2739                && let Some(next_hop_addr) =
2740                    self.learned_routes
2741                        .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2742            {
2743                return self.peers.get(&next_hop_addr);
2744            }
2745            if let Some(direct_addr) = healthy_direct_route {
2746                return self.peers.get(&direct_addr);
2747            }
2748            return None;
2749        };
2750
2751        // 4. Bloom filter candidates — requires dest_coords for loop-free selection.
2752        //    If no candidate is strictly closer, fall through to tree routing.
2753        let coordinate_route_addr = {
2754            let candidates: Vec<&ActivePeer> = self
2755                .peers
2756                .iter()
2757                .filter(|(addr, peer)| {
2758                    payload_candidate_can_send(addr, peer) && peer.may_reach(dest_node_addr)
2759                })
2760                .map(|(_, peer)| peer)
2761                .collect();
2762            if !candidates.is_empty() {
2763                self.select_best_candidate(&candidates, &dest_coords)
2764                    .map(|peer| *peer.node_addr())
2765            } else {
2766                None
2767            }
2768        };
2769        if let Some(next_hop_addr) = coordinate_route_addr
2770            && fallback_beats_direct(self, next_hop_addr)
2771        {
2772            return self.peers.get(&next_hop_addr);
2773        }
2774
2775        // 5. Greedy tree routing fallback
2776        let tree_route_addr = self.select_tree_payload_candidate(
2777            &dest_coords,
2778            dest_node_addr,
2779            direct_payload_eligible,
2780        );
2781        if let Some(next_hop_addr) = tree_route_addr
2782            && fallback_beats_direct(self, next_hop_addr)
2783        {
2784            return self.peers.get(&next_hop_addr);
2785        }
2786
2787        if explore_fallback {
2788            return sendable_learned_peers.as_ref().and_then(|sendable| {
2789                self.learned_routes
2790                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2791                    .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2792            });
2793        }
2794
2795        if let Some(direct_addr) = healthy_direct_route {
2796            return self.peers.get(&direct_addr);
2797        }
2798
2799        if let Some(sendable) = &sendable_learned_peers
2800            && let Some(next_hop_addr) =
2801                self.learned_routes
2802                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2803        {
2804            return self.peers.get(&next_hop_addr);
2805        }
2806
2807        None
2808    }
2809
2810    pub(in crate::node) fn find_transit_next_hop(
2811        &mut self,
2812        dest_node_addr: &NodeAddr,
2813        previous_hop: &NodeAddr,
2814    ) -> Option<NodeAddr> {
2815        if dest_node_addr == self.node_addr() {
2816            return None;
2817        }
2818
2819        if dest_node_addr != previous_hop
2820            && self
2821                .peers
2822                .get(dest_node_addr)
2823                .is_some_and(|peer| peer.is_healthy())
2824        {
2825            return Some(*dest_node_addr);
2826        }
2827
2828        let next_hop_addr = *self.find_next_hop(dest_node_addr)?.node_addr();
2829        if &next_hop_addr == previous_hop {
2830            self.record_route_failure(*dest_node_addr, next_hop_addr);
2831            return None;
2832        }
2833        Some(next_hop_addr)
2834    }
2835
2836    fn route_candidate_beats_direct(
2837        &self,
2838        healthy_direct_route: Option<NodeAddr>,
2839        candidate_addr: NodeAddr,
2840    ) -> bool {
2841        let Some(direct_addr) = healthy_direct_route else {
2842            return true;
2843        };
2844        if candidate_addr == direct_addr {
2845            return false;
2846        }
2847
2848        let Some(direct) = self.peers.get(&direct_addr) else {
2849            return true;
2850        };
2851        let Some(candidate) = self.peers.get(&candidate_addr) else {
2852            return false;
2853        };
2854        if !candidate.is_healthy() {
2855            return false;
2856        }
2857
2858        let direct_cost = direct.link_cost();
2859        let candidate_cost = candidate.link_cost();
2860        candidate_cost + ROUTING_FALLBACK_MIN_COST_ADVANTAGE < direct_cost
2861    }
2862
2863    fn select_tree_payload_candidate(
2864        &self,
2865        dest_coords: &crate::tree::TreeCoordinate,
2866        direct_dest: &NodeAddr,
2867        direct_payload_eligible: bool,
2868    ) -> Option<NodeAddr> {
2869        if self.tree_state.my_coords().root_id() != dest_coords.root_id() {
2870            return None;
2871        }
2872
2873        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2874        let mut best: Option<(NodeAddr, usize)> = None;
2875
2876        for (peer_addr, peer) in &self.peers {
2877            if peer_addr == direct_dest {
2878                if !direct_payload_eligible {
2879                    continue;
2880                }
2881            } else if !peer.is_healthy() {
2882                continue;
2883            }
2884
2885            let Some(peer_coords) = self.tree_state.peer_coords(peer_addr) else {
2886                continue;
2887            };
2888            let distance = peer_coords.distance_to(dest_coords);
2889            if distance >= my_distance {
2890                continue;
2891            }
2892
2893            let dominated = match &best {
2894                None => true,
2895                Some((best_id, best_dist)) => {
2896                    distance < *best_dist || (distance == *best_dist && peer_addr < best_id)
2897                }
2898            };
2899            if dominated {
2900                best = Some((*peer_addr, distance));
2901            }
2902        }
2903
2904        best.map(|(peer_addr, _)| peer_addr)
2905    }
2906
2907    pub(in crate::node) fn session_direct_path_is_degraded(
2908        &mut self,
2909        dest: &NodeAddr,
2910        now_ms: u64,
2911    ) -> bool {
2912        match self.session_direct_degraded_until_ms.get(dest).copied() {
2913            Some(until_ms) if until_ms > now_ms => true,
2914            Some(_) => {
2915                self.session_direct_degraded_until_ms.remove(dest);
2916                false
2917            }
2918            None => false,
2919        }
2920    }
2921
2922    pub(in crate::node) fn session_direct_path_blocks_direct_payload(
2923        &mut self,
2924        dest: &NodeAddr,
2925        now_ms: u64,
2926    ) -> bool {
2927        self.session_direct_path_is_degraded(dest, now_ms)
2928            && !self.active_peer_uses_configured_static_udp_path(dest)
2929    }
2930
2931    pub(in crate::node) fn mark_session_direct_path_degraded(
2932        &mut self,
2933        dest: NodeAddr,
2934        now_ms: u64,
2935    ) -> bool {
2936        let until_ms = now_ms.saturating_add(SESSION_DIRECT_DEGRADED_HOLD_MS);
2937        let entry = self
2938            .session_direct_degraded_until_ms
2939            .entry(dest)
2940            .or_insert(0);
2941        let was_degraded = *entry > now_ms;
2942        *entry = (*entry).max(until_ms);
2943        !was_degraded
2944    }
2945
2946    pub(in crate::node) fn clear_session_direct_path_degraded(&mut self, dest: &NodeAddr) -> bool {
2947        self.session_direct_degraded_until_ms.remove(dest).is_some()
2948    }
2949
2950    pub(in crate::node) fn learn_reverse_route(
2951        &mut self,
2952        destination: NodeAddr,
2953        next_hop: NodeAddr,
2954    ) {
2955        if self.config.node.routing.mode != RoutingMode::ReplyLearned
2956            || destination == *self.node_addr()
2957        {
2958            return;
2959        }
2960        let now_ms = Self::now_ms();
2961        self.learned_routes.learn(
2962            destination,
2963            next_hop,
2964            now_ms,
2965            self.config.node.routing.learned_ttl_secs,
2966            self.config.node.routing.max_learned_routes_per_dest,
2967        );
2968    }
2969
2970    pub(in crate::node) fn record_route_failure(
2971        &mut self,
2972        destination: NodeAddr,
2973        next_hop: NodeAddr,
2974    ) {
2975        if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2976            return;
2977        }
2978        self.learned_routes.record_failure(&destination, &next_hop);
2979    }
2980
2981    pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2982        self.learned_routes.snapshot(now_ms)
2983    }
2984
2985    pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2986        self.learned_routes.purge_expired(now_ms);
2987    }
2988
2989    /// Select the best peer from a set of bloom filter candidates.
2990    ///
2991    /// Uses distance from each candidate's tree coordinates to the destination
2992    /// as the primary metric (after link_cost). Only selects peers that are
2993    /// strictly closer to the destination than we are (self-distance check
2994    /// prevents routing loops).
2995    ///
2996    /// Ordering: `(link_cost, distance_to_dest, node_addr)`.
2997    fn select_best_candidate<'a>(
2998        &'a self,
2999        candidates: &[&'a ActivePeer],
3000        dest_coords: &crate::tree::TreeCoordinate,
3001    ) -> Option<&'a ActivePeer> {
3002        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
3003
3004        let mut best: Option<(&ActivePeer, f64, usize)> = None;
3005
3006        for &candidate in candidates {
3007            if !candidate.can_send() {
3008                continue;
3009            }
3010
3011            let cost = candidate.link_cost();
3012
3013            let dist = self
3014                .tree_state
3015                .peer_coords(candidate.node_addr())
3016                .map(|pc| pc.distance_to(dest_coords))
3017                .unwrap_or(usize::MAX);
3018
3019            // Self-distance check: only consider peers strictly closer
3020            // to the destination than we are (prevents routing loops)
3021            if dist >= my_distance {
3022                continue;
3023            }
3024
3025            let dominated = match &best {
3026                None => true,
3027                Some((_, best_cost, best_dist)) => {
3028                    cost < *best_cost
3029                        || (cost == *best_cost && dist < *best_dist)
3030                        || (cost == *best_cost
3031                            && dist == *best_dist
3032                            && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
3033                }
3034            };
3035
3036            if dominated {
3037                best = Some((candidate, cost, dist));
3038            }
3039        }
3040
3041        best.map(|(peer, _, _)| peer)
3042    }
3043
3044    /// Check if a destination is in any peer's bloom filter.
3045    pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
3046        self.peers.values().filter(|p| p.may_reach(dest)).collect()
3047    }
3048
3049    /// Get the TUN packet sender channel.
3050    ///
3051    /// Returns None if TUN is not active or the node hasn't been started.
3052    pub fn tun_tx(&self) -> Option<&TunTx> {
3053        self.tun_tx.as_ref()
3054    }
3055
3056    /// Attach app-owned packet I/O for embedded operation without a system TUN.
3057    ///
3058    /// This must be called before [`Node::start`] and requires `tun.enabled =
3059    /// false`. Outbound packets sent to the returned sender are processed by the
3060    /// normal session pipeline. Inbound packets delivered by FIPS sessions are
3061    /// sent to the returned receiver with source attribution.
3062    pub fn attach_external_packet_io(
3063        &mut self,
3064        capacity: usize,
3065    ) -> Result<ExternalPacketIo, NodeError> {
3066        if self.state != NodeState::Created {
3067            return Err(NodeError::Config(ConfigError::Validation(
3068                "external packet I/O must be attached before node start".to_string(),
3069            )));
3070        }
3071        if self.config.tun.enabled {
3072            return Err(NodeError::Config(ConfigError::Validation(
3073                "external packet I/O requires tun.enabled=false".to_string(),
3074            )));
3075        }
3076
3077        let capacity = capacity.max(1);
3078        let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
3079        let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
3080        self.tun_outbound_rx = Some(outbound_rx);
3081        self.external_packet_tx = Some(inbound_tx);
3082
3083        Ok(ExternalPacketIo {
3084            outbound_tx,
3085            inbound_rx,
3086        })
3087    }
3088
3089    /// Attach app-owned endpoint data I/O for embedded operation.
3090    ///
3091    /// Commands sent to the returned sender are processed by the node RX loop.
3092    /// Incoming endpoint data is emitted as source-attributed events.
3093    pub(crate) fn attach_endpoint_data_io(
3094        &mut self,
3095        capacity: usize,
3096    ) -> Result<EndpointDataIo, NodeError> {
3097        if self.state != NodeState::Created {
3098            return Err(NodeError::Config(ConfigError::Validation(
3099                "endpoint data I/O must be attached before node start".to_string(),
3100            )));
3101        }
3102
3103        let command_capacity = endpoint_data_command_capacity(capacity);
3104        let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
3105        // Inbound endpoint-data events use an unbounded channel — see
3106        // `EndpointDataIo::event_rx` docs for the rationale (kills the
3107        // per-packet semaphore + the cross-task relay task that used to
3108        // sit on top of this channel).
3109        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
3110        self.endpoint_command_rx = Some(command_rx);
3111        self.endpoint_event_tx = Some(event_tx.clone());
3112
3113        Ok(EndpointDataIo {
3114            command_tx,
3115            event_rx,
3116            event_tx,
3117        })
3118    }
3119
3120    pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
3121        let mut prefix = [0u8; 15];
3122        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
3123        self.identity_cache
3124            .get(&prefix)
3125            .filter(|entry| &entry.node_addr == addr)
3126            .map(|entry| entry.pubkey)
3127    }
3128
3129    pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
3130        let mut prefix = [0u8; 15];
3131        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
3132        self.identity_cache
3133            .get(&prefix)
3134            .filter(|entry| &entry.node_addr == addr)
3135            .map(|entry| entry.npub.clone())
3136    }
3137
3138    pub(in crate::node) fn deliver_external_ipv6_packet(
3139        &self,
3140        src_addr: &NodeAddr,
3141        packet: Vec<u8>,
3142    ) {
3143        let Some(external_packet_tx) = &self.external_packet_tx else {
3144            return;
3145        };
3146        if packet.len() < 40 {
3147            return;
3148        }
3149        let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
3150            return;
3151        };
3152        let delivered = NodeDeliveredPacket {
3153            source_node_addr: *src_addr,
3154            source_npub: self.npub_for_node_addr(src_addr),
3155            destination,
3156            packet,
3157        };
3158        if let Err(error) = external_packet_tx.try_send(delivered) {
3159            debug!(error = %error, "Failed to deliver packet to external app sink");
3160        }
3161    }
3162
3163    // === Sending ===
3164
3165    /// Encrypt and send a link-layer message to an authenticated peer.
3166    ///
3167    /// The plaintext should include the message type byte followed by the
3168    /// message-specific payload (e.g., `[0x50, reason]` for Disconnect).
3169    ///
3170    /// The send path prepends a 4-byte session-relative timestamp (inner
3171    /// header) before encryption. The full 16-byte outer header is used
3172    /// as AAD for the AEAD construction.
3173    ///
3174    /// This is the standard path for sending any link-layer control message
3175    /// to a peer over their encrypted Noise session.
3176    pub(super) async fn send_encrypted_link_message(
3177        &mut self,
3178        node_addr: &NodeAddr,
3179        plaintext: &[u8],
3180    ) -> Result<(), NodeError> {
3181        self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
3182            .await
3183    }
3184
3185    /// Update one peer's local-outbound-broken signal from a `transport.send`
3186    /// outcome. Sets a per-peer timestamp on local-side io errors
3187    /// (NetworkUnreachable / HostUnreachable / AddrNotAvailable); clears that
3188    /// peer on success. The reaper consults this in `check_link_heartbeats` to
3189    /// switch only that peer to `fast_link_dead_timeout_secs`.
3190    pub(in crate::node) fn note_local_send_outcome(
3191        &mut self,
3192        node_addr: &NodeAddr,
3193        result: &Result<usize, TransportError>,
3194    ) {
3195        match result {
3196            Ok(_) => {
3197                self.local_send_failure_at_by_peer.remove(node_addr);
3198            }
3199            Err(error) if error.is_local_route_unavailable() => {
3200                self.local_send_failure_at_by_peer
3201                    .insert(*node_addr, std::time::Instant::now());
3202            }
3203            Err(_) => {}
3204        }
3205    }
3206
3207    /// Return the active dead-timeout for one peer after considering recent
3208    /// local route failures. The fast-dead signal is intentionally short-lived:
3209    /// on the UDP worker path a send call can return before the kernel result
3210    /// is observed, so a stale route error must not compress liveness for the
3211    /// whole normal dead-timeout window.
3212    pub(in crate::node) fn local_send_failure_dead_timeout_for_peer(
3213        &self,
3214        node_addr: &NodeAddr,
3215        now: std::time::Instant,
3216        dead_timeout: std::time::Duration,
3217        fast_dead_timeout: std::time::Duration,
3218    ) -> std::time::Duration {
3219        match self.local_send_failure_at_by_peer.get(node_addr).copied() {
3220            Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
3221                fast_dead_timeout.min(dead_timeout)
3222            }
3223            None => dead_timeout,
3224            Some(_) => dead_timeout,
3225        }
3226    }
3227
3228    pub(in crate::node) fn purge_expired_local_send_failures(&mut self, now: std::time::Instant) {
3229        self.local_send_failure_at_by_peer
3230            .retain(|_, at| now.duration_since(*at) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW);
3231    }
3232
3233    pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
3234        self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
3235    }
3236
3237    pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
3238        let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
3239            return false;
3240        };
3241        let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
3242        std::time::Instant::now().duration_since(t) <= grace
3243    }
3244
3245    /// Like `send_encrypted_link_message` but allows setting the FMP CE flag.
3246    ///
3247    /// Used by the forwarding path to relay congestion signals hop-by-hop.
3248    pub(super) async fn send_encrypted_link_message_with_ce(
3249        &mut self,
3250        node_addr: &NodeAddr,
3251        plaintext: &[u8],
3252        ce_flag: bool,
3253    ) -> Result<(), NodeError> {
3254        let peer = self
3255            .peers
3256            .get_mut(node_addr)
3257            .ok_or(NodeError::PeerNotFound(*node_addr))?;
3258
3259        let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
3260            node_addr: *node_addr,
3261            reason: "no their_index".into(),
3262        })?;
3263        let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
3264            node_addr: *node_addr,
3265            reason: "no transport_id".into(),
3266        })?;
3267        let remote_addr = peer
3268            .current_addr()
3269            .cloned()
3270            .ok_or_else(|| NodeError::SendFailed {
3271                node_addr: *node_addr,
3272                reason: "no current_addr".into(),
3273            })?;
3274        #[cfg(any(target_os = "linux", target_os = "macos"))]
3275        let connected_socket = peer.connected_udp();
3276
3277        // Prepend 4-byte session-relative timestamp (inner header)
3278        let timestamp_ms = peer.session_elapsed_ms();
3279
3280        // MMP: read spin bit value before entering session borrow
3281        let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
3282        let mut flags = if sp_flag { FLAG_SP } else { 0 };
3283        if ce_flag {
3284            flags |= FLAG_CE;
3285        }
3286        if peer.current_k_bit() {
3287            flags |= FLAG_KEY_EPOCH;
3288        }
3289
3290        let session = peer
3291            .noise_session_mut()
3292            .ok_or_else(|| NodeError::SendFailed {
3293                node_addr: *node_addr,
3294                reason: "no noise session".into(),
3295            })?;
3296
3297        // Build 16-byte outer header upfront. The inner-plaintext
3298        // layout is `[ts:4 LE][plaintext...]`, so its length is exactly
3299        // `INNER_TS_LEN + plaintext.len()` — no need to build the Vec
3300        // just to measure it. The worker path uses this length to size
3301        // the wire buffer directly; the legacy path below still
3302        // materialises a separate `inner_plaintext` Vec for the inline
3303        // encrypt-and-send call.
3304        const INNER_TS_LEN: usize = 4;
3305        let counter = session.current_send_counter();
3306        let inner_len = INNER_TS_LEN + plaintext.len();
3307        let payload_len = inner_len as u16;
3308        let header = build_established_header(their_index, counter, flags, payload_len);
3309
3310        // **Unix UDP send fast path.** On Unix, the encrypt-worker pool
3311        // is spawned at lifecycle start (workers = num_cpus) in
3312        // production, so this branch is taken for every authentic send on
3313        // every UDP-transported established session. The AEAD work +
3314        // sendmsg syscall run on a dedicated OS thread; the rx_loop only
3315        // builds the wire buffer + reserves the counter inline.
3316        //
3317        // Other transport kinds (BLE, TCP, sim, ethernet) fall
3318        // through to the inline encrypt + transport.send path
3319        // below — those don't have raw-fd / sendmmsg / UDP_GSO
3320        // benefits to expose through the worker pool, so the simpler
3321        // synchronous send is the right shape for them.
3322        //
3323        // Windows intentionally stays on the inline tokio UDP send path:
3324        // lifecycle::start does not spawn these raw-fd workers there, and
3325        // tests may still set `encrypt_workers` manually.
3326        //
3327        // The `encrypt_workers.is_some()` check below is true in Unix
3328        // production (lifecycle::start spawns the pool); it stays checked
3329        // rather than `expect()`-ed because unit tests construct `Node`
3330        // without calling `start()`.
3331        let transport_for_send = self
3332            .transports
3333            .get(&transport_id)
3334            .ok_or(NodeError::TransportNotFound(transport_id))?;
3335        match transport_for_send.connection_state(&remote_addr) {
3336            ConnectionState::Connected => {}
3337            other => {
3338                if matches!(other, ConnectionState::None) {
3339                    let _ = transport_for_send.connect(&remote_addr).await;
3340                }
3341                return Err(NodeError::SendFailed {
3342                    node_addr: *node_addr,
3343                    reason: format!("transport connection not ready: {:?}", other),
3344                });
3345            }
3346        }
3347        #[cfg(unix)]
3348        {
3349            let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
3350            if let Some(workers) = self.encrypt_workers.as_ref().cloned()
3351                && is_udp
3352                && let Some(cipher_clone) = session.send_cipher_clone()
3353            {
3354                // Reserve the counter on the session so subsequent
3355                // sends don't reuse it. `current_send_counter` only
3356                // peeks; we advance via `take_send_counter`.
3357                let reserved_counter =
3358                    session
3359                        .take_send_counter()
3360                        .map_err(|e| NodeError::SendFailed {
3361                            node_addr: *node_addr,
3362                            reason: format!("counter reservation failed: {}", e),
3363                        })?;
3364                debug_assert_eq!(reserved_counter, counter);
3365                // Re-derive the header with the now-locked-in counter
3366                // value (same value, but the call sequence is more
3367                // explicit).
3368                let header =
3369                    build_established_header(their_index, reserved_counter, flags, payload_len);
3370                let transport = transport_for_send;
3371                // Snapshot the per-peer connected UDP socket before
3372                // resolving the fallback address. On the established
3373                // steady-state path this socket already carries the
3374                // kernel peer address, so re-parsing the configured
3375                // transport address and touching the DNS cache on every
3376                // packet is pure overhead on the sender hot path.
3377                let send_target = {
3378                    if let TransportHandle::Udp(udp) = transport {
3379                        let socket_addr = {
3380                            #[cfg(any(target_os = "linux", target_os = "macos"))]
3381                            {
3382                                match connected_socket.as_ref() {
3383                                    Some(socket) => Some(socket.peer_addr()),
3384                                    None => udp.resolve_for_off_task(&remote_addr).await.ok(),
3385                                }
3386                            }
3387                            #[cfg(not(any(target_os = "linux", target_os = "macos")))]
3388                            {
3389                                udp.resolve_for_off_task(&remote_addr).await.ok()
3390                            }
3391                        };
3392                        match (udp.async_socket(), socket_addr) {
3393                            (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3394                            _ => None,
3395                        }
3396                    } else {
3397                        None
3398                    }
3399                };
3400                if let Some((socket, socket_addr)) = send_target {
3401                    // Build the wire buffer **directly** from
3402                    // `plaintext` with a single allocation:
3403                    //   `[16 header][4 ts][plaintext...]` with
3404                    // +16 trailing capacity for the AEAD tag.
3405                    // The worker seals `wire_buf[16..]` in
3406                    // place and appends the tag — no second
3407                    // alloc, no second memcpy.
3408                    //
3409                    // Previous design built `inner_plaintext`
3410                    // via `prepend_inner_header` (1 alloc + 1
3411                    // copy) and then let the worker memcpy
3412                    // header + plaintext into a fresh Vec
3413                    // (another alloc + copy). At ~100 kpps the
3414                    // saved alloc/copy is ~150 MB/sec of memory
3415                    // bandwidth on the hot rx_loop + worker.
3416                    let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3417                    let mut wire_buf = Vec::with_capacity(wire_capacity);
3418                    wire_buf.extend_from_slice(&header);
3419                    wire_buf.extend_from_slice(&timestamp_ms.to_le_bytes());
3420                    wire_buf.extend_from_slice(plaintext);
3421                    let predicted_bytes = wire_capacity;
3422                    // Stats / MMP update inline — predicted size
3423                    // is exact for ChaCha20-Poly1305 (tag is
3424                    // constant 16 bytes). When `connected_socket` is
3425                    // `Some`, the worker sends on it without a
3426                    // destination sockaddr — the kernel skips the
3427                    // per-packet sockaddr + route + neighbor resolve.
3428                    if let Some(peer) = self.peers.get_mut(node_addr) {
3429                        peer.link_stats_mut().record_sent(predicted_bytes);
3430                        if let Some(mmp) = peer.mmp_mut() {
3431                            mmp.sender
3432                                .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3433                        }
3434                    }
3435                    let scheduling_weight = self.send_weight_for_peer(node_addr);
3436                    let traffic_class = classify_fmp_plaintext_traffic(plaintext);
3437                    workers.dispatch(self::encrypt_worker::FmpSendJob {
3438                        cipher: cipher_clone,
3439                        counter: reserved_counter,
3440                        wire_buf,
3441                        fsp_seal: None,
3442                        socket,
3443                        dest_addr: socket_addr,
3444                        #[cfg(any(target_os = "linux", target_os = "macos"))]
3445                        connected_socket,
3446                        bulk_endpoint_data: traffic_class.bulk_endpoint_data,
3447                        drop_on_backpressure: traffic_class.drop_on_backpressure,
3448                        scheduling_weight,
3449                        queued_at: crate::perf_profile::stamp(),
3450                    });
3451                    return Ok(());
3452                }
3453            }
3454        }
3455
3456        // Inline (legacy) path: encrypt + send on the rx_loop.
3457        // Build the inner plaintext lazily here — the worker path
3458        // above never reaches this point, so the prepend_inner_header
3459        // alloc is avoided in the fast path.
3460        let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3461        // Encrypt with AAD binding to the outer header
3462        let ciphertext = {
3463            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3464            session
3465                .encrypt_with_aad(&inner_plaintext, &header)
3466                .map_err(|e| NodeError::SendFailed {
3467                    node_addr: *node_addr,
3468                    reason: format!("encryption failed: {}", e),
3469                })?
3470        };
3471
3472        let wire_packet = build_encrypted(&header, &ciphertext);
3473
3474        // Re-borrow peer for stats update after sending
3475        let send_result = {
3476            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3477            let transport = self
3478                .transports
3479                .get(&transport_id)
3480                .ok_or(NodeError::TransportNotFound(transport_id))?;
3481            transport.send(&remote_addr, &wire_packet).await
3482        };
3483        self.note_local_send_outcome(node_addr, &send_result);
3484        let bytes_sent = send_result.map_err(|e| match e {
3485            TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3486                node_addr: *node_addr,
3487                packet_size,
3488                mtu,
3489            },
3490            other => NodeError::SendFailed {
3491                node_addr: *node_addr,
3492                reason: format!("transport send: {}", other),
3493            },
3494        })?;
3495
3496        // Update send statistics
3497        if let Some(peer) = self.peers.get_mut(node_addr) {
3498            peer.link_stats_mut().record_sent(bytes_sent);
3499            // MMP: record sent frame for sender report generation
3500            if let Some(mmp) = peer.mmp_mut() {
3501                mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3502            }
3503        }
3504
3505        Ok(())
3506    }
3507}
3508
3509impl fmt::Debug for Node {
3510    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3511        f.debug_struct("Node")
3512            .field("node_addr", self.node_addr())
3513            .field("state", &self.state)
3514            .field("is_leaf_only", &self.is_leaf_only)
3515            .field("connections", &self.connection_count())
3516            .field("peers", &self.peer_count())
3517            .field("links", &self.link_count())
3518            .field("transports", &self.transport_count())
3519            .finish()
3520    }
3521}