Skip to main content

fips_core/node/
mod.rs

1//! FIPS Node Entity
2//!
3//! Top-level structure representing a running FIPS instance. The Node
4//! holds all state required for mesh routing: identity, tree state,
5//! Bloom filters, coordinate caches, transports, links, and peers.
6
7mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31#[cfg(unix)]
32use self::wire::ESTABLISHED_HEADER_SIZE;
33use self::wire::{
34    FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted, build_established_header,
35    prepend_inner_header,
36};
37use crate::bloom::{BloomFilter, BloomState};
38use crate::cache::CoordCache;
39use crate::config::{NostrDiscoveryPolicy, PeerConfig, RoutingMode};
40use crate::node::session::SessionEntry;
41use crate::node::session_wire::{FSP_PHASE_ESTABLISHED, FspCommonPrefix};
42use crate::peer::{ActivePeer, PeerConnection};
43#[cfg(any(target_os = "linux", target_os = "macos"))]
44use crate::transport::ethernet::EthernetTransport;
45use crate::transport::tcp::TcpTransport;
46use crate::transport::tor::TorTransport;
47use crate::transport::udp::UdpTransport;
48#[cfg(feature = "webrtc-transport")]
49use crate::transport::webrtc::WebRtcTransport;
50use crate::transport::{
51    ConnectionState, Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError,
52    TransportHandle, TransportId,
53};
54use crate::tree::TreeState;
55use crate::upper::hosts::HostMap;
56use crate::upper::icmp_rate_limit::IcmpRateLimiter;
57use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
58use crate::utils::index::IndexAllocator;
59use crate::{
60    Config, ConfigError, FipsAddress, Identity, IdentityError, LinkMessageType, NodeAddr,
61    PeerIdentity, encode_npub,
62};
63use rand::Rng;
64use std::collections::{HashMap, HashSet, VecDeque};
65use std::fmt;
66use std::sync::Arc;
67use std::thread::JoinHandle;
68use thiserror::Error;
69use tracing::{debug, warn};
70
71const LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW: std::time::Duration = std::time::Duration::from_secs(3);
72const SESSION_DIRECT_DEGRADED_HOLD_MS: u64 = 20_000;
73const SESSION_DIRECT_DEGRADED_MIN_SAMPLE: u64 = 16;
74const SESSION_DIRECT_DEGRADED_LOSS_THRESHOLD: f64 = 0.08;
75const SESSION_DIRECT_RECOVERY_LOSS_THRESHOLD: f64 = 0.02;
76const ROUTING_FALLBACK_MIN_COST_ADVANTAGE: f64 = 0.25;
77
78#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
79struct FmpPlaintextTrafficClass {
80    bulk_endpoint_data: bool,
81    drop_on_backpressure: bool,
82}
83
84#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
85struct EndpointPayloadTrafficClass {
86    bulk_endpoint_data: bool,
87    drop_on_backpressure: bool,
88}
89
90#[derive(Clone, Copy, Debug, PartialEq, Eq)]
91pub(crate) enum EndpointCommandLane {
92    Priority,
93    Bulk,
94}
95
96fn classify_fmp_plaintext_traffic(plaintext: &[u8]) -> FmpPlaintextTrafficClass {
97    let bulk_endpoint_data = fmp_plaintext_is_bulk_session_datagram(plaintext);
98    // At this layer established FSP payloads are already end-to-end encrypted,
99    // so a bulk SessionDatagram may still be TCP endpoint traffic. Keep it out
100    // of the control lane, but only the pre-FSP endpoint path may mark known
101    // non-TCP packets as discardable under sender backpressure.
102    FmpPlaintextTrafficClass {
103        bulk_endpoint_data,
104        drop_on_backpressure: false,
105    }
106}
107
108fn fmp_plaintext_is_bulk_session_datagram(plaintext: &[u8]) -> bool {
109    if plaintext
110        .first()
111        .is_none_or(|ty| *ty != LinkMessageType::SessionDatagram.to_byte())
112    {
113        return false;
114    }
115    let Some(fsp_payload) = plaintext.get(crate::protocol::SESSION_DATAGRAM_HEADER_SIZE..) else {
116        return false;
117    };
118    FspCommonPrefix::parse(fsp_payload).is_some_and(|prefix| {
119        prefix.phase == FSP_PHASE_ESTABLISHED && !prefix.is_unencrypted() && !prefix.has_coords()
120    })
121}
122
123fn classify_endpoint_payload(payload: &[u8]) -> EndpointPayloadTrafficClass {
124    const IPPROTO_TCP: u8 = 6;
125    const IPPROTO_ICMPV6: u8 = 58;
126
127    match parse_endpoint_payload_ip_proto(payload) {
128        Some((IPPROTO_ICMPV6, _)) => EndpointPayloadTrafficClass::default(),
129        Some((IPPROTO_TCP, offset)) => {
130            let latency_sensitive = endpoint_tcp_payload_is_latency_sensitive(payload, offset);
131            EndpointPayloadTrafficClass {
132                bulk_endpoint_data: !latency_sensitive,
133                drop_on_backpressure: false,
134            }
135        }
136        _ => EndpointPayloadTrafficClass {
137            bulk_endpoint_data: true,
138            drop_on_backpressure: true,
139        },
140    }
141}
142
143pub(crate) fn endpoint_payload_is_latency_sensitive(payload: &[u8]) -> bool {
144    !classify_endpoint_payload(payload).bulk_endpoint_data
145}
146
147pub(crate) fn endpoint_command_lane_for_payload(payload: &[u8]) -> EndpointCommandLane {
148    if endpoint_payload_is_latency_sensitive(payload) {
149        EndpointCommandLane::Priority
150    } else {
151        EndpointCommandLane::Bulk
152    }
153}
154
155fn endpoint_tcp_payload_is_latency_sensitive(payload: &[u8], tcp_offset: usize) -> bool {
156    const TCP_MIN_HEADER_LEN: usize = 20;
157    const TCP_FLAG_FIN: u8 = 0x01;
158    const TCP_FLAG_SYN: u8 = 0x02;
159    const TCP_FLAG_RST: u8 = 0x04;
160    const INTERACTIVE_TCP_PAYLOAD_MAX: usize = 256;
161
162    if payload.len() < tcp_offset + TCP_MIN_HEADER_LEN {
163        return true;
164    }
165
166    let tcp_header_len = usize::from(payload[tcp_offset + 12] >> 4) * 4;
167    if tcp_header_len < TCP_MIN_HEADER_LEN || payload.len() < tcp_offset + tcp_header_len {
168        return true;
169    }
170
171    let flags = payload[tcp_offset + 13];
172    if flags & (TCP_FLAG_FIN | TCP_FLAG_SYN | TCP_FLAG_RST) != 0 {
173        return true;
174    }
175
176    let payload_len = endpoint_ip_payload_len(payload)
177        .and_then(|ip_payload_len| ip_payload_len.checked_sub(tcp_header_len))
178        .unwrap_or_else(|| payload.len().saturating_sub(tcp_offset + tcp_header_len));
179    payload_len <= INTERACTIVE_TCP_PAYLOAD_MAX
180}
181
182fn endpoint_ip_payload_len(payload: &[u8]) -> Option<usize> {
183    const IPV4_MIN_HEADER_LEN: usize = 20;
184    const IPV6_HEADER_LEN: usize = 40;
185
186    let version_ihl = payload.first().copied()?;
187    match version_ihl >> 4 {
188        4 => {
189            if payload.len() < IPV4_MIN_HEADER_LEN {
190                return None;
191            }
192            let header_len = usize::from(version_ihl & 0x0f) * 4;
193            if header_len < IPV4_MIN_HEADER_LEN || payload.len() < header_len {
194                return None;
195            }
196            let total_len = usize::from(u16::from_be_bytes([payload[2], payload[3]]));
197            total_len.checked_sub(header_len)
198        }
199        6 => {
200            if payload.len() < IPV6_HEADER_LEN {
201                return None;
202            }
203            Some(usize::from(u16::from_be_bytes([payload[4], payload[5]])))
204        }
205        _ => None,
206    }
207}
208
209fn parse_endpoint_payload_ip_proto(payload: &[u8]) -> Option<(u8, usize)> {
210    const IPV4_MIN_HEADER_LEN: usize = 20;
211
212    let version_ihl = payload.first().copied()?;
213
214    match version_ihl >> 4 {
215        4 => {
216            if payload.len() < IPV4_MIN_HEADER_LEN {
217                return None;
218            }
219            let header_len = usize::from(version_ihl & 0x0f) * 4;
220            if header_len >= IPV4_MIN_HEADER_LEN && payload.len() >= header_len {
221                Some((payload[9], header_len))
222            } else {
223                None
224            }
225        }
226        6 => ipv6_payload_next_header(payload),
227        _ => None,
228    }
229}
230
231#[cfg(test)]
232fn endpoint_payload_is_tcp(payload: &[u8]) -> bool {
233    const IPPROTO_TCP: u8 = 6;
234    parse_endpoint_payload_ip_proto(payload).is_some_and(|(proto, _)| proto == IPPROTO_TCP)
235}
236
237fn ipv6_payload_next_header(payload: &[u8]) -> Option<(u8, usize)> {
238    const IPV6_HEADER_LEN: usize = 40;
239    const IPV6_FRAGMENT_HEADER_LEN: usize = 8;
240
241    if payload.len() < IPV6_HEADER_LEN || payload[0] >> 4 != 6 {
242        return None;
243    }
244
245    let mut next_header = payload[6];
246    let mut offset = IPV6_HEADER_LEN;
247    let mut extension_count = 0usize;
248    while ipv6_extension_header_is_skippable(next_header) {
249        if next_header == 44 {
250            if payload.len() < offset + IPV6_FRAGMENT_HEADER_LEN {
251                return None;
252            }
253            next_header = payload[offset];
254            offset += IPV6_FRAGMENT_HEADER_LEN;
255        } else if next_header == 51 {
256            if payload.len() < offset + 2 {
257                return None;
258            }
259            let header_len = (usize::from(payload[offset + 1]) + 2) * 4;
260            if payload.len() < offset + header_len {
261                return None;
262            }
263            next_header = payload[offset];
264            offset += header_len;
265        } else {
266            if payload.len() < offset + 2 {
267                return None;
268            }
269            let header_len = (usize::from(payload[offset + 1]) + 1) * 8;
270            if payload.len() < offset + header_len {
271                return None;
272            }
273            next_header = payload[offset];
274            offset += header_len;
275        }
276        extension_count += 1;
277        if extension_count > 8 {
278            return None;
279        }
280    }
281
282    Some((next_header, offset))
283}
284
285fn ipv6_extension_header_is_skippable(next_header: u8) -> bool {
286    matches!(next_header, 0 | 43 | 44 | 51 | 60 | 135)
287}
288
289/// Half-range of the symmetric jitter applied to per-session rekey timers.
290///
291/// Each FMP/FSP session draws an offset uniformly from
292/// `[-REKEY_JITTER_SECS, +REKEY_JITTER_SECS]` seconds at construction and
293/// after each cutover. This preserves the configured mean interval while
294/// reducing dual-initiation bursts in symmetric-start meshes.
295pub(crate) const REKEY_JITTER_SECS: i64 = 15;
296
297/// Errors related to node operations.
298#[derive(Debug, Error)]
299pub enum NodeError {
300    #[error("node not started")]
301    NotStarted,
302
303    #[error("node already started")]
304    AlreadyStarted,
305
306    #[error("node already stopped")]
307    AlreadyStopped,
308
309    #[error("transport not found: {0}")]
310    TransportNotFound(TransportId),
311
312    #[error("no transport available for type: {0}")]
313    NoTransportForType(String),
314
315    #[error("link not found: {0}")]
316    LinkNotFound(LinkId),
317
318    #[error("connection not found: {0}")]
319    ConnectionNotFound(LinkId),
320
321    #[error("peer not found: {0:?}")]
322    PeerNotFound(NodeAddr),
323
324    #[error("peer already exists: {0:?}")]
325    PeerAlreadyExists(NodeAddr),
326
327    #[error("connection already exists for link: {0}")]
328    ConnectionAlreadyExists(LinkId),
329
330    #[error("invalid peer npub '{npub}': {reason}")]
331    InvalidPeerNpub { npub: String, reason: String },
332
333    #[error("discovery error: {0}")]
334    Discovery(String),
335
336    #[error("access denied: {0}")]
337    AccessDenied(String),
338
339    #[error("max connections exceeded: {max}")]
340    MaxConnectionsExceeded { max: usize },
341
342    #[error("max peers exceeded: {max}")]
343    MaxPeersExceeded { max: usize },
344
345    #[error("max links exceeded: {max}")]
346    MaxLinksExceeded { max: usize },
347
348    #[error("handshake incomplete for link {0}")]
349    HandshakeIncomplete(LinkId),
350
351    #[error("no session available for link {0}")]
352    NoSession(LinkId),
353
354    #[error("promotion failed for link {link_id}: {reason}")]
355    PromotionFailed { link_id: LinkId, reason: String },
356
357    #[error("send failed to {node_addr}: {reason}")]
358    SendFailed { node_addr: NodeAddr, reason: String },
359
360    #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
361    MtuExceeded {
362        node_addr: NodeAddr,
363        packet_size: usize,
364        mtu: u16,
365    },
366
367    #[error("config error: {0}")]
368    Config(#[from] ConfigError),
369
370    #[error("identity error: {0}")]
371    Identity(#[from] IdentityError),
372
373    #[error("TUN error: {0}")]
374    Tun(#[from] TunError),
375
376    #[error("index allocation failed: {0}")]
377    IndexAllocationFailed(String),
378
379    #[error("handshake failed: {0}")]
380    HandshakeFailed(String),
381
382    #[error("transport error: {0}")]
383    TransportError(String),
384
385    #[error("local route unavailable: {0}")]
386    LocalRouteUnavailable(String),
387
388    #[error("bootstrap handoff failed: {0}")]
389    BootstrapHandoff(String),
390}
391
392impl NodeError {
393    pub(in crate::node) fn from_transport_error(error: TransportError) -> Self {
394        if error.is_local_route_unavailable() {
395            Self::LocalRouteUnavailable(error.to_string())
396        } else {
397            Self::TransportError(error.to_string())
398        }
399    }
400
401    pub(in crate::node) fn is_local_route_unavailable(&self) -> bool {
402        matches!(self, Self::LocalRouteUnavailable(_))
403    }
404}
405
406/// Source-attributed packet delivered by a node running without a system TUN.
407#[derive(Debug, Clone, PartialEq, Eq)]
408pub struct NodeDeliveredPacket {
409    /// FIPS node address that originated the packet.
410    pub source_node_addr: NodeAddr,
411    /// Source Nostr public key when the node has learned it.
412    pub source_npub: Option<String>,
413    /// Destination FIPS address from the IPv6 packet.
414    pub destination: FipsAddress,
415    /// Full IPv6 packet after FIPS session decapsulation.
416    pub packet: Vec<u8>,
417}
418
419#[derive(Debug, Clone)]
420struct IdentityCacheEntry {
421    node_addr: NodeAddr,
422    pubkey: secp256k1::PublicKey,
423    npub: String,
424    last_seen_ms: u64,
425}
426
427impl IdentityCacheEntry {
428    fn new(
429        node_addr: NodeAddr,
430        pubkey: secp256k1::PublicKey,
431        npub: String,
432        last_seen_ms: u64,
433    ) -> Self {
434        Self {
435            node_addr,
436            pubkey,
437            npub,
438            last_seen_ms,
439        }
440    }
441}
442
443/// App-owned packet channels for embedding FIPS without a system TUN.
444#[derive(Debug)]
445pub struct ExternalPacketIo {
446    /// Send outbound IPv6 packets into the node.
447    pub outbound_tx: crate::upper::tun::TunOutboundTx,
448    /// Receive inbound IPv6 packets delivered by FIPS sessions.
449    pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
450}
451
452/// App-owned endpoint data channels for embedding FIPS without a daemon.
453#[derive(Debug)]
454pub(crate) struct EndpointDataIo {
455    /// Send latency-sensitive endpoint data and management commands into the
456    /// node RX loop ahead of queued bulk endpoint data.
457    pub(crate) priority_command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
458    /// Send endpoint data commands into the node RX loop.
459    ///
460    /// Bounded with a generous default so normal sender bursts do not
461    /// stall on semaphore acquisition. macOS pacing happens at the UDP
462    /// egress thread where the real Wi-Fi/interface bottleneck is visible;
463    /// constraining this app queue instead caused the inner TCP flow to
464    /// collapse under iperf. `FIPS_ENDPOINT_DATA_QUEUE_CAP` overrides the
465    /// default for benches.
466    pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
467    /// Receive endpoint data delivered by FIPS sessions.
468    ///
469    /// Unbounded so the rx_loop's send on inbound packet delivery is a
470    /// wait-free push (no semaphore acquire), and so we can drop the
471    /// per-packet cross-task relay that previously sat between the node
472    /// task and the `FipsEndpoint::recv()` consumer. Backpressure is
473    /// naturally bounded — the rx_loop both produces here and runs the
474    /// same runtime that schedules the consumer, so a stalled consumer
475    /// stalls production too.
476    pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
477    /// Clone of the event_tx exposed for in-process loopback (e.g.
478    /// `FipsEndpoint::send` to self_npub). Lets the endpoint inject an
479    /// event into the same queue without going through the encrypt /
480    /// decrypt path, while keeping every consumer reading from a single
481    /// channel.
482    pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
483}
484
485fn endpoint_data_command_capacity(requested: usize) -> usize {
486    if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
487        && let Ok(value) = raw.trim().parse::<usize>()
488        && value > 0
489    {
490        return value;
491    }
492
493    requested.max(1).max(32_768)
494}
495
496/// Commands accepted by the node endpoint data service.
497#[derive(Debug)]
498pub(crate) enum NodeEndpointCommand {
499    /// Send with an explicit response channel — used by callers that
500    /// care whether the local-stack handoff succeeded (e.g.
501    /// `blocking_send` waits for the runtime to accept the send).
502    Send {
503        remote: PeerIdentity,
504        payload: Vec<u8>,
505        queued_at: Option<std::time::Instant>,
506        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
507    },
508    /// **Fire-and-forget** variant of `Send` — no oneshot allocation,
509    /// no per-packet result channel. Used by the data-plane fast path
510    /// (`FipsEndpoint::send`) where the caller already discards the
511    /// result. Saves one oneshot::channel() allocation per outbound
512    /// packet on the application's send hot path.
513    SendOneway {
514        remote: PeerIdentity,
515        payload: Vec<u8>,
516        queued_at: Option<std::time::Instant>,
517    },
518    PeerSnapshot {
519        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
520    },
521    RelaySnapshot {
522        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
523    },
524    UpdateRelays {
525        advert_relays: Vec<String>,
526        dm_relays: Vec<String>,
527        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
528    },
529    /// Replace the runtime peer list. Newly added auto-connect peers get
530    /// `initiate_peer_connection` immediately; removed peers are dropped
531    /// from the retry queue (the regular liveness timeout reaps any active
532    /// session). Existing entries are kept and their `addresses` field is
533    /// refreshed so the next retry sees the latest hints.
534    UpdatePeers {
535        peers: Vec<crate::config::PeerConfig>,
536        response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
537    },
538}
539
540/// Reports what changed in response to `UpdatePeers`.
541#[derive(Debug, Clone, Default, PartialEq, Eq)]
542pub(crate) struct UpdatePeersOutcome {
543    pub(crate) added: usize,
544    pub(crate) removed: usize,
545    pub(crate) updated: usize,
546    pub(crate) unchanged: usize,
547}
548
549/// Endpoint data events emitted by the node session receive path.
550#[derive(Debug)]
551pub(crate) enum NodeEndpointEvent {
552    Data {
553        source_node_addr: NodeAddr,
554        source_npub: Option<String>,
555        payload: Vec<u8>,
556        queued_at: Option<std::time::Instant>,
557    },
558}
559
560/// Authenticated peer state exposed to embedded endpoint callers.
561#[derive(Debug, Clone, PartialEq, Eq)]
562pub(crate) struct NodeEndpointPeer {
563    pub(crate) npub: String,
564    pub(crate) connected: bool,
565    pub(crate) transport_addr: Option<String>,
566    pub(crate) transport_type: Option<String>,
567    pub(crate) link_id: u64,
568    pub(crate) srtt_ms: Option<u64>,
569    pub(crate) packets_sent: u64,
570    pub(crate) packets_recv: u64,
571    pub(crate) bytes_sent: u64,
572    pub(crate) bytes_recv: u64,
573    pub(crate) direct_probe_pending: bool,
574    pub(crate) direct_probe_after_ms: Option<u64>,
575}
576
577/// Live Nostr relay state exposed to embedded endpoint callers.
578#[derive(Debug, Clone, PartialEq, Eq)]
579pub(crate) struct NodeEndpointRelayStatus {
580    pub(crate) url: String,
581    pub(crate) status: String,
582}
583
584/// Node operational state.
585#[derive(Clone, Copy, Debug, PartialEq, Eq)]
586pub enum NodeState {
587    /// Created but not started.
588    Created,
589    /// Starting up (initializing transports).
590    Starting,
591    /// Fully operational.
592    Running,
593    /// Shutting down.
594    Stopping,
595    /// Stopped.
596    Stopped,
597}
598
599impl NodeState {
600    /// Check if node is operational.
601    pub fn is_operational(&self) -> bool {
602        matches!(self, NodeState::Running)
603    }
604
605    /// Check if node can be started.
606    pub fn can_start(&self) -> bool {
607        matches!(self, NodeState::Created | NodeState::Stopped)
608    }
609
610    /// Check if node can be stopped.
611    pub fn can_stop(&self) -> bool {
612        matches!(self, NodeState::Running)
613    }
614}
615
616impl fmt::Display for NodeState {
617    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
618        let s = match self {
619            NodeState::Created => "created",
620            NodeState::Starting => "starting",
621            NodeState::Running => "running",
622            NodeState::Stopping => "stopping",
623            NodeState::Stopped => "stopped",
624        };
625        write!(f, "{}", s)
626    }
627}
628
629/// Recent request tracking for dedup and reverse-path forwarding.
630///
631/// When a LookupRequest is forwarded through a node, the node stores the
632/// request_id and which peer sent it. When the corresponding LookupResponse
633/// arrives, it's forwarded back to that peer (reverse-path forwarding).
634/// The `response_forwarded` flag prevents response routing loops.
635#[derive(Clone, Debug)]
636pub(crate) struct RecentRequest {
637    /// The peer who sent this request to us.
638    pub(crate) from_peer: NodeAddr,
639    /// When we received this request (Unix milliseconds).
640    pub(crate) timestamp_ms: u64,
641    /// Whether we've already forwarded a response for this request.
642    /// Prevents response routing loops when convergent request paths
643    /// create bidirectional entries in recent_requests.
644    pub(crate) response_forwarded: bool,
645}
646
647impl RecentRequest {
648    pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
649        Self {
650            from_peer,
651            timestamp_ms,
652            response_forwarded: false,
653        }
654    }
655
656    /// Check if this entry has expired (older than expiry_ms).
657    pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
658        current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
659    }
660}
661
662/// Key for addr_to_link reverse lookup.
663type AddrKey = (TransportId, TransportAddr);
664
665/// Per-transport kernel drop tracking for congestion detection.
666///
667/// Sampled every tick (1s). The `dropping` flag indicates whether new
668/// kernel drops were observed since the previous sample.
669#[derive(Debug, Default)]
670struct TransportDropState {
671    /// Previous `recv_drops` sample (cumulative counter).
672    prev_drops: u64,
673    /// True if drops increased since the last sample.
674    dropping: bool,
675}
676
677/// State for a link waiting for transport-level connection establishment.
678///
679/// For connection-oriented transports (TCP, Tor), the transport connect runs
680/// asynchronously. This struct holds the data needed to complete the handshake
681/// once the connection is ready.
682struct PendingConnect {
683    /// The link that was created for this connection.
684    link_id: LinkId,
685    /// Which transport is being used.
686    transport_id: TransportId,
687    /// The remote address being connected to.
688    remote_addr: TransportAddr,
689    /// The peer identity (for handshake initiation).
690    peer_identity: PeerIdentity,
691}
692
693/// A running FIPS node instance.
694///
695/// This is the top-level container holding all node state.
696///
697/// ## Peer Lifecycle
698///
699/// Peers go through two phases:
700/// 1. **Connection phase** (`connections`): Handshake in progress, indexed by LinkId
701/// 2. **Active phase** (`peers`): Authenticated, indexed by NodeAddr
702///
703/// The `addr_to_link` map enables dispatching incoming packets to the right
704/// connection before authentication completes.
705// Discovery lookup constants moved to config: node.discovery.attempt_timeouts_secs, node.discovery.ttl
706pub struct Node {
707    // === Identity ===
708    /// This node's cryptographic identity.
709    identity: Identity,
710
711    /// Random epoch generated at startup for peer restart detection.
712    /// Exchanged inside Noise handshake messages so peers can detect restarts.
713    startup_epoch: [u8; 8],
714
715    /// Instant when the node was created, for uptime reporting.
716    started_at: std::time::Instant,
717
718    // === Configuration ===
719    /// Loaded configuration.
720    config: Config,
721
722    // === State ===
723    /// Node operational state.
724    state: NodeState,
725
726    /// Whether this is a leaf-only node.
727    is_leaf_only: bool,
728
729    // === Spanning Tree ===
730    /// Local spanning tree state.
731    tree_state: TreeState,
732
733    // === Bloom Filter ===
734    /// Local Bloom filter state.
735    bloom_state: BloomState,
736
737    // === Routing ===
738    /// Address -> coordinates cache (from session setup and discovery).
739    coord_cache: CoordCache,
740    /// Locally learned reverse-path next-hop hints.
741    learned_routes: LearnedRouteTable,
742    /// Destinations whose direct first-hop path is temporarily suspect because
743    /// session-layer MMP observed sustained loss while using that direct path.
744    session_direct_degraded_until_ms: HashMap<NodeAddr, u64>,
745    /// Recent discovery requests (dedup + reverse-path forwarding).
746    /// Maps request_id → RecentRequest.
747    recent_requests: HashMap<u64, RecentRequest>,
748    /// Per-destination path MTU lookup, keyed by FipsAddress (mirrors
749    /// `coord_cache.entries[*].path_mtu`). Sync read-only access from
750    /// the TUN reader/writer threads at TCP MSS clamp time so the
751    /// SYN/SYN-ACK clamp can use the smaller of the local-egress floor
752    /// and the learned per-destination path MTU.
753    path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
754
755    // === Transports & Links ===
756    /// Active transports (owned by Node).
757    transports: HashMap<TransportId, TransportHandle>,
758    /// Per-transport kernel drop tracking for congestion detection.
759    transport_drops: HashMap<TransportId, TransportDropState>,
760    /// Active links.
761    links: HashMap<LinkId, Link>,
762    /// Reverse lookup: (transport_id, remote_addr) -> link_id.
763    addr_to_link: HashMap<AddrKey, LinkId>,
764
765    // === Packet Channel ===
766    /// Packet sender for transports.
767    packet_tx: Option<PacketTx>,
768    /// Packet receiver (for event loop).
769    packet_rx: Option<PacketRx>,
770
771    // === Connections (Handshake Phase) ===
772    /// Pending connections (handshake in progress).
773    /// Indexed by LinkId since we don't know the peer's identity yet.
774    connections: HashMap<LinkId, PeerConnection>,
775
776    // === Peers (Active Phase) ===
777    /// Authenticated peers.
778    /// Indexed by NodeAddr (verified identity).
779    peers: HashMap<NodeAddr, ActivePeer>,
780
781    // === End-to-End Sessions ===
782    /// Session table for end-to-end encrypted sessions.
783    /// Keyed by remote NodeAddr.
784    sessions: HashMap<NodeAddr, SessionEntry>,
785
786    // === Identity Cache ===
787    /// Maps FipsAddress prefix bytes (bytes 1-15) to cached peer identity data.
788    /// Enables reverse lookup from IPv6 destination to session/routing identity.
789    identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
790
791    // === Pending TUN Packets ===
792    /// Packets queued while waiting for session establishment.
793    /// Keyed by destination NodeAddr, bounded per-dest and total.
794    pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
795    /// Endpoint data payloads queued while waiting for session establishment.
796    pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
797    // === Pending Discovery Lookups ===
798    /// Tracks in-flight discovery lookups. Maps target NodeAddr to the
799    /// initiation timestamp (Unix ms). Prevents duplicate flood queries.
800    pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
801
802    // === Resource Limits ===
803    /// Maximum connections (0 = unlimited).
804    max_connections: usize,
805    /// Maximum peers (0 = unlimited).
806    max_peers: usize,
807    /// Maximum links (0 = unlimited).
808    max_links: usize,
809
810    // === Counters ===
811    /// Next link ID to allocate.
812    next_link_id: u64,
813    /// Next transport ID to allocate.
814    next_transport_id: u32,
815
816    // === Node Statistics ===
817    /// Routing, forwarding, discovery, and error signal counters.
818    stats: stats::NodeStats,
819
820    /// Time-series history of node-level metrics (1s/1m rings).
821    stats_history: stats_history::StatsHistory,
822
823    // === TUN Interface ===
824    /// TUN device state.
825    tun_state: TunState,
826    /// TUN interface name (for cleanup).
827    tun_name: Option<String>,
828    /// TUN packet sender channel.
829    tun_tx: Option<TunTx>,
830    /// Receiver for outbound packets from the TUN reader.
831    tun_outbound_rx: Option<TunOutboundRx>,
832    /// App-owned packet sink used by embedded/no-TUN integrations.
833    external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
834    /// Endpoint data command receiver used by embedded/no-daemon integrations.
835    endpoint_priority_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
836    /// Bulk endpoint data command receiver used by embedded/no-daemon integrations.
837    endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
838    /// Endpoint data event sink used by embedded/no-daemon integrations.
839    endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
840    /// Off-task FMP-encrypt + UDP-send worker pool. `None` if not yet
841    /// spawned (set up in `start()` once transports are running).
842    /// `Some(pool)` once available; the pool internally holds
843    /// per-worker mpsc senders and round-robins jobs across them.
844    /// See `node::encrypt_worker` for the rationale and layout.
845    encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
846    /// Off-task FMP + FSP decrypt + delivery worker pool. Mirror of
847    /// `encrypt_workers` for the receive side.
848    decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
849    /// Set of sessions that have been registered with the decrypt
850    /// shard worker pool. Used by rx_loop to decide between fast-path
851    /// dispatch (worker owns the session) and legacy in-place decrypt
852    /// (worker doesn't have it yet). Per the data-plane restructure,
853    /// the worker owns its session state directly — there's no shared
854    /// `Arc<RwLock<HashMap>>` of cipher / replay state anymore, only
855    /// this set tracks **whether** the worker has been told about a
856    /// given session.
857    decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
858    /// Fallback channel: decrypt worker bounces non-fast-path packets
859    /// (anything that's not bulk EndpointData) back here for rx_loop
860    /// to handle via the legacy path. Drained by a new rx_loop arm.
861    decrypt_fallback_rx:
862        Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
863    decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
864    /// TUN reader thread handle.
865    tun_reader_handle: Option<JoinHandle<()>>,
866    /// TUN writer thread handle.
867    tun_writer_handle: Option<JoinHandle<()>>,
868    /// Shutdown pipe: writing to this fd unblocks the TUN reader thread on macOS.
869    /// On Linux, deleting the interface via netlink serves the same purpose.
870    #[cfg(target_os = "macos")]
871    tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
872
873    // === DNS Responder ===
874    /// Receiver for resolved identities from the DNS responder.
875    dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
876    /// DNS responder task handle.
877    dns_task: Option<tokio::task::JoinHandle<()>>,
878
879    // === Index-Based Session Dispatch ===
880    /// Allocator for session indices.
881    index_allocator: IndexAllocator,
882    /// O(1) lookup: (transport_id, our_index) → NodeAddr.
883    /// This maps our session index to the peer that uses it.
884    peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
885    /// Pending outbound handshakes by our sender_idx.
886    /// Tracks which LinkId corresponds to which session index.
887    pending_outbound: HashMap<(TransportId, u32), LinkId>,
888
889    // === Rate Limiting ===
890    /// Rate limiter for msg1 processing (DoS protection).
891    msg1_rate_limiter: HandshakeRateLimiter,
892    /// Rate limiter for ICMP Packet Too Big messages.
893    icmp_rate_limiter: IcmpRateLimiter,
894    /// Rate limiter for routing error signals (CoordsRequired / PathBroken).
895    routing_error_rate_limiter: RoutingErrorRateLimiter,
896    /// Rate limiter for source-side CoordsRequired/PathBroken responses.
897    coords_response_rate_limiter: RoutingErrorRateLimiter,
898    /// Backoff for failed discovery lookups (originator-side).
899    discovery_backoff: DiscoveryBackoff,
900    /// Rate limiter for forwarded discovery requests (transit-side).
901    discovery_forward_limiter: DiscoveryForwardRateLimiter,
902
903    // === Pending Transport Connects ===
904    /// Links waiting for transport-level connection establishment before
905    /// sending handshake msg1. For connection-oriented transports (TCP, Tor),
906    /// the transport connect runs in the background; the tick handler polls
907    /// connection_state() and initiates the handshake when connected.
908    pending_connects: Vec<PendingConnect>,
909
910    // === Connection Retry ===
911    /// Retry state for peers whose outbound connections have failed.
912    /// Keyed by NodeAddr. Entries are created when a handshake times out
913    /// or fails, and removed on successful promotion or when max retries
914    /// are exhausted.
915    retry_pending: HashMap<NodeAddr, retry::RetryState>,
916
917    /// Optional Nostr/STUN overlay discovery coordinator for `udp:nat` peers.
918    nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
919    /// mDNS / DNS-SD responder + browser for local-link peer discovery.
920    /// Identity is unverified at this layer — the Noise XX handshake
921    /// initiated against an mDNS-observed endpoint is what proves the
922    /// peer holds the matching private key.
923    lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
924    /// Same-host JSON registry under `~/.fips/instances`. Records are
925    /// loopback routing hints only; peer identity is still verified by the
926    /// Noise handshake.
927    local_instance_registry: Option<crate::discovery::local::LocalInstanceRegistry>,
928    local_instance_started_at_ms: Option<u64>,
929    last_local_instance_publish_ms: Option<u64>,
930    last_local_instance_scan_ms: Option<u64>,
931    /// Wall-clock ms when Nostr discovery successfully started, used to
932    /// schedule the one-shot startup advert sweep after a settle delay.
933    /// `None` until discovery comes up; remains `None` if discovery is
934    /// disabled or failed to start.
935    nostr_discovery_started_at_ms: Option<u64>,
936    /// Whether the one-shot startup advert sweep has run. Set to true
937    /// after the first sweep fires (under `policy: open`); thereafter
938    /// only the per-tick `queue_open_discovery_retries` continues.
939    startup_open_discovery_sweep_done: bool,
940    /// Per-peer UDP transports adopted from NAT traversal handoff.
941    bootstrap_transports: HashSet<TransportId>,
942    /// Originating peer npub (bech32) for each adopted bootstrap
943    /// transport, captured at `adopt_established_traversal` time.
944    /// Populated alongside `bootstrap_transports`; cleared in
945    /// `cleanup_bootstrap_transport_if_unused`. Used by the rx loop to
946    /// route fatal-protocol-mismatch observations back to the
947    /// Nostr-discovery `failure_state` for long cooldown application.
948    bootstrap_transport_npubs: HashMap<TransportId, String>,
949    /// Peers that should not be used as reply-learned fallback transit for
950    /// other destinations. Direct lookups to the peer are still permitted.
951    discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
952
953    // === Periodic Parent Re-evaluation ===
954    /// Timestamp of last periodic parent re-evaluation (for pacing).
955    last_parent_reeval: Option<crate::time::Instant>,
956
957    // === Congestion Logging ===
958    /// Timestamp of last congestion detection log (rate-limited to 5s).
959    last_congestion_log: Option<std::time::Instant>,
960
961    // === Mesh Size Estimate ===
962    /// Cached estimated mesh size (computed once per tick from bloom filters).
963    estimated_mesh_size: Option<u64>,
964    /// Timestamp of last mesh size log emission.
965    last_mesh_size_log: Option<std::time::Instant>,
966
967    // === Bloom Self-Plausibility ===
968    /// Rate-limit state for the self-plausibility WARN. Fires at most
969    /// once per 60s globally when our own outgoing FilterAnnounce has
970    /// an FPR above `node.bloom.max_inbound_fpr`, signalling either
971    /// aggregation drift or an ingress bypass.
972    last_self_warn: Option<std::time::Instant>,
973
974    // === Local Outbound Liveness ===
975    /// Set per peer when a `transport.send` returned a local-side io error
976    /// (`NetworkUnreachable` / `HostUnreachable` / `AddrNotAvailable`),
977    /// cleared on the next successful send to that peer. Used by
978    /// `check_link_heartbeats` to compress only that peer's dead-timeout to
979    /// `fast_link_dead_timeout_secs` while its outbound is observed broken.
980    local_send_failure_at_by_peer: HashMap<NodeAddr, std::time::Instant>,
981    /// Set when the rx loop could not complete its 1s maintenance work
982    /// inside the watchdog timeout. Link-dead detection may be valid during
983    /// overload, but traversal cooldown should not punish a path just because
984    /// our own scheduler/worker queue was late.
985    last_rx_loop_maintenance_timeout_at: Option<std::time::Instant>,
986
987    // === Display Names ===
988    /// Human-readable names for configured peers (alias or short npub).
989    /// Populated at startup from peer config.
990    peer_aliases: HashMap<NodeAddr, String>,
991    /// Scheduler weight for explicitly configured peers. Built when config
992    /// changes so the packet hot path only does a NodeAddr hash lookup.
993    configured_peer_send_weights: HashMap<NodeAddr, u8>,
994
995    /// Reloadable peer ACL state from standard allow/deny files.
996    peer_acl: acl::PeerAclReloader,
997
998    // === Host Map ===
999    /// Static hostname → npub mapping for DNS resolution.
1000    /// Built at construction from peer aliases and /etc/fips/hosts.
1001    host_map: Arc<HostMap>,
1002}
1003
1004impl Node {
1005    /// Create a new node from configuration.
1006    pub fn new(config: Config) -> Result<Self, NodeError> {
1007        config.validate()?;
1008        let identity = config.create_identity()?;
1009        let node_addr = *identity.node_addr();
1010        let is_leaf_only = config.is_leaf_only();
1011
1012        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
1013        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
1014
1015        let mut startup_epoch = [0u8; 8];
1016        rand::rng().fill_bytes(&mut startup_epoch);
1017
1018        let mut bloom_state = if is_leaf_only {
1019            BloomState::leaf_only(node_addr)
1020        } else {
1021            BloomState::new(node_addr)
1022        };
1023        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
1024
1025        let tun_state = if config.tun.enabled {
1026            TunState::Configured
1027        } else {
1028            TunState::Disabled
1029        };
1030
1031        // Initialize tree state with signed self-declaration
1032        let mut tree_state = TreeState::new(node_addr);
1033        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
1034        tree_state.set_hold_down(config.node.tree.hold_down_secs);
1035        tree_state.set_flap_dampening(
1036            config.node.tree.flap_threshold,
1037            config.node.tree.flap_window_secs,
1038            config.node.tree.flap_dampening_secs,
1039        );
1040        tree_state
1041            .sign_declaration(&identity)
1042            .expect("signing own declaration should never fail");
1043
1044        let coord_cache = CoordCache::new(
1045            config.node.cache.coord_size,
1046            config.node.cache.coord_ttl_secs * 1000,
1047        );
1048        let rl = &config.node.rate_limit;
1049        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
1050            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
1051            config.node.limits.max_pending_inbound,
1052        );
1053
1054        let max_connections = config.node.limits.max_connections;
1055        let max_peers = config.node.limits.max_peers;
1056        let max_links = config.node.limits.max_links;
1057        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1058        let backoff_base_secs = config.node.discovery.backoff_base_secs;
1059        let backoff_max_secs = config.node.discovery.backoff_max_secs;
1060        let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
1061
1062        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1063        let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1064
1065        Ok(Self {
1066            identity,
1067            startup_epoch,
1068            started_at: std::time::Instant::now(),
1069            config,
1070            state: NodeState::Created,
1071            is_leaf_only,
1072            tree_state,
1073            bloom_state,
1074            coord_cache,
1075            learned_routes: LearnedRouteTable::default(),
1076            session_direct_degraded_until_ms: HashMap::new(),
1077            recent_requests: HashMap::new(),
1078            transports: HashMap::new(),
1079            transport_drops: HashMap::new(),
1080            links: HashMap::new(),
1081            addr_to_link: HashMap::new(),
1082            packet_tx: None,
1083            packet_rx: None,
1084            connections: HashMap::new(),
1085            peers: HashMap::new(),
1086            sessions: HashMap::new(),
1087            identity_cache: HashMap::new(),
1088            pending_tun_packets: HashMap::new(),
1089            pending_endpoint_data: HashMap::new(),
1090            pending_lookups: HashMap::new(),
1091            max_connections,
1092            max_peers,
1093            max_links,
1094            next_link_id: 1,
1095            next_transport_id: 1,
1096            stats: stats::NodeStats::new(),
1097            stats_history: stats_history::StatsHistory::new(),
1098            tun_state,
1099            tun_name: None,
1100            tun_tx: None,
1101            tun_outbound_rx: None,
1102            external_packet_tx: None,
1103            endpoint_priority_command_rx: None,
1104            endpoint_command_rx: None,
1105            endpoint_event_tx: None,
1106            encrypt_workers: None,
1107            decrypt_workers: None,
1108            decrypt_registered_sessions: std::collections::HashSet::new(),
1109            decrypt_fallback_tx,
1110            decrypt_fallback_rx,
1111            tun_reader_handle: None,
1112            tun_writer_handle: None,
1113            #[cfg(target_os = "macos")]
1114            tun_shutdown_fd: None,
1115            dns_identity_rx: None,
1116            dns_task: None,
1117            index_allocator: IndexAllocator::new(),
1118            peers_by_index: HashMap::new(),
1119            pending_outbound: HashMap::new(),
1120            msg1_rate_limiter,
1121            icmp_rate_limiter: IcmpRateLimiter::new(),
1122            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1123            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1124                std::time::Duration::from_millis(coords_response_interval_ms),
1125            ),
1126            discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
1127            discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
1128                std::time::Duration::from_secs(forward_min_interval_secs),
1129            ),
1130            pending_connects: Vec::new(),
1131            retry_pending: HashMap::new(),
1132            nostr_discovery: None,
1133            nostr_discovery_started_at_ms: None,
1134            lan_discovery: None,
1135            local_instance_registry: None,
1136            local_instance_started_at_ms: None,
1137            last_local_instance_publish_ms: None,
1138            last_local_instance_scan_ms: None,
1139            startup_open_discovery_sweep_done: false,
1140            bootstrap_transports: HashSet::new(),
1141            bootstrap_transport_npubs: HashMap::new(),
1142            discovery_fallback_transit_blocked_peers: HashSet::new(),
1143            last_parent_reeval: None,
1144            last_congestion_log: None,
1145            estimated_mesh_size: None,
1146            last_mesh_size_log: None,
1147            last_self_warn: None,
1148            local_send_failure_at_by_peer: HashMap::new(),
1149            last_rx_loop_maintenance_timeout_at: None,
1150            peer_aliases: HashMap::new(),
1151            configured_peer_send_weights,
1152            peer_acl,
1153            host_map,
1154            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1155        })
1156    }
1157
1158    /// Create a node with a specific identity.
1159    ///
1160    /// This constructor validates cross-field config invariants before
1161    /// constructing the node, same as [`Node::new`].
1162    pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
1163        config.validate()?;
1164        let node_addr = *identity.node_addr();
1165
1166        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
1167        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
1168
1169        let mut startup_epoch = [0u8; 8];
1170        rand::rng().fill_bytes(&mut startup_epoch);
1171
1172        let tun_state = if config.tun.enabled {
1173            TunState::Configured
1174        } else {
1175            TunState::Disabled
1176        };
1177
1178        // Initialize tree state with signed self-declaration
1179        let mut tree_state = TreeState::new(node_addr);
1180        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
1181        tree_state.set_hold_down(config.node.tree.hold_down_secs);
1182        tree_state.set_flap_dampening(
1183            config.node.tree.flap_threshold,
1184            config.node.tree.flap_window_secs,
1185            config.node.tree.flap_dampening_secs,
1186        );
1187        tree_state
1188            .sign_declaration(&identity)
1189            .expect("signing own declaration should never fail");
1190
1191        let mut bloom_state = BloomState::new(node_addr);
1192        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
1193
1194        let coord_cache = CoordCache::new(
1195            config.node.cache.coord_size,
1196            config.node.cache.coord_ttl_secs * 1000,
1197        );
1198        let rl = &config.node.rate_limit;
1199        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
1200            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
1201            config.node.limits.max_pending_inbound,
1202        );
1203
1204        let max_connections = config.node.limits.max_connections;
1205        let max_peers = config.node.limits.max_peers;
1206        let max_links = config.node.limits.max_links;
1207        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
1208
1209        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
1210        let configured_peer_send_weights = Self::configured_peer_send_weights(&config);
1211
1212        Ok(Self {
1213            identity,
1214            startup_epoch,
1215            started_at: std::time::Instant::now(),
1216            config,
1217            state: NodeState::Created,
1218            is_leaf_only: false,
1219            tree_state,
1220            bloom_state,
1221            coord_cache,
1222            learned_routes: LearnedRouteTable::default(),
1223            session_direct_degraded_until_ms: HashMap::new(),
1224            recent_requests: HashMap::new(),
1225            transports: HashMap::new(),
1226            transport_drops: HashMap::new(),
1227            links: HashMap::new(),
1228            addr_to_link: HashMap::new(),
1229            packet_tx: None,
1230            packet_rx: None,
1231            connections: HashMap::new(),
1232            peers: HashMap::new(),
1233            sessions: HashMap::new(),
1234            identity_cache: HashMap::new(),
1235            pending_tun_packets: HashMap::new(),
1236            pending_endpoint_data: HashMap::new(),
1237            pending_lookups: HashMap::new(),
1238            max_connections,
1239            max_peers,
1240            max_links,
1241            next_link_id: 1,
1242            next_transport_id: 1,
1243            stats: stats::NodeStats::new(),
1244            stats_history: stats_history::StatsHistory::new(),
1245            tun_state,
1246            tun_name: None,
1247            tun_tx: None,
1248            tun_outbound_rx: None,
1249            external_packet_tx: None,
1250            endpoint_priority_command_rx: None,
1251            endpoint_command_rx: None,
1252            endpoint_event_tx: None,
1253            encrypt_workers: None,
1254            decrypt_workers: None,
1255            decrypt_registered_sessions: std::collections::HashSet::new(),
1256            decrypt_fallback_tx,
1257            decrypt_fallback_rx,
1258            tun_reader_handle: None,
1259            tun_writer_handle: None,
1260            #[cfg(target_os = "macos")]
1261            tun_shutdown_fd: None,
1262            dns_identity_rx: None,
1263            dns_task: None,
1264            index_allocator: IndexAllocator::new(),
1265            peers_by_index: HashMap::new(),
1266            pending_outbound: HashMap::new(),
1267            msg1_rate_limiter,
1268            icmp_rate_limiter: IcmpRateLimiter::new(),
1269            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
1270            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
1271                std::time::Duration::from_millis(coords_response_interval_ms),
1272            ),
1273            discovery_backoff: DiscoveryBackoff::new(),
1274            discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
1275            pending_connects: Vec::new(),
1276            retry_pending: HashMap::new(),
1277            nostr_discovery: None,
1278            nostr_discovery_started_at_ms: None,
1279            lan_discovery: None,
1280            local_instance_registry: None,
1281            local_instance_started_at_ms: None,
1282            last_local_instance_publish_ms: None,
1283            last_local_instance_scan_ms: None,
1284            startup_open_discovery_sweep_done: false,
1285            bootstrap_transports: HashSet::new(),
1286            bootstrap_transport_npubs: HashMap::new(),
1287            discovery_fallback_transit_blocked_peers: HashSet::new(),
1288            last_parent_reeval: None,
1289            last_congestion_log: None,
1290            estimated_mesh_size: None,
1291            last_mesh_size_log: None,
1292            last_self_warn: None,
1293            local_send_failure_at_by_peer: HashMap::new(),
1294            last_rx_loop_maintenance_timeout_at: None,
1295            peer_aliases: HashMap::new(),
1296            configured_peer_send_weights,
1297            peer_acl,
1298            host_map,
1299            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1300        })
1301    }
1302
1303    /// Create a leaf-only node (simplified state).
1304    pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1305        let mut node = Self::new(config)?;
1306        node.is_leaf_only = true;
1307        node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1308        Ok(node)
1309    }
1310
1311    fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1312        let base_host_map = HostMap::from_peer_configs(config.peers());
1313        if !config.node.system_files_enabled {
1314            return (
1315                Arc::new(base_host_map.clone()),
1316                acl::PeerAclReloader::memory_only(base_host_map),
1317            );
1318        }
1319
1320        let mut host_map = base_host_map.clone();
1321        let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1322        let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1323            crate::upper::hosts::DEFAULT_HOSTS_PATH,
1324        ));
1325        host_map.merge(hosts_file);
1326        let peer_acl = acl::PeerAclReloader::with_alias_sources(
1327            std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1328            std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1329            base_host_map,
1330            hosts_path,
1331        );
1332        (Arc::new(host_map), peer_acl)
1333    }
1334
1335    fn configured_peer_send_weights(config: &Config) -> HashMap<NodeAddr, u8> {
1336        config
1337            .peers()
1338            .iter()
1339            .filter_map(|peer| {
1340                PeerIdentity::from_npub(&peer.npub).ok().map(|identity| {
1341                    (
1342                        *identity.node_addr(),
1343                        encrypt_worker::EXPLICIT_PEER_SEND_WEIGHT,
1344                    )
1345                })
1346            })
1347            .collect()
1348    }
1349
1350    #[cfg(unix)]
1351    fn send_weight_for_peer(&self, peer_addr: &NodeAddr) -> u8 {
1352        self.configured_peer_send_weights
1353            .get(peer_addr)
1354            .copied()
1355            .unwrap_or(encrypt_worker::DEFAULT_SEND_WEIGHT)
1356    }
1357
1358    /// Create transport instances from configuration.
1359    ///
1360    /// Returns a vector of TransportHandles for all configured transports.
1361    async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1362        let mut transports = Vec::new();
1363
1364        // Collect UDP configs with optional names to avoid borrow conflicts
1365        let udp_instances: Vec<_> = self
1366            .config
1367            .transports
1368            .udp
1369            .iter()
1370            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1371            .collect();
1372
1373        // Create UDP transport instances
1374        for (name, udp_config) in udp_instances {
1375            let transport_id = self.allocate_transport_id();
1376            let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1377            transports.push(TransportHandle::Udp(udp));
1378        }
1379
1380        #[cfg(feature = "sim-transport")]
1381        {
1382            let sim_instances: Vec<_> = self
1383                .config
1384                .transports
1385                .sim
1386                .iter()
1387                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1388                .collect();
1389
1390            for (name, sim_config) in sim_instances {
1391                let transport_id = self.allocate_transport_id();
1392                let sim = crate::transport::sim::SimTransport::new(
1393                    transport_id,
1394                    name,
1395                    sim_config,
1396                    packet_tx.clone(),
1397                );
1398                transports.push(TransportHandle::Sim(sim));
1399            }
1400        }
1401
1402        // Create Ethernet transport instances where raw-socket support exists.
1403        #[cfg(any(target_os = "linux", target_os = "macos"))]
1404        {
1405            let eth_instances: Vec<_> = self
1406                .config
1407                .transports
1408                .ethernet
1409                .iter()
1410                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1411                .collect();
1412            let xonly = self.identity.pubkey();
1413            for (name, eth_config) in eth_instances {
1414                let mut eth_config = eth_config;
1415                if eth_config.discovery_scope.is_none() {
1416                    eth_config.discovery_scope = self.lan_discovery_scope();
1417                }
1418                let transport_id = self.allocate_transport_id();
1419                let mut eth =
1420                    EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1421                eth.set_local_pubkey(xonly);
1422                transports.push(TransportHandle::Ethernet(eth));
1423            }
1424        }
1425
1426        // Create TCP transport instances
1427        let tcp_instances: Vec<_> = self
1428            .config
1429            .transports
1430            .tcp
1431            .iter()
1432            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1433            .collect();
1434
1435        for (name, tcp_config) in tcp_instances {
1436            let transport_id = self.allocate_transport_id();
1437            let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1438            transports.push(TransportHandle::Tcp(tcp));
1439        }
1440
1441        // Create Tor transport instances
1442        let tor_instances: Vec<_> = self
1443            .config
1444            .transports
1445            .tor
1446            .iter()
1447            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1448            .collect();
1449
1450        for (name, tor_config) in tor_instances {
1451            let transport_id = self.allocate_transport_id();
1452            let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1453            transports.push(TransportHandle::Tor(tor));
1454        }
1455
1456        let webrtc_instances: Vec<_> = self
1457            .config
1458            .transports
1459            .webrtc
1460            .iter()
1461            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1462            .collect();
1463
1464        #[cfg(feature = "webrtc-transport")]
1465        {
1466            for (name, webrtc_config) in webrtc_instances {
1467                let transport_id = self.allocate_transport_id();
1468                match WebRtcTransport::new(
1469                    transport_id,
1470                    name,
1471                    webrtc_config,
1472                    packet_tx.clone(),
1473                    &self.identity,
1474                    &self.config.node.discovery.nostr,
1475                ) {
1476                    Ok(webrtc) => transports.push(TransportHandle::WebRtc(Box::new(webrtc))),
1477                    Err(err) => {
1478                        warn!(
1479                            transport_id = %transport_id,
1480                            error = %err,
1481                            "failed to initialize WebRTC transport"
1482                        );
1483                    }
1484                }
1485            }
1486        }
1487        #[cfg(not(feature = "webrtc-transport"))]
1488        if !webrtc_instances.is_empty() {
1489            warn!("WebRTC transport configured but this build lacks WebRTC transport support");
1490        }
1491
1492        // Create BLE transport instances
1493        #[cfg(bluer_available)]
1494        {
1495            let ble_instances: Vec<_> = self
1496                .config
1497                .transports
1498                .ble
1499                .iter()
1500                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1501                .collect();
1502
1503            #[cfg(all(bluer_available, not(test)))]
1504            for (name, ble_config) in ble_instances {
1505                let transport_id = self.allocate_transport_id();
1506                let adapter = ble_config.adapter().to_string();
1507                let mtu = ble_config.mtu();
1508                match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1509                    Ok(io) => {
1510                        let mut ble = crate::transport::ble::BleTransport::new(
1511                            transport_id,
1512                            name,
1513                            ble_config,
1514                            io,
1515                            packet_tx.clone(),
1516                        );
1517                        ble.set_local_pubkey(self.identity.pubkey().serialize());
1518                        transports.push(TransportHandle::Ble(ble));
1519                    }
1520                    Err(e) => {
1521                        tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1522                    }
1523                }
1524            }
1525
1526            #[cfg(any(not(bluer_available), test))]
1527            if !ble_instances.is_empty() {
1528                #[cfg(not(test))]
1529                tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1530            }
1531        }
1532
1533        transports
1534    }
1535
1536    /// Find an operational transport that matches the given transport type name.
1537    ///
1538    /// Adopted UDP bootstrap transports are point-to-point sockets handed off
1539    /// from Nostr/STUN traversal. They must not be reused for ordinary
1540    /// `udp host:port` dials discovered through static config, mDNS, or overlay
1541    /// adverts: on macOS a `send_to` through the wrong adopted socket can fail
1542    /// with `EINVAL`, and even on platforms that allow it the packet would use
1543    /// the wrong 5-tuple/NAT mapping. Prefer configured transports and make the
1544    /// choice deterministic by lowest transport id instead of HashMap order.
1545    fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1546        self.transports
1547            .iter()
1548            .filter(|(id, handle)| {
1549                handle.transport_type().name == transport_type
1550                    && handle.is_operational()
1551                    && !self.bootstrap_transports.contains(id)
1552            })
1553            .min_by_key(|(id, _)| id.as_u32())
1554            .map(|(id, _)| *id)
1555    }
1556
1557    /// Resolve an Ethernet peer address ("interface/mac") to a transport ID
1558    /// and binary TransportAddr.
1559    ///
1560    /// Finds the Ethernet transport instance bound to the named interface
1561    /// and parses the MAC portion into a 6-byte TransportAddr.
1562    #[allow(unused_variables)]
1563    fn resolve_ethernet_addr(
1564        &self,
1565        addr_str: &str,
1566    ) -> Result<(TransportId, TransportAddr), NodeError> {
1567        #[cfg(any(target_os = "linux", target_os = "macos"))]
1568        {
1569            let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1570                NodeError::NoTransportForType(format!(
1571                    "invalid Ethernet address format '{}': expected 'interface/mac'",
1572                    addr_str
1573                ))
1574            })?;
1575
1576            // Find the Ethernet transport bound to this interface
1577            let transport_id = self
1578                .transports
1579                .iter()
1580                .find(|(_, handle)| {
1581                    handle.transport_type().name == "ethernet"
1582                        && handle.is_operational()
1583                        && handle.interface_name() == Some(iface)
1584                })
1585                .map(|(id, _)| *id)
1586                .ok_or_else(|| {
1587                    NodeError::NoTransportForType(format!(
1588                        "no operational Ethernet transport for interface '{}'",
1589                        iface
1590                    ))
1591                })?;
1592
1593            let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1594                NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1595            })?;
1596
1597            Ok((transport_id, TransportAddr::from_bytes(&mac)))
1598        }
1599        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1600        {
1601            Err(NodeError::NoTransportForType(
1602                "Ethernet transport is not supported on this platform".to_string(),
1603            ))
1604        }
1605    }
1606
1607    /// Resolve a BLE address string (`"adapter/AA:BB:CC:DD:EE:FF"`) to a
1608    /// (TransportId, TransportAddr) pair by finding the BLE transport
1609    /// instance matching the adapter name.
1610    #[cfg(bluer_available)]
1611    fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1612        let ta = TransportAddr::from_string(addr_str);
1613        let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1614            NodeError::NoTransportForType(format!(
1615                "invalid BLE address format '{}': expected 'adapter/mac'",
1616                addr_str
1617            ))
1618        })?;
1619
1620        // Find the BLE transport for this adapter
1621        let transport_id = self
1622            .transports
1623            .iter()
1624            .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1625            .map(|(id, _)| *id)
1626            .ok_or_else(|| {
1627                NodeError::NoTransportForType(format!(
1628                    "no operational BLE transport for adapter '{}'",
1629                    adapter
1630                ))
1631            })?;
1632
1633        // Validate the address format
1634        crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1635            NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1636        })?;
1637
1638        Ok((transport_id, TransportAddr::from_string(addr_str)))
1639    }
1640
1641    // === Identity Accessors ===
1642
1643    /// Get this node's identity.
1644    pub fn identity(&self) -> &Identity {
1645        &self.identity
1646    }
1647
1648    /// Get this node's NodeAddr.
1649    pub fn node_addr(&self) -> &NodeAddr {
1650        self.identity.node_addr()
1651    }
1652
1653    /// Get this node's npub.
1654    pub fn npub(&self) -> String {
1655        self.identity.npub()
1656    }
1657
1658    /// Return a human-readable display name for a NodeAddr.
1659    ///
1660    /// Lookup order:
1661    /// 1. Host map hostname (from peer aliases + /etc/fips/hosts)
1662    /// 2. Configured peer alias or short npub (from startup map)
1663    /// 3. Active peer's short npub (e.g., inbound peer not in config)
1664    /// 4. Session endpoint's short npub (end-to-end, may not be direct peer)
1665    /// 5. Truncated NodeAddr hex (unknown address)
1666    pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1667        if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1668            return hostname.to_string();
1669        }
1670        if let Some(name) = self.peer_aliases.get(addr) {
1671            return name.clone();
1672        }
1673        if let Some(peer) = self.peers.get(addr) {
1674            return peer.identity().short_npub();
1675        }
1676        if let Some(entry) = self.sessions.get(addr) {
1677            let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1678            return PeerIdentity::from_pubkey(xonly).short_npub();
1679        }
1680        addr.short_hex()
1681    }
1682
1683    /// Tear down a `peers_by_index` entry **and** keep the shard-owned
1684    /// decrypt-worker state coherent: removes the same `cache_key`
1685    /// from the registered-sessions tracking set and tells the
1686    /// assigned shard worker to drop its `OwnedSessionState` entry.
1687    ///
1688    /// Use this instead of a bare `self.peers_by_index.remove(&key)`
1689    /// at every session-lifecycle teardown site (rekey cross-connection
1690    /// swap, peer disconnect, dispatch session-rotation) so the worker
1691    /// doesn't keep stale ciphers / replay windows around. The
1692    /// follow-up `RegisterSession` for the NEW key (if any) will then
1693    /// install the fresh state on the same shard.
1694    pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1695        // Find the peer that owns this index BEFORE removing it from
1696        // the index map, so we can decide whether the deregistration
1697        // also tears down the peer's connected UDP socket.
1698        let owning_peer = self.peers_by_index.get(&cache_key).copied();
1699        self.peers_by_index.remove(&cache_key);
1700        if self.decrypt_registered_sessions.remove(&cache_key)
1701            && let Some(workers) = self.decrypt_workers.as_ref()
1702        {
1703            workers.unregister_session(cache_key);
1704        }
1705        // Tear down the per-peer connected UDP socket *only* if no
1706        // other peers_by_index entry still resolves to this peer.
1707        // Rekey drain calls into this helper with the OLD session
1708        // index while the NEW index is already installed and points
1709        // at the same peer — there the connect()-ed 5-tuple is
1710        // still valid for the new session and we must not close it.
1711        // Peer-teardown sites (CrossConnection swap, stale-index
1712        // fall-through in encrypted.rs, disconnect handler) call
1713        // here when this is the peer's last index, so the connected
1714        // socket goes away with the peer.
1715        if let Some(peer_addr) = owning_peer {
1716            let peer_has_other_index = self
1717                .peers_by_index
1718                .values()
1719                .any(|other| *other == peer_addr);
1720            if !peer_has_other_index {
1721                self.clear_connected_udp_for_peer(&peer_addr);
1722            }
1723        }
1724    }
1725
1726    /// Ensure the current FMP receive index resolves to this peer.
1727    ///
1728    /// Rekey msg1/msg2 handlers pre-register the pending index before
1729    /// cutover, but losing that registration in a debug build used to
1730    /// panic in the cutover path. Repairing the map here is safe: the
1731    /// peer has already promoted the pending session, and the decrypt
1732    /// worker registration immediately after cutover depends on the
1733    /// same `(transport_id, our_index)` key.
1734    pub(in crate::node) fn ensure_current_session_index_registered(
1735        &mut self,
1736        node_addr: &NodeAddr,
1737        context: &'static str,
1738    ) -> bool {
1739        let Some(peer) = self.peers.get(node_addr) else {
1740            return false;
1741        };
1742        let Some(transport_id) = peer.transport_id() else {
1743            warn!(
1744                peer = %self.peer_display_name(node_addr),
1745                context,
1746                "Cannot register current session index without transport id"
1747            );
1748            return false;
1749        };
1750        let Some(our_index) = peer.our_index() else {
1751            warn!(
1752                peer = %self.peer_display_name(node_addr),
1753                context,
1754                "Cannot register current session index without local index"
1755            );
1756            return false;
1757        };
1758
1759        let cache_key = (transport_id, our_index.as_u32());
1760        match self.peers_by_index.get(&cache_key).copied() {
1761            Some(existing) if existing == *node_addr => true,
1762            Some(existing) => {
1763                warn!(
1764                    peer = %self.peer_display_name(node_addr),
1765                    previous_owner = %self.peer_display_name(&existing),
1766                    transport_id = %transport_id,
1767                    our_index = %our_index,
1768                    context,
1769                    "Repairing current session index with stale owner"
1770                );
1771                self.peers_by_index.insert(cache_key, *node_addr);
1772                true
1773            }
1774            None => {
1775                warn!(
1776                    peer = %self.peer_display_name(node_addr),
1777                    transport_id = %transport_id,
1778                    our_index = %our_index,
1779                    context,
1780                    "Repairing missing current session index"
1781                );
1782                self.peers_by_index.insert(cache_key, *node_addr);
1783                true
1784            }
1785        }
1786    }
1787
1788    // === Configuration ===
1789
1790    /// Get the configuration.
1791    pub fn config(&self) -> &Config {
1792        &self.config
1793    }
1794
1795    /// Calculate the effective IPv6 MTU that can be sent over FIPS.
1796    ///
1797    /// Delegates to `upper::icmp::effective_ipv6_mtu()` with this node's
1798    /// transport MTU. Returns the maximum IPv6 packet size (including
1799    /// IPv6 header) that can be transmitted through the FIPS mesh.
1800    pub fn effective_ipv6_mtu(&self) -> u16 {
1801        crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1802    }
1803
1804    /// Get the transport MTU governing the global TUN-boundary MSS clamp.
1805    ///
1806    /// Returns the **minimum** MTU across all operational transports, or
1807    /// 1280 (IPv6 minimum) as fallback. Used for initial TUN configuration
1808    /// where a specific egress transport isn't yet known: the resulting
1809    /// `effective_ipv6_mtu` (transport_mtu - 77) and `max_mss`
1810    /// (effective_mtu - 60) form a conservative ceiling that fits ANY
1811    /// configured-transport's egress, eliminating PMTU-D black holes that
1812    /// would otherwise occur when a flow's actual egress is smaller than
1813    /// the clamp ceiling assumed at TUN init.
1814    ///
1815    /// Returning the smallest (rather than the first-iterated, which used
1816    /// to vary across HashMap iteration order + async-startup race) makes
1817    /// the clamp deterministic across daemon restarts.
1818    ///
1819    /// See `ISSUE-2026-0011` for the empirical investigation.
1820    pub fn transport_mtu(&self) -> u16 {
1821        let min_operational = self
1822            .transports
1823            .values()
1824            .filter(|h| h.is_operational())
1825            .map(|h| h.mtu())
1826            .min();
1827        if let Some(mtu) = min_operational {
1828            return mtu;
1829        }
1830        // Fallback to config: try UDP first, then Ethernet
1831        if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1832            return cfg.mtu();
1833        }
1834        1280
1835    }
1836
1837    // === State ===
1838
1839    /// Get the node state.
1840    pub fn state(&self) -> NodeState {
1841        self.state
1842    }
1843
1844    /// Get the node uptime.
1845    pub fn uptime(&self) -> std::time::Duration {
1846        self.started_at.elapsed()
1847    }
1848
1849    /// Check if node is operational.
1850    pub fn is_running(&self) -> bool {
1851        self.state.is_operational()
1852    }
1853
1854    /// Check if this is a leaf-only node.
1855    pub fn is_leaf_only(&self) -> bool {
1856        self.is_leaf_only
1857    }
1858
1859    // === Tree State ===
1860
1861    /// Get the tree state.
1862    pub fn tree_state(&self) -> &TreeState {
1863        &self.tree_state
1864    }
1865
1866    /// Get mutable tree state.
1867    pub fn tree_state_mut(&mut self) -> &mut TreeState {
1868        &mut self.tree_state
1869    }
1870
1871    // === Bloom State ===
1872
1873    /// Get the Bloom filter state.
1874    pub fn bloom_state(&self) -> &BloomState {
1875        &self.bloom_state
1876    }
1877
1878    /// Get mutable Bloom filter state.
1879    pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1880        &mut self.bloom_state
1881    }
1882
1883    // === Mesh Size Estimate ===
1884
1885    /// Get the cached estimated mesh size.
1886    pub fn estimated_mesh_size(&self) -> Option<u64> {
1887        self.estimated_mesh_size
1888    }
1889
1890    /// Compute and cache the estimated mesh size from bloom filters.
1891    ///
1892    /// Uses the spanning tree partition: parent's filter covers nodes reachable
1893    /// upward, children's filters cover subtrees downward. The OR-union of
1894    /// those filters plus self approximates total network size without
1895    /// double-counting overlapping filters.
1896    pub(crate) fn compute_mesh_size(&mut self) {
1897        let my_addr = *self.tree_state.my_node_addr();
1898        let parent_id = *self.tree_state.my_declaration().parent_id();
1899        let is_root = self.tree_state.is_root();
1900
1901        let max_fpr = self.config.node.bloom.max_inbound_fpr;
1902        let mut child_count: u32 = 0;
1903        let mut union: Option<BloomFilter> = None;
1904
1905        let add_to_union = |union: &mut Option<BloomFilter>, filter: &BloomFilter| match union {
1906            None => *union = Some(filter.clone()),
1907            Some(existing) => {
1908                // Size-class mismatch is skipped rather than fatal.
1909                let _ = existing.merge(filter);
1910            }
1911        };
1912
1913        // Parent's filter: nodes reachable upward through the tree.
1914        if !is_root
1915            && let Some(parent) = self.peers.get(&parent_id)
1916            && let Some(filter) = parent.inbound_filter()
1917        {
1918            add_to_union(&mut union, filter);
1919        }
1920
1921        // Children's filters: each child's subtree is ideally disjoint; OR is
1922        // idempotent when filters overlap.
1923        for (peer_addr, peer) in &self.peers {
1924            if peer_addr == &parent_id {
1925                continue;
1926            }
1927            if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1928                && *decl.parent_id() == my_addr
1929            {
1930                child_count += 1;
1931                if let Some(filter) = peer.inbound_filter() {
1932                    add_to_union(&mut union, filter);
1933                }
1934            }
1935        }
1936
1937        let Some(mut union) = union else {
1938            self.estimated_mesh_size = None;
1939            return;
1940        };
1941        union.insert(&my_addr);
1942
1943        // If the union is saturated or above the FPR cap, refuse to estimate
1944        // rather than publish a biased aggregate.
1945        let Some(union_estimate) = union.estimated_count(max_fpr) else {
1946            self.estimated_mesh_size = None;
1947            return;
1948        };
1949
1950        let size = union_estimate.round() as u64;
1951        self.estimated_mesh_size = Some(size);
1952
1953        // Periodic logging (reuse MMP default interval: 30s)
1954        let now = std::time::Instant::now();
1955        let should_log = match self.last_mesh_size_log {
1956            None => true,
1957            Some(last) => {
1958                now.duration_since(last)
1959                    >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1960            }
1961        };
1962        if should_log {
1963            tracing::debug!(
1964                estimated_mesh_size = size,
1965                peers = self.peers.len(),
1966                children = child_count,
1967                "Mesh size estimate"
1968            );
1969            self.last_mesh_size_log = Some(now);
1970        }
1971    }
1972
1973    // === Coord Cache ===
1974
1975    /// Get the coordinate cache.
1976    pub fn coord_cache(&self) -> &CoordCache {
1977        &self.coord_cache
1978    }
1979
1980    /// Get mutable coordinate cache.
1981    pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1982        &mut self.coord_cache
1983    }
1984
1985    // === Node Statistics ===
1986
1987    /// Get the node statistics.
1988    pub fn stats(&self) -> &stats::NodeStats {
1989        &self.stats
1990    }
1991
1992    /// Get mutable node statistics.
1993    pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1994        &mut self.stats
1995    }
1996
1997    /// Get the stats history collector.
1998    pub fn stats_history(&self) -> &stats_history::StatsHistory {
1999        &self.stats_history
2000    }
2001
2002    /// Sample the current node state into the stats history ring.
2003    /// Called once per tick from the RX loop.
2004    pub(crate) fn record_stats_history(&mut self) {
2005        let fwd = &self.stats.forwarding;
2006        let peers_with_mmp: Vec<f64> = self
2007            .peers
2008            .values()
2009            .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
2010            .collect();
2011        let loss_rate = if peers_with_mmp.is_empty() {
2012            0.0
2013        } else {
2014            peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
2015        };
2016
2017        let snap = stats_history::Snapshot {
2018            mesh_size: self.estimated_mesh_size,
2019            tree_depth: self.tree_state.my_coords().depth() as u32,
2020            peer_count: self.peers.len() as u64,
2021            parent_switches_total: self.stats.tree.parent_switches,
2022            bytes_in_total: fwd.received_bytes,
2023            bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
2024            packets_in_total: fwd.received_packets,
2025            packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
2026            loss_rate,
2027            active_sessions: self.sessions.len() as u64,
2028        };
2029
2030        let now = std::time::Instant::now();
2031        let peer_snaps: Vec<stats_history::PeerSnapshot> = self
2032            .peers
2033            .values()
2034            .map(|p| {
2035                let stats = p.link_stats();
2036                let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
2037                    Some(m) => (
2038                        m.metrics.srtt_ms(),
2039                        Some(m.metrics.loss_rate()),
2040                        m.receiver.ecn_ce_count() as u64,
2041                    ),
2042                    None => (None, None, 0),
2043                };
2044                stats_history::PeerSnapshot {
2045                    node_addr: *p.node_addr(),
2046                    last_seen: now,
2047                    srtt_ms,
2048                    loss_rate,
2049                    bytes_in_total: stats.bytes_recv,
2050                    bytes_out_total: stats.bytes_sent,
2051                    packets_in_total: stats.packets_recv,
2052                    packets_out_total: stats.packets_sent,
2053                    ecn_ce_total: ecn_ce,
2054                }
2055            })
2056            .collect();
2057
2058        self.stats_history.tick(now, &snap, &peer_snaps);
2059    }
2060
2061    // === TUN Interface ===
2062
2063    /// Get the TUN state.
2064    pub fn tun_state(&self) -> TunState {
2065        self.tun_state
2066    }
2067
2068    /// Get the TUN interface name, if active.
2069    pub fn tun_name(&self) -> Option<&str> {
2070        self.tun_name.as_deref()
2071    }
2072
2073    // === Resource Limits ===
2074
2075    /// Set the maximum number of connections (handshake phase).
2076    pub fn set_max_connections(&mut self, max: usize) {
2077        self.max_connections = max;
2078    }
2079
2080    /// Set the maximum number of peers (authenticated).
2081    pub fn set_max_peers(&mut self, max: usize) {
2082        self.max_peers = max;
2083    }
2084
2085    /// Returns false when starting more outbound work would exceed a resource
2086    /// cap. A cap of `0` means uncapped.
2087    pub(crate) fn outbound_admission_check(&self) -> bool {
2088        let connection_used = self
2089            .connections
2090            .len()
2091            .saturating_add(self.pending_connects.len());
2092        let peer_allowed = self.max_peers == 0 || self.peers.len() < self.max_peers;
2093        let connection_allowed =
2094            self.max_connections == 0 || connection_used < self.max_connections;
2095        let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
2096        peer_allowed && connection_allowed && link_allowed
2097    }
2098
2099    /// Admission for public/open-discovery outbound work. This includes the
2100    /// general connection/link caps and, when open Nostr discovery is enabled,
2101    /// the configured non-peer budget.
2102    pub(crate) fn open_discovery_outbound_admission_check(&self) -> bool {
2103        if !self.outbound_admission_check() {
2104            return false;
2105        }
2106
2107        let nostr = &self.config.node.discovery.nostr;
2108        if !nostr.enabled || nostr.policy != NostrDiscoveryPolicy::Open {
2109            return true;
2110        }
2111
2112        let configured_npubs = self
2113            .config
2114            .peers()
2115            .iter()
2116            .map(|peer| peer.npub.clone())
2117            .collect::<HashSet<_>>();
2118        self.open_discovery_enqueue_budget(&configured_npubs) > 0
2119    }
2120
2121    /// Like `outbound_admission_check`, but for racing a better path to a
2122    /// peer that is already authenticated. This may temporarily add a
2123    /// connection/link, but it does not consume a new peer slot.
2124    pub(crate) fn outbound_direct_refresh_admission_check(&self) -> bool {
2125        let connection_used = self
2126            .connections
2127            .len()
2128            .saturating_add(self.pending_connects.len());
2129        let connection_allowed =
2130            self.max_connections == 0 || connection_used < self.max_connections;
2131        let link_allowed = self.max_links == 0 || self.links.len() < self.max_links;
2132        connection_allowed && link_allowed
2133    }
2134
2135    /// Set the maximum number of links.
2136    pub fn set_max_links(&mut self, max: usize) {
2137        self.max_links = max;
2138    }
2139
2140    // === Counts ===
2141
2142    /// Number of pending connections (handshake in progress).
2143    pub fn connection_count(&self) -> usize {
2144        self.connections.len()
2145    }
2146
2147    /// Number of authenticated peers.
2148    pub fn peer_count(&self) -> usize {
2149        self.peers.len()
2150    }
2151
2152    /// Number of active links.
2153    pub fn link_count(&self) -> usize {
2154        self.links.len()
2155    }
2156
2157    /// Number of active transports.
2158    pub fn transport_count(&self) -> usize {
2159        self.transports.len()
2160    }
2161
2162    // === Transport Management ===
2163
2164    /// Allocate a new transport ID.
2165    pub fn allocate_transport_id(&mut self) -> TransportId {
2166        let id = TransportId::new(self.next_transport_id);
2167        self.next_transport_id += 1;
2168        id
2169    }
2170
2171    /// Get a transport by ID.
2172    pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
2173        self.transports.get(id)
2174    }
2175
2176    /// Get mutable transport by ID.
2177    pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
2178        self.transports.get_mut(id)
2179    }
2180
2181    /// Iterate over transport IDs.
2182    pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
2183        self.transports.keys()
2184    }
2185
2186    /// Get the packet receiver for the event loop.
2187    pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
2188        self.packet_rx.as_mut()
2189    }
2190
2191    // === Link Management ===
2192
2193    /// Allocate a new link ID.
2194    pub fn allocate_link_id(&mut self) -> LinkId {
2195        let id = LinkId::new(self.next_link_id);
2196        self.next_link_id += 1;
2197        id
2198    }
2199
2200    /// Add a link.
2201    pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
2202        if self.max_links > 0 && self.links.len() >= self.max_links {
2203            return Err(NodeError::MaxLinksExceeded {
2204                max: self.max_links,
2205            });
2206        }
2207        let link_id = link.link_id();
2208        let transport_id = link.transport_id();
2209        let remote_addr = link.remote_addr().clone();
2210
2211        self.links.insert(link_id, link);
2212        self.addr_to_link
2213            .insert((transport_id, remote_addr), link_id);
2214        Ok(())
2215    }
2216
2217    /// Get a link by ID.
2218    pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
2219        self.links.get(link_id)
2220    }
2221
2222    /// Get a mutable link by ID.
2223    pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
2224        self.links.get_mut(link_id)
2225    }
2226
2227    /// Find link ID by transport address.
2228    pub fn find_link_by_addr(
2229        &self,
2230        transport_id: TransportId,
2231        addr: &TransportAddr,
2232    ) -> Option<LinkId> {
2233        self.addr_to_link
2234            .get(&(transport_id, addr.clone()))
2235            .copied()
2236    }
2237
2238    /// Remove a link.
2239    ///
2240    /// Only removes the addr_to_link reverse lookup if it still points to this
2241    /// link. In cross-connection scenarios, a newer link may have replaced the
2242    /// entry for the same address.
2243    pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
2244        if let Some(link) = self.links.remove(link_id) {
2245            // Clean up reverse lookup only if it still maps to this link
2246            let key = (link.transport_id(), link.remote_addr().clone());
2247            if self.addr_to_link.get(&key) == Some(link_id) {
2248                self.addr_to_link.remove(&key);
2249            }
2250            Some(link)
2251        } else {
2252            None
2253        }
2254    }
2255
2256    pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
2257        if !self.bootstrap_transports.contains(&transport_id) {
2258            return;
2259        }
2260
2261        let transport_in_use = self
2262            .links
2263            .values()
2264            .any(|link| link.transport_id() == transport_id)
2265            || self
2266                .connections
2267                .values()
2268                .any(|conn| conn.transport_id() == Some(transport_id))
2269            || self
2270                .peers
2271                .values()
2272                .any(|peer| peer.transport_id() == Some(transport_id))
2273            || self
2274                .pending_connects
2275                .iter()
2276                .any(|pending| pending.transport_id == transport_id);
2277
2278        if transport_in_use {
2279            return;
2280        }
2281
2282        tracing::debug!(
2283            transport_id = %transport_id,
2284            "bootstrap transport has no remaining references; dropping"
2285        );
2286
2287        self.bootstrap_transports.remove(&transport_id);
2288        self.bootstrap_transport_npubs.remove(&transport_id);
2289        self.transport_drops.remove(&transport_id);
2290        self.transports.remove(&transport_id);
2291    }
2292
2293    /// Iterate over all links.
2294    pub fn links(&self) -> impl Iterator<Item = &Link> {
2295        self.links.values()
2296    }
2297
2298    // === Connection Management (Handshake Phase) ===
2299
2300    /// Add a pending connection.
2301    pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
2302        let link_id = connection.link_id();
2303
2304        if self.connections.contains_key(&link_id) {
2305            return Err(NodeError::ConnectionAlreadyExists(link_id));
2306        }
2307
2308        if self.max_connections > 0 && self.connections.len() >= self.max_connections {
2309            return Err(NodeError::MaxConnectionsExceeded {
2310                max: self.max_connections,
2311            });
2312        }
2313
2314        self.connections.insert(link_id, connection);
2315        Ok(())
2316    }
2317
2318    /// Get a connection by LinkId.
2319    pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
2320        self.connections.get(link_id)
2321    }
2322
2323    /// Get a mutable connection by LinkId.
2324    pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
2325        self.connections.get_mut(link_id)
2326    }
2327
2328    /// Remove a connection.
2329    pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
2330        self.connections.remove(link_id)
2331    }
2332
2333    /// Iterate over all connections.
2334    pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
2335        self.connections.values()
2336    }
2337
2338    // === Peer Management (Active Phase) ===
2339
2340    /// Get a peer by NodeAddr.
2341    pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
2342        self.peers.get(node_addr)
2343    }
2344
2345    /// Get a mutable peer by NodeAddr.
2346    pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
2347        self.peers.get_mut(node_addr)
2348    }
2349
2350    /// Remove a peer.
2351    pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
2352        self.peers.remove(node_addr)
2353    }
2354
2355    /// Iterate over all peers.
2356    pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
2357        self.peers.values()
2358    }
2359
2360    /// Reference to the Nostr discovery handle if discovery is enabled.
2361    /// Used by control queries (`show_peers` per-peer Nostr-traversal
2362    /// state) to read failure-state without taking shared ownership.
2363    pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
2364        self.nostr_discovery.as_deref()
2365    }
2366
2367    /// Iterate over all peer node IDs.
2368    pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
2369        self.peers.keys()
2370    }
2371
2372    /// Iterate over peers that can send traffic.
2373    pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
2374        self.peers.values().filter(|p| p.can_send())
2375    }
2376
2377    /// Number of peers that can send traffic.
2378    pub fn sendable_peer_count(&self) -> usize {
2379        self.peers.values().filter(|p| p.can_send()).count()
2380    }
2381
2382    pub(crate) fn set_discovery_fallback_transit_allowed(
2383        &mut self,
2384        peer_addr: NodeAddr,
2385        allowed: bool,
2386    ) {
2387        if allowed {
2388            self.discovery_fallback_transit_blocked_peers
2389                .remove(&peer_addr);
2390        } else {
2391            self.discovery_fallback_transit_blocked_peers
2392                .insert(peer_addr);
2393        }
2394    }
2395
2396    pub(crate) fn configured_discovery_fallback_transit(
2397        &self,
2398        peer_addr: &NodeAddr,
2399    ) -> Option<bool> {
2400        self.configured_peer(peer_addr)
2401            .map(|peer| peer.discovery_fallback_transit)
2402    }
2403
2404    pub(crate) fn configured_peer(&self, peer_addr: &NodeAddr) -> Option<&PeerConfig> {
2405        self.config.peers().iter().find(|peer| {
2406            PeerIdentity::from_npub(&peer.npub)
2407                .ok()
2408                .is_some_and(|identity| identity.node_addr() == peer_addr)
2409        })
2410    }
2411
2412    pub(in crate::node) fn active_peer_uses_configured_static_udp_path(
2413        &self,
2414        peer_addr: &NodeAddr,
2415    ) -> bool {
2416        let Some(peer_config) = self.configured_peer(peer_addr) else {
2417            return false;
2418        };
2419
2420        peer_config.addresses.iter().any(|candidate| {
2421            candidate.seen_at_ms.is_none()
2422                && candidate.transport.eq_ignore_ascii_case("udp")
2423                && self.active_peer_matches_candidate(peer_addr, candidate)
2424        })
2425    }
2426
2427    pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2428        if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2429            return retry_state.peer_config.discovery_fallback_transit;
2430        }
2431
2432        if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2433            return allowed;
2434        }
2435
2436        self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2437    }
2438
2439    // === End-to-End Sessions ===
2440
2441    /// Get a session by remote NodeAddr.
2442    /// Disable the discovery forward rate limiter (for tests).
2443    #[cfg(test)]
2444    pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2445        self.discovery_forward_limiter
2446            .set_interval(std::time::Duration::ZERO);
2447    }
2448
2449    #[cfg(test)]
2450    pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2451        self.sessions.get(remote)
2452    }
2453
2454    /// Get a mutable session by remote NodeAddr.
2455    #[cfg(test)]
2456    pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2457        self.sessions.get_mut(remote)
2458    }
2459
2460    /// Remove a session.
2461    #[cfg(test)]
2462    pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2463        self.sessions.remove(remote)
2464    }
2465
2466    /// Read the path_mtu_lookup entry for a destination FipsAddress.
2467    #[cfg(test)]
2468    pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2469        self.path_mtu_lookup
2470            .read()
2471            .ok()
2472            .and_then(|map| map.get(fips_addr).copied())
2473    }
2474
2475    /// Write a path_mtu_lookup entry directly (for tests that pre-seed the map).
2476    #[cfg(test)]
2477    pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2478        if let Ok(mut map) = self.path_mtu_lookup.write() {
2479            map.insert(fips_addr, mtu);
2480        }
2481    }
2482
2483    /// Number of end-to-end sessions.
2484    pub fn session_count(&self) -> usize {
2485        self.sessions.len()
2486    }
2487
2488    /// Iterate over all session entries (for control queries).
2489    pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2490        self.sessions.iter()
2491    }
2492
2493    // === Identity Cache ===
2494
2495    /// Register a node in the identity cache for FipsAddress → NodeAddr lookup.
2496    pub(crate) fn register_identity(
2497        &mut self,
2498        node_addr: NodeAddr,
2499        pubkey: secp256k1::PublicKey,
2500    ) -> bool {
2501        let mut prefix = [0u8; 15];
2502        prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2503        if let Some(entry) = self.identity_cache.get(&prefix)
2504            && entry.node_addr == node_addr
2505            && entry.pubkey == pubkey
2506        {
2507            // Endpoint sends pass the same PeerIdentity on every packet. Once
2508            // validated, avoid re-deriving NodeAddr from the public key in the
2509            // data path; that hash showed up in macOS sender profiles.
2510            return true;
2511        }
2512
2513        let (xonly, _) = pubkey.x_only_public_key();
2514        let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2515        if derived_node_addr != node_addr {
2516            debug!(
2517                claimed_node_addr = %node_addr,
2518                derived_node_addr = %derived_node_addr,
2519                "Rejected identity cache entry with mismatched public key"
2520            );
2521            return false;
2522        }
2523
2524        let now_ms = Self::now_ms();
2525        if let Some(entry) = self.identity_cache.get_mut(&prefix)
2526            && entry.node_addr == node_addr
2527        {
2528            entry.pubkey = pubkey;
2529            entry.last_seen_ms = now_ms;
2530            return true;
2531        }
2532
2533        let npub = encode_npub(&xonly);
2534        self.identity_cache.insert(
2535            prefix,
2536            IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2537        );
2538        // LRU eviction
2539        let max = self.config.node.cache.identity_size;
2540        if self.identity_cache.len() > max
2541            && let Some(oldest_key) = self
2542                .identity_cache
2543                .iter()
2544                .min_by_key(|(_, entry)| entry.last_seen_ms)
2545                .map(|(k, _)| *k)
2546        {
2547            self.identity_cache.remove(&oldest_key);
2548        }
2549        true
2550    }
2551
2552    /// Look up a destination by FipsAddress prefix (bytes 1-15 of the IPv6 address).
2553    pub(crate) fn lookup_by_fips_prefix(
2554        &mut self,
2555        prefix: &[u8; 15],
2556    ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2557        if let Some(entry) = self.identity_cache.get_mut(prefix) {
2558            entry.last_seen_ms = Self::now_ms(); // LRU touch
2559            Some((entry.node_addr, entry.pubkey))
2560        } else {
2561            None
2562        }
2563    }
2564
2565    /// Check if a node's identity is in the cache (without LRU touch).
2566    pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2567        let mut prefix = [0u8; 15];
2568        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2569        self.identity_cache.contains_key(&prefix)
2570    }
2571
2572    /// Number of identity cache entries.
2573    pub fn identity_cache_len(&self) -> usize {
2574        self.identity_cache.len()
2575    }
2576
2577    /// Iterate over identity cache entries.
2578    ///
2579    /// Returns `(NodeAddr, PublicKey, last_seen_ms)` for each cached identity.
2580    /// Used by the `show_identity_cache` control query.
2581    pub fn identity_cache_iter(
2582        &self,
2583    ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2584        self.identity_cache
2585            .values()
2586            .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2587    }
2588
2589    /// Configured maximum identity cache size.
2590    pub fn identity_cache_max(&self) -> usize {
2591        self.config.node.cache.identity_size
2592    }
2593
2594    /// Number of pending discovery lookups.
2595    pub fn pending_lookup_count(&self) -> usize {
2596        self.pending_lookups.len()
2597    }
2598
2599    /// Iterate over pending discovery lookups for diagnostics.
2600    pub fn pending_lookups_iter(
2601        &self,
2602    ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2603        self.pending_lookups.iter()
2604    }
2605
2606    /// Number of recent discovery requests tracked.
2607    pub fn recent_request_count(&self) -> usize {
2608        self.recent_requests.len()
2609    }
2610
2611    /// Count of destinations with queued TUN packets awaiting session setup.
2612    pub fn pending_tun_destinations(&self) -> usize {
2613        self.pending_tun_packets.len()
2614    }
2615
2616    /// Total TUN packets queued across all destinations.
2617    pub fn pending_tun_total_packets(&self) -> usize {
2618        self.pending_tun_packets.values().map(|q| q.len()).sum()
2619    }
2620
2621    /// Iterate over retry state for diagnostics.
2622    pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2623        self.retry_pending.iter()
2624    }
2625
2626    // === Routing ===
2627
2628    /// Check if a peer is a tree neighbor (parent or child in the spanning tree).
2629    ///
2630    /// Returns true if the peer is our current tree parent, or if the peer
2631    /// has declared us as their parent (making them our child).
2632    pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2633        // Peer is our parent
2634        if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2635            return true;
2636        }
2637        // Peer is our child (their declaration names us as parent)
2638        if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2639            && decl.parent_id() == self.node_addr()
2640        {
2641            return true;
2642        }
2643        false
2644    }
2645
2646    /// Find next hop for a destination node address.
2647    ///
2648    /// Routing priority:
2649    /// 1. Destination is self → `None` (local delivery)
2650    /// 2. Destination is a healthy direct peer → that peer, unless a known
2651    ///    fallback next-hop has a meaningful link-quality advantage.
2652    /// 3. Reply-learned routes in `reply_learned` mode. These are locally
2653    ///    observed reverse paths, selected with weighted multipath plus
2654    ///    periodic coordinate/tree exploration.
2655    /// 4. Bloom filter candidates with cached dest coords → among peers whose
2656    ///    bloom filter contains the destination, pick the one that minimizes
2657    ///    tree distance to the destination, with
2658    ///    `(link_cost, tree_distance_to_dest, node_addr)` tie-breaking.
2659    ///    The self-distance check ensures only peers strictly closer to the
2660    ///    destination than us are considered (prevents routing loops).
2661    /// 5. Greedy tree routing fallback (requires cached dest coords)
2662    /// 6. No route → `None`
2663    ///
2664    /// Both the bloom filter and tree routing paths require cached destination
2665    /// coordinates (checked in `coord_cache`). Without coordinates, the node
2666    /// cannot make loop-free forwarding decisions. The caller should signal
2667    /// `CoordsRequired` back to the source when `None` is returned for a
2668    /// non-local destination.
2669    pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2670        // 1. Local delivery
2671        if dest_node_addr == self.node_addr() {
2672            return None;
2673        }
2674        let now_ms = Self::now_ms();
2675        let direct_session_degraded =
2676            self.session_direct_path_blocks_direct_payload(dest_node_addr, now_ms);
2677
2678        let healthy_direct_route = self
2679            .peers
2680            .get(dest_node_addr)
2681            .filter(|peer| peer.is_healthy() && !direct_session_degraded)
2682            .map(|_| *dest_node_addr);
2683        if let Some(direct_addr) = healthy_direct_route
2684            && self
2685                .peers
2686                .get(&direct_addr)
2687                .is_some_and(|peer| peer.link_cost() <= 1.0 + ROUTING_FALLBACK_MIN_COST_ADVANTAGE)
2688        {
2689            return self.peers.get(&direct_addr);
2690        }
2691        let direct_payload_eligible = healthy_direct_route.is_some();
2692        let payload_candidate_can_send = |addr: &NodeAddr, peer: &ActivePeer| {
2693            if addr == dest_node_addr {
2694                direct_payload_eligible
2695            } else {
2696                peer.is_healthy()
2697            }
2698        };
2699
2700        // A healthy direct path is not automatically the best path. A
2701        // hotspot/NAT hairpin can remain sendable with high RTT or mild loss;
2702        // in that case a lower-cost mesh next-hop should carry traffic while
2703        // direct probes continue in the background.
2704        let fallback_beats_direct = |node: &Self, fallback_addr: NodeAddr| {
2705            node.route_candidate_beats_direct(healthy_direct_route, fallback_addr)
2706        };
2707
2708        let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2709            Some(
2710                self.peers
2711                    .iter()
2712                    .filter(|(addr, peer)| payload_candidate_can_send(addr, peer))
2713                    .map(|(addr, _)| *addr)
2714                    .collect::<HashSet<_>>(),
2715            )
2716        } else {
2717            None
2718        };
2719
2720        // 3. Optional reply-learned routing. These entries are not peer
2721        // claims; they are local observations of which peer carried traffic
2722        // or a verified lookup response back from the destination. Most
2723        // packets use weighted multipath over learned routes, but periodic
2724        // fallback exploration lets coord/bloom/tree routes discover better
2725        // candidates.
2726        let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2727            self.learned_routes.should_explore_fallback(
2728                dest_node_addr,
2729                now_ms,
2730                self.config.node.routing.learned_fallback_explore_interval,
2731                |addr| sendable.contains(addr),
2732            )
2733        });
2734        if let Some(sendable) = &sendable_learned_peers
2735            && !explore_fallback
2736        {
2737            let eligible = sendable
2738                .iter()
2739                .copied()
2740                .filter(|addr| fallback_beats_direct(self, *addr))
2741                .collect::<HashSet<_>>();
2742            if !eligible.is_empty()
2743                && let Some(next_hop_addr) =
2744                    self.learned_routes
2745                        .select_next_hop(dest_node_addr, now_ms, |addr| eligible.contains(addr))
2746            {
2747                return self.peers.get(&next_hop_addr);
2748            }
2749        }
2750
2751        // Look up cached destination coordinates (required by both bloom and tree paths).
2752        let Some(dest_coords) = self
2753            .coord_cache
2754            .get_and_touch(dest_node_addr, now_ms)
2755            .cloned()
2756        else {
2757            if (healthy_direct_route.is_none() || explore_fallback)
2758                && let Some(sendable) = &sendable_learned_peers
2759                && let Some(next_hop_addr) =
2760                    self.learned_routes
2761                        .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2762            {
2763                return self.peers.get(&next_hop_addr);
2764            }
2765            if let Some(direct_addr) = healthy_direct_route {
2766                return self.peers.get(&direct_addr);
2767            }
2768            return None;
2769        };
2770
2771        // 4. Bloom filter candidates — requires dest_coords for loop-free selection.
2772        //    If no candidate is strictly closer, fall through to tree routing.
2773        let coordinate_route_addr = {
2774            let candidates: Vec<&ActivePeer> = self
2775                .peers
2776                .iter()
2777                .filter(|(addr, peer)| {
2778                    payload_candidate_can_send(addr, peer) && peer.may_reach(dest_node_addr)
2779                })
2780                .map(|(_, peer)| peer)
2781                .collect();
2782            if !candidates.is_empty() {
2783                self.select_best_candidate(&candidates, &dest_coords)
2784                    .map(|peer| *peer.node_addr())
2785            } else {
2786                None
2787            }
2788        };
2789        if let Some(next_hop_addr) = coordinate_route_addr
2790            && fallback_beats_direct(self, next_hop_addr)
2791        {
2792            return self.peers.get(&next_hop_addr);
2793        }
2794
2795        // 5. Greedy tree routing fallback
2796        let tree_route_addr = self.select_tree_payload_candidate(
2797            &dest_coords,
2798            dest_node_addr,
2799            direct_payload_eligible,
2800        );
2801        if let Some(next_hop_addr) = tree_route_addr
2802            && fallback_beats_direct(self, next_hop_addr)
2803        {
2804            return self.peers.get(&next_hop_addr);
2805        }
2806
2807        if explore_fallback {
2808            return sendable_learned_peers.as_ref().and_then(|sendable| {
2809                self.learned_routes
2810                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2811                    .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2812            });
2813        }
2814
2815        if let Some(direct_addr) = healthy_direct_route {
2816            return self.peers.get(&direct_addr);
2817        }
2818
2819        if let Some(sendable) = &sendable_learned_peers
2820            && let Some(next_hop_addr) =
2821                self.learned_routes
2822                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2823        {
2824            return self.peers.get(&next_hop_addr);
2825        }
2826
2827        None
2828    }
2829
2830    pub(in crate::node) fn find_transit_next_hop(
2831        &mut self,
2832        dest_node_addr: &NodeAddr,
2833        previous_hop: &NodeAddr,
2834    ) -> Option<NodeAddr> {
2835        if dest_node_addr == self.node_addr() {
2836            return None;
2837        }
2838
2839        if dest_node_addr != previous_hop
2840            && self
2841                .peers
2842                .get(dest_node_addr)
2843                .is_some_and(|peer| peer.is_healthy())
2844        {
2845            return Some(*dest_node_addr);
2846        }
2847
2848        let next_hop_addr = *self.find_next_hop(dest_node_addr)?.node_addr();
2849        if &next_hop_addr == previous_hop {
2850            self.record_route_failure(*dest_node_addr, next_hop_addr);
2851            return None;
2852        }
2853        Some(next_hop_addr)
2854    }
2855
2856    fn route_candidate_beats_direct(
2857        &self,
2858        healthy_direct_route: Option<NodeAddr>,
2859        candidate_addr: NodeAddr,
2860    ) -> bool {
2861        let Some(direct_addr) = healthy_direct_route else {
2862            return true;
2863        };
2864        if candidate_addr == direct_addr {
2865            return false;
2866        }
2867
2868        let Some(direct) = self.peers.get(&direct_addr) else {
2869            return true;
2870        };
2871        let Some(candidate) = self.peers.get(&candidate_addr) else {
2872            return false;
2873        };
2874        if !candidate.is_healthy() {
2875            return false;
2876        }
2877
2878        let direct_cost = direct.link_cost();
2879        let candidate_cost = candidate.link_cost();
2880        candidate_cost + ROUTING_FALLBACK_MIN_COST_ADVANTAGE < direct_cost
2881    }
2882
2883    fn select_tree_payload_candidate(
2884        &self,
2885        dest_coords: &crate::tree::TreeCoordinate,
2886        direct_dest: &NodeAddr,
2887        direct_payload_eligible: bool,
2888    ) -> Option<NodeAddr> {
2889        if self.tree_state.my_coords().root_id() != dest_coords.root_id() {
2890            return None;
2891        }
2892
2893        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2894        let mut best: Option<(NodeAddr, usize)> = None;
2895
2896        for (peer_addr, peer) in &self.peers {
2897            if peer_addr == direct_dest {
2898                if !direct_payload_eligible {
2899                    continue;
2900                }
2901            } else if !peer.is_healthy() {
2902                continue;
2903            }
2904
2905            let Some(peer_coords) = self.tree_state.peer_coords(peer_addr) else {
2906                continue;
2907            };
2908            let distance = peer_coords.distance_to(dest_coords);
2909            if distance >= my_distance {
2910                continue;
2911            }
2912
2913            let dominated = match &best {
2914                None => true,
2915                Some((best_id, best_dist)) => {
2916                    distance < *best_dist || (distance == *best_dist && peer_addr < best_id)
2917                }
2918            };
2919            if dominated {
2920                best = Some((*peer_addr, distance));
2921            }
2922        }
2923
2924        best.map(|(peer_addr, _)| peer_addr)
2925    }
2926
2927    pub(in crate::node) fn session_direct_path_is_degraded(
2928        &mut self,
2929        dest: &NodeAddr,
2930        now_ms: u64,
2931    ) -> bool {
2932        match self.session_direct_degraded_until_ms.get(dest).copied() {
2933            Some(until_ms) if until_ms > now_ms => true,
2934            Some(_) => {
2935                self.session_direct_degraded_until_ms.remove(dest);
2936                false
2937            }
2938            None => false,
2939        }
2940    }
2941
2942    pub(in crate::node) fn session_direct_path_blocks_direct_payload(
2943        &mut self,
2944        dest: &NodeAddr,
2945        now_ms: u64,
2946    ) -> bool {
2947        self.session_direct_path_is_degraded(dest, now_ms)
2948            && !self.active_peer_uses_configured_static_udp_path(dest)
2949    }
2950
2951    pub(in crate::node) fn mark_session_direct_path_degraded(
2952        &mut self,
2953        dest: NodeAddr,
2954        now_ms: u64,
2955    ) -> bool {
2956        let until_ms = now_ms.saturating_add(SESSION_DIRECT_DEGRADED_HOLD_MS);
2957        let entry = self
2958            .session_direct_degraded_until_ms
2959            .entry(dest)
2960            .or_insert(0);
2961        let was_degraded = *entry > now_ms;
2962        *entry = (*entry).max(until_ms);
2963        !was_degraded
2964    }
2965
2966    pub(in crate::node) fn clear_session_direct_path_degraded(&mut self, dest: &NodeAddr) -> bool {
2967        self.session_direct_degraded_until_ms.remove(dest).is_some()
2968    }
2969
2970    pub(in crate::node) fn learn_reverse_route(
2971        &mut self,
2972        destination: NodeAddr,
2973        next_hop: NodeAddr,
2974    ) {
2975        if self.config.node.routing.mode != RoutingMode::ReplyLearned
2976            || destination == *self.node_addr()
2977        {
2978            return;
2979        }
2980        let now_ms = Self::now_ms();
2981        self.learned_routes.learn(
2982            destination,
2983            next_hop,
2984            now_ms,
2985            self.config.node.routing.learned_ttl_secs,
2986            self.config.node.routing.max_learned_routes_per_dest,
2987        );
2988    }
2989
2990    pub(in crate::node) fn record_route_failure(
2991        &mut self,
2992        destination: NodeAddr,
2993        next_hop: NodeAddr,
2994    ) {
2995        if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2996            return;
2997        }
2998        self.learned_routes.record_failure(&destination, &next_hop);
2999    }
3000
3001    pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
3002        self.learned_routes.snapshot(now_ms)
3003    }
3004
3005    pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
3006        self.learned_routes.purge_expired(now_ms);
3007    }
3008
3009    /// Select the best peer from a set of bloom filter candidates.
3010    ///
3011    /// Uses distance from each candidate's tree coordinates to the destination
3012    /// as the primary metric (after link_cost). Only selects peers that are
3013    /// strictly closer to the destination than we are (self-distance check
3014    /// prevents routing loops).
3015    ///
3016    /// Ordering: `(link_cost, distance_to_dest, node_addr)`.
3017    fn select_best_candidate<'a>(
3018        &'a self,
3019        candidates: &[&'a ActivePeer],
3020        dest_coords: &crate::tree::TreeCoordinate,
3021    ) -> Option<&'a ActivePeer> {
3022        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
3023
3024        let mut best: Option<(&ActivePeer, f64, usize)> = None;
3025
3026        for &candidate in candidates {
3027            if !candidate.can_send() {
3028                continue;
3029            }
3030
3031            let cost = candidate.link_cost();
3032
3033            let dist = self
3034                .tree_state
3035                .peer_coords(candidate.node_addr())
3036                .map(|pc| pc.distance_to(dest_coords))
3037                .unwrap_or(usize::MAX);
3038
3039            // Self-distance check: only consider peers strictly closer
3040            // to the destination than we are (prevents routing loops)
3041            if dist >= my_distance {
3042                continue;
3043            }
3044
3045            let dominated = match &best {
3046                None => true,
3047                Some((_, best_cost, best_dist)) => {
3048                    cost < *best_cost
3049                        || (cost == *best_cost && dist < *best_dist)
3050                        || (cost == *best_cost
3051                            && dist == *best_dist
3052                            && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
3053                }
3054            };
3055
3056            if dominated {
3057                best = Some((candidate, cost, dist));
3058            }
3059        }
3060
3061        best.map(|(peer, _, _)| peer)
3062    }
3063
3064    /// Check if a destination is in any peer's bloom filter.
3065    pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
3066        self.peers.values().filter(|p| p.may_reach(dest)).collect()
3067    }
3068
3069    /// Get the TUN packet sender channel.
3070    ///
3071    /// Returns None if TUN is not active or the node hasn't been started.
3072    pub fn tun_tx(&self) -> Option<&TunTx> {
3073        self.tun_tx.as_ref()
3074    }
3075
3076    /// Attach app-owned packet I/O for embedded operation without a system TUN.
3077    ///
3078    /// This must be called before [`Node::start`] and requires `tun.enabled =
3079    /// false`. Outbound packets sent to the returned sender are processed by the
3080    /// normal session pipeline. Inbound packets delivered by FIPS sessions are
3081    /// sent to the returned receiver with source attribution.
3082    pub fn attach_external_packet_io(
3083        &mut self,
3084        capacity: usize,
3085    ) -> Result<ExternalPacketIo, NodeError> {
3086        if self.state != NodeState::Created {
3087            return Err(NodeError::Config(ConfigError::Validation(
3088                "external packet I/O must be attached before node start".to_string(),
3089            )));
3090        }
3091        if self.config.tun.enabled {
3092            return Err(NodeError::Config(ConfigError::Validation(
3093                "external packet I/O requires tun.enabled=false".to_string(),
3094            )));
3095        }
3096
3097        let capacity = capacity.max(1);
3098        let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
3099        let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
3100        self.tun_outbound_rx = Some(outbound_rx);
3101        self.external_packet_tx = Some(inbound_tx);
3102
3103        Ok(ExternalPacketIo {
3104            outbound_tx,
3105            inbound_rx,
3106        })
3107    }
3108
3109    /// Attach app-owned endpoint data I/O for embedded operation.
3110    ///
3111    /// Commands sent to the returned sender are processed by the node RX loop.
3112    /// Incoming endpoint data is emitted as source-attributed events.
3113    pub(crate) fn attach_endpoint_data_io(
3114        &mut self,
3115        capacity: usize,
3116    ) -> Result<EndpointDataIo, NodeError> {
3117        if self.state != NodeState::Created {
3118            return Err(NodeError::Config(ConfigError::Validation(
3119                "endpoint data I/O must be attached before node start".to_string(),
3120            )));
3121        }
3122
3123        let command_capacity = endpoint_data_command_capacity(capacity);
3124        let (priority_command_tx, priority_command_rx) =
3125            tokio::sync::mpsc::channel(command_capacity);
3126        let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
3127        // Inbound endpoint-data events use an unbounded channel — see
3128        // `EndpointDataIo::event_rx` docs for the rationale (kills the
3129        // per-packet semaphore + the cross-task relay task that used to
3130        // sit on top of this channel).
3131        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
3132        self.endpoint_priority_command_rx = Some(priority_command_rx);
3133        self.endpoint_command_rx = Some(command_rx);
3134        self.endpoint_event_tx = Some(event_tx.clone());
3135
3136        Ok(EndpointDataIo {
3137            priority_command_tx,
3138            command_tx,
3139            event_rx,
3140            event_tx,
3141        })
3142    }
3143
3144    pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
3145        let mut prefix = [0u8; 15];
3146        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
3147        self.identity_cache
3148            .get(&prefix)
3149            .filter(|entry| &entry.node_addr == addr)
3150            .map(|entry| entry.pubkey)
3151    }
3152
3153    pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
3154        let mut prefix = [0u8; 15];
3155        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
3156        self.identity_cache
3157            .get(&prefix)
3158            .filter(|entry| &entry.node_addr == addr)
3159            .map(|entry| entry.npub.clone())
3160    }
3161
3162    pub(in crate::node) fn deliver_external_ipv6_packet(
3163        &self,
3164        src_addr: &NodeAddr,
3165        packet: Vec<u8>,
3166    ) {
3167        let Some(external_packet_tx) = &self.external_packet_tx else {
3168            return;
3169        };
3170        if packet.len() < 40 {
3171            return;
3172        }
3173        let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
3174            return;
3175        };
3176        let delivered = NodeDeliveredPacket {
3177            source_node_addr: *src_addr,
3178            source_npub: self.npub_for_node_addr(src_addr),
3179            destination,
3180            packet,
3181        };
3182        if let Err(error) = external_packet_tx.try_send(delivered) {
3183            debug!(error = %error, "Failed to deliver packet to external app sink");
3184        }
3185    }
3186
3187    // === Sending ===
3188
3189    /// Encrypt and send a link-layer message to an authenticated peer.
3190    ///
3191    /// The plaintext should include the message type byte followed by the
3192    /// message-specific payload (e.g., `[0x50, reason]` for Disconnect).
3193    ///
3194    /// The send path prepends a 4-byte session-relative timestamp (inner
3195    /// header) before encryption. The full 16-byte outer header is used
3196    /// as AAD for the AEAD construction.
3197    ///
3198    /// This is the standard path for sending any link-layer control message
3199    /// to a peer over their encrypted Noise session.
3200    pub(super) async fn send_encrypted_link_message(
3201        &mut self,
3202        node_addr: &NodeAddr,
3203        plaintext: &[u8],
3204    ) -> Result<(), NodeError> {
3205        self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
3206            .await
3207    }
3208
3209    /// Update one peer's local-outbound-broken signal from a `transport.send`
3210    /// outcome. Sets a per-peer timestamp on local-side io errors
3211    /// (NetworkUnreachable / HostUnreachable / AddrNotAvailable); clears that
3212    /// peer on success. The reaper consults this in `check_link_heartbeats` to
3213    /// switch only that peer to `fast_link_dead_timeout_secs`.
3214    pub(in crate::node) fn note_local_send_outcome(
3215        &mut self,
3216        node_addr: &NodeAddr,
3217        result: &Result<usize, TransportError>,
3218    ) {
3219        match result {
3220            Ok(_) => {
3221                self.local_send_failure_at_by_peer.remove(node_addr);
3222            }
3223            Err(error) if error.is_local_route_unavailable() => {
3224                self.local_send_failure_at_by_peer
3225                    .insert(*node_addr, std::time::Instant::now());
3226            }
3227            Err(_) => {}
3228        }
3229    }
3230
3231    /// Return the active dead-timeout for one peer after considering recent
3232    /// local route failures. The fast-dead signal is intentionally short-lived:
3233    /// on the UDP worker path a send call can return before the kernel result
3234    /// is observed, so a stale route error must not compress liveness for the
3235    /// whole normal dead-timeout window.
3236    pub(in crate::node) fn local_send_failure_dead_timeout_for_peer(
3237        &self,
3238        node_addr: &NodeAddr,
3239        now: std::time::Instant,
3240        dead_timeout: std::time::Duration,
3241        fast_dead_timeout: std::time::Duration,
3242    ) -> std::time::Duration {
3243        match self.local_send_failure_at_by_peer.get(node_addr).copied() {
3244            Some(t) if now.duration_since(t) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW => {
3245                fast_dead_timeout.min(dead_timeout)
3246            }
3247            None => dead_timeout,
3248            Some(_) => dead_timeout,
3249        }
3250    }
3251
3252    pub(in crate::node) fn purge_expired_local_send_failures(&mut self, now: std::time::Instant) {
3253        self.local_send_failure_at_by_peer
3254            .retain(|_, at| now.duration_since(*at) <= LOCAL_SEND_FAILURE_FAST_DEAD_WINDOW);
3255    }
3256
3257    pub(in crate::node) fn mark_rx_loop_maintenance_timeout(&mut self) {
3258        self.last_rx_loop_maintenance_timeout_at = Some(std::time::Instant::now());
3259    }
3260
3261    pub(in crate::node) fn rx_loop_maintenance_timed_out_recently(&self) -> bool {
3262        let Some(t) = self.last_rx_loop_maintenance_timeout_at else {
3263            return false;
3264        };
3265        let grace = std::time::Duration::from_secs(self.config.node.link_dead_timeout_secs.max(1));
3266        std::time::Instant::now().duration_since(t) <= grace
3267    }
3268
3269    /// Like `send_encrypted_link_message` but allows setting the FMP CE flag.
3270    ///
3271    /// Used by the forwarding path to relay congestion signals hop-by-hop.
3272    pub(super) async fn send_encrypted_link_message_with_ce(
3273        &mut self,
3274        node_addr: &NodeAddr,
3275        plaintext: &[u8],
3276        ce_flag: bool,
3277    ) -> Result<(), NodeError> {
3278        let peer = self
3279            .peers
3280            .get_mut(node_addr)
3281            .ok_or(NodeError::PeerNotFound(*node_addr))?;
3282
3283        let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
3284            node_addr: *node_addr,
3285            reason: "no their_index".into(),
3286        })?;
3287        let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
3288            node_addr: *node_addr,
3289            reason: "no transport_id".into(),
3290        })?;
3291        let remote_addr = peer
3292            .current_addr()
3293            .cloned()
3294            .ok_or_else(|| NodeError::SendFailed {
3295                node_addr: *node_addr,
3296                reason: "no current_addr".into(),
3297            })?;
3298        #[cfg(any(target_os = "linux", target_os = "macos"))]
3299        let connected_socket = peer.connected_udp();
3300
3301        // Prepend 4-byte session-relative timestamp (inner header)
3302        let timestamp_ms = peer.session_elapsed_ms();
3303
3304        // MMP: read spin bit value before entering session borrow
3305        let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
3306        let mut flags = if sp_flag { FLAG_SP } else { 0 };
3307        if ce_flag {
3308            flags |= FLAG_CE;
3309        }
3310        if peer.current_k_bit() {
3311            flags |= FLAG_KEY_EPOCH;
3312        }
3313
3314        let session = peer
3315            .noise_session_mut()
3316            .ok_or_else(|| NodeError::SendFailed {
3317                node_addr: *node_addr,
3318                reason: "no noise session".into(),
3319            })?;
3320
3321        // Build 16-byte outer header upfront. The inner-plaintext
3322        // layout is `[ts:4 LE][plaintext...]`, so its length is exactly
3323        // `INNER_TS_LEN + plaintext.len()` — no need to build the Vec
3324        // just to measure it. The worker path uses this length to size
3325        // the wire buffer directly; the legacy path below still
3326        // materialises a separate `inner_plaintext` Vec for the inline
3327        // encrypt-and-send call.
3328        const INNER_TS_LEN: usize = 4;
3329        let counter = session.current_send_counter();
3330        let inner_len = INNER_TS_LEN + plaintext.len();
3331        let payload_len = inner_len as u16;
3332        let header = build_established_header(their_index, counter, flags, payload_len);
3333
3334        // **Unix UDP send fast path.** On Unix, the encrypt-worker pool
3335        // is spawned at lifecycle start (workers = num_cpus) in
3336        // production, so this branch is taken for every authentic send on
3337        // every UDP-transported established session. The AEAD work +
3338        // sendmsg syscall run on a dedicated OS thread; the rx_loop only
3339        // builds the wire buffer + reserves the counter inline.
3340        //
3341        // Other transport kinds (BLE, TCP, sim, ethernet) fall
3342        // through to the inline encrypt + transport.send path
3343        // below — those don't have raw-fd / sendmmsg / UDP_GSO
3344        // benefits to expose through the worker pool, so the simpler
3345        // synchronous send is the right shape for them.
3346        //
3347        // Windows intentionally stays on the inline tokio UDP send path:
3348        // lifecycle::start does not spawn these raw-fd workers there, and
3349        // tests may still set `encrypt_workers` manually.
3350        //
3351        // The `encrypt_workers.is_some()` check below is true in Unix
3352        // production (lifecycle::start spawns the pool); it stays checked
3353        // rather than `expect()`-ed because unit tests construct `Node`
3354        // without calling `start()`.
3355        let transport_for_send = self
3356            .transports
3357            .get(&transport_id)
3358            .ok_or(NodeError::TransportNotFound(transport_id))?;
3359        match transport_for_send.connection_state(&remote_addr) {
3360            ConnectionState::Connected => {}
3361            other => {
3362                if matches!(other, ConnectionState::None) {
3363                    let _ = transport_for_send.connect(&remote_addr).await;
3364                }
3365                return Err(NodeError::SendFailed {
3366                    node_addr: *node_addr,
3367                    reason: format!("transport connection not ready: {:?}", other),
3368                });
3369            }
3370        }
3371        #[cfg(unix)]
3372        {
3373            let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
3374            if let Some(workers) = self.encrypt_workers.as_ref().cloned()
3375                && is_udp
3376                && let Some(cipher_clone) = session.send_cipher_clone()
3377            {
3378                // Reserve the counter on the session so subsequent
3379                // sends don't reuse it. `current_send_counter` only
3380                // peeks; we advance via `take_send_counter`.
3381                let reserved_counter =
3382                    session
3383                        .take_send_counter()
3384                        .map_err(|e| NodeError::SendFailed {
3385                            node_addr: *node_addr,
3386                            reason: format!("counter reservation failed: {}", e),
3387                        })?;
3388                debug_assert_eq!(reserved_counter, counter);
3389                // Re-derive the header with the now-locked-in counter
3390                // value (same value, but the call sequence is more
3391                // explicit).
3392                let header =
3393                    build_established_header(their_index, reserved_counter, flags, payload_len);
3394                let transport = transport_for_send;
3395                // Snapshot the per-peer connected UDP socket before
3396                // resolving the fallback address. On the established
3397                // steady-state path this socket already carries the
3398                // kernel peer address, so re-parsing the configured
3399                // transport address and touching the DNS cache on every
3400                // packet is pure overhead on the sender hot path.
3401                let send_target = {
3402                    if let TransportHandle::Udp(udp) = transport {
3403                        let socket_addr = {
3404                            #[cfg(any(target_os = "linux", target_os = "macos"))]
3405                            {
3406                                match connected_socket.as_ref() {
3407                                    Some(socket) => Some(socket.peer_addr()),
3408                                    None => udp.resolve_for_off_task(&remote_addr).await.ok(),
3409                                }
3410                            }
3411                            #[cfg(not(any(target_os = "linux", target_os = "macos")))]
3412                            {
3413                                udp.resolve_for_off_task(&remote_addr).await.ok()
3414                            }
3415                        };
3416                        match (udp.async_socket(), socket_addr) {
3417                            (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
3418                            _ => None,
3419                        }
3420                    } else {
3421                        None
3422                    }
3423                };
3424                if let Some((socket, socket_addr)) = send_target {
3425                    // Build the wire buffer **directly** from
3426                    // `plaintext` with a single allocation:
3427                    //   `[16 header][4 ts][plaintext...]` with
3428                    // +16 trailing capacity for the AEAD tag.
3429                    // The worker seals `wire_buf[16..]` in
3430                    // place and appends the tag — no second
3431                    // alloc, no second memcpy.
3432                    //
3433                    // Previous design built `inner_plaintext`
3434                    // via `prepend_inner_header` (1 alloc + 1
3435                    // copy) and then let the worker memcpy
3436                    // header + plaintext into a fresh Vec
3437                    // (another alloc + copy). At ~100 kpps the
3438                    // saved alloc/copy is ~150 MB/sec of memory
3439                    // bandwidth on the hot rx_loop + worker.
3440                    let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
3441                    let mut wire_buf = Vec::with_capacity(wire_capacity);
3442                    wire_buf.extend_from_slice(&header);
3443                    wire_buf.extend_from_slice(&timestamp_ms.to_le_bytes());
3444                    wire_buf.extend_from_slice(plaintext);
3445                    let predicted_bytes = wire_capacity;
3446                    // Stats / MMP update inline — predicted size
3447                    // is exact for ChaCha20-Poly1305 (tag is
3448                    // constant 16 bytes). When `connected_socket` is
3449                    // `Some`, the worker sends on it without a
3450                    // destination sockaddr — the kernel skips the
3451                    // per-packet sockaddr + route + neighbor resolve.
3452                    if let Some(peer) = self.peers.get_mut(node_addr) {
3453                        peer.link_stats_mut().record_sent(predicted_bytes);
3454                        if let Some(mmp) = peer.mmp_mut() {
3455                            mmp.sender
3456                                .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
3457                        }
3458                    }
3459                    let scheduling_weight = self.send_weight_for_peer(node_addr);
3460                    let traffic_class = classify_fmp_plaintext_traffic(plaintext);
3461                    workers.dispatch(self::encrypt_worker::FmpSendJob {
3462                        cipher: cipher_clone,
3463                        counter: reserved_counter,
3464                        wire_buf,
3465                        fsp_seal: None,
3466                        socket,
3467                        dest_addr: socket_addr,
3468                        #[cfg(any(target_os = "linux", target_os = "macos"))]
3469                        connected_socket,
3470                        bulk_endpoint_data: traffic_class.bulk_endpoint_data,
3471                        drop_on_backpressure: traffic_class.drop_on_backpressure,
3472                        scheduling_weight,
3473                        queued_at: crate::perf_profile::stamp(),
3474                    });
3475                    return Ok(());
3476                }
3477            }
3478        }
3479
3480        // Inline (legacy) path: encrypt + send on the rx_loop.
3481        // Build the inner plaintext lazily here — the worker path
3482        // above never reaches this point, so the prepend_inner_header
3483        // alloc is avoided in the fast path.
3484        let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
3485        // Encrypt with AAD binding to the outer header
3486        let ciphertext = {
3487            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
3488            session
3489                .encrypt_with_aad(&inner_plaintext, &header)
3490                .map_err(|e| NodeError::SendFailed {
3491                    node_addr: *node_addr,
3492                    reason: format!("encryption failed: {}", e),
3493                })?
3494        };
3495
3496        let wire_packet = build_encrypted(&header, &ciphertext);
3497
3498        // Re-borrow peer for stats update after sending
3499        let send_result = {
3500            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
3501            let transport = self
3502                .transports
3503                .get(&transport_id)
3504                .ok_or(NodeError::TransportNotFound(transport_id))?;
3505            transport.send(&remote_addr, &wire_packet).await
3506        };
3507        self.note_local_send_outcome(node_addr, &send_result);
3508        let bytes_sent = send_result.map_err(|e| match e {
3509            TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
3510                node_addr: *node_addr,
3511                packet_size,
3512                mtu,
3513            },
3514            other => NodeError::SendFailed {
3515                node_addr: *node_addr,
3516                reason: format!("transport send: {}", other),
3517            },
3518        })?;
3519
3520        // Update send statistics
3521        if let Some(peer) = self.peers.get_mut(node_addr) {
3522            peer.link_stats_mut().record_sent(bytes_sent);
3523            // MMP: record sent frame for sender report generation
3524            if let Some(mmp) = peer.mmp_mut() {
3525                mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
3526            }
3527        }
3528
3529        Ok(())
3530    }
3531}
3532
3533impl fmt::Debug for Node {
3534    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3535        f.debug_struct("Node")
3536            .field("node_addr", self.node_addr())
3537            .field("state", &self.state)
3538            .field("is_leaf_only", &self.is_leaf_only)
3539            .field("connections", &self.connection_count())
3540            .field("peers", &self.peer_count())
3541            .field("links", &self.link_count())
3542            .field("transports", &self.transport_count())
3543            .finish()
3544    }
3545}