Skip to main content

fips_core/node/
mod.rs

1//! FIPS Node Entity
2//!
3//! Top-level structure representing a running FIPS instance. The Node
4//! holds all state required for mesh routing: identity, tree state,
5//! Bloom filters, coordinate caches, transports, links, and peers.
6
7mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32    ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33    build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::RoutingMode;
38use crate::node::session::SessionEntry;
39use crate::peer::{ActivePeer, PeerConnection};
40#[cfg(any(target_os = "linux", target_os = "macos"))]
41use crate::transport::ethernet::EthernetTransport;
42use crate::transport::tcp::TcpTransport;
43use crate::transport::tor::TorTransport;
44use crate::transport::udp::UdpTransport;
45use crate::transport::{
46    Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError, TransportHandle, TransportId,
47};
48use crate::tree::TreeState;
49use crate::upper::hosts::HostMap;
50use crate::upper::icmp_rate_limit::IcmpRateLimiter;
51use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
52use crate::utils::index::IndexAllocator;
53use crate::{
54    Config, ConfigError, FipsAddress, Identity, IdentityError, NodeAddr, PeerIdentity,
55    SessionMessageType, encode_npub,
56};
57use rand::Rng;
58use std::collections::{HashMap, HashSet, VecDeque};
59use std::fmt;
60use std::sync::Arc;
61use std::thread::JoinHandle;
62use thiserror::Error;
63use tracing::{debug, warn};
64
65/// Half-range of the symmetric jitter applied to per-session rekey timers.
66///
67/// Each FMP/FSP session draws an offset uniformly from
68/// `[-REKEY_JITTER_SECS, +REKEY_JITTER_SECS]` seconds at construction and
69/// after each cutover. This preserves the configured mean interval while
70/// reducing dual-initiation bursts in symmetric-start meshes.
71pub(crate) const REKEY_JITTER_SECS: i64 = 15;
72
73/// Errors related to node operations.
74#[derive(Debug, Error)]
75pub enum NodeError {
76    #[error("node not started")]
77    NotStarted,
78
79    #[error("node already started")]
80    AlreadyStarted,
81
82    #[error("node already stopped")]
83    AlreadyStopped,
84
85    #[error("transport not found: {0}")]
86    TransportNotFound(TransportId),
87
88    #[error("no transport available for type: {0}")]
89    NoTransportForType(String),
90
91    #[error("link not found: {0}")]
92    LinkNotFound(LinkId),
93
94    #[error("connection not found: {0}")]
95    ConnectionNotFound(LinkId),
96
97    #[error("peer not found: {0:?}")]
98    PeerNotFound(NodeAddr),
99
100    #[error("peer already exists: {0:?}")]
101    PeerAlreadyExists(NodeAddr),
102
103    #[error("connection already exists for link: {0}")]
104    ConnectionAlreadyExists(LinkId),
105
106    #[error("invalid peer npub '{npub}': {reason}")]
107    InvalidPeerNpub { npub: String, reason: String },
108
109    #[error("discovery error: {0}")]
110    Discovery(String),
111
112    #[error("access denied: {0}")]
113    AccessDenied(String),
114
115    #[error("max connections exceeded: {max}")]
116    MaxConnectionsExceeded { max: usize },
117
118    #[error("max peers exceeded: {max}")]
119    MaxPeersExceeded { max: usize },
120
121    #[error("max links exceeded: {max}")]
122    MaxLinksExceeded { max: usize },
123
124    #[error("handshake incomplete for link {0}")]
125    HandshakeIncomplete(LinkId),
126
127    #[error("no session available for link {0}")]
128    NoSession(LinkId),
129
130    #[error("promotion failed for link {link_id}: {reason}")]
131    PromotionFailed { link_id: LinkId, reason: String },
132
133    #[error("send failed to {node_addr}: {reason}")]
134    SendFailed { node_addr: NodeAddr, reason: String },
135
136    #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
137    MtuExceeded {
138        node_addr: NodeAddr,
139        packet_size: usize,
140        mtu: u16,
141    },
142
143    #[error("config error: {0}")]
144    Config(#[from] ConfigError),
145
146    #[error("identity error: {0}")]
147    Identity(#[from] IdentityError),
148
149    #[error("TUN error: {0}")]
150    Tun(#[from] TunError),
151
152    #[error("index allocation failed: {0}")]
153    IndexAllocationFailed(String),
154
155    #[error("handshake failed: {0}")]
156    HandshakeFailed(String),
157
158    #[error("transport error: {0}")]
159    TransportError(String),
160
161    #[error("bootstrap handoff failed: {0}")]
162    BootstrapHandoff(String),
163}
164
165/// Source-attributed packet delivered by a node running without a system TUN.
166#[derive(Debug, Clone, PartialEq, Eq)]
167pub struct NodeDeliveredPacket {
168    /// FIPS node address that originated the packet.
169    pub source_node_addr: NodeAddr,
170    /// Source Nostr public key when the node has learned it.
171    pub source_npub: Option<String>,
172    /// Destination FIPS address from the IPv6 packet.
173    pub destination: FipsAddress,
174    /// Full IPv6 packet after FIPS session decapsulation.
175    pub packet: Vec<u8>,
176}
177
178#[derive(Debug, Clone)]
179struct IdentityCacheEntry {
180    node_addr: NodeAddr,
181    pubkey: secp256k1::PublicKey,
182    npub: String,
183    last_seen_ms: u64,
184}
185
186impl IdentityCacheEntry {
187    fn new(
188        node_addr: NodeAddr,
189        pubkey: secp256k1::PublicKey,
190        npub: String,
191        last_seen_ms: u64,
192    ) -> Self {
193        Self {
194            node_addr,
195            pubkey,
196            npub,
197            last_seen_ms,
198        }
199    }
200}
201
202/// App-owned packet channels for embedding FIPS without a system TUN.
203#[derive(Debug)]
204pub struct ExternalPacketIo {
205    /// Send outbound IPv6 packets into the node.
206    pub outbound_tx: crate::upper::tun::TunOutboundTx,
207    /// Receive inbound IPv6 packets delivered by FIPS sessions.
208    pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
209}
210
211/// App-owned endpoint data channels for embedding FIPS without a daemon.
212#[derive(Debug)]
213pub(crate) struct EndpointDataIo {
214    /// Send endpoint data commands into the node RX loop.
215    ///
216    /// Bounded with a generous default so normal sender bursts do not
217    /// stall on semaphore acquisition. macOS pacing happens at the UDP
218    /// egress thread where the real Wi-Fi/interface bottleneck is visible;
219    /// constraining this app queue instead caused the inner TCP flow to
220    /// collapse under iperf. `FIPS_ENDPOINT_DATA_QUEUE_CAP` overrides the
221    /// default for benches.
222    pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
223    /// Receive endpoint data delivered by FIPS sessions.
224    ///
225    /// Unbounded so the rx_loop's send on inbound packet delivery is a
226    /// wait-free push (no semaphore acquire), and so we can drop the
227    /// per-packet cross-task relay that previously sat between the node
228    /// task and the `FipsEndpoint::recv()` consumer. Backpressure is
229    /// naturally bounded — the rx_loop both produces here and runs the
230    /// same runtime that schedules the consumer, so a stalled consumer
231    /// stalls production too.
232    pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
233    /// Clone of the event_tx exposed for in-process loopback (e.g.
234    /// `FipsEndpoint::send` to self_npub). Lets the endpoint inject an
235    /// event into the same queue without going through the encrypt /
236    /// decrypt path, while keeping every consumer reading from a single
237    /// channel.
238    pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
239}
240
241fn endpoint_data_command_capacity(requested: usize) -> usize {
242    if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
243        && let Ok(value) = raw.trim().parse::<usize>()
244        && value > 0
245    {
246        return value;
247    }
248
249    requested.max(1).max(32_768)
250}
251
252/// Commands accepted by the node endpoint data service.
253#[derive(Debug)]
254pub(crate) enum NodeEndpointCommand {
255    /// Send with an explicit response channel — used by callers that
256    /// care whether the local-stack handoff succeeded (e.g.
257    /// `blocking_send` waits for the runtime to accept the send).
258    Send {
259        remote: PeerIdentity,
260        payload: Vec<u8>,
261        queued_at: Option<std::time::Instant>,
262        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
263    },
264    /// **Fire-and-forget** variant of `Send` — no oneshot allocation,
265    /// no per-packet result channel. Used by the data-plane fast path
266    /// (`FipsEndpoint::send`) where the caller already discards the
267    /// result. Saves one oneshot::channel() allocation per outbound
268    /// packet on the application's send hot path.
269    SendOneway {
270        remote: PeerIdentity,
271        payload: Vec<u8>,
272        queued_at: Option<std::time::Instant>,
273    },
274    PeerSnapshot {
275        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
276    },
277    RelaySnapshot {
278        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointRelayStatus>>,
279    },
280    UpdateRelays {
281        advert_relays: Vec<String>,
282        dm_relays: Vec<String>,
283        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
284    },
285    /// Replace the runtime peer list. Newly added auto-connect peers get
286    /// `initiate_peer_connection` immediately; removed peers are dropped
287    /// from the retry queue (the regular liveness timeout reaps any active
288    /// session). Existing entries are kept and their `addresses` field is
289    /// refreshed so the next retry sees the latest hints.
290    UpdatePeers {
291        peers: Vec<crate::config::PeerConfig>,
292        response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
293    },
294}
295
296/// Reports what changed in response to `UpdatePeers`.
297#[derive(Debug, Clone, Default, PartialEq, Eq)]
298pub(crate) struct UpdatePeersOutcome {
299    pub(crate) added: usize,
300    pub(crate) removed: usize,
301    pub(crate) updated: usize,
302    pub(crate) unchanged: usize,
303}
304
305/// Endpoint data events emitted by the node session receive path.
306#[derive(Debug)]
307pub(crate) enum NodeEndpointEvent {
308    Data {
309        source_node_addr: NodeAddr,
310        source_npub: Option<String>,
311        payload: Vec<u8>,
312        queued_at: Option<std::time::Instant>,
313    },
314}
315
316/// Authenticated peer state exposed to embedded endpoint callers.
317#[derive(Debug, Clone, PartialEq, Eq)]
318pub(crate) struct NodeEndpointPeer {
319    pub(crate) npub: String,
320    pub(crate) transport_addr: Option<String>,
321    pub(crate) transport_type: Option<String>,
322    pub(crate) link_id: u64,
323    pub(crate) srtt_ms: Option<u64>,
324    pub(crate) packets_sent: u64,
325    pub(crate) packets_recv: u64,
326    pub(crate) bytes_sent: u64,
327    pub(crate) bytes_recv: u64,
328}
329
330/// Live Nostr relay state exposed to embedded endpoint callers.
331#[derive(Debug, Clone, PartialEq, Eq)]
332pub(crate) struct NodeEndpointRelayStatus {
333    pub(crate) url: String,
334    pub(crate) status: String,
335}
336
337/// Node operational state.
338#[derive(Clone, Copy, Debug, PartialEq, Eq)]
339pub enum NodeState {
340    /// Created but not started.
341    Created,
342    /// Starting up (initializing transports).
343    Starting,
344    /// Fully operational.
345    Running,
346    /// Shutting down.
347    Stopping,
348    /// Stopped.
349    Stopped,
350}
351
352impl NodeState {
353    /// Check if node is operational.
354    pub fn is_operational(&self) -> bool {
355        matches!(self, NodeState::Running)
356    }
357
358    /// Check if node can be started.
359    pub fn can_start(&self) -> bool {
360        matches!(self, NodeState::Created | NodeState::Stopped)
361    }
362
363    /// Check if node can be stopped.
364    pub fn can_stop(&self) -> bool {
365        matches!(self, NodeState::Running)
366    }
367}
368
369impl fmt::Display for NodeState {
370    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
371        let s = match self {
372            NodeState::Created => "created",
373            NodeState::Starting => "starting",
374            NodeState::Running => "running",
375            NodeState::Stopping => "stopping",
376            NodeState::Stopped => "stopped",
377        };
378        write!(f, "{}", s)
379    }
380}
381
382/// Recent request tracking for dedup and reverse-path forwarding.
383///
384/// When a LookupRequest is forwarded through a node, the node stores the
385/// request_id and which peer sent it. When the corresponding LookupResponse
386/// arrives, it's forwarded back to that peer (reverse-path forwarding).
387/// The `response_forwarded` flag prevents response routing loops.
388#[derive(Clone, Debug)]
389pub(crate) struct RecentRequest {
390    /// The peer who sent this request to us.
391    pub(crate) from_peer: NodeAddr,
392    /// When we received this request (Unix milliseconds).
393    pub(crate) timestamp_ms: u64,
394    /// Whether we've already forwarded a response for this request.
395    /// Prevents response routing loops when convergent request paths
396    /// create bidirectional entries in recent_requests.
397    pub(crate) response_forwarded: bool,
398}
399
400impl RecentRequest {
401    pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
402        Self {
403            from_peer,
404            timestamp_ms,
405            response_forwarded: false,
406        }
407    }
408
409    /// Check if this entry has expired (older than expiry_ms).
410    pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
411        current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
412    }
413}
414
415/// Key for addr_to_link reverse lookup.
416type AddrKey = (TransportId, TransportAddr);
417
418/// Per-transport kernel drop tracking for congestion detection.
419///
420/// Sampled every tick (1s). The `dropping` flag indicates whether new
421/// kernel drops were observed since the previous sample.
422#[derive(Debug, Default)]
423struct TransportDropState {
424    /// Previous `recv_drops` sample (cumulative counter).
425    prev_drops: u64,
426    /// True if drops increased since the last sample.
427    dropping: bool,
428}
429
430/// State for a link waiting for transport-level connection establishment.
431///
432/// For connection-oriented transports (TCP, Tor), the transport connect runs
433/// asynchronously. This struct holds the data needed to complete the handshake
434/// once the connection is ready.
435struct PendingConnect {
436    /// The link that was created for this connection.
437    link_id: LinkId,
438    /// Which transport is being used.
439    transport_id: TransportId,
440    /// The remote address being connected to.
441    remote_addr: TransportAddr,
442    /// The peer identity (for handshake initiation).
443    peer_identity: PeerIdentity,
444}
445
446/// A running FIPS node instance.
447///
448/// This is the top-level container holding all node state.
449///
450/// ## Peer Lifecycle
451///
452/// Peers go through two phases:
453/// 1. **Connection phase** (`connections`): Handshake in progress, indexed by LinkId
454/// 2. **Active phase** (`peers`): Authenticated, indexed by NodeAddr
455///
456/// The `addr_to_link` map enables dispatching incoming packets to the right
457/// connection before authentication completes.
458// Discovery lookup constants moved to config: node.discovery.attempt_timeouts_secs, node.discovery.ttl
459pub struct Node {
460    // === Identity ===
461    /// This node's cryptographic identity.
462    identity: Identity,
463
464    /// Random epoch generated at startup for peer restart detection.
465    /// Exchanged inside Noise handshake messages so peers can detect restarts.
466    startup_epoch: [u8; 8],
467
468    /// Instant when the node was created, for uptime reporting.
469    started_at: std::time::Instant,
470
471    // === Configuration ===
472    /// Loaded configuration.
473    config: Config,
474
475    // === State ===
476    /// Node operational state.
477    state: NodeState,
478
479    /// Whether this is a leaf-only node.
480    is_leaf_only: bool,
481
482    // === Spanning Tree ===
483    /// Local spanning tree state.
484    tree_state: TreeState,
485
486    // === Bloom Filter ===
487    /// Local Bloom filter state.
488    bloom_state: BloomState,
489
490    // === Routing ===
491    /// Address -> coordinates cache (from session setup and discovery).
492    coord_cache: CoordCache,
493    /// Locally learned reverse-path next-hop hints.
494    learned_routes: LearnedRouteTable,
495    /// Recent discovery requests (dedup + reverse-path forwarding).
496    /// Maps request_id → RecentRequest.
497    recent_requests: HashMap<u64, RecentRequest>,
498    /// Per-destination path MTU lookup, keyed by FipsAddress (mirrors
499    /// `coord_cache.entries[*].path_mtu`). Sync read-only access from
500    /// the TUN reader/writer threads at TCP MSS clamp time so the
501    /// SYN/SYN-ACK clamp can use the smaller of the local-egress floor
502    /// and the learned per-destination path MTU.
503    path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
504
505    // === Transports & Links ===
506    /// Active transports (owned by Node).
507    transports: HashMap<TransportId, TransportHandle>,
508    /// Per-transport kernel drop tracking for congestion detection.
509    transport_drops: HashMap<TransportId, TransportDropState>,
510    /// Active links.
511    links: HashMap<LinkId, Link>,
512    /// Reverse lookup: (transport_id, remote_addr) -> link_id.
513    addr_to_link: HashMap<AddrKey, LinkId>,
514
515    // === Packet Channel ===
516    /// Packet sender for transports.
517    packet_tx: Option<PacketTx>,
518    /// Packet receiver (for event loop).
519    packet_rx: Option<PacketRx>,
520
521    // === Connections (Handshake Phase) ===
522    /// Pending connections (handshake in progress).
523    /// Indexed by LinkId since we don't know the peer's identity yet.
524    connections: HashMap<LinkId, PeerConnection>,
525
526    // === Peers (Active Phase) ===
527    /// Authenticated peers.
528    /// Indexed by NodeAddr (verified identity).
529    peers: HashMap<NodeAddr, ActivePeer>,
530
531    // === End-to-End Sessions ===
532    /// Session table for end-to-end encrypted sessions.
533    /// Keyed by remote NodeAddr.
534    sessions: HashMap<NodeAddr, SessionEntry>,
535
536    // === Identity Cache ===
537    /// Maps FipsAddress prefix bytes (bytes 1-15) to cached peer identity data.
538    /// Enables reverse lookup from IPv6 destination to session/routing identity.
539    identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
540
541    // === Pending TUN Packets ===
542    /// Packets queued while waiting for session establishment.
543    /// Keyed by destination NodeAddr, bounded per-dest and total.
544    pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
545    /// Endpoint data payloads queued while waiting for session establishment.
546    pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
547    // === Pending Discovery Lookups ===
548    /// Tracks in-flight discovery lookups. Maps target NodeAddr to the
549    /// initiation timestamp (Unix ms). Prevents duplicate flood queries.
550    pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
551
552    // === Resource Limits ===
553    /// Maximum connections (0 = unlimited).
554    max_connections: usize,
555    /// Maximum peers (0 = unlimited).
556    max_peers: usize,
557    /// Maximum links (0 = unlimited).
558    max_links: usize,
559
560    // === Counters ===
561    /// Next link ID to allocate.
562    next_link_id: u64,
563    /// Next transport ID to allocate.
564    next_transport_id: u32,
565
566    // === Node Statistics ===
567    /// Routing, forwarding, discovery, and error signal counters.
568    stats: stats::NodeStats,
569
570    /// Time-series history of node-level metrics (1s/1m rings).
571    stats_history: stats_history::StatsHistory,
572
573    // === TUN Interface ===
574    /// TUN device state.
575    tun_state: TunState,
576    /// TUN interface name (for cleanup).
577    tun_name: Option<String>,
578    /// TUN packet sender channel.
579    tun_tx: Option<TunTx>,
580    /// Receiver for outbound packets from the TUN reader.
581    tun_outbound_rx: Option<TunOutboundRx>,
582    /// App-owned packet sink used by embedded/no-TUN integrations.
583    external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
584    /// Endpoint data command receiver used by embedded/no-daemon integrations.
585    endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
586    /// Endpoint data event sink used by embedded/no-daemon integrations.
587    endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
588    /// Off-task FMP-encrypt + UDP-send worker pool. `None` if not yet
589    /// spawned (set up in `start()` once transports are running).
590    /// `Some(pool)` once available; the pool internally holds
591    /// per-worker mpsc senders and round-robins jobs across them.
592    /// See `node::encrypt_worker` for the rationale and layout.
593    encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
594    /// Off-task FMP + FSP decrypt + delivery worker pool. Mirror of
595    /// `encrypt_workers` for the receive side.
596    decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
597    /// Set of sessions that have been registered with the decrypt
598    /// shard worker pool. Used by rx_loop to decide between fast-path
599    /// dispatch (worker owns the session) and legacy in-place decrypt
600    /// (worker doesn't have it yet). Per the data-plane restructure,
601    /// the worker owns its session state directly — there's no shared
602    /// `Arc<RwLock<HashMap>>` of cipher / replay state anymore, only
603    /// this set tracks **whether** the worker has been told about a
604    /// given session.
605    decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
606    /// Fallback channel: decrypt worker bounces non-fast-path packets
607    /// (anything that's not bulk EndpointData) back here for rx_loop
608    /// to handle via the legacy path. Drained by a new rx_loop arm.
609    decrypt_fallback_rx:
610        Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
611    decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
612    /// TUN reader thread handle.
613    tun_reader_handle: Option<JoinHandle<()>>,
614    /// TUN writer thread handle.
615    tun_writer_handle: Option<JoinHandle<()>>,
616    /// Shutdown pipe: writing to this fd unblocks the TUN reader thread on macOS.
617    /// On Linux, deleting the interface via netlink serves the same purpose.
618    #[cfg(target_os = "macos")]
619    tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
620
621    // === DNS Responder ===
622    /// Receiver for resolved identities from the DNS responder.
623    dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
624    /// DNS responder task handle.
625    dns_task: Option<tokio::task::JoinHandle<()>>,
626
627    // === Index-Based Session Dispatch ===
628    /// Allocator for session indices.
629    index_allocator: IndexAllocator,
630    /// O(1) lookup: (transport_id, our_index) → NodeAddr.
631    /// This maps our session index to the peer that uses it.
632    peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
633    /// Pending outbound handshakes by our sender_idx.
634    /// Tracks which LinkId corresponds to which session index.
635    pending_outbound: HashMap<(TransportId, u32), LinkId>,
636
637    // === Rate Limiting ===
638    /// Rate limiter for msg1 processing (DoS protection).
639    msg1_rate_limiter: HandshakeRateLimiter,
640    /// Rate limiter for ICMP Packet Too Big messages.
641    icmp_rate_limiter: IcmpRateLimiter,
642    /// Rate limiter for routing error signals (CoordsRequired / PathBroken).
643    routing_error_rate_limiter: RoutingErrorRateLimiter,
644    /// Rate limiter for source-side CoordsRequired/PathBroken responses.
645    coords_response_rate_limiter: RoutingErrorRateLimiter,
646    /// Backoff for failed discovery lookups (originator-side).
647    discovery_backoff: DiscoveryBackoff,
648    /// Rate limiter for forwarded discovery requests (transit-side).
649    discovery_forward_limiter: DiscoveryForwardRateLimiter,
650
651    // === Pending Transport Connects ===
652    /// Links waiting for transport-level connection establishment before
653    /// sending handshake msg1. For connection-oriented transports (TCP, Tor),
654    /// the transport connect runs in the background; the tick handler polls
655    /// connection_state() and initiates the handshake when connected.
656    pending_connects: Vec<PendingConnect>,
657
658    // === Connection Retry ===
659    /// Retry state for peers whose outbound connections have failed.
660    /// Keyed by NodeAddr. Entries are created when a handshake times out
661    /// or fails, and removed on successful promotion or when max retries
662    /// are exhausted.
663    retry_pending: HashMap<NodeAddr, retry::RetryState>,
664
665    /// Optional Nostr/STUN overlay discovery coordinator for `udp:nat` peers.
666    nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
667    /// mDNS / DNS-SD responder + browser for local-link peer discovery.
668    /// Identity is unverified at this layer — the Noise XX handshake
669    /// initiated against an mDNS-observed endpoint is what proves the
670    /// peer holds the matching private key.
671    lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
672    /// Wall-clock ms when Nostr discovery successfully started, used to
673    /// schedule the one-shot startup advert sweep after a settle delay.
674    /// `None` until discovery comes up; remains `None` if discovery is
675    /// disabled or failed to start.
676    nostr_discovery_started_at_ms: Option<u64>,
677    /// Whether the one-shot startup advert sweep has run. Set to true
678    /// after the first sweep fires (under `policy: open`); thereafter
679    /// only the per-tick `queue_open_discovery_retries` continues.
680    startup_open_discovery_sweep_done: bool,
681    /// Per-peer UDP transports adopted from NAT traversal handoff.
682    bootstrap_transports: HashSet<TransportId>,
683    /// Originating peer npub (bech32) for each adopted bootstrap
684    /// transport, captured at `adopt_established_traversal` time.
685    /// Populated alongside `bootstrap_transports`; cleared in
686    /// `cleanup_bootstrap_transport_if_unused`. Used by the rx loop to
687    /// route fatal-protocol-mismatch observations back to the
688    /// Nostr-discovery `failure_state` for long cooldown application.
689    bootstrap_transport_npubs: HashMap<TransportId, String>,
690    /// Peers that should not be used as reply-learned fallback transit for
691    /// other destinations. Direct lookups to the peer are still permitted.
692    discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
693
694    // === Periodic Parent Re-evaluation ===
695    /// Timestamp of last periodic parent re-evaluation (for pacing).
696    last_parent_reeval: Option<crate::time::Instant>,
697
698    // === Congestion Logging ===
699    /// Timestamp of last congestion detection log (rate-limited to 5s).
700    last_congestion_log: Option<std::time::Instant>,
701
702    // === Mesh Size Estimate ===
703    /// Cached estimated mesh size (computed once per tick from bloom filters).
704    estimated_mesh_size: Option<u64>,
705    /// Timestamp of last mesh size log emission.
706    last_mesh_size_log: Option<std::time::Instant>,
707
708    // === Bloom Self-Plausibility ===
709    /// Rate-limit state for the self-plausibility WARN. Fires at most
710    /// once per 60s globally when our own outgoing FilterAnnounce has
711    /// an FPR above `node.bloom.max_inbound_fpr`, signalling either
712    /// aggregation drift or an ingress bypass.
713    last_self_warn: Option<std::time::Instant>,
714
715    // === Local Outbound Liveness ===
716    /// Set when a `transport.send` returned a local-side io error
717    /// (`NetworkUnreachable` / `HostUnreachable` / `AddrNotAvailable`),
718    /// cleared on the next successful send. Used by
719    /// `check_link_heartbeats` to compress the dead-timeout to
720    /// `fast_link_dead_timeout_secs` while our outbound is observed
721    /// broken — direct kernel evidence beats waiting on receive-silence.
722    last_local_send_failure_at: Option<std::time::Instant>,
723
724    // === Display Names ===
725    /// Human-readable names for configured peers (alias or short npub).
726    /// Populated at startup from peer config.
727    peer_aliases: HashMap<NodeAddr, String>,
728
729    /// Reloadable peer ACL state from standard allow/deny files.
730    peer_acl: acl::PeerAclReloader,
731
732    // === Host Map ===
733    /// Static hostname → npub mapping for DNS resolution.
734    /// Built at construction from peer aliases and /etc/fips/hosts.
735    host_map: Arc<HostMap>,
736}
737
738impl Node {
739    /// Create a new node from configuration.
740    pub fn new(config: Config) -> Result<Self, NodeError> {
741        config.validate()?;
742        let identity = config.create_identity()?;
743        let node_addr = *identity.node_addr();
744        let is_leaf_only = config.is_leaf_only();
745
746        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
747        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
748
749        let mut startup_epoch = [0u8; 8];
750        rand::rng().fill_bytes(&mut startup_epoch);
751
752        let mut bloom_state = if is_leaf_only {
753            BloomState::leaf_only(node_addr)
754        } else {
755            BloomState::new(node_addr)
756        };
757        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
758
759        let tun_state = if config.tun.enabled {
760            TunState::Configured
761        } else {
762            TunState::Disabled
763        };
764
765        // Initialize tree state with signed self-declaration
766        let mut tree_state = TreeState::new(node_addr);
767        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
768        tree_state.set_hold_down(config.node.tree.hold_down_secs);
769        tree_state.set_flap_dampening(
770            config.node.tree.flap_threshold,
771            config.node.tree.flap_window_secs,
772            config.node.tree.flap_dampening_secs,
773        );
774        tree_state
775            .sign_declaration(&identity)
776            .expect("signing own declaration should never fail");
777
778        let coord_cache = CoordCache::new(
779            config.node.cache.coord_size,
780            config.node.cache.coord_ttl_secs * 1000,
781        );
782        let rl = &config.node.rate_limit;
783        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
784            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
785            config.node.limits.max_pending_inbound,
786        );
787
788        let max_connections = config.node.limits.max_connections;
789        let max_peers = config.node.limits.max_peers;
790        let max_links = config.node.limits.max_links;
791        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
792        let backoff_base_secs = config.node.discovery.backoff_base_secs;
793        let backoff_max_secs = config.node.discovery.backoff_max_secs;
794        let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
795
796        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
797
798        Ok(Self {
799            identity,
800            startup_epoch,
801            started_at: std::time::Instant::now(),
802            config,
803            state: NodeState::Created,
804            is_leaf_only,
805            tree_state,
806            bloom_state,
807            coord_cache,
808            learned_routes: LearnedRouteTable::default(),
809            recent_requests: HashMap::new(),
810            transports: HashMap::new(),
811            transport_drops: HashMap::new(),
812            links: HashMap::new(),
813            addr_to_link: HashMap::new(),
814            packet_tx: None,
815            packet_rx: None,
816            connections: HashMap::new(),
817            peers: HashMap::new(),
818            sessions: HashMap::new(),
819            identity_cache: HashMap::new(),
820            pending_tun_packets: HashMap::new(),
821            pending_endpoint_data: HashMap::new(),
822            pending_lookups: HashMap::new(),
823            max_connections,
824            max_peers,
825            max_links,
826            next_link_id: 1,
827            next_transport_id: 1,
828            stats: stats::NodeStats::new(),
829            stats_history: stats_history::StatsHistory::new(),
830            tun_state,
831            tun_name: None,
832            tun_tx: None,
833            tun_outbound_rx: None,
834            external_packet_tx: None,
835            endpoint_command_rx: None,
836            endpoint_event_tx: None,
837            encrypt_workers: None,
838            decrypt_workers: None,
839            decrypt_registered_sessions: std::collections::HashSet::new(),
840            decrypt_fallback_tx,
841            decrypt_fallback_rx,
842            tun_reader_handle: None,
843            tun_writer_handle: None,
844            #[cfg(target_os = "macos")]
845            tun_shutdown_fd: None,
846            dns_identity_rx: None,
847            dns_task: None,
848            index_allocator: IndexAllocator::new(),
849            peers_by_index: HashMap::new(),
850            pending_outbound: HashMap::new(),
851            msg1_rate_limiter,
852            icmp_rate_limiter: IcmpRateLimiter::new(),
853            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
854            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
855                std::time::Duration::from_millis(coords_response_interval_ms),
856            ),
857            discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
858            discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
859                std::time::Duration::from_secs(forward_min_interval_secs),
860            ),
861            pending_connects: Vec::new(),
862            retry_pending: HashMap::new(),
863            nostr_discovery: None,
864            nostr_discovery_started_at_ms: None,
865            lan_discovery: None,
866            startup_open_discovery_sweep_done: false,
867            bootstrap_transports: HashSet::new(),
868            bootstrap_transport_npubs: HashMap::new(),
869            discovery_fallback_transit_blocked_peers: HashSet::new(),
870            last_parent_reeval: None,
871            last_congestion_log: None,
872            estimated_mesh_size: None,
873            last_mesh_size_log: None,
874            last_self_warn: None,
875            last_local_send_failure_at: None,
876            peer_aliases: HashMap::new(),
877            peer_acl,
878            host_map,
879            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
880        })
881    }
882
883    /// Create a node with a specific identity.
884    ///
885    /// This constructor validates cross-field config invariants before
886    /// constructing the node, same as [`Node::new`].
887    pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
888        config.validate()?;
889        let node_addr = *identity.node_addr();
890
891        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
892        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
893
894        let mut startup_epoch = [0u8; 8];
895        rand::rng().fill_bytes(&mut startup_epoch);
896
897        let tun_state = if config.tun.enabled {
898            TunState::Configured
899        } else {
900            TunState::Disabled
901        };
902
903        // Initialize tree state with signed self-declaration
904        let mut tree_state = TreeState::new(node_addr);
905        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
906        tree_state.set_hold_down(config.node.tree.hold_down_secs);
907        tree_state.set_flap_dampening(
908            config.node.tree.flap_threshold,
909            config.node.tree.flap_window_secs,
910            config.node.tree.flap_dampening_secs,
911        );
912        tree_state
913            .sign_declaration(&identity)
914            .expect("signing own declaration should never fail");
915
916        let mut bloom_state = BloomState::new(node_addr);
917        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
918
919        let coord_cache = CoordCache::new(
920            config.node.cache.coord_size,
921            config.node.cache.coord_ttl_secs * 1000,
922        );
923        let rl = &config.node.rate_limit;
924        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
925            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
926            config.node.limits.max_pending_inbound,
927        );
928
929        let max_connections = config.node.limits.max_connections;
930        let max_peers = config.node.limits.max_peers;
931        let max_links = config.node.limits.max_links;
932        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
933
934        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
935
936        Ok(Self {
937            identity,
938            startup_epoch,
939            started_at: std::time::Instant::now(),
940            config,
941            state: NodeState::Created,
942            is_leaf_only: false,
943            tree_state,
944            bloom_state,
945            coord_cache,
946            learned_routes: LearnedRouteTable::default(),
947            recent_requests: HashMap::new(),
948            transports: HashMap::new(),
949            transport_drops: HashMap::new(),
950            links: HashMap::new(),
951            addr_to_link: HashMap::new(),
952            packet_tx: None,
953            packet_rx: None,
954            connections: HashMap::new(),
955            peers: HashMap::new(),
956            sessions: HashMap::new(),
957            identity_cache: HashMap::new(),
958            pending_tun_packets: HashMap::new(),
959            pending_endpoint_data: HashMap::new(),
960            pending_lookups: HashMap::new(),
961            max_connections,
962            max_peers,
963            max_links,
964            next_link_id: 1,
965            next_transport_id: 1,
966            stats: stats::NodeStats::new(),
967            stats_history: stats_history::StatsHistory::new(),
968            tun_state,
969            tun_name: None,
970            tun_tx: None,
971            tun_outbound_rx: None,
972            external_packet_tx: None,
973            endpoint_command_rx: None,
974            endpoint_event_tx: None,
975            encrypt_workers: None,
976            decrypt_workers: None,
977            decrypt_registered_sessions: std::collections::HashSet::new(),
978            decrypt_fallback_tx,
979            decrypt_fallback_rx,
980            tun_reader_handle: None,
981            tun_writer_handle: None,
982            #[cfg(target_os = "macos")]
983            tun_shutdown_fd: None,
984            dns_identity_rx: None,
985            dns_task: None,
986            index_allocator: IndexAllocator::new(),
987            peers_by_index: HashMap::new(),
988            pending_outbound: HashMap::new(),
989            msg1_rate_limiter,
990            icmp_rate_limiter: IcmpRateLimiter::new(),
991            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
992            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
993                std::time::Duration::from_millis(coords_response_interval_ms),
994            ),
995            discovery_backoff: DiscoveryBackoff::new(),
996            discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
997            pending_connects: Vec::new(),
998            retry_pending: HashMap::new(),
999            nostr_discovery: None,
1000            nostr_discovery_started_at_ms: None,
1001            lan_discovery: None,
1002            startup_open_discovery_sweep_done: false,
1003            bootstrap_transports: HashSet::new(),
1004            bootstrap_transport_npubs: HashMap::new(),
1005            discovery_fallback_transit_blocked_peers: HashSet::new(),
1006            last_parent_reeval: None,
1007            last_congestion_log: None,
1008            estimated_mesh_size: None,
1009            last_mesh_size_log: None,
1010            last_self_warn: None,
1011            last_local_send_failure_at: None,
1012            peer_aliases: HashMap::new(),
1013            peer_acl,
1014            host_map,
1015            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
1016        })
1017    }
1018
1019    /// Create a leaf-only node (simplified state).
1020    pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1021        let mut node = Self::new(config)?;
1022        node.is_leaf_only = true;
1023        node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1024        Ok(node)
1025    }
1026
1027    fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1028        let base_host_map = HostMap::from_peer_configs(config.peers());
1029        if !config.node.system_files_enabled {
1030            return (
1031                Arc::new(base_host_map.clone()),
1032                acl::PeerAclReloader::memory_only(base_host_map),
1033            );
1034        }
1035
1036        let mut host_map = base_host_map.clone();
1037        let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1038        let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1039            crate::upper::hosts::DEFAULT_HOSTS_PATH,
1040        ));
1041        host_map.merge(hosts_file);
1042        let peer_acl = acl::PeerAclReloader::with_alias_sources(
1043            std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1044            std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1045            base_host_map,
1046            hosts_path,
1047        );
1048        (Arc::new(host_map), peer_acl)
1049    }
1050
1051    /// Create transport instances from configuration.
1052    ///
1053    /// Returns a vector of TransportHandles for all configured transports.
1054    async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1055        let mut transports = Vec::new();
1056
1057        // Collect UDP configs with optional names to avoid borrow conflicts
1058        let udp_instances: Vec<_> = self
1059            .config
1060            .transports
1061            .udp
1062            .iter()
1063            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1064            .collect();
1065
1066        // Create UDP transport instances
1067        for (name, udp_config) in udp_instances {
1068            let transport_id = self.allocate_transport_id();
1069            let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1070            transports.push(TransportHandle::Udp(udp));
1071        }
1072
1073        #[cfg(feature = "sim-transport")]
1074        {
1075            let sim_instances: Vec<_> = self
1076                .config
1077                .transports
1078                .sim
1079                .iter()
1080                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1081                .collect();
1082
1083            for (name, sim_config) in sim_instances {
1084                let transport_id = self.allocate_transport_id();
1085                let sim = crate::transport::sim::SimTransport::new(
1086                    transport_id,
1087                    name,
1088                    sim_config,
1089                    packet_tx.clone(),
1090                );
1091                transports.push(TransportHandle::Sim(sim));
1092            }
1093        }
1094
1095        // Create Ethernet transport instances where raw-socket support exists.
1096        #[cfg(any(target_os = "linux", target_os = "macos"))]
1097        {
1098            let eth_instances: Vec<_> = self
1099                .config
1100                .transports
1101                .ethernet
1102                .iter()
1103                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1104                .collect();
1105            let xonly = self.identity.pubkey();
1106            for (name, eth_config) in eth_instances {
1107                let transport_id = self.allocate_transport_id();
1108                let mut eth =
1109                    EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1110                eth.set_local_pubkey(xonly);
1111                transports.push(TransportHandle::Ethernet(eth));
1112            }
1113        }
1114
1115        // Create TCP transport instances
1116        let tcp_instances: Vec<_> = self
1117            .config
1118            .transports
1119            .tcp
1120            .iter()
1121            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1122            .collect();
1123
1124        for (name, tcp_config) in tcp_instances {
1125            let transport_id = self.allocate_transport_id();
1126            let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1127            transports.push(TransportHandle::Tcp(tcp));
1128        }
1129
1130        // Create Tor transport instances
1131        let tor_instances: Vec<_> = self
1132            .config
1133            .transports
1134            .tor
1135            .iter()
1136            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1137            .collect();
1138
1139        for (name, tor_config) in tor_instances {
1140            let transport_id = self.allocate_transport_id();
1141            let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1142            transports.push(TransportHandle::Tor(tor));
1143        }
1144
1145        // Create BLE transport instances
1146        #[cfg(bluer_available)]
1147        {
1148            let ble_instances: Vec<_> = self
1149                .config
1150                .transports
1151                .ble
1152                .iter()
1153                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1154                .collect();
1155
1156            #[cfg(all(bluer_available, not(test)))]
1157            for (name, ble_config) in ble_instances {
1158                let transport_id = self.allocate_transport_id();
1159                let adapter = ble_config.adapter().to_string();
1160                let mtu = ble_config.mtu();
1161                match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1162                    Ok(io) => {
1163                        let mut ble = crate::transport::ble::BleTransport::new(
1164                            transport_id,
1165                            name,
1166                            ble_config,
1167                            io,
1168                            packet_tx.clone(),
1169                        );
1170                        ble.set_local_pubkey(self.identity.pubkey().serialize());
1171                        transports.push(TransportHandle::Ble(ble));
1172                    }
1173                    Err(e) => {
1174                        tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1175                    }
1176                }
1177            }
1178
1179            #[cfg(any(not(bluer_available), test))]
1180            if !ble_instances.is_empty() {
1181                #[cfg(not(test))]
1182                tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1183            }
1184        }
1185
1186        transports
1187    }
1188
1189    /// Find an operational transport that matches the given transport type name.
1190    ///
1191    /// Adopted UDP bootstrap transports are point-to-point sockets handed off
1192    /// from Nostr/STUN traversal. They must not be reused for ordinary
1193    /// `udp host:port` dials discovered through static config, mDNS, or overlay
1194    /// adverts: on macOS a `send_to` through the wrong adopted socket can fail
1195    /// with `EINVAL`, and even on platforms that allow it the packet would use
1196    /// the wrong 5-tuple/NAT mapping. Prefer configured transports and make the
1197    /// choice deterministic by lowest transport id instead of HashMap order.
1198    fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1199        self.transports
1200            .iter()
1201            .filter(|(id, handle)| {
1202                handle.transport_type().name == transport_type
1203                    && handle.is_operational()
1204                    && !self.bootstrap_transports.contains(id)
1205            })
1206            .min_by_key(|(id, _)| id.as_u32())
1207            .map(|(id, _)| *id)
1208    }
1209
1210    /// Resolve an Ethernet peer address ("interface/mac") to a transport ID
1211    /// and binary TransportAddr.
1212    ///
1213    /// Finds the Ethernet transport instance bound to the named interface
1214    /// and parses the MAC portion into a 6-byte TransportAddr.
1215    #[allow(unused_variables)]
1216    fn resolve_ethernet_addr(
1217        &self,
1218        addr_str: &str,
1219    ) -> Result<(TransportId, TransportAddr), NodeError> {
1220        #[cfg(any(target_os = "linux", target_os = "macos"))]
1221        {
1222            let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1223                NodeError::NoTransportForType(format!(
1224                    "invalid Ethernet address format '{}': expected 'interface/mac'",
1225                    addr_str
1226                ))
1227            })?;
1228
1229            // Find the Ethernet transport bound to this interface
1230            let transport_id = self
1231                .transports
1232                .iter()
1233                .find(|(_, handle)| {
1234                    handle.transport_type().name == "ethernet"
1235                        && handle.is_operational()
1236                        && handle.interface_name() == Some(iface)
1237                })
1238                .map(|(id, _)| *id)
1239                .ok_or_else(|| {
1240                    NodeError::NoTransportForType(format!(
1241                        "no operational Ethernet transport for interface '{}'",
1242                        iface
1243                    ))
1244                })?;
1245
1246            let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1247                NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1248            })?;
1249
1250            Ok((transport_id, TransportAddr::from_bytes(&mac)))
1251        }
1252        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1253        {
1254            Err(NodeError::NoTransportForType(
1255                "Ethernet transport is not supported on this platform".to_string(),
1256            ))
1257        }
1258    }
1259
1260    /// Resolve a BLE address string (`"adapter/AA:BB:CC:DD:EE:FF"`) to a
1261    /// (TransportId, TransportAddr) pair by finding the BLE transport
1262    /// instance matching the adapter name.
1263    #[cfg(bluer_available)]
1264    fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1265        let ta = TransportAddr::from_string(addr_str);
1266        let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1267            NodeError::NoTransportForType(format!(
1268                "invalid BLE address format '{}': expected 'adapter/mac'",
1269                addr_str
1270            ))
1271        })?;
1272
1273        // Find the BLE transport for this adapter
1274        let transport_id = self
1275            .transports
1276            .iter()
1277            .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1278            .map(|(id, _)| *id)
1279            .ok_or_else(|| {
1280                NodeError::NoTransportForType(format!(
1281                    "no operational BLE transport for adapter '{}'",
1282                    adapter
1283                ))
1284            })?;
1285
1286        // Validate the address format
1287        crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1288            NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1289        })?;
1290
1291        Ok((transport_id, TransportAddr::from_string(addr_str)))
1292    }
1293
1294    // === Identity Accessors ===
1295
1296    /// Get this node's identity.
1297    pub fn identity(&self) -> &Identity {
1298        &self.identity
1299    }
1300
1301    /// Get this node's NodeAddr.
1302    pub fn node_addr(&self) -> &NodeAddr {
1303        self.identity.node_addr()
1304    }
1305
1306    /// Get this node's npub.
1307    pub fn npub(&self) -> String {
1308        self.identity.npub()
1309    }
1310
1311    /// Return a human-readable display name for a NodeAddr.
1312    ///
1313    /// Lookup order:
1314    /// 1. Host map hostname (from peer aliases + /etc/fips/hosts)
1315    /// 2. Configured peer alias or short npub (from startup map)
1316    /// 3. Active peer's short npub (e.g., inbound peer not in config)
1317    /// 4. Session endpoint's short npub (end-to-end, may not be direct peer)
1318    /// 5. Truncated NodeAddr hex (unknown address)
1319    pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1320        if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1321            return hostname.to_string();
1322        }
1323        if let Some(name) = self.peer_aliases.get(addr) {
1324            return name.clone();
1325        }
1326        if let Some(peer) = self.peers.get(addr) {
1327            return peer.identity().short_npub();
1328        }
1329        if let Some(entry) = self.sessions.get(addr) {
1330            let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1331            return PeerIdentity::from_pubkey(xonly).short_npub();
1332        }
1333        addr.short_hex()
1334    }
1335
1336    /// Tear down a `peers_by_index` entry **and** keep the shard-owned
1337    /// decrypt-worker state coherent: removes the same `cache_key`
1338    /// from the registered-sessions tracking set and tells the
1339    /// assigned shard worker to drop its `OwnedSessionState` entry.
1340    ///
1341    /// Use this instead of a bare `self.peers_by_index.remove(&key)`
1342    /// at every session-lifecycle teardown site (rekey cross-connection
1343    /// swap, peer disconnect, dispatch session-rotation) so the worker
1344    /// doesn't keep stale ciphers / replay windows around. The
1345    /// follow-up `RegisterSession` for the NEW key (if any) will then
1346    /// install the fresh state on the same shard.
1347    pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1348        // Find the peer that owns this index BEFORE removing it from
1349        // the index map, so we can decide whether the deregistration
1350        // also tears down the peer's connected UDP socket.
1351        let owning_peer = self.peers_by_index.get(&cache_key).copied();
1352        self.peers_by_index.remove(&cache_key);
1353        if self.decrypt_registered_sessions.remove(&cache_key)
1354            && let Some(workers) = self.decrypt_workers.as_ref()
1355        {
1356            workers.unregister_session(cache_key);
1357        }
1358        // Tear down the per-peer connected UDP socket *only* if no
1359        // other peers_by_index entry still resolves to this peer.
1360        // Rekey drain calls into this helper with the OLD session
1361        // index while the NEW index is already installed and points
1362        // at the same peer — there the connect()-ed 5-tuple is
1363        // still valid for the new session and we must not close it.
1364        // Peer-teardown sites (CrossConnection swap, stale-index
1365        // fall-through in encrypted.rs, disconnect handler) call
1366        // here when this is the peer's last index, so the connected
1367        // socket goes away with the peer.
1368        if let Some(peer_addr) = owning_peer {
1369            let peer_has_other_index = self
1370                .peers_by_index
1371                .values()
1372                .any(|other| *other == peer_addr);
1373            if !peer_has_other_index {
1374                self.clear_connected_udp_for_peer(&peer_addr);
1375            }
1376        }
1377    }
1378
1379    /// Ensure the current FMP receive index resolves to this peer.
1380    ///
1381    /// Rekey msg1/msg2 handlers pre-register the pending index before
1382    /// cutover, but losing that registration in a debug build used to
1383    /// panic in the cutover path. Repairing the map here is safe: the
1384    /// peer has already promoted the pending session, and the decrypt
1385    /// worker registration immediately after cutover depends on the
1386    /// same `(transport_id, our_index)` key.
1387    pub(in crate::node) fn ensure_current_session_index_registered(
1388        &mut self,
1389        node_addr: &NodeAddr,
1390        context: &'static str,
1391    ) -> bool {
1392        let Some(peer) = self.peers.get(node_addr) else {
1393            return false;
1394        };
1395        let Some(transport_id) = peer.transport_id() else {
1396            warn!(
1397                peer = %self.peer_display_name(node_addr),
1398                context,
1399                "Cannot register current session index without transport id"
1400            );
1401            return false;
1402        };
1403        let Some(our_index) = peer.our_index() else {
1404            warn!(
1405                peer = %self.peer_display_name(node_addr),
1406                context,
1407                "Cannot register current session index without local index"
1408            );
1409            return false;
1410        };
1411
1412        let cache_key = (transport_id, our_index.as_u32());
1413        match self.peers_by_index.get(&cache_key).copied() {
1414            Some(existing) if existing == *node_addr => true,
1415            Some(existing) => {
1416                warn!(
1417                    peer = %self.peer_display_name(node_addr),
1418                    previous_owner = %self.peer_display_name(&existing),
1419                    transport_id = %transport_id,
1420                    our_index = %our_index,
1421                    context,
1422                    "Repairing current session index with stale owner"
1423                );
1424                self.peers_by_index.insert(cache_key, *node_addr);
1425                true
1426            }
1427            None => {
1428                warn!(
1429                    peer = %self.peer_display_name(node_addr),
1430                    transport_id = %transport_id,
1431                    our_index = %our_index,
1432                    context,
1433                    "Repairing missing current session index"
1434                );
1435                self.peers_by_index.insert(cache_key, *node_addr);
1436                true
1437            }
1438        }
1439    }
1440
1441    // === Configuration ===
1442
1443    /// Get the configuration.
1444    pub fn config(&self) -> &Config {
1445        &self.config
1446    }
1447
1448    /// Calculate the effective IPv6 MTU that can be sent over FIPS.
1449    ///
1450    /// Delegates to `upper::icmp::effective_ipv6_mtu()` with this node's
1451    /// transport MTU. Returns the maximum IPv6 packet size (including
1452    /// IPv6 header) that can be transmitted through the FIPS mesh.
1453    pub fn effective_ipv6_mtu(&self) -> u16 {
1454        crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1455    }
1456
1457    /// Get the transport MTU governing the global TUN-boundary MSS clamp.
1458    ///
1459    /// Returns the **minimum** MTU across all operational transports, or
1460    /// 1280 (IPv6 minimum) as fallback. Used for initial TUN configuration
1461    /// where a specific egress transport isn't yet known: the resulting
1462    /// `effective_ipv6_mtu` (transport_mtu - 77) and `max_mss`
1463    /// (effective_mtu - 60) form a conservative ceiling that fits ANY
1464    /// configured-transport's egress, eliminating PMTU-D black holes that
1465    /// would otherwise occur when a flow's actual egress is smaller than
1466    /// the clamp ceiling assumed at TUN init.
1467    ///
1468    /// Returning the smallest (rather than the first-iterated, which used
1469    /// to vary across HashMap iteration order + async-startup race) makes
1470    /// the clamp deterministic across daemon restarts.
1471    ///
1472    /// See `ISSUE-2026-0011` for the empirical investigation.
1473    pub fn transport_mtu(&self) -> u16 {
1474        let min_operational = self
1475            .transports
1476            .values()
1477            .filter(|h| h.is_operational())
1478            .map(|h| h.mtu())
1479            .min();
1480        if let Some(mtu) = min_operational {
1481            return mtu;
1482        }
1483        // Fallback to config: try UDP first, then Ethernet
1484        if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1485            return cfg.mtu();
1486        }
1487        1280
1488    }
1489
1490    // === State ===
1491
1492    /// Get the node state.
1493    pub fn state(&self) -> NodeState {
1494        self.state
1495    }
1496
1497    /// Get the node uptime.
1498    pub fn uptime(&self) -> std::time::Duration {
1499        self.started_at.elapsed()
1500    }
1501
1502    /// Check if node is operational.
1503    pub fn is_running(&self) -> bool {
1504        self.state.is_operational()
1505    }
1506
1507    /// Check if this is a leaf-only node.
1508    pub fn is_leaf_only(&self) -> bool {
1509        self.is_leaf_only
1510    }
1511
1512    // === Tree State ===
1513
1514    /// Get the tree state.
1515    pub fn tree_state(&self) -> &TreeState {
1516        &self.tree_state
1517    }
1518
1519    /// Get mutable tree state.
1520    pub fn tree_state_mut(&mut self) -> &mut TreeState {
1521        &mut self.tree_state
1522    }
1523
1524    // === Bloom State ===
1525
1526    /// Get the Bloom filter state.
1527    pub fn bloom_state(&self) -> &BloomState {
1528        &self.bloom_state
1529    }
1530
1531    /// Get mutable Bloom filter state.
1532    pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1533        &mut self.bloom_state
1534    }
1535
1536    // === Mesh Size Estimate ===
1537
1538    /// Get the cached estimated mesh size.
1539    pub fn estimated_mesh_size(&self) -> Option<u64> {
1540        self.estimated_mesh_size
1541    }
1542
1543    /// Compute and cache the estimated mesh size from bloom filters.
1544    ///
1545    /// Uses the spanning tree partition: parent's filter covers nodes reachable
1546    /// upward, children's filters cover disjoint subtrees downward. The sum
1547    /// of estimated entry counts plus one (self) approximates total network size.
1548    pub(crate) fn compute_mesh_size(&mut self) {
1549        let my_addr = *self.tree_state.my_node_addr();
1550        let parent_id = *self.tree_state.my_declaration().parent_id();
1551        let is_root = self.tree_state.is_root();
1552
1553        let max_fpr = self.config.node.bloom.max_inbound_fpr;
1554        let mut total: f64 = 1.0; // count self
1555        let mut child_count: u32 = 0;
1556        let mut has_data = false;
1557
1558        // Parent's filter: nodes reachable upward through the tree.
1559        // If any contributing filter is above the FPR cap, we refuse to
1560        // estimate rather than substitute a partial/biased aggregate —
1561        // Node.estimated_mesh_size is already Option<u64> and consumers
1562        // (control socket, fipstop, periodic debug log) handle None.
1563        if !is_root
1564            && let Some(parent) = self.peers.get(&parent_id)
1565            && let Some(filter) = parent.inbound_filter()
1566        {
1567            match filter.estimated_count(max_fpr) {
1568                Some(n) => {
1569                    total += n;
1570                    has_data = true;
1571                }
1572                None => {
1573                    self.estimated_mesh_size = None;
1574                    return;
1575                }
1576            }
1577        }
1578
1579        // Children's filters: each child's subtree is disjoint
1580        for (peer_addr, peer) in &self.peers {
1581            if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1582                && *decl.parent_id() == my_addr
1583            {
1584                child_count += 1;
1585                if let Some(filter) = peer.inbound_filter() {
1586                    match filter.estimated_count(max_fpr) {
1587                        Some(n) => {
1588                            total += n;
1589                            has_data = true;
1590                        }
1591                        None => {
1592                            self.estimated_mesh_size = None;
1593                            return;
1594                        }
1595                    }
1596                }
1597            }
1598        }
1599
1600        if !has_data {
1601            self.estimated_mesh_size = None;
1602            return;
1603        }
1604
1605        let size = total.round() as u64;
1606        self.estimated_mesh_size = Some(size);
1607
1608        // Periodic logging (reuse MMP default interval: 30s)
1609        let now = std::time::Instant::now();
1610        let should_log = match self.last_mesh_size_log {
1611            None => true,
1612            Some(last) => {
1613                now.duration_since(last)
1614                    >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1615            }
1616        };
1617        if should_log {
1618            tracing::debug!(
1619                estimated_mesh_size = size,
1620                peers = self.peers.len(),
1621                children = child_count,
1622                "Mesh size estimate"
1623            );
1624            self.last_mesh_size_log = Some(now);
1625        }
1626    }
1627
1628    // === Coord Cache ===
1629
1630    /// Get the coordinate cache.
1631    pub fn coord_cache(&self) -> &CoordCache {
1632        &self.coord_cache
1633    }
1634
1635    /// Get mutable coordinate cache.
1636    pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1637        &mut self.coord_cache
1638    }
1639
1640    // === Node Statistics ===
1641
1642    /// Get the node statistics.
1643    pub fn stats(&self) -> &stats::NodeStats {
1644        &self.stats
1645    }
1646
1647    /// Get mutable node statistics.
1648    pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1649        &mut self.stats
1650    }
1651
1652    /// Get the stats history collector.
1653    pub fn stats_history(&self) -> &stats_history::StatsHistory {
1654        &self.stats_history
1655    }
1656
1657    /// Sample the current node state into the stats history ring.
1658    /// Called once per tick from the RX loop.
1659    pub(crate) fn record_stats_history(&mut self) {
1660        let fwd = &self.stats.forwarding;
1661        let peers_with_mmp: Vec<f64> = self
1662            .peers
1663            .values()
1664            .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1665            .collect();
1666        let loss_rate = if peers_with_mmp.is_empty() {
1667            0.0
1668        } else {
1669            peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1670        };
1671
1672        let snap = stats_history::Snapshot {
1673            mesh_size: self.estimated_mesh_size,
1674            tree_depth: self.tree_state.my_coords().depth() as u32,
1675            peer_count: self.peers.len() as u64,
1676            parent_switches_total: self.stats.tree.parent_switches,
1677            bytes_in_total: fwd.received_bytes,
1678            bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1679            packets_in_total: fwd.received_packets,
1680            packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1681            loss_rate,
1682            active_sessions: self.sessions.len() as u64,
1683        };
1684
1685        let now = std::time::Instant::now();
1686        let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1687            .peers
1688            .values()
1689            .map(|p| {
1690                let stats = p.link_stats();
1691                let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1692                    Some(m) => (
1693                        m.metrics.srtt_ms(),
1694                        Some(m.metrics.loss_rate()),
1695                        m.receiver.ecn_ce_count() as u64,
1696                    ),
1697                    None => (None, None, 0),
1698                };
1699                stats_history::PeerSnapshot {
1700                    node_addr: *p.node_addr(),
1701                    last_seen: now,
1702                    srtt_ms,
1703                    loss_rate,
1704                    bytes_in_total: stats.bytes_recv,
1705                    bytes_out_total: stats.bytes_sent,
1706                    packets_in_total: stats.packets_recv,
1707                    packets_out_total: stats.packets_sent,
1708                    ecn_ce_total: ecn_ce,
1709                }
1710            })
1711            .collect();
1712
1713        self.stats_history.tick(now, &snap, &peer_snaps);
1714    }
1715
1716    // === TUN Interface ===
1717
1718    /// Get the TUN state.
1719    pub fn tun_state(&self) -> TunState {
1720        self.tun_state
1721    }
1722
1723    /// Get the TUN interface name, if active.
1724    pub fn tun_name(&self) -> Option<&str> {
1725        self.tun_name.as_deref()
1726    }
1727
1728    // === Resource Limits ===
1729
1730    /// Set the maximum number of connections (handshake phase).
1731    pub fn set_max_connections(&mut self, max: usize) {
1732        self.max_connections = max;
1733    }
1734
1735    /// Set the maximum number of peers (authenticated).
1736    pub fn set_max_peers(&mut self, max: usize) {
1737        self.max_peers = max;
1738    }
1739
1740    /// Set the maximum number of links.
1741    pub fn set_max_links(&mut self, max: usize) {
1742        self.max_links = max;
1743    }
1744
1745    // === Counts ===
1746
1747    /// Number of pending connections (handshake in progress).
1748    pub fn connection_count(&self) -> usize {
1749        self.connections.len()
1750    }
1751
1752    /// Number of authenticated peers.
1753    pub fn peer_count(&self) -> usize {
1754        self.peers.len()
1755    }
1756
1757    /// Number of active links.
1758    pub fn link_count(&self) -> usize {
1759        self.links.len()
1760    }
1761
1762    /// Number of active transports.
1763    pub fn transport_count(&self) -> usize {
1764        self.transports.len()
1765    }
1766
1767    // === Transport Management ===
1768
1769    /// Allocate a new transport ID.
1770    pub fn allocate_transport_id(&mut self) -> TransportId {
1771        let id = TransportId::new(self.next_transport_id);
1772        self.next_transport_id += 1;
1773        id
1774    }
1775
1776    /// Get a transport by ID.
1777    pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1778        self.transports.get(id)
1779    }
1780
1781    /// Get mutable transport by ID.
1782    pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1783        self.transports.get_mut(id)
1784    }
1785
1786    /// Iterate over transport IDs.
1787    pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1788        self.transports.keys()
1789    }
1790
1791    /// Get the packet receiver for the event loop.
1792    pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1793        self.packet_rx.as_mut()
1794    }
1795
1796    // === Link Management ===
1797
1798    /// Allocate a new link ID.
1799    pub fn allocate_link_id(&mut self) -> LinkId {
1800        let id = LinkId::new(self.next_link_id);
1801        self.next_link_id += 1;
1802        id
1803    }
1804
1805    /// Add a link.
1806    pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1807        if self.max_links > 0 && self.links.len() >= self.max_links {
1808            return Err(NodeError::MaxLinksExceeded {
1809                max: self.max_links,
1810            });
1811        }
1812        let link_id = link.link_id();
1813        let transport_id = link.transport_id();
1814        let remote_addr = link.remote_addr().clone();
1815
1816        self.links.insert(link_id, link);
1817        self.addr_to_link
1818            .insert((transport_id, remote_addr), link_id);
1819        Ok(())
1820    }
1821
1822    /// Get a link by ID.
1823    pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
1824        self.links.get(link_id)
1825    }
1826
1827    /// Get a mutable link by ID.
1828    pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
1829        self.links.get_mut(link_id)
1830    }
1831
1832    /// Find link ID by transport address.
1833    pub fn find_link_by_addr(
1834        &self,
1835        transport_id: TransportId,
1836        addr: &TransportAddr,
1837    ) -> Option<LinkId> {
1838        self.addr_to_link
1839            .get(&(transport_id, addr.clone()))
1840            .copied()
1841    }
1842
1843    /// Remove a link.
1844    ///
1845    /// Only removes the addr_to_link reverse lookup if it still points to this
1846    /// link. In cross-connection scenarios, a newer link may have replaced the
1847    /// entry for the same address.
1848    pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
1849        if let Some(link) = self.links.remove(link_id) {
1850            // Clean up reverse lookup only if it still maps to this link
1851            let key = (link.transport_id(), link.remote_addr().clone());
1852            if self.addr_to_link.get(&key) == Some(link_id) {
1853                self.addr_to_link.remove(&key);
1854            }
1855            Some(link)
1856        } else {
1857            None
1858        }
1859    }
1860
1861    pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
1862        if !self.bootstrap_transports.contains(&transport_id) {
1863            return;
1864        }
1865
1866        let transport_in_use = self
1867            .links
1868            .values()
1869            .any(|link| link.transport_id() == transport_id)
1870            || self
1871                .connections
1872                .values()
1873                .any(|conn| conn.transport_id() == Some(transport_id))
1874            || self
1875                .peers
1876                .values()
1877                .any(|peer| peer.transport_id() == Some(transport_id))
1878            || self
1879                .pending_connects
1880                .iter()
1881                .any(|pending| pending.transport_id == transport_id);
1882
1883        if transport_in_use {
1884            return;
1885        }
1886
1887        tracing::debug!(
1888            transport_id = %transport_id,
1889            "bootstrap transport has no remaining references; dropping"
1890        );
1891
1892        self.bootstrap_transports.remove(&transport_id);
1893        self.bootstrap_transport_npubs.remove(&transport_id);
1894        self.transport_drops.remove(&transport_id);
1895        self.transports.remove(&transport_id);
1896    }
1897
1898    /// Iterate over all links.
1899    pub fn links(&self) -> impl Iterator<Item = &Link> {
1900        self.links.values()
1901    }
1902
1903    // === Connection Management (Handshake Phase) ===
1904
1905    /// Add a pending connection.
1906    pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
1907        let link_id = connection.link_id();
1908
1909        if self.connections.contains_key(&link_id) {
1910            return Err(NodeError::ConnectionAlreadyExists(link_id));
1911        }
1912
1913        if self.max_connections > 0 && self.connections.len() >= self.max_connections {
1914            return Err(NodeError::MaxConnectionsExceeded {
1915                max: self.max_connections,
1916            });
1917        }
1918
1919        self.connections.insert(link_id, connection);
1920        Ok(())
1921    }
1922
1923    /// Get a connection by LinkId.
1924    pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
1925        self.connections.get(link_id)
1926    }
1927
1928    /// Get a mutable connection by LinkId.
1929    pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
1930        self.connections.get_mut(link_id)
1931    }
1932
1933    /// Remove a connection.
1934    pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
1935        self.connections.remove(link_id)
1936    }
1937
1938    /// Iterate over all connections.
1939    pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
1940        self.connections.values()
1941    }
1942
1943    // === Peer Management (Active Phase) ===
1944
1945    /// Get a peer by NodeAddr.
1946    pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
1947        self.peers.get(node_addr)
1948    }
1949
1950    /// Get a mutable peer by NodeAddr.
1951    pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
1952        self.peers.get_mut(node_addr)
1953    }
1954
1955    /// Remove a peer.
1956    pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
1957        self.peers.remove(node_addr)
1958    }
1959
1960    /// Iterate over all peers.
1961    pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
1962        self.peers.values()
1963    }
1964
1965    /// Reference to the Nostr discovery handle if discovery is enabled.
1966    /// Used by control queries (`show_peers` per-peer Nostr-traversal
1967    /// state) to read failure-state without taking shared ownership.
1968    pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
1969        self.nostr_discovery.as_deref()
1970    }
1971
1972    /// Iterate over all peer node IDs.
1973    pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
1974        self.peers.keys()
1975    }
1976
1977    /// Iterate over peers that can send traffic.
1978    pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
1979        self.peers.values().filter(|p| p.can_send())
1980    }
1981
1982    /// Number of peers that can send traffic.
1983    pub fn sendable_peer_count(&self) -> usize {
1984        self.peers.values().filter(|p| p.can_send()).count()
1985    }
1986
1987    pub(crate) fn set_discovery_fallback_transit_allowed(
1988        &mut self,
1989        peer_addr: NodeAddr,
1990        allowed: bool,
1991    ) {
1992        if allowed {
1993            self.discovery_fallback_transit_blocked_peers
1994                .remove(&peer_addr);
1995        } else {
1996            self.discovery_fallback_transit_blocked_peers
1997                .insert(peer_addr);
1998        }
1999    }
2000
2001    pub(crate) fn configured_discovery_fallback_transit(
2002        &self,
2003        peer_addr: &NodeAddr,
2004    ) -> Option<bool> {
2005        self.config.peers().iter().find_map(|peer| {
2006            PeerIdentity::from_npub(&peer.npub)
2007                .ok()
2008                .filter(|identity| identity.node_addr() == peer_addr)
2009                .map(|_| peer.discovery_fallback_transit)
2010        })
2011    }
2012
2013    pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
2014        if let Some(retry_state) = self.retry_pending.get(peer_addr) {
2015            return retry_state.peer_config.discovery_fallback_transit;
2016        }
2017
2018        if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2019            return allowed;
2020        }
2021
2022        self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2023    }
2024
2025    // === End-to-End Sessions ===
2026
2027    /// Get a session by remote NodeAddr.
2028    /// Disable the discovery forward rate limiter (for tests).
2029    #[cfg(test)]
2030    pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2031        self.discovery_forward_limiter
2032            .set_interval(std::time::Duration::ZERO);
2033    }
2034
2035    #[cfg(test)]
2036    pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2037        self.sessions.get(remote)
2038    }
2039
2040    /// Get a mutable session by remote NodeAddr.
2041    #[cfg(test)]
2042    pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2043        self.sessions.get_mut(remote)
2044    }
2045
2046    /// Remove a session.
2047    #[cfg(test)]
2048    pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2049        self.sessions.remove(remote)
2050    }
2051
2052    /// Read the path_mtu_lookup entry for a destination FipsAddress.
2053    #[cfg(test)]
2054    pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2055        self.path_mtu_lookup
2056            .read()
2057            .ok()
2058            .and_then(|map| map.get(fips_addr).copied())
2059    }
2060
2061    /// Write a path_mtu_lookup entry directly (for tests that pre-seed the map).
2062    #[cfg(test)]
2063    pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2064        if let Ok(mut map) = self.path_mtu_lookup.write() {
2065            map.insert(fips_addr, mtu);
2066        }
2067    }
2068
2069    /// Number of end-to-end sessions.
2070    pub fn session_count(&self) -> usize {
2071        self.sessions.len()
2072    }
2073
2074    /// Iterate over all session entries (for control queries).
2075    pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2076        self.sessions.iter()
2077    }
2078
2079    // === Identity Cache ===
2080
2081    /// Register a node in the identity cache for FipsAddress → NodeAddr lookup.
2082    pub(crate) fn register_identity(
2083        &mut self,
2084        node_addr: NodeAddr,
2085        pubkey: secp256k1::PublicKey,
2086    ) -> bool {
2087        let mut prefix = [0u8; 15];
2088        prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2089        if let Some(entry) = self.identity_cache.get(&prefix)
2090            && entry.node_addr == node_addr
2091            && entry.pubkey == pubkey
2092        {
2093            // Endpoint sends pass the same PeerIdentity on every packet. Once
2094            // validated, avoid re-deriving NodeAddr from the public key in the
2095            // data path; that hash showed up in macOS sender profiles.
2096            return true;
2097        }
2098
2099        let (xonly, _) = pubkey.x_only_public_key();
2100        let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2101        if derived_node_addr != node_addr {
2102            debug!(
2103                claimed_node_addr = %node_addr,
2104                derived_node_addr = %derived_node_addr,
2105                "Rejected identity cache entry with mismatched public key"
2106            );
2107            return false;
2108        }
2109
2110        let now_ms = Self::now_ms();
2111        if let Some(entry) = self.identity_cache.get_mut(&prefix)
2112            && entry.node_addr == node_addr
2113        {
2114            entry.pubkey = pubkey;
2115            entry.last_seen_ms = now_ms;
2116            return true;
2117        }
2118
2119        let npub = encode_npub(&xonly);
2120        self.identity_cache.insert(
2121            prefix,
2122            IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2123        );
2124        // LRU eviction
2125        let max = self.config.node.cache.identity_size;
2126        if self.identity_cache.len() > max
2127            && let Some(oldest_key) = self
2128                .identity_cache
2129                .iter()
2130                .min_by_key(|(_, entry)| entry.last_seen_ms)
2131                .map(|(k, _)| *k)
2132        {
2133            self.identity_cache.remove(&oldest_key);
2134        }
2135        true
2136    }
2137
2138    /// Look up a destination by FipsAddress prefix (bytes 1-15 of the IPv6 address).
2139    pub(crate) fn lookup_by_fips_prefix(
2140        &mut self,
2141        prefix: &[u8; 15],
2142    ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2143        if let Some(entry) = self.identity_cache.get_mut(prefix) {
2144            entry.last_seen_ms = Self::now_ms(); // LRU touch
2145            Some((entry.node_addr, entry.pubkey))
2146        } else {
2147            None
2148        }
2149    }
2150
2151    /// Check if a node's identity is in the cache (without LRU touch).
2152    pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2153        let mut prefix = [0u8; 15];
2154        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2155        self.identity_cache.contains_key(&prefix)
2156    }
2157
2158    /// Number of identity cache entries.
2159    pub fn identity_cache_len(&self) -> usize {
2160        self.identity_cache.len()
2161    }
2162
2163    /// Iterate over identity cache entries.
2164    ///
2165    /// Returns `(NodeAddr, PublicKey, last_seen_ms)` for each cached identity.
2166    /// Used by the `show_identity_cache` control query.
2167    pub fn identity_cache_iter(
2168        &self,
2169    ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2170        self.identity_cache
2171            .values()
2172            .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2173    }
2174
2175    /// Configured maximum identity cache size.
2176    pub fn identity_cache_max(&self) -> usize {
2177        self.config.node.cache.identity_size
2178    }
2179
2180    /// Number of pending discovery lookups.
2181    pub fn pending_lookup_count(&self) -> usize {
2182        self.pending_lookups.len()
2183    }
2184
2185    /// Iterate over pending discovery lookups for diagnostics.
2186    pub fn pending_lookups_iter(
2187        &self,
2188    ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2189        self.pending_lookups.iter()
2190    }
2191
2192    /// Number of recent discovery requests tracked.
2193    pub fn recent_request_count(&self) -> usize {
2194        self.recent_requests.len()
2195    }
2196
2197    /// Count of destinations with queued TUN packets awaiting session setup.
2198    pub fn pending_tun_destinations(&self) -> usize {
2199        self.pending_tun_packets.len()
2200    }
2201
2202    /// Total TUN packets queued across all destinations.
2203    pub fn pending_tun_total_packets(&self) -> usize {
2204        self.pending_tun_packets.values().map(|q| q.len()).sum()
2205    }
2206
2207    /// Iterate over retry state for diagnostics.
2208    pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2209        self.retry_pending.iter()
2210    }
2211
2212    // === Routing ===
2213
2214    /// Check if a peer is a tree neighbor (parent or child in the spanning tree).
2215    ///
2216    /// Returns true if the peer is our current tree parent, or if the peer
2217    /// has declared us as their parent (making them our child).
2218    pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2219        // Peer is our parent
2220        if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2221            return true;
2222        }
2223        // Peer is our child (their declaration names us as parent)
2224        if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2225            && decl.parent_id() == self.node_addr()
2226        {
2227            return true;
2228        }
2229        false
2230    }
2231
2232    /// Find next hop for a destination node address.
2233    ///
2234    /// Routing priority:
2235    /// 1. Destination is self → `None` (local delivery)
2236    /// 2. Destination is a direct peer → that peer
2237    /// 3. Reply-learned routes in `reply_learned` mode. These are locally
2238    ///    observed reverse paths, selected with weighted multipath plus
2239    ///    periodic coordinate/tree exploration.
2240    /// 4. Bloom filter candidates with cached dest coords → among peers whose
2241    ///    bloom filter contains the destination, pick the one that minimizes
2242    ///    tree distance to the destination, with
2243    ///    `(link_cost, tree_distance_to_dest, node_addr)` tie-breaking.
2244    ///    The self-distance check ensures only peers strictly closer to the
2245    ///    destination than us are considered (prevents routing loops).
2246    /// 5. Greedy tree routing fallback (requires cached dest coords)
2247    /// 6. No route → `None`
2248    ///
2249    /// Both the bloom filter and tree routing paths require cached destination
2250    /// coordinates (checked in `coord_cache`). Without coordinates, the node
2251    /// cannot make loop-free forwarding decisions. The caller should signal
2252    /// `CoordsRequired` back to the source when `None` is returned for a
2253    /// non-local destination.
2254    pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2255        // 1. Local delivery
2256        if dest_node_addr == self.node_addr() {
2257            return None;
2258        }
2259
2260        // 2. Healthy direct peer. Stale direct links remain usable, but in
2261        // reply-learned mode they must not hide a live mesh route learned from
2262        // recent traffic or discovery.
2263        let direct_peer_can_send = self
2264            .peers
2265            .get(dest_node_addr)
2266            .is_some_and(|peer| peer.can_send());
2267        if let Some(peer) = self.peers.get(dest_node_addr)
2268            && peer.is_healthy()
2269        {
2270            return Some(peer);
2271        }
2272
2273        let now_ms = Self::now_ms();
2274
2275        let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2276            Some(
2277                self.peers
2278                    .iter()
2279                    .filter(|(_, peer)| peer.can_send())
2280                    .map(|(addr, _)| *addr)
2281                    .collect::<HashSet<_>>(),
2282            )
2283        } else {
2284            None
2285        };
2286
2287        // 3. Optional reply-learned routing. These entries are not peer
2288        // claims; they are local observations of which peer carried traffic
2289        // or a verified lookup response back from the destination. Most
2290        // packets use weighted multipath over learned routes, but periodic
2291        // fallback exploration lets coord/bloom/tree routes discover better
2292        // candidates.
2293        let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2294            self.learned_routes.should_explore_fallback(
2295                dest_node_addr,
2296                now_ms,
2297                self.config.node.routing.learned_fallback_explore_interval,
2298                |addr| sendable.contains(addr),
2299            )
2300        });
2301        if let Some(sendable) = &sendable_learned_peers
2302            && !explore_fallback
2303            && let Some(next_hop_addr) =
2304                self.learned_routes
2305                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2306        {
2307            return self.peers.get(&next_hop_addr);
2308        }
2309
2310        // Look up cached destination coordinates (required by both bloom and tree paths).
2311        let Some(dest_coords) = self
2312            .coord_cache
2313            .get_and_touch(dest_node_addr, now_ms)
2314            .cloned()
2315        else {
2316            if let Some(sendable) = &sendable_learned_peers
2317                && let Some(next_hop_addr) =
2318                    self.learned_routes
2319                        .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2320            {
2321                return self.peers.get(&next_hop_addr);
2322            }
2323            if direct_peer_can_send {
2324                return self.peers.get(dest_node_addr);
2325            }
2326            return None;
2327        };
2328
2329        // 4. Bloom filter candidates — requires dest_coords for loop-free selection.
2330        //    If no candidate is strictly closer, fall through to tree routing.
2331        let coordinate_route_addr = {
2332            let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2333            if !candidates.is_empty() {
2334                self.select_best_candidate(&candidates, &dest_coords)
2335                    .map(|peer| *peer.node_addr())
2336            } else {
2337                None
2338            }
2339        };
2340        if let Some(next_hop_addr) = coordinate_route_addr {
2341            return self.peers.get(&next_hop_addr);
2342        }
2343
2344        // 5. Greedy tree routing fallback
2345        let tree_route_addr = self
2346            .tree_state
2347            .find_next_hop(&dest_coords)
2348            .filter(|next_hop_id| {
2349                self.peers
2350                    .get(next_hop_id)
2351                    .is_some_and(|peer| peer.can_send())
2352            });
2353        if let Some(next_hop_addr) = tree_route_addr {
2354            return self.peers.get(&next_hop_addr);
2355        }
2356        if explore_fallback {
2357            return sendable_learned_peers.as_ref().and_then(|sendable| {
2358                self.learned_routes
2359                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2360                    .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2361            });
2362        }
2363
2364        if let Some(sendable) = &sendable_learned_peers
2365            && let Some(next_hop_addr) =
2366                self.learned_routes
2367                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2368        {
2369            return self.peers.get(&next_hop_addr);
2370        }
2371
2372        if direct_peer_can_send {
2373            return self.peers.get(dest_node_addr);
2374        }
2375
2376        None
2377    }
2378
2379    pub(in crate::node) fn learn_reverse_route(
2380        &mut self,
2381        destination: NodeAddr,
2382        next_hop: NodeAddr,
2383    ) {
2384        if self.config.node.routing.mode != RoutingMode::ReplyLearned
2385            || destination == *self.node_addr()
2386        {
2387            return;
2388        }
2389        let now_ms = Self::now_ms();
2390        self.learned_routes.learn(
2391            destination,
2392            next_hop,
2393            now_ms,
2394            self.config.node.routing.learned_ttl_secs,
2395            self.config.node.routing.max_learned_routes_per_dest,
2396        );
2397    }
2398
2399    pub(in crate::node) fn record_route_failure(
2400        &mut self,
2401        destination: NodeAddr,
2402        next_hop: NodeAddr,
2403    ) {
2404        if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2405            return;
2406        }
2407        self.learned_routes.record_failure(&destination, &next_hop);
2408    }
2409
2410    pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2411        self.learned_routes.snapshot(now_ms)
2412    }
2413
2414    pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2415        self.learned_routes.purge_expired(now_ms);
2416    }
2417
2418    /// Select the best peer from a set of bloom filter candidates.
2419    ///
2420    /// Uses distance from each candidate's tree coordinates to the destination
2421    /// as the primary metric (after link_cost). Only selects peers that are
2422    /// strictly closer to the destination than we are (self-distance check
2423    /// prevents routing loops).
2424    ///
2425    /// Ordering: `(link_cost, distance_to_dest, node_addr)`.
2426    fn select_best_candidate<'a>(
2427        &'a self,
2428        candidates: &[&'a ActivePeer],
2429        dest_coords: &crate::tree::TreeCoordinate,
2430    ) -> Option<&'a ActivePeer> {
2431        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2432
2433        let mut best: Option<(&ActivePeer, f64, usize)> = None;
2434
2435        for &candidate in candidates {
2436            if !candidate.can_send() {
2437                continue;
2438            }
2439
2440            let cost = candidate.link_cost();
2441
2442            let dist = self
2443                .tree_state
2444                .peer_coords(candidate.node_addr())
2445                .map(|pc| pc.distance_to(dest_coords))
2446                .unwrap_or(usize::MAX);
2447
2448            // Self-distance check: only consider peers strictly closer
2449            // to the destination than we are (prevents routing loops)
2450            if dist >= my_distance {
2451                continue;
2452            }
2453
2454            let dominated = match &best {
2455                None => true,
2456                Some((_, best_cost, best_dist)) => {
2457                    cost < *best_cost
2458                        || (cost == *best_cost && dist < *best_dist)
2459                        || (cost == *best_cost
2460                            && dist == *best_dist
2461                            && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2462                }
2463            };
2464
2465            if dominated {
2466                best = Some((candidate, cost, dist));
2467            }
2468        }
2469
2470        best.map(|(peer, _, _)| peer)
2471    }
2472
2473    /// Check if a destination is in any peer's bloom filter.
2474    pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2475        self.peers.values().filter(|p| p.may_reach(dest)).collect()
2476    }
2477
2478    /// Get the TUN packet sender channel.
2479    ///
2480    /// Returns None if TUN is not active or the node hasn't been started.
2481    pub fn tun_tx(&self) -> Option<&TunTx> {
2482        self.tun_tx.as_ref()
2483    }
2484
2485    /// Attach app-owned packet I/O for embedded operation without a system TUN.
2486    ///
2487    /// This must be called before [`Node::start`] and requires `tun.enabled =
2488    /// false`. Outbound packets sent to the returned sender are processed by the
2489    /// normal session pipeline. Inbound packets delivered by FIPS sessions are
2490    /// sent to the returned receiver with source attribution.
2491    pub fn attach_external_packet_io(
2492        &mut self,
2493        capacity: usize,
2494    ) -> Result<ExternalPacketIo, NodeError> {
2495        if self.state != NodeState::Created {
2496            return Err(NodeError::Config(ConfigError::Validation(
2497                "external packet I/O must be attached before node start".to_string(),
2498            )));
2499        }
2500        if self.config.tun.enabled {
2501            return Err(NodeError::Config(ConfigError::Validation(
2502                "external packet I/O requires tun.enabled=false".to_string(),
2503            )));
2504        }
2505
2506        let capacity = capacity.max(1);
2507        let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2508        let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2509        self.tun_outbound_rx = Some(outbound_rx);
2510        self.external_packet_tx = Some(inbound_tx);
2511
2512        Ok(ExternalPacketIo {
2513            outbound_tx,
2514            inbound_rx,
2515        })
2516    }
2517
2518    /// Attach app-owned endpoint data I/O for embedded operation.
2519    ///
2520    /// Commands sent to the returned sender are processed by the node RX loop.
2521    /// Incoming endpoint data is emitted as source-attributed events.
2522    pub(crate) fn attach_endpoint_data_io(
2523        &mut self,
2524        capacity: usize,
2525    ) -> Result<EndpointDataIo, NodeError> {
2526        if self.state != NodeState::Created {
2527            return Err(NodeError::Config(ConfigError::Validation(
2528                "endpoint data I/O must be attached before node start".to_string(),
2529            )));
2530        }
2531
2532        let command_capacity = endpoint_data_command_capacity(capacity);
2533        let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2534        // Inbound endpoint-data events use an unbounded channel — see
2535        // `EndpointDataIo::event_rx` docs for the rationale (kills the
2536        // per-packet semaphore + the cross-task relay task that used to
2537        // sit on top of this channel).
2538        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2539        self.endpoint_command_rx = Some(command_rx);
2540        self.endpoint_event_tx = Some(event_tx.clone());
2541
2542        Ok(EndpointDataIo {
2543            command_tx,
2544            event_rx,
2545            event_tx,
2546        })
2547    }
2548
2549    pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2550        let mut prefix = [0u8; 15];
2551        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2552        self.identity_cache
2553            .get(&prefix)
2554            .filter(|entry| &entry.node_addr == addr)
2555            .map(|entry| entry.pubkey)
2556    }
2557
2558    pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2559        let mut prefix = [0u8; 15];
2560        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2561        self.identity_cache
2562            .get(&prefix)
2563            .filter(|entry| &entry.node_addr == addr)
2564            .map(|entry| entry.npub.clone())
2565    }
2566
2567    pub(in crate::node) fn deliver_external_ipv6_packet(
2568        &self,
2569        src_addr: &NodeAddr,
2570        packet: Vec<u8>,
2571    ) {
2572        let Some(external_packet_tx) = &self.external_packet_tx else {
2573            return;
2574        };
2575        if packet.len() < 40 {
2576            return;
2577        }
2578        let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2579            return;
2580        };
2581        let delivered = NodeDeliveredPacket {
2582            source_node_addr: *src_addr,
2583            source_npub: self.npub_for_node_addr(src_addr),
2584            destination,
2585            packet,
2586        };
2587        if let Err(error) = external_packet_tx.try_send(delivered) {
2588            debug!(error = %error, "Failed to deliver packet to external app sink");
2589        }
2590    }
2591
2592    // === Sending ===
2593
2594    /// Encrypt and send a link-layer message to an authenticated peer.
2595    ///
2596    /// The plaintext should include the message type byte followed by the
2597    /// message-specific payload (e.g., `[0x50, reason]` for Disconnect).
2598    ///
2599    /// The send path prepends a 4-byte session-relative timestamp (inner
2600    /// header) before encryption. The full 16-byte outer header is used
2601    /// as AAD for the AEAD construction.
2602    ///
2603    /// This is the standard path for sending any link-layer control message
2604    /// to a peer over their encrypted Noise session.
2605    pub(super) async fn send_encrypted_link_message(
2606        &mut self,
2607        node_addr: &NodeAddr,
2608        plaintext: &[u8],
2609    ) -> Result<(), NodeError> {
2610        self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2611            .await
2612    }
2613
2614    /// Update the local-outbound-broken signal from a `transport.send`
2615    /// outcome. Sets `last_local_send_failure_at` on local-side io
2616    /// errors (NetworkUnreachable / HostUnreachable / AddrNotAvailable);
2617    /// clears it on success. The reaper consults this in
2618    /// `check_link_heartbeats` to switch to `fast_link_dead_timeout_secs`.
2619    pub(in crate::node) fn note_local_send_outcome(
2620        &mut self,
2621        result: &Result<usize, TransportError>,
2622    ) {
2623        match result {
2624            Ok(_) => {
2625                if self.last_local_send_failure_at.is_some() {
2626                    self.last_local_send_failure_at = None;
2627                }
2628            }
2629            Err(TransportError::Io(e))
2630                if matches!(
2631                    e.kind(),
2632                    std::io::ErrorKind::NetworkUnreachable
2633                        | std::io::ErrorKind::HostUnreachable
2634                        | std::io::ErrorKind::AddrNotAvailable
2635                ) =>
2636            {
2637                self.last_local_send_failure_at = Some(std::time::Instant::now());
2638            }
2639            Err(_) => {}
2640        }
2641    }
2642
2643    /// Returns the wall-clock instant of the most recent observed local
2644    /// outbound failure, if any. Used by the link-dead reaper.
2645    pub(in crate::node) fn last_local_send_failure_at(&self) -> Option<std::time::Instant> {
2646        self.last_local_send_failure_at
2647    }
2648
2649    /// Like `send_encrypted_link_message` but allows setting the FMP CE flag.
2650    ///
2651    /// Used by the forwarding path to relay congestion signals hop-by-hop.
2652    pub(super) async fn send_encrypted_link_message_with_ce(
2653        &mut self,
2654        node_addr: &NodeAddr,
2655        plaintext: &[u8],
2656        ce_flag: bool,
2657    ) -> Result<(), NodeError> {
2658        let peer = self
2659            .peers
2660            .get_mut(node_addr)
2661            .ok_or(NodeError::PeerNotFound(*node_addr))?;
2662
2663        let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2664            node_addr: *node_addr,
2665            reason: "no their_index".into(),
2666        })?;
2667        let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2668            node_addr: *node_addr,
2669            reason: "no transport_id".into(),
2670        })?;
2671        let remote_addr = peer
2672            .current_addr()
2673            .cloned()
2674            .ok_or_else(|| NodeError::SendFailed {
2675                node_addr: *node_addr,
2676                reason: "no current_addr".into(),
2677            })?;
2678        #[cfg(any(target_os = "linux", target_os = "macos"))]
2679        let connected_socket = peer.connected_udp();
2680
2681        // Prepend 4-byte session-relative timestamp (inner header)
2682        let timestamp_ms = peer.session_elapsed_ms();
2683
2684        // MMP: read spin bit value before entering session borrow
2685        let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2686        let mut flags = if sp_flag { FLAG_SP } else { 0 };
2687        if ce_flag {
2688            flags |= FLAG_CE;
2689        }
2690        if peer.current_k_bit() {
2691            flags |= FLAG_KEY_EPOCH;
2692        }
2693
2694        let session = peer
2695            .noise_session_mut()
2696            .ok_or_else(|| NodeError::SendFailed {
2697                node_addr: *node_addr,
2698                reason: "no noise session".into(),
2699            })?;
2700
2701        // Build 16-byte outer header upfront. The inner-plaintext
2702        // layout is `[ts:4 LE][plaintext...]`, so its length is exactly
2703        // `INNER_TS_LEN + plaintext.len()` — no need to build the Vec
2704        // just to measure it. The worker path uses this length to size
2705        // the wire buffer directly; the legacy path below still
2706        // materialises a separate `inner_plaintext` Vec for the inline
2707        // encrypt-and-send call.
2708        const INNER_TS_LEN: usize = 4;
2709        let counter = session.current_send_counter();
2710        let inner_len = INNER_TS_LEN + plaintext.len();
2711        let payload_len = inner_len as u16;
2712        let header = build_established_header(their_index, counter, flags, payload_len);
2713
2714        // **UDP send fast path.** The encrypt-worker pool is always
2715        // spawned at lifecycle start (workers = num_cpus) in
2716        // production, so this branch is taken for every authentic
2717        // send on every UDP-transported established session. The
2718        // AEAD work + sendmsg syscall run on a dedicated OS thread;
2719        // the rx_loop only builds the wire buffer + reserves the
2720        // counter inline.
2721        //
2722        // Other transport kinds (BLE, TCP, sim, ethernet) fall
2723        // through to the inline encrypt + transport.send path
2724        // below — those don't have raw-fd / sendmmsg / UDP_GSO
2725        // benefits to expose through the worker pool, so the simpler
2726        // synchronous send is the right shape for them.
2727        //
2728        // The `encrypt_workers.is_some()` check below is true in
2729        // production (lifecycle::start spawns the pool); it stays
2730        // checked rather than `expect()`-ed because unit tests
2731        // construct `Node` without calling `start()`.
2732        let transport_for_send = self
2733            .transports
2734            .get(&transport_id)
2735            .ok_or(NodeError::TransportNotFound(transport_id))?;
2736        let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2737        if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2738            && is_udp
2739            && let Some(cipher_clone) = session.send_cipher_clone()
2740        {
2741            {
2742                // Reserve the counter on the session so subsequent
2743                // sends don't reuse it. `current_send_counter` only
2744                // peeks; we advance via `take_send_counter`.
2745                let reserved_counter =
2746                    session
2747                        .take_send_counter()
2748                        .map_err(|e| NodeError::SendFailed {
2749                            node_addr: *node_addr,
2750                            reason: format!("counter reservation failed: {}", e),
2751                        })?;
2752                debug_assert_eq!(reserved_counter, counter);
2753                // Re-derive the header with the now-locked-in counter
2754                // value (same value, but the call sequence is more
2755                // explicit).
2756                let header =
2757                    build_established_header(their_index, reserved_counter, flags, payload_len);
2758                let transport = transport_for_send;
2759                // Snapshot the per-peer connected UDP socket before
2760                // resolving the fallback address. On the established
2761                // steady-state path this socket already carries the
2762                // kernel peer address, so re-parsing the configured
2763                // transport address and touching the DNS cache on every
2764                // packet is pure overhead on the sender hot path.
2765                let send_target = {
2766                    if let TransportHandle::Udp(udp) = transport {
2767                        let socket_addr = {
2768                            #[cfg(any(target_os = "linux", target_os = "macos"))]
2769                            {
2770                                match connected_socket.as_ref() {
2771                                    Some(socket) => Some(socket.peer_addr()),
2772                                    None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2773                                }
2774                            }
2775                            #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2776                            {
2777                                udp.resolve_for_off_task(&remote_addr).await.ok()
2778                            }
2779                        };
2780                        match (udp.async_socket(), socket_addr) {
2781                            (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
2782                            _ => None,
2783                        }
2784                    } else {
2785                        None
2786                    }
2787                };
2788                if let Some((socket, socket_addr)) = send_target {
2789                    // Build the wire buffer **directly** from
2790                    // `plaintext` with a single allocation:
2791                    //   `[16 header][4 ts][plaintext...]` with
2792                    // +16 trailing capacity for the AEAD tag.
2793                    // The worker seals `wire_buf[16..]` in
2794                    // place and appends the tag — no second
2795                    // alloc, no second memcpy.
2796                    //
2797                    // Previous design built `inner_plaintext`
2798                    // via `prepend_inner_header` (1 alloc + 1
2799                    // copy) and then let the worker memcpy
2800                    // header + plaintext into a fresh Vec
2801                    // (another alloc + copy). At ~100 kpps the
2802                    // saved alloc/copy is ~150 MB/sec of memory
2803                    // bandwidth on the hot rx_loop + worker.
2804                    let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
2805                    let mut wire_buf = Vec::with_capacity(wire_capacity);
2806                    wire_buf.extend_from_slice(&header);
2807                    wire_buf.extend_from_slice(&timestamp_ms.to_le_bytes());
2808                    wire_buf.extend_from_slice(plaintext);
2809                    let predicted_bytes = wire_capacity;
2810                    // Stats / MMP update inline — predicted size
2811                    // is exact for ChaCha20-Poly1305 (tag is
2812                    // constant 16 bytes). When `connected_socket` is
2813                    // `Some`, the worker sends on it without a
2814                    // destination sockaddr — the kernel skips the
2815                    // per-packet sockaddr + route + neighbor resolve.
2816                    if let Some(peer) = self.peers.get_mut(node_addr) {
2817                        peer.link_stats_mut().record_sent(predicted_bytes);
2818                        if let Some(mmp) = peer.mmp_mut() {
2819                            mmp.sender
2820                                .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
2821                        }
2822                    }
2823                    workers.dispatch(self::encrypt_worker::FmpSendJob {
2824                        cipher: cipher_clone,
2825                        counter: reserved_counter,
2826                        wire_buf,
2827                        fsp_seal: None,
2828                        socket,
2829                        dest_addr: socket_addr,
2830                        #[cfg(any(target_os = "linux", target_os = "macos"))]
2831                        connected_socket,
2832                        drop_on_backpressure: plaintext
2833                            .first()
2834                            .is_some_and(|ty| *ty == SessionMessageType::EndpointData.to_byte()),
2835                        queued_at: crate::perf_profile::stamp(),
2836                    });
2837                    return Ok(());
2838                }
2839            }
2840        }
2841
2842        // Inline (legacy) path: encrypt + send on the rx_loop.
2843        // Build the inner plaintext lazily here — the worker path
2844        // above never reaches this point, so the prepend_inner_header
2845        // alloc is avoided in the fast path.
2846        let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
2847        // Encrypt with AAD binding to the outer header
2848        let ciphertext = {
2849            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
2850            session
2851                .encrypt_with_aad(&inner_plaintext, &header)
2852                .map_err(|e| NodeError::SendFailed {
2853                    node_addr: *node_addr,
2854                    reason: format!("encryption failed: {}", e),
2855                })?
2856        };
2857
2858        let wire_packet = build_encrypted(&header, &ciphertext);
2859
2860        // Re-borrow peer for stats update after sending
2861        let send_result = {
2862            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
2863            let transport = self
2864                .transports
2865                .get(&transport_id)
2866                .ok_or(NodeError::TransportNotFound(transport_id))?;
2867            transport.send(&remote_addr, &wire_packet).await
2868        };
2869        self.note_local_send_outcome(&send_result);
2870        let bytes_sent = send_result.map_err(|e| match e {
2871            TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
2872                node_addr: *node_addr,
2873                packet_size,
2874                mtu,
2875            },
2876            other => NodeError::SendFailed {
2877                node_addr: *node_addr,
2878                reason: format!("transport send: {}", other),
2879            },
2880        })?;
2881
2882        // Update send statistics
2883        if let Some(peer) = self.peers.get_mut(node_addr) {
2884            peer.link_stats_mut().record_sent(bytes_sent);
2885            // MMP: record sent frame for sender report generation
2886            if let Some(mmp) = peer.mmp_mut() {
2887                mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
2888            }
2889        }
2890
2891        Ok(())
2892    }
2893}
2894
2895impl fmt::Debug for Node {
2896    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2897        f.debug_struct("Node")
2898            .field("node_addr", self.node_addr())
2899            .field("state", &self.state)
2900            .field("is_leaf_only", &self.is_leaf_only)
2901            .field("connections", &self.connection_count())
2902            .field("peers", &self.peer_count())
2903            .field("links", &self.link_count())
2904            .field("transports", &self.transport_count())
2905            .finish()
2906    }
2907}