Skip to main content

fips_core/node/
mod.rs

1//! FIPS Node Entity
2//!
3//! Top-level structure representing a running FIPS instance. The Node
4//! holds all state required for mesh routing: identity, tree state,
5//! Bloom filters, coordinate caches, transports, links, and peers.
6
7mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32    ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33    build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::RoutingMode;
38use crate::node::session::SessionEntry;
39use crate::peer::{ActivePeer, PeerConnection};
40#[cfg(any(target_os = "linux", target_os = "macos"))]
41use crate::transport::ethernet::EthernetTransport;
42use crate::transport::tcp::TcpTransport;
43use crate::transport::tor::TorTransport;
44use crate::transport::udp::UdpTransport;
45use crate::transport::{
46    Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError, TransportHandle, TransportId,
47};
48use crate::tree::TreeState;
49use crate::upper::hosts::HostMap;
50use crate::upper::icmp_rate_limit::IcmpRateLimiter;
51use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
52use crate::utils::index::IndexAllocator;
53use crate::{
54    Config, ConfigError, FipsAddress, Identity, IdentityError, NodeAddr, PeerIdentity,
55    SessionMessageType, encode_npub,
56};
57use rand::Rng;
58use std::collections::{HashMap, HashSet, VecDeque};
59use std::fmt;
60use std::sync::Arc;
61use std::thread::JoinHandle;
62use thiserror::Error;
63use tracing::{debug, warn};
64
65/// Errors related to node operations.
66#[derive(Debug, Error)]
67pub enum NodeError {
68    #[error("node not started")]
69    NotStarted,
70
71    #[error("node already started")]
72    AlreadyStarted,
73
74    #[error("node already stopped")]
75    AlreadyStopped,
76
77    #[error("transport not found: {0}")]
78    TransportNotFound(TransportId),
79
80    #[error("no transport available for type: {0}")]
81    NoTransportForType(String),
82
83    #[error("link not found: {0}")]
84    LinkNotFound(LinkId),
85
86    #[error("connection not found: {0}")]
87    ConnectionNotFound(LinkId),
88
89    #[error("peer not found: {0:?}")]
90    PeerNotFound(NodeAddr),
91
92    #[error("peer already exists: {0:?}")]
93    PeerAlreadyExists(NodeAddr),
94
95    #[error("connection already exists for link: {0}")]
96    ConnectionAlreadyExists(LinkId),
97
98    #[error("invalid peer npub '{npub}': {reason}")]
99    InvalidPeerNpub { npub: String, reason: String },
100
101    #[error("access denied: {0}")]
102    AccessDenied(String),
103
104    #[error("max connections exceeded: {max}")]
105    MaxConnectionsExceeded { max: usize },
106
107    #[error("max peers exceeded: {max}")]
108    MaxPeersExceeded { max: usize },
109
110    #[error("max links exceeded: {max}")]
111    MaxLinksExceeded { max: usize },
112
113    #[error("handshake incomplete for link {0}")]
114    HandshakeIncomplete(LinkId),
115
116    #[error("no session available for link {0}")]
117    NoSession(LinkId),
118
119    #[error("promotion failed for link {link_id}: {reason}")]
120    PromotionFailed { link_id: LinkId, reason: String },
121
122    #[error("send failed to {node_addr}: {reason}")]
123    SendFailed { node_addr: NodeAddr, reason: String },
124
125    #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
126    MtuExceeded {
127        node_addr: NodeAddr,
128        packet_size: usize,
129        mtu: u16,
130    },
131
132    #[error("config error: {0}")]
133    Config(#[from] ConfigError),
134
135    #[error("identity error: {0}")]
136    Identity(#[from] IdentityError),
137
138    #[error("TUN error: {0}")]
139    Tun(#[from] TunError),
140
141    #[error("index allocation failed: {0}")]
142    IndexAllocationFailed(String),
143
144    #[error("handshake failed: {0}")]
145    HandshakeFailed(String),
146
147    #[error("transport error: {0}")]
148    TransportError(String),
149
150    #[error("bootstrap handoff failed: {0}")]
151    BootstrapHandoff(String),
152}
153
154/// Source-attributed packet delivered by a node running without a system TUN.
155#[derive(Debug, Clone, PartialEq, Eq)]
156pub struct NodeDeliveredPacket {
157    /// FIPS node address that originated the packet.
158    pub source_node_addr: NodeAddr,
159    /// Source Nostr public key when the node has learned it.
160    pub source_npub: Option<String>,
161    /// Destination FIPS address from the IPv6 packet.
162    pub destination: FipsAddress,
163    /// Full IPv6 packet after FIPS session decapsulation.
164    pub packet: Vec<u8>,
165}
166
167#[derive(Debug, Clone)]
168struct IdentityCacheEntry {
169    node_addr: NodeAddr,
170    pubkey: secp256k1::PublicKey,
171    npub: String,
172    last_seen_ms: u64,
173}
174
175impl IdentityCacheEntry {
176    fn new(
177        node_addr: NodeAddr,
178        pubkey: secp256k1::PublicKey,
179        npub: String,
180        last_seen_ms: u64,
181    ) -> Self {
182        Self {
183            node_addr,
184            pubkey,
185            npub,
186            last_seen_ms,
187        }
188    }
189}
190
191/// App-owned packet channels for embedding FIPS without a system TUN.
192#[derive(Debug)]
193pub struct ExternalPacketIo {
194    /// Send outbound IPv6 packets into the node.
195    pub outbound_tx: crate::upper::tun::TunOutboundTx,
196    /// Receive inbound IPv6 packets delivered by FIPS sessions.
197    pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
198}
199
200/// App-owned endpoint data channels for embedding FIPS without a daemon.
201#[derive(Debug)]
202pub(crate) struct EndpointDataIo {
203    /// Send endpoint data commands into the node RX loop.
204    ///
205    /// Bounded with a generous default so normal sender bursts do not
206    /// stall on semaphore acquisition. macOS pacing happens at the UDP
207    /// egress thread where the real Wi-Fi/interface bottleneck is visible;
208    /// constraining this app queue instead caused the inner TCP flow to
209    /// collapse under iperf. `FIPS_ENDPOINT_DATA_QUEUE_CAP` overrides the
210    /// default for benches.
211    pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
212    /// Receive endpoint data delivered by FIPS sessions.
213    ///
214    /// Unbounded so the rx_loop's send on inbound packet delivery is a
215    /// wait-free push (no semaphore acquire), and so we can drop the
216    /// per-packet cross-task relay that previously sat between the node
217    /// task and the `FipsEndpoint::recv()` consumer. Backpressure is
218    /// naturally bounded — the rx_loop both produces here and runs the
219    /// same runtime that schedules the consumer, so a stalled consumer
220    /// stalls production too.
221    pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
222    /// Clone of the event_tx exposed for in-process loopback (e.g.
223    /// `FipsEndpoint::send` to self_npub). Lets the endpoint inject an
224    /// event into the same queue without going through the encrypt /
225    /// decrypt path, while keeping every consumer reading from a single
226    /// channel.
227    pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
228}
229
230fn endpoint_data_command_capacity(requested: usize) -> usize {
231    if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
232        && let Ok(value) = raw.trim().parse::<usize>()
233        && value > 0
234    {
235        return value;
236    }
237
238    requested.max(1).max(32_768)
239}
240
241/// Commands accepted by the node endpoint data service.
242#[derive(Debug)]
243pub(crate) enum NodeEndpointCommand {
244    /// Send with an explicit response channel — used by callers that
245    /// care whether the local-stack handoff succeeded (e.g.
246    /// `blocking_send` waits for the runtime to accept the send).
247    Send {
248        remote: PeerIdentity,
249        payload: Vec<u8>,
250        queued_at: Option<std::time::Instant>,
251        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
252    },
253    /// **Fire-and-forget** variant of `Send` — no oneshot allocation,
254    /// no per-packet result channel. Used by the data-plane fast path
255    /// (`FipsEndpoint::send`) where the caller already discards the
256    /// result. Saves one oneshot::channel() allocation per outbound
257    /// packet on the application's send hot path.
258    SendOneway {
259        remote: PeerIdentity,
260        payload: Vec<u8>,
261        queued_at: Option<std::time::Instant>,
262    },
263    PeerSnapshot {
264        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
265    },
266}
267
268/// Endpoint data events emitted by the node session receive path.
269#[derive(Debug)]
270pub(crate) enum NodeEndpointEvent {
271    Data {
272        source_node_addr: NodeAddr,
273        source_npub: Option<String>,
274        payload: Vec<u8>,
275        queued_at: Option<std::time::Instant>,
276    },
277}
278
279/// Authenticated peer state exposed to embedded endpoint callers.
280#[derive(Debug, Clone, PartialEq, Eq)]
281pub(crate) struct NodeEndpointPeer {
282    pub(crate) npub: String,
283    pub(crate) transport_addr: Option<String>,
284    pub(crate) transport_type: Option<String>,
285    pub(crate) link_id: u64,
286    pub(crate) srtt_ms: Option<u64>,
287    pub(crate) packets_sent: u64,
288    pub(crate) packets_recv: u64,
289    pub(crate) bytes_sent: u64,
290    pub(crate) bytes_recv: u64,
291}
292
293/// Node operational state.
294#[derive(Clone, Copy, Debug, PartialEq, Eq)]
295pub enum NodeState {
296    /// Created but not started.
297    Created,
298    /// Starting up (initializing transports).
299    Starting,
300    /// Fully operational.
301    Running,
302    /// Shutting down.
303    Stopping,
304    /// Stopped.
305    Stopped,
306}
307
308impl NodeState {
309    /// Check if node is operational.
310    pub fn is_operational(&self) -> bool {
311        matches!(self, NodeState::Running)
312    }
313
314    /// Check if node can be started.
315    pub fn can_start(&self) -> bool {
316        matches!(self, NodeState::Created | NodeState::Stopped)
317    }
318
319    /// Check if node can be stopped.
320    pub fn can_stop(&self) -> bool {
321        matches!(self, NodeState::Running)
322    }
323}
324
325impl fmt::Display for NodeState {
326    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
327        let s = match self {
328            NodeState::Created => "created",
329            NodeState::Starting => "starting",
330            NodeState::Running => "running",
331            NodeState::Stopping => "stopping",
332            NodeState::Stopped => "stopped",
333        };
334        write!(f, "{}", s)
335    }
336}
337
338/// Recent request tracking for dedup and reverse-path forwarding.
339///
340/// When a LookupRequest is forwarded through a node, the node stores the
341/// request_id and which peer sent it. When the corresponding LookupResponse
342/// arrives, it's forwarded back to that peer (reverse-path forwarding).
343/// The `response_forwarded` flag prevents response routing loops.
344#[derive(Clone, Debug)]
345pub(crate) struct RecentRequest {
346    /// The peer who sent this request to us.
347    pub(crate) from_peer: NodeAddr,
348    /// When we received this request (Unix milliseconds).
349    pub(crate) timestamp_ms: u64,
350    /// Whether we've already forwarded a response for this request.
351    /// Prevents response routing loops when convergent request paths
352    /// create bidirectional entries in recent_requests.
353    pub(crate) response_forwarded: bool,
354}
355
356impl RecentRequest {
357    pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
358        Self {
359            from_peer,
360            timestamp_ms,
361            response_forwarded: false,
362        }
363    }
364
365    /// Check if this entry has expired (older than expiry_ms).
366    pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
367        current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
368    }
369}
370
371/// Key for addr_to_link reverse lookup.
372type AddrKey = (TransportId, TransportAddr);
373
374/// Per-transport kernel drop tracking for congestion detection.
375///
376/// Sampled every tick (1s). The `dropping` flag indicates whether new
377/// kernel drops were observed since the previous sample.
378#[derive(Debug, Default)]
379struct TransportDropState {
380    /// Previous `recv_drops` sample (cumulative counter).
381    prev_drops: u64,
382    /// True if drops increased since the last sample.
383    dropping: bool,
384}
385
386/// State for a link waiting for transport-level connection establishment.
387///
388/// For connection-oriented transports (TCP, Tor), the transport connect runs
389/// asynchronously. This struct holds the data needed to complete the handshake
390/// once the connection is ready.
391struct PendingConnect {
392    /// The link that was created for this connection.
393    link_id: LinkId,
394    /// Which transport is being used.
395    transport_id: TransportId,
396    /// The remote address being connected to.
397    remote_addr: TransportAddr,
398    /// The peer identity (for handshake initiation).
399    peer_identity: PeerIdentity,
400}
401
402/// A running FIPS node instance.
403///
404/// This is the top-level container holding all node state.
405///
406/// ## Peer Lifecycle
407///
408/// Peers go through two phases:
409/// 1. **Connection phase** (`connections`): Handshake in progress, indexed by LinkId
410/// 2. **Active phase** (`peers`): Authenticated, indexed by NodeAddr
411///
412/// The `addr_to_link` map enables dispatching incoming packets to the right
413/// connection before authentication completes.
414// Discovery lookup constants moved to config: node.discovery.attempt_timeouts_secs, node.discovery.ttl
415pub struct Node {
416    // === Identity ===
417    /// This node's cryptographic identity.
418    identity: Identity,
419
420    /// Random epoch generated at startup for peer restart detection.
421    /// Exchanged inside Noise handshake messages so peers can detect restarts.
422    startup_epoch: [u8; 8],
423
424    /// Instant when the node was created, for uptime reporting.
425    started_at: std::time::Instant,
426
427    // === Configuration ===
428    /// Loaded configuration.
429    config: Config,
430
431    // === State ===
432    /// Node operational state.
433    state: NodeState,
434
435    /// Whether this is a leaf-only node.
436    is_leaf_only: bool,
437
438    // === Spanning Tree ===
439    /// Local spanning tree state.
440    tree_state: TreeState,
441
442    // === Bloom Filter ===
443    /// Local Bloom filter state.
444    bloom_state: BloomState,
445
446    // === Routing ===
447    /// Address -> coordinates cache (from session setup and discovery).
448    coord_cache: CoordCache,
449    /// Locally learned reverse-path next-hop hints.
450    learned_routes: LearnedRouteTable,
451    /// Recent discovery requests (dedup + reverse-path forwarding).
452    /// Maps request_id → RecentRequest.
453    recent_requests: HashMap<u64, RecentRequest>,
454    /// Per-destination path MTU lookup, keyed by FipsAddress (mirrors
455    /// `coord_cache.entries[*].path_mtu`). Sync read-only access from
456    /// the TUN reader/writer threads at TCP MSS clamp time so the
457    /// SYN/SYN-ACK clamp can use the smaller of the local-egress floor
458    /// and the learned per-destination path MTU.
459    path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
460
461    // === Transports & Links ===
462    /// Active transports (owned by Node).
463    transports: HashMap<TransportId, TransportHandle>,
464    /// Per-transport kernel drop tracking for congestion detection.
465    transport_drops: HashMap<TransportId, TransportDropState>,
466    /// Active links.
467    links: HashMap<LinkId, Link>,
468    /// Reverse lookup: (transport_id, remote_addr) -> link_id.
469    addr_to_link: HashMap<AddrKey, LinkId>,
470
471    // === Packet Channel ===
472    /// Packet sender for transports.
473    packet_tx: Option<PacketTx>,
474    /// Packet receiver (for event loop).
475    packet_rx: Option<PacketRx>,
476
477    // === Connections (Handshake Phase) ===
478    /// Pending connections (handshake in progress).
479    /// Indexed by LinkId since we don't know the peer's identity yet.
480    connections: HashMap<LinkId, PeerConnection>,
481
482    // === Peers (Active Phase) ===
483    /// Authenticated peers.
484    /// Indexed by NodeAddr (verified identity).
485    peers: HashMap<NodeAddr, ActivePeer>,
486
487    // === End-to-End Sessions ===
488    /// Session table for end-to-end encrypted sessions.
489    /// Keyed by remote NodeAddr.
490    sessions: HashMap<NodeAddr, SessionEntry>,
491
492    // === Identity Cache ===
493    /// Maps FipsAddress prefix bytes (bytes 1-15) to cached peer identity data.
494    /// Enables reverse lookup from IPv6 destination to session/routing identity.
495    identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
496
497    // === Pending TUN Packets ===
498    /// Packets queued while waiting for session establishment.
499    /// Keyed by destination NodeAddr, bounded per-dest and total.
500    pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
501    /// Endpoint data payloads queued while waiting for session establishment.
502    pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
503    // === Pending Discovery Lookups ===
504    /// Tracks in-flight discovery lookups. Maps target NodeAddr to the
505    /// initiation timestamp (Unix ms). Prevents duplicate flood queries.
506    pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
507
508    // === Resource Limits ===
509    /// Maximum connections (0 = unlimited).
510    max_connections: usize,
511    /// Maximum peers (0 = unlimited).
512    max_peers: usize,
513    /// Maximum links (0 = unlimited).
514    max_links: usize,
515
516    // === Counters ===
517    /// Next link ID to allocate.
518    next_link_id: u64,
519    /// Next transport ID to allocate.
520    next_transport_id: u32,
521
522    // === Node Statistics ===
523    /// Routing, forwarding, discovery, and error signal counters.
524    stats: stats::NodeStats,
525
526    /// Time-series history of node-level metrics (1s/1m rings).
527    stats_history: stats_history::StatsHistory,
528
529    // === TUN Interface ===
530    /// TUN device state.
531    tun_state: TunState,
532    /// TUN interface name (for cleanup).
533    tun_name: Option<String>,
534    /// TUN packet sender channel.
535    tun_tx: Option<TunTx>,
536    /// Receiver for outbound packets from the TUN reader.
537    tun_outbound_rx: Option<TunOutboundRx>,
538    /// App-owned packet sink used by embedded/no-TUN integrations.
539    external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
540    /// Endpoint data command receiver used by embedded/no-daemon integrations.
541    endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
542    /// Endpoint data event sink used by embedded/no-daemon integrations.
543    endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
544    /// Off-task FMP-encrypt + UDP-send worker pool. `None` if not yet
545    /// spawned (set up in `start()` once transports are running).
546    /// `Some(pool)` once available; the pool internally holds
547    /// per-worker mpsc senders and round-robins jobs across them.
548    /// See `node::encrypt_worker` for the rationale and layout.
549    encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
550    /// Off-task FMP + FSP decrypt + delivery worker pool. Mirror of
551    /// `encrypt_workers` for the receive side.
552    decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
553    /// Set of sessions that have been registered with the decrypt
554    /// shard worker pool. Used by rx_loop to decide between fast-path
555    /// dispatch (worker owns the session) and legacy in-place decrypt
556    /// (worker doesn't have it yet). Per the data-plane restructure,
557    /// the worker owns its session state directly — there's no shared
558    /// `Arc<RwLock<HashMap>>` of cipher / replay state anymore, only
559    /// this set tracks **whether** the worker has been told about a
560    /// given session.
561    decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
562    /// Fallback channel: decrypt worker bounces non-fast-path packets
563    /// (anything that's not bulk EndpointData) back here for rx_loop
564    /// to handle via the legacy path. Drained by a new rx_loop arm.
565    decrypt_fallback_rx:
566        Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptFallback>>,
567    decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptFallback>,
568    /// TUN reader thread handle.
569    tun_reader_handle: Option<JoinHandle<()>>,
570    /// TUN writer thread handle.
571    tun_writer_handle: Option<JoinHandle<()>>,
572    /// Shutdown pipe: writing to this fd unblocks the TUN reader thread on macOS.
573    /// On Linux, deleting the interface via netlink serves the same purpose.
574    #[cfg(target_os = "macos")]
575    tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
576
577    // === DNS Responder ===
578    /// Receiver for resolved identities from the DNS responder.
579    dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
580    /// DNS responder task handle.
581    dns_task: Option<tokio::task::JoinHandle<()>>,
582
583    // === Index-Based Session Dispatch ===
584    /// Allocator for session indices.
585    index_allocator: IndexAllocator,
586    /// O(1) lookup: (transport_id, our_index) → NodeAddr.
587    /// This maps our session index to the peer that uses it.
588    peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
589    /// Pending outbound handshakes by our sender_idx.
590    /// Tracks which LinkId corresponds to which session index.
591    pending_outbound: HashMap<(TransportId, u32), LinkId>,
592
593    // === Rate Limiting ===
594    /// Rate limiter for msg1 processing (DoS protection).
595    msg1_rate_limiter: HandshakeRateLimiter,
596    /// Rate limiter for ICMP Packet Too Big messages.
597    icmp_rate_limiter: IcmpRateLimiter,
598    /// Rate limiter for routing error signals (CoordsRequired / PathBroken).
599    routing_error_rate_limiter: RoutingErrorRateLimiter,
600    /// Rate limiter for source-side CoordsRequired/PathBroken responses.
601    coords_response_rate_limiter: RoutingErrorRateLimiter,
602    /// Backoff for failed discovery lookups (originator-side).
603    discovery_backoff: DiscoveryBackoff,
604    /// Rate limiter for forwarded discovery requests (transit-side).
605    discovery_forward_limiter: DiscoveryForwardRateLimiter,
606
607    // === Pending Transport Connects ===
608    /// Links waiting for transport-level connection establishment before
609    /// sending handshake msg1. For connection-oriented transports (TCP, Tor),
610    /// the transport connect runs in the background; the tick handler polls
611    /// connection_state() and initiates the handshake when connected.
612    pending_connects: Vec<PendingConnect>,
613
614    // === Connection Retry ===
615    /// Retry state for peers whose outbound connections have failed.
616    /// Keyed by NodeAddr. Entries are created when a handshake times out
617    /// or fails, and removed on successful promotion or when max retries
618    /// are exhausted.
619    retry_pending: HashMap<NodeAddr, retry::RetryState>,
620
621    /// Optional Nostr/STUN overlay discovery coordinator for `udp:nat` peers.
622    nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
623    /// mDNS / DNS-SD responder + browser for local-link peer discovery.
624    /// Identity is unverified at this layer — the Noise XX handshake
625    /// initiated against an mDNS-observed endpoint is what proves the
626    /// peer holds the matching private key.
627    lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
628    /// Wall-clock ms when Nostr discovery successfully started, used to
629    /// schedule the one-shot startup advert sweep after a settle delay.
630    /// `None` until discovery comes up; remains `None` if discovery is
631    /// disabled or failed to start.
632    nostr_discovery_started_at_ms: Option<u64>,
633    /// Whether the one-shot startup advert sweep has run. Set to true
634    /// after the first sweep fires (under `policy: open`); thereafter
635    /// only the per-tick `queue_open_discovery_retries` continues.
636    startup_open_discovery_sweep_done: bool,
637    /// Per-peer UDP transports adopted from NAT traversal handoff.
638    bootstrap_transports: HashSet<TransportId>,
639    /// Originating peer npub (bech32) for each adopted bootstrap
640    /// transport, captured at `adopt_established_traversal` time.
641    /// Populated alongside `bootstrap_transports`; cleared in
642    /// `cleanup_bootstrap_transport_if_unused`. Used by the rx loop to
643    /// route fatal-protocol-mismatch observations back to the
644    /// Nostr-discovery `failure_state` for long cooldown application.
645    bootstrap_transport_npubs: HashMap<TransportId, String>,
646
647    // === Periodic Parent Re-evaluation ===
648    /// Timestamp of last periodic parent re-evaluation (for pacing).
649    last_parent_reeval: Option<crate::time::Instant>,
650
651    // === Congestion Logging ===
652    /// Timestamp of last congestion detection log (rate-limited to 5s).
653    last_congestion_log: Option<std::time::Instant>,
654
655    // === Mesh Size Estimate ===
656    /// Cached estimated mesh size (computed once per tick from bloom filters).
657    estimated_mesh_size: Option<u64>,
658    /// Timestamp of last mesh size log emission.
659    last_mesh_size_log: Option<std::time::Instant>,
660
661    // === Bloom Self-Plausibility ===
662    /// Rate-limit state for the self-plausibility WARN. Fires at most
663    /// once per 60s globally when our own outgoing FilterAnnounce has
664    /// an FPR above `node.bloom.max_inbound_fpr`, signalling either
665    /// aggregation drift or an ingress bypass.
666    last_self_warn: Option<std::time::Instant>,
667
668    // === Local Outbound Liveness ===
669    /// Set when a `transport.send` returned a local-side io error
670    /// (`NetworkUnreachable` / `HostUnreachable` / `AddrNotAvailable`),
671    /// cleared on the next successful send. Used by
672    /// `check_link_heartbeats` to compress the dead-timeout to
673    /// `fast_link_dead_timeout_secs` while our outbound is observed
674    /// broken — direct kernel evidence beats waiting on receive-silence.
675    last_local_send_failure_at: Option<std::time::Instant>,
676
677    // === Display Names ===
678    /// Human-readable names for configured peers (alias or short npub).
679    /// Populated at startup from peer config.
680    peer_aliases: HashMap<NodeAddr, String>,
681
682    /// Reloadable peer ACL state from standard allow/deny files.
683    peer_acl: acl::PeerAclReloader,
684
685    // === Host Map ===
686    /// Static hostname → npub mapping for DNS resolution.
687    /// Built at construction from peer aliases and /etc/fips/hosts.
688    host_map: Arc<HostMap>,
689}
690
691impl Node {
692    /// Create a new node from configuration.
693    pub fn new(config: Config) -> Result<Self, NodeError> {
694        config.validate()?;
695        let identity = config.create_identity()?;
696        let node_addr = *identity.node_addr();
697        let is_leaf_only = config.is_leaf_only();
698
699        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
700        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
701
702        let mut startup_epoch = [0u8; 8];
703        rand::rng().fill_bytes(&mut startup_epoch);
704
705        let mut bloom_state = if is_leaf_only {
706            BloomState::leaf_only(node_addr)
707        } else {
708            BloomState::new(node_addr)
709        };
710        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
711
712        let tun_state = if config.tun.enabled {
713            TunState::Configured
714        } else {
715            TunState::Disabled
716        };
717
718        // Initialize tree state with signed self-declaration
719        let mut tree_state = TreeState::new(node_addr);
720        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
721        tree_state.set_hold_down(config.node.tree.hold_down_secs);
722        tree_state.set_flap_dampening(
723            config.node.tree.flap_threshold,
724            config.node.tree.flap_window_secs,
725            config.node.tree.flap_dampening_secs,
726        );
727        tree_state
728            .sign_declaration(&identity)
729            .expect("signing own declaration should never fail");
730
731        let coord_cache = CoordCache::new(
732            config.node.cache.coord_size,
733            config.node.cache.coord_ttl_secs * 1000,
734        );
735        let rl = &config.node.rate_limit;
736        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
737            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
738            config.node.limits.max_pending_inbound,
739        );
740
741        let max_connections = config.node.limits.max_connections;
742        let max_peers = config.node.limits.max_peers;
743        let max_links = config.node.limits.max_links;
744        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
745        let backoff_base_secs = config.node.discovery.backoff_base_secs;
746        let backoff_max_secs = config.node.discovery.backoff_max_secs;
747        let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
748
749        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
750
751        Ok(Self {
752            identity,
753            startup_epoch,
754            started_at: std::time::Instant::now(),
755            config,
756            state: NodeState::Created,
757            is_leaf_only,
758            tree_state,
759            bloom_state,
760            coord_cache,
761            learned_routes: LearnedRouteTable::default(),
762            recent_requests: HashMap::new(),
763            transports: HashMap::new(),
764            transport_drops: HashMap::new(),
765            links: HashMap::new(),
766            addr_to_link: HashMap::new(),
767            packet_tx: None,
768            packet_rx: None,
769            connections: HashMap::new(),
770            peers: HashMap::new(),
771            sessions: HashMap::new(),
772            identity_cache: HashMap::new(),
773            pending_tun_packets: HashMap::new(),
774            pending_endpoint_data: HashMap::new(),
775            pending_lookups: HashMap::new(),
776            max_connections,
777            max_peers,
778            max_links,
779            next_link_id: 1,
780            next_transport_id: 1,
781            stats: stats::NodeStats::new(),
782            stats_history: stats_history::StatsHistory::new(),
783            tun_state,
784            tun_name: None,
785            tun_tx: None,
786            tun_outbound_rx: None,
787            external_packet_tx: None,
788            endpoint_command_rx: None,
789            endpoint_event_tx: None,
790            encrypt_workers: None,
791            decrypt_workers: None,
792            decrypt_registered_sessions: std::collections::HashSet::new(),
793            decrypt_fallback_tx,
794            decrypt_fallback_rx,
795            tun_reader_handle: None,
796            tun_writer_handle: None,
797            #[cfg(target_os = "macos")]
798            tun_shutdown_fd: None,
799            dns_identity_rx: None,
800            dns_task: None,
801            index_allocator: IndexAllocator::new(),
802            peers_by_index: HashMap::new(),
803            pending_outbound: HashMap::new(),
804            msg1_rate_limiter,
805            icmp_rate_limiter: IcmpRateLimiter::new(),
806            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
807            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
808                std::time::Duration::from_millis(coords_response_interval_ms),
809            ),
810            discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
811            discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
812                std::time::Duration::from_secs(forward_min_interval_secs),
813            ),
814            pending_connects: Vec::new(),
815            retry_pending: HashMap::new(),
816            nostr_discovery: None,
817            nostr_discovery_started_at_ms: None,
818            lan_discovery: None,
819            startup_open_discovery_sweep_done: false,
820            bootstrap_transports: HashSet::new(),
821            bootstrap_transport_npubs: HashMap::new(),
822            last_parent_reeval: None,
823            last_congestion_log: None,
824            estimated_mesh_size: None,
825            last_mesh_size_log: None,
826            last_self_warn: None,
827            last_local_send_failure_at: None,
828            peer_aliases: HashMap::new(),
829            peer_acl,
830            host_map,
831            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
832        })
833    }
834
835    /// Create a node with a specific identity.
836    ///
837    /// This constructor validates cross-field config invariants before
838    /// constructing the node, same as [`Node::new`].
839    pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
840        config.validate()?;
841        let node_addr = *identity.node_addr();
842
843        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
844        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
845
846        let mut startup_epoch = [0u8; 8];
847        rand::rng().fill_bytes(&mut startup_epoch);
848
849        let tun_state = if config.tun.enabled {
850            TunState::Configured
851        } else {
852            TunState::Disabled
853        };
854
855        // Initialize tree state with signed self-declaration
856        let mut tree_state = TreeState::new(node_addr);
857        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
858        tree_state.set_hold_down(config.node.tree.hold_down_secs);
859        tree_state.set_flap_dampening(
860            config.node.tree.flap_threshold,
861            config.node.tree.flap_window_secs,
862            config.node.tree.flap_dampening_secs,
863        );
864        tree_state
865            .sign_declaration(&identity)
866            .expect("signing own declaration should never fail");
867
868        let mut bloom_state = BloomState::new(node_addr);
869        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
870
871        let coord_cache = CoordCache::new(
872            config.node.cache.coord_size,
873            config.node.cache.coord_ttl_secs * 1000,
874        );
875        let rl = &config.node.rate_limit;
876        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
877            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
878            config.node.limits.max_pending_inbound,
879        );
880
881        let max_connections = config.node.limits.max_connections;
882        let max_peers = config.node.limits.max_peers;
883        let max_links = config.node.limits.max_links;
884        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
885
886        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
887
888        Ok(Self {
889            identity,
890            startup_epoch,
891            started_at: std::time::Instant::now(),
892            config,
893            state: NodeState::Created,
894            is_leaf_only: false,
895            tree_state,
896            bloom_state,
897            coord_cache,
898            learned_routes: LearnedRouteTable::default(),
899            recent_requests: HashMap::new(),
900            transports: HashMap::new(),
901            transport_drops: HashMap::new(),
902            links: HashMap::new(),
903            addr_to_link: HashMap::new(),
904            packet_tx: None,
905            packet_rx: None,
906            connections: HashMap::new(),
907            peers: HashMap::new(),
908            sessions: HashMap::new(),
909            identity_cache: HashMap::new(),
910            pending_tun_packets: HashMap::new(),
911            pending_endpoint_data: HashMap::new(),
912            pending_lookups: HashMap::new(),
913            max_connections,
914            max_peers,
915            max_links,
916            next_link_id: 1,
917            next_transport_id: 1,
918            stats: stats::NodeStats::new(),
919            stats_history: stats_history::StatsHistory::new(),
920            tun_state,
921            tun_name: None,
922            tun_tx: None,
923            tun_outbound_rx: None,
924            external_packet_tx: None,
925            endpoint_command_rx: None,
926            endpoint_event_tx: None,
927            encrypt_workers: None,
928            decrypt_workers: None,
929            decrypt_registered_sessions: std::collections::HashSet::new(),
930            decrypt_fallback_tx,
931            decrypt_fallback_rx,
932            tun_reader_handle: None,
933            tun_writer_handle: None,
934            #[cfg(target_os = "macos")]
935            tun_shutdown_fd: None,
936            dns_identity_rx: None,
937            dns_task: None,
938            index_allocator: IndexAllocator::new(),
939            peers_by_index: HashMap::new(),
940            pending_outbound: HashMap::new(),
941            msg1_rate_limiter,
942            icmp_rate_limiter: IcmpRateLimiter::new(),
943            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
944            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
945                std::time::Duration::from_millis(coords_response_interval_ms),
946            ),
947            discovery_backoff: DiscoveryBackoff::new(),
948            discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
949            pending_connects: Vec::new(),
950            retry_pending: HashMap::new(),
951            nostr_discovery: None,
952            nostr_discovery_started_at_ms: None,
953            lan_discovery: None,
954            startup_open_discovery_sweep_done: false,
955            bootstrap_transports: HashSet::new(),
956            bootstrap_transport_npubs: HashMap::new(),
957            last_parent_reeval: None,
958            last_congestion_log: None,
959            estimated_mesh_size: None,
960            last_mesh_size_log: None,
961            last_self_warn: None,
962            last_local_send_failure_at: None,
963            peer_aliases: HashMap::new(),
964            peer_acl,
965            host_map,
966            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
967        })
968    }
969
970    /// Create a leaf-only node (simplified state).
971    pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
972        let mut node = Self::new(config)?;
973        node.is_leaf_only = true;
974        node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
975        Ok(node)
976    }
977
978    fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
979        let base_host_map = HostMap::from_peer_configs(config.peers());
980        if !config.node.system_files_enabled {
981            return (
982                Arc::new(base_host_map.clone()),
983                acl::PeerAclReloader::memory_only(base_host_map),
984            );
985        }
986
987        let mut host_map = base_host_map.clone();
988        let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
989        let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
990            crate::upper::hosts::DEFAULT_HOSTS_PATH,
991        ));
992        host_map.merge(hosts_file);
993        let peer_acl = acl::PeerAclReloader::with_alias_sources(
994            std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
995            std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
996            base_host_map,
997            hosts_path,
998        );
999        (Arc::new(host_map), peer_acl)
1000    }
1001
1002    /// Create transport instances from configuration.
1003    ///
1004    /// Returns a vector of TransportHandles for all configured transports.
1005    async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1006        let mut transports = Vec::new();
1007
1008        // Collect UDP configs with optional names to avoid borrow conflicts
1009        let udp_instances: Vec<_> = self
1010            .config
1011            .transports
1012            .udp
1013            .iter()
1014            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1015            .collect();
1016
1017        // Create UDP transport instances
1018        for (name, udp_config) in udp_instances {
1019            let transport_id = self.allocate_transport_id();
1020            let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1021            transports.push(TransportHandle::Udp(udp));
1022        }
1023
1024        #[cfg(feature = "sim-transport")]
1025        {
1026            let sim_instances: Vec<_> = self
1027                .config
1028                .transports
1029                .sim
1030                .iter()
1031                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1032                .collect();
1033
1034            for (name, sim_config) in sim_instances {
1035                let transport_id = self.allocate_transport_id();
1036                let sim = crate::transport::sim::SimTransport::new(
1037                    transport_id,
1038                    name,
1039                    sim_config,
1040                    packet_tx.clone(),
1041                );
1042                transports.push(TransportHandle::Sim(sim));
1043            }
1044        }
1045
1046        // Create Ethernet transport instances where raw-socket support exists.
1047        #[cfg(any(target_os = "linux", target_os = "macos"))]
1048        {
1049            let eth_instances: Vec<_> = self
1050                .config
1051                .transports
1052                .ethernet
1053                .iter()
1054                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1055                .collect();
1056            let xonly = self.identity.pubkey();
1057            for (name, eth_config) in eth_instances {
1058                let transport_id = self.allocate_transport_id();
1059                let mut eth =
1060                    EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1061                eth.set_local_pubkey(xonly);
1062                transports.push(TransportHandle::Ethernet(eth));
1063            }
1064        }
1065
1066        // Create TCP transport instances
1067        let tcp_instances: Vec<_> = self
1068            .config
1069            .transports
1070            .tcp
1071            .iter()
1072            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1073            .collect();
1074
1075        for (name, tcp_config) in tcp_instances {
1076            let transport_id = self.allocate_transport_id();
1077            let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1078            transports.push(TransportHandle::Tcp(tcp));
1079        }
1080
1081        // Create Tor transport instances
1082        let tor_instances: Vec<_> = self
1083            .config
1084            .transports
1085            .tor
1086            .iter()
1087            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1088            .collect();
1089
1090        for (name, tor_config) in tor_instances {
1091            let transport_id = self.allocate_transport_id();
1092            let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1093            transports.push(TransportHandle::Tor(tor));
1094        }
1095
1096        // Create BLE transport instances
1097        #[cfg(bluer_available)]
1098        {
1099            let ble_instances: Vec<_> = self
1100                .config
1101                .transports
1102                .ble
1103                .iter()
1104                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1105                .collect();
1106
1107            #[cfg(all(bluer_available, not(test)))]
1108            for (name, ble_config) in ble_instances {
1109                let transport_id = self.allocate_transport_id();
1110                let adapter = ble_config.adapter().to_string();
1111                let mtu = ble_config.mtu();
1112                match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1113                    Ok(io) => {
1114                        let mut ble = crate::transport::ble::BleTransport::new(
1115                            transport_id,
1116                            name,
1117                            ble_config,
1118                            io,
1119                            packet_tx.clone(),
1120                        );
1121                        ble.set_local_pubkey(self.identity.pubkey().serialize());
1122                        transports.push(TransportHandle::Ble(ble));
1123                    }
1124                    Err(e) => {
1125                        tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1126                    }
1127                }
1128            }
1129
1130            #[cfg(any(not(bluer_available), test))]
1131            if !ble_instances.is_empty() {
1132                #[cfg(not(test))]
1133                tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1134            }
1135        }
1136
1137        transports
1138    }
1139
1140    /// Find an operational transport that matches the given transport type name.
1141    ///
1142    /// Adopted UDP bootstrap transports are point-to-point sockets handed off
1143    /// from Nostr/STUN traversal. They must not be reused for ordinary
1144    /// `udp host:port` dials discovered through static config, mDNS, or overlay
1145    /// adverts: on macOS a `send_to` through the wrong adopted socket can fail
1146    /// with `EINVAL`, and even on platforms that allow it the packet would use
1147    /// the wrong 5-tuple/NAT mapping. Prefer configured transports and make the
1148    /// choice deterministic by lowest transport id instead of HashMap order.
1149    fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1150        self.transports
1151            .iter()
1152            .filter(|(id, handle)| {
1153                handle.transport_type().name == transport_type
1154                    && handle.is_operational()
1155                    && !self.bootstrap_transports.contains(id)
1156            })
1157            .min_by_key(|(id, _)| id.as_u32())
1158            .map(|(id, _)| *id)
1159    }
1160
1161    /// Resolve an Ethernet peer address ("interface/mac") to a transport ID
1162    /// and binary TransportAddr.
1163    ///
1164    /// Finds the Ethernet transport instance bound to the named interface
1165    /// and parses the MAC portion into a 6-byte TransportAddr.
1166    #[allow(unused_variables)]
1167    fn resolve_ethernet_addr(
1168        &self,
1169        addr_str: &str,
1170    ) -> Result<(TransportId, TransportAddr), NodeError> {
1171        #[cfg(any(target_os = "linux", target_os = "macos"))]
1172        {
1173            let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1174                NodeError::NoTransportForType(format!(
1175                    "invalid Ethernet address format '{}': expected 'interface/mac'",
1176                    addr_str
1177                ))
1178            })?;
1179
1180            // Find the Ethernet transport bound to this interface
1181            let transport_id = self
1182                .transports
1183                .iter()
1184                .find(|(_, handle)| {
1185                    handle.transport_type().name == "ethernet"
1186                        && handle.is_operational()
1187                        && handle.interface_name() == Some(iface)
1188                })
1189                .map(|(id, _)| *id)
1190                .ok_or_else(|| {
1191                    NodeError::NoTransportForType(format!(
1192                        "no operational Ethernet transport for interface '{}'",
1193                        iface
1194                    ))
1195                })?;
1196
1197            let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1198                NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1199            })?;
1200
1201            Ok((transport_id, TransportAddr::from_bytes(&mac)))
1202        }
1203        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1204        {
1205            Err(NodeError::NoTransportForType(
1206                "Ethernet transport is not supported on this platform".to_string(),
1207            ))
1208        }
1209    }
1210
1211    /// Resolve a BLE address string (`"adapter/AA:BB:CC:DD:EE:FF"`) to a
1212    /// (TransportId, TransportAddr) pair by finding the BLE transport
1213    /// instance matching the adapter name.
1214    #[cfg(bluer_available)]
1215    fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1216        let ta = TransportAddr::from_string(addr_str);
1217        let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1218            NodeError::NoTransportForType(format!(
1219                "invalid BLE address format '{}': expected 'adapter/mac'",
1220                addr_str
1221            ))
1222        })?;
1223
1224        // Find the BLE transport for this adapter
1225        let transport_id = self
1226            .transports
1227            .iter()
1228            .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1229            .map(|(id, _)| *id)
1230            .ok_or_else(|| {
1231                NodeError::NoTransportForType(format!(
1232                    "no operational BLE transport for adapter '{}'",
1233                    adapter
1234                ))
1235            })?;
1236
1237        // Validate the address format
1238        crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1239            NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1240        })?;
1241
1242        Ok((transport_id, TransportAddr::from_string(addr_str)))
1243    }
1244
1245    // === Identity Accessors ===
1246
1247    /// Get this node's identity.
1248    pub fn identity(&self) -> &Identity {
1249        &self.identity
1250    }
1251
1252    /// Get this node's NodeAddr.
1253    pub fn node_addr(&self) -> &NodeAddr {
1254        self.identity.node_addr()
1255    }
1256
1257    /// Get this node's npub.
1258    pub fn npub(&self) -> String {
1259        self.identity.npub()
1260    }
1261
1262    /// Return a human-readable display name for a NodeAddr.
1263    ///
1264    /// Lookup order:
1265    /// 1. Host map hostname (from peer aliases + /etc/fips/hosts)
1266    /// 2. Configured peer alias or short npub (from startup map)
1267    /// 3. Active peer's short npub (e.g., inbound peer not in config)
1268    /// 4. Session endpoint's short npub (end-to-end, may not be direct peer)
1269    /// 5. Truncated NodeAddr hex (unknown address)
1270    pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1271        if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1272            return hostname.to_string();
1273        }
1274        if let Some(name) = self.peer_aliases.get(addr) {
1275            return name.clone();
1276        }
1277        if let Some(peer) = self.peers.get(addr) {
1278            return peer.identity().short_npub();
1279        }
1280        if let Some(entry) = self.sessions.get(addr) {
1281            let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1282            return PeerIdentity::from_pubkey(xonly).short_npub();
1283        }
1284        addr.short_hex()
1285    }
1286
1287    /// Tear down a `peers_by_index` entry **and** keep the shard-owned
1288    /// decrypt-worker state coherent: removes the same `cache_key`
1289    /// from the registered-sessions tracking set and tells the
1290    /// assigned shard worker to drop its `OwnedSessionState` entry.
1291    ///
1292    /// Use this instead of a bare `self.peers_by_index.remove(&key)`
1293    /// at every session-lifecycle teardown site (rekey cross-connection
1294    /// swap, peer disconnect, dispatch session-rotation) so the worker
1295    /// doesn't keep stale ciphers / replay windows around. The
1296    /// follow-up `RegisterSession` for the NEW key (if any) will then
1297    /// install the fresh state on the same shard.
1298    pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1299        // Find the peer that owns this index BEFORE removing it from
1300        // the index map, so we can decide whether the deregistration
1301        // also tears down the peer's connected UDP socket.
1302        let owning_peer = self.peers_by_index.get(&cache_key).copied();
1303        self.peers_by_index.remove(&cache_key);
1304        if self.decrypt_registered_sessions.remove(&cache_key)
1305            && let Some(workers) = self.decrypt_workers.as_ref()
1306        {
1307            workers.unregister_session(cache_key);
1308        }
1309        // Tear down the per-peer connected UDP socket *only* if no
1310        // other peers_by_index entry still resolves to this peer.
1311        // Rekey drain calls into this helper with the OLD session
1312        // index while the NEW index is already installed and points
1313        // at the same peer — there the connect()-ed 5-tuple is
1314        // still valid for the new session and we must not close it.
1315        // Peer-teardown sites (CrossConnection swap, stale-index
1316        // fall-through in encrypted.rs, disconnect handler) call
1317        // here when this is the peer's last index, so the connected
1318        // socket goes away with the peer.
1319        if let Some(peer_addr) = owning_peer {
1320            let peer_has_other_index = self
1321                .peers_by_index
1322                .values()
1323                .any(|other| *other == peer_addr);
1324            if !peer_has_other_index {
1325                self.clear_connected_udp_for_peer(&peer_addr);
1326            }
1327        }
1328    }
1329
1330    /// Ensure the current FMP receive index resolves to this peer.
1331    ///
1332    /// Rekey msg1/msg2 handlers pre-register the pending index before
1333    /// cutover, but losing that registration in a debug build used to
1334    /// panic in the cutover path. Repairing the map here is safe: the
1335    /// peer has already promoted the pending session, and the decrypt
1336    /// worker registration immediately after cutover depends on the
1337    /// same `(transport_id, our_index)` key.
1338    pub(in crate::node) fn ensure_current_session_index_registered(
1339        &mut self,
1340        node_addr: &NodeAddr,
1341        context: &'static str,
1342    ) -> bool {
1343        let Some(peer) = self.peers.get(node_addr) else {
1344            return false;
1345        };
1346        let Some(transport_id) = peer.transport_id() else {
1347            warn!(
1348                peer = %self.peer_display_name(node_addr),
1349                context,
1350                "Cannot register current session index without transport id"
1351            );
1352            return false;
1353        };
1354        let Some(our_index) = peer.our_index() else {
1355            warn!(
1356                peer = %self.peer_display_name(node_addr),
1357                context,
1358                "Cannot register current session index without local index"
1359            );
1360            return false;
1361        };
1362
1363        let cache_key = (transport_id, our_index.as_u32());
1364        match self.peers_by_index.get(&cache_key).copied() {
1365            Some(existing) if existing == *node_addr => true,
1366            Some(existing) => {
1367                warn!(
1368                    peer = %self.peer_display_name(node_addr),
1369                    previous_owner = %self.peer_display_name(&existing),
1370                    transport_id = %transport_id,
1371                    our_index = %our_index,
1372                    context,
1373                    "Repairing current session index with stale owner"
1374                );
1375                self.peers_by_index.insert(cache_key, *node_addr);
1376                true
1377            }
1378            None => {
1379                warn!(
1380                    peer = %self.peer_display_name(node_addr),
1381                    transport_id = %transport_id,
1382                    our_index = %our_index,
1383                    context,
1384                    "Repairing missing current session index"
1385                );
1386                self.peers_by_index.insert(cache_key, *node_addr);
1387                true
1388            }
1389        }
1390    }
1391
1392    // === Configuration ===
1393
1394    /// Get the configuration.
1395    pub fn config(&self) -> &Config {
1396        &self.config
1397    }
1398
1399    /// Calculate the effective IPv6 MTU that can be sent over FIPS.
1400    ///
1401    /// Delegates to `upper::icmp::effective_ipv6_mtu()` with this node's
1402    /// transport MTU. Returns the maximum IPv6 packet size (including
1403    /// IPv6 header) that can be transmitted through the FIPS mesh.
1404    pub fn effective_ipv6_mtu(&self) -> u16 {
1405        crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1406    }
1407
1408    /// Get the transport MTU governing the global TUN-boundary MSS clamp.
1409    ///
1410    /// Returns the **minimum** MTU across all operational transports, or
1411    /// 1280 (IPv6 minimum) as fallback. Used for initial TUN configuration
1412    /// where a specific egress transport isn't yet known: the resulting
1413    /// `effective_ipv6_mtu` (transport_mtu - 77) and `max_mss`
1414    /// (effective_mtu - 60) form a conservative ceiling that fits ANY
1415    /// configured-transport's egress, eliminating PMTU-D black holes that
1416    /// would otherwise occur when a flow's actual egress is smaller than
1417    /// the clamp ceiling assumed at TUN init.
1418    ///
1419    /// Returning the smallest (rather than the first-iterated, which used
1420    /// to vary across HashMap iteration order + async-startup race) makes
1421    /// the clamp deterministic across daemon restarts.
1422    ///
1423    /// See `ISSUE-2026-0011` for the empirical investigation.
1424    pub fn transport_mtu(&self) -> u16 {
1425        let min_operational = self
1426            .transports
1427            .values()
1428            .filter(|h| h.is_operational())
1429            .map(|h| h.mtu())
1430            .min();
1431        if let Some(mtu) = min_operational {
1432            return mtu;
1433        }
1434        // Fallback to config: try UDP first, then Ethernet
1435        if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1436            return cfg.mtu();
1437        }
1438        1280
1439    }
1440
1441    // === State ===
1442
1443    /// Get the node state.
1444    pub fn state(&self) -> NodeState {
1445        self.state
1446    }
1447
1448    /// Get the node uptime.
1449    pub fn uptime(&self) -> std::time::Duration {
1450        self.started_at.elapsed()
1451    }
1452
1453    /// Check if node is operational.
1454    pub fn is_running(&self) -> bool {
1455        self.state.is_operational()
1456    }
1457
1458    /// Check if this is a leaf-only node.
1459    pub fn is_leaf_only(&self) -> bool {
1460        self.is_leaf_only
1461    }
1462
1463    // === Tree State ===
1464
1465    /// Get the tree state.
1466    pub fn tree_state(&self) -> &TreeState {
1467        &self.tree_state
1468    }
1469
1470    /// Get mutable tree state.
1471    pub fn tree_state_mut(&mut self) -> &mut TreeState {
1472        &mut self.tree_state
1473    }
1474
1475    // === Bloom State ===
1476
1477    /// Get the Bloom filter state.
1478    pub fn bloom_state(&self) -> &BloomState {
1479        &self.bloom_state
1480    }
1481
1482    /// Get mutable Bloom filter state.
1483    pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1484        &mut self.bloom_state
1485    }
1486
1487    // === Mesh Size Estimate ===
1488
1489    /// Get the cached estimated mesh size.
1490    pub fn estimated_mesh_size(&self) -> Option<u64> {
1491        self.estimated_mesh_size
1492    }
1493
1494    /// Compute and cache the estimated mesh size from bloom filters.
1495    ///
1496    /// Uses the spanning tree partition: parent's filter covers nodes reachable
1497    /// upward, children's filters cover disjoint subtrees downward. The sum
1498    /// of estimated entry counts plus one (self) approximates total network size.
1499    pub(crate) fn compute_mesh_size(&mut self) {
1500        let my_addr = *self.tree_state.my_node_addr();
1501        let parent_id = *self.tree_state.my_declaration().parent_id();
1502        let is_root = self.tree_state.is_root();
1503
1504        let max_fpr = self.config.node.bloom.max_inbound_fpr;
1505        let mut total: f64 = 1.0; // count self
1506        let mut child_count: u32 = 0;
1507        let mut has_data = false;
1508
1509        // Parent's filter: nodes reachable upward through the tree.
1510        // If any contributing filter is above the FPR cap, we refuse to
1511        // estimate rather than substitute a partial/biased aggregate —
1512        // Node.estimated_mesh_size is already Option<u64> and consumers
1513        // (control socket, fipstop, periodic debug log) handle None.
1514        if !is_root
1515            && let Some(parent) = self.peers.get(&parent_id)
1516            && let Some(filter) = parent.inbound_filter()
1517        {
1518            match filter.estimated_count(max_fpr) {
1519                Some(n) => {
1520                    total += n;
1521                    has_data = true;
1522                }
1523                None => {
1524                    self.estimated_mesh_size = None;
1525                    return;
1526                }
1527            }
1528        }
1529
1530        // Children's filters: each child's subtree is disjoint
1531        for (peer_addr, peer) in &self.peers {
1532            if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1533                && *decl.parent_id() == my_addr
1534            {
1535                child_count += 1;
1536                if let Some(filter) = peer.inbound_filter() {
1537                    match filter.estimated_count(max_fpr) {
1538                        Some(n) => {
1539                            total += n;
1540                            has_data = true;
1541                        }
1542                        None => {
1543                            self.estimated_mesh_size = None;
1544                            return;
1545                        }
1546                    }
1547                }
1548            }
1549        }
1550
1551        if !has_data {
1552            self.estimated_mesh_size = None;
1553            return;
1554        }
1555
1556        let size = total.round() as u64;
1557        self.estimated_mesh_size = Some(size);
1558
1559        // Periodic logging (reuse MMP default interval: 30s)
1560        let now = std::time::Instant::now();
1561        let should_log = match self.last_mesh_size_log {
1562            None => true,
1563            Some(last) => {
1564                now.duration_since(last)
1565                    >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1566            }
1567        };
1568        if should_log {
1569            tracing::debug!(
1570                estimated_mesh_size = size,
1571                peers = self.peers.len(),
1572                children = child_count,
1573                "Mesh size estimate"
1574            );
1575            self.last_mesh_size_log = Some(now);
1576        }
1577    }
1578
1579    // === Coord Cache ===
1580
1581    /// Get the coordinate cache.
1582    pub fn coord_cache(&self) -> &CoordCache {
1583        &self.coord_cache
1584    }
1585
1586    /// Get mutable coordinate cache.
1587    pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1588        &mut self.coord_cache
1589    }
1590
1591    // === Node Statistics ===
1592
1593    /// Get the node statistics.
1594    pub fn stats(&self) -> &stats::NodeStats {
1595        &self.stats
1596    }
1597
1598    /// Get mutable node statistics.
1599    pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1600        &mut self.stats
1601    }
1602
1603    /// Get the stats history collector.
1604    pub fn stats_history(&self) -> &stats_history::StatsHistory {
1605        &self.stats_history
1606    }
1607
1608    /// Sample the current node state into the stats history ring.
1609    /// Called once per tick from the RX loop.
1610    pub(crate) fn record_stats_history(&mut self) {
1611        let fwd = &self.stats.forwarding;
1612        let peers_with_mmp: Vec<f64> = self
1613            .peers
1614            .values()
1615            .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1616            .collect();
1617        let loss_rate = if peers_with_mmp.is_empty() {
1618            0.0
1619        } else {
1620            peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1621        };
1622
1623        let snap = stats_history::Snapshot {
1624            mesh_size: self.estimated_mesh_size,
1625            tree_depth: self.tree_state.my_coords().depth() as u32,
1626            peer_count: self.peers.len() as u64,
1627            parent_switches_total: self.stats.tree.parent_switches,
1628            bytes_in_total: fwd.received_bytes,
1629            bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1630            packets_in_total: fwd.received_packets,
1631            packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1632            loss_rate,
1633            active_sessions: self.sessions.len() as u64,
1634        };
1635
1636        let now = std::time::Instant::now();
1637        let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1638            .peers
1639            .values()
1640            .map(|p| {
1641                let stats = p.link_stats();
1642                let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1643                    Some(m) => (
1644                        m.metrics.srtt_ms(),
1645                        Some(m.metrics.loss_rate()),
1646                        m.receiver.ecn_ce_count() as u64,
1647                    ),
1648                    None => (None, None, 0),
1649                };
1650                stats_history::PeerSnapshot {
1651                    node_addr: *p.node_addr(),
1652                    last_seen: now,
1653                    srtt_ms,
1654                    loss_rate,
1655                    bytes_in_total: stats.bytes_recv,
1656                    bytes_out_total: stats.bytes_sent,
1657                    packets_in_total: stats.packets_recv,
1658                    packets_out_total: stats.packets_sent,
1659                    ecn_ce_total: ecn_ce,
1660                }
1661            })
1662            .collect();
1663
1664        self.stats_history.tick(now, &snap, &peer_snaps);
1665    }
1666
1667    // === TUN Interface ===
1668
1669    /// Get the TUN state.
1670    pub fn tun_state(&self) -> TunState {
1671        self.tun_state
1672    }
1673
1674    /// Get the TUN interface name, if active.
1675    pub fn tun_name(&self) -> Option<&str> {
1676        self.tun_name.as_deref()
1677    }
1678
1679    // === Resource Limits ===
1680
1681    /// Set the maximum number of connections (handshake phase).
1682    pub fn set_max_connections(&mut self, max: usize) {
1683        self.max_connections = max;
1684    }
1685
1686    /// Set the maximum number of peers (authenticated).
1687    pub fn set_max_peers(&mut self, max: usize) {
1688        self.max_peers = max;
1689    }
1690
1691    /// Set the maximum number of links.
1692    pub fn set_max_links(&mut self, max: usize) {
1693        self.max_links = max;
1694    }
1695
1696    // === Counts ===
1697
1698    /// Number of pending connections (handshake in progress).
1699    pub fn connection_count(&self) -> usize {
1700        self.connections.len()
1701    }
1702
1703    /// Number of authenticated peers.
1704    pub fn peer_count(&self) -> usize {
1705        self.peers.len()
1706    }
1707
1708    /// Number of active links.
1709    pub fn link_count(&self) -> usize {
1710        self.links.len()
1711    }
1712
1713    /// Number of active transports.
1714    pub fn transport_count(&self) -> usize {
1715        self.transports.len()
1716    }
1717
1718    // === Transport Management ===
1719
1720    /// Allocate a new transport ID.
1721    pub fn allocate_transport_id(&mut self) -> TransportId {
1722        let id = TransportId::new(self.next_transport_id);
1723        self.next_transport_id += 1;
1724        id
1725    }
1726
1727    /// Get a transport by ID.
1728    pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1729        self.transports.get(id)
1730    }
1731
1732    /// Get mutable transport by ID.
1733    pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1734        self.transports.get_mut(id)
1735    }
1736
1737    /// Iterate over transport IDs.
1738    pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1739        self.transports.keys()
1740    }
1741
1742    /// Get the packet receiver for the event loop.
1743    pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1744        self.packet_rx.as_mut()
1745    }
1746
1747    // === Link Management ===
1748
1749    /// Allocate a new link ID.
1750    pub fn allocate_link_id(&mut self) -> LinkId {
1751        let id = LinkId::new(self.next_link_id);
1752        self.next_link_id += 1;
1753        id
1754    }
1755
1756    /// Add a link.
1757    pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1758        if self.max_links > 0 && self.links.len() >= self.max_links {
1759            return Err(NodeError::MaxLinksExceeded {
1760                max: self.max_links,
1761            });
1762        }
1763        let link_id = link.link_id();
1764        let transport_id = link.transport_id();
1765        let remote_addr = link.remote_addr().clone();
1766
1767        self.links.insert(link_id, link);
1768        self.addr_to_link
1769            .insert((transport_id, remote_addr), link_id);
1770        Ok(())
1771    }
1772
1773    /// Get a link by ID.
1774    pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
1775        self.links.get(link_id)
1776    }
1777
1778    /// Get a mutable link by ID.
1779    pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
1780        self.links.get_mut(link_id)
1781    }
1782
1783    /// Find link ID by transport address.
1784    pub fn find_link_by_addr(
1785        &self,
1786        transport_id: TransportId,
1787        addr: &TransportAddr,
1788    ) -> Option<LinkId> {
1789        self.addr_to_link
1790            .get(&(transport_id, addr.clone()))
1791            .copied()
1792    }
1793
1794    /// Remove a link.
1795    ///
1796    /// Only removes the addr_to_link reverse lookup if it still points to this
1797    /// link. In cross-connection scenarios, a newer link may have replaced the
1798    /// entry for the same address.
1799    pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
1800        if let Some(link) = self.links.remove(link_id) {
1801            // Clean up reverse lookup only if it still maps to this link
1802            let key = (link.transport_id(), link.remote_addr().clone());
1803            if self.addr_to_link.get(&key) == Some(link_id) {
1804                self.addr_to_link.remove(&key);
1805            }
1806            Some(link)
1807        } else {
1808            None
1809        }
1810    }
1811
1812    pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
1813        if !self.bootstrap_transports.contains(&transport_id) {
1814            return;
1815        }
1816
1817        let transport_in_use = self
1818            .links
1819            .values()
1820            .any(|link| link.transport_id() == transport_id)
1821            || self
1822                .connections
1823                .values()
1824                .any(|conn| conn.transport_id() == Some(transport_id))
1825            || self
1826                .peers
1827                .values()
1828                .any(|peer| peer.transport_id() == Some(transport_id))
1829            || self
1830                .pending_connects
1831                .iter()
1832                .any(|pending| pending.transport_id == transport_id);
1833
1834        if transport_in_use {
1835            return;
1836        }
1837
1838        tracing::debug!(
1839            transport_id = %transport_id,
1840            "bootstrap transport has no remaining references; dropping"
1841        );
1842
1843        self.bootstrap_transports.remove(&transport_id);
1844        self.bootstrap_transport_npubs.remove(&transport_id);
1845        self.transport_drops.remove(&transport_id);
1846        self.transports.remove(&transport_id);
1847    }
1848
1849    /// Iterate over all links.
1850    pub fn links(&self) -> impl Iterator<Item = &Link> {
1851        self.links.values()
1852    }
1853
1854    // === Connection Management (Handshake Phase) ===
1855
1856    /// Add a pending connection.
1857    pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
1858        let link_id = connection.link_id();
1859
1860        if self.connections.contains_key(&link_id) {
1861            return Err(NodeError::ConnectionAlreadyExists(link_id));
1862        }
1863
1864        if self.max_connections > 0 && self.connections.len() >= self.max_connections {
1865            return Err(NodeError::MaxConnectionsExceeded {
1866                max: self.max_connections,
1867            });
1868        }
1869
1870        self.connections.insert(link_id, connection);
1871        Ok(())
1872    }
1873
1874    /// Get a connection by LinkId.
1875    pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
1876        self.connections.get(link_id)
1877    }
1878
1879    /// Get a mutable connection by LinkId.
1880    pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
1881        self.connections.get_mut(link_id)
1882    }
1883
1884    /// Remove a connection.
1885    pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
1886        self.connections.remove(link_id)
1887    }
1888
1889    /// Iterate over all connections.
1890    pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
1891        self.connections.values()
1892    }
1893
1894    // === Peer Management (Active Phase) ===
1895
1896    /// Get a peer by NodeAddr.
1897    pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
1898        self.peers.get(node_addr)
1899    }
1900
1901    /// Get a mutable peer by NodeAddr.
1902    pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
1903        self.peers.get_mut(node_addr)
1904    }
1905
1906    /// Remove a peer.
1907    pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
1908        self.peers.remove(node_addr)
1909    }
1910
1911    /// Iterate over all peers.
1912    pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
1913        self.peers.values()
1914    }
1915
1916    /// Reference to the Nostr discovery handle if discovery is enabled.
1917    /// Used by control queries (`show_peers` per-peer Nostr-traversal
1918    /// state) to read failure-state without taking shared ownership.
1919    pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
1920        self.nostr_discovery.as_deref()
1921    }
1922
1923    /// Iterate over all peer node IDs.
1924    pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
1925        self.peers.keys()
1926    }
1927
1928    /// Iterate over peers that can send traffic.
1929    pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
1930        self.peers.values().filter(|p| p.can_send())
1931    }
1932
1933    /// Number of peers that can send traffic.
1934    pub fn sendable_peer_count(&self) -> usize {
1935        self.peers.values().filter(|p| p.can_send()).count()
1936    }
1937
1938    // === End-to-End Sessions ===
1939
1940    /// Get a session by remote NodeAddr.
1941    /// Disable the discovery forward rate limiter (for tests).
1942    #[cfg(test)]
1943    pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
1944        self.discovery_forward_limiter
1945            .set_interval(std::time::Duration::ZERO);
1946    }
1947
1948    #[cfg(test)]
1949    pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
1950        self.sessions.get(remote)
1951    }
1952
1953    /// Get a mutable session by remote NodeAddr.
1954    #[cfg(test)]
1955    pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
1956        self.sessions.get_mut(remote)
1957    }
1958
1959    /// Remove a session.
1960    #[cfg(test)]
1961    pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
1962        self.sessions.remove(remote)
1963    }
1964
1965    /// Read the path_mtu_lookup entry for a destination FipsAddress.
1966    #[cfg(test)]
1967    pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
1968        self.path_mtu_lookup
1969            .read()
1970            .ok()
1971            .and_then(|map| map.get(fips_addr).copied())
1972    }
1973
1974    /// Write a path_mtu_lookup entry directly (for tests that pre-seed the map).
1975    #[cfg(test)]
1976    pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
1977        if let Ok(mut map) = self.path_mtu_lookup.write() {
1978            map.insert(fips_addr, mtu);
1979        }
1980    }
1981
1982    /// Number of end-to-end sessions.
1983    pub fn session_count(&self) -> usize {
1984        self.sessions.len()
1985    }
1986
1987    /// Iterate over all session entries (for control queries).
1988    pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
1989        self.sessions.iter()
1990    }
1991
1992    // === Identity Cache ===
1993
1994    /// Register a node in the identity cache for FipsAddress → NodeAddr lookup.
1995    pub(crate) fn register_identity(
1996        &mut self,
1997        node_addr: NodeAddr,
1998        pubkey: secp256k1::PublicKey,
1999    ) -> bool {
2000        let mut prefix = [0u8; 15];
2001        prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2002        if let Some(entry) = self.identity_cache.get(&prefix)
2003            && entry.node_addr == node_addr
2004            && entry.pubkey == pubkey
2005        {
2006            // Endpoint sends pass the same PeerIdentity on every packet. Once
2007            // validated, avoid re-deriving NodeAddr from the public key in the
2008            // data path; that hash showed up in macOS sender profiles.
2009            return true;
2010        }
2011
2012        let (xonly, _) = pubkey.x_only_public_key();
2013        let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2014        if derived_node_addr != node_addr {
2015            debug!(
2016                claimed_node_addr = %node_addr,
2017                derived_node_addr = %derived_node_addr,
2018                "Rejected identity cache entry with mismatched public key"
2019            );
2020            return false;
2021        }
2022
2023        let now_ms = Self::now_ms();
2024        if let Some(entry) = self.identity_cache.get_mut(&prefix)
2025            && entry.node_addr == node_addr
2026        {
2027            entry.pubkey = pubkey;
2028            entry.last_seen_ms = now_ms;
2029            return true;
2030        }
2031
2032        let npub = encode_npub(&xonly);
2033        self.identity_cache.insert(
2034            prefix,
2035            IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2036        );
2037        // LRU eviction
2038        let max = self.config.node.cache.identity_size;
2039        if self.identity_cache.len() > max
2040            && let Some(oldest_key) = self
2041                .identity_cache
2042                .iter()
2043                .min_by_key(|(_, entry)| entry.last_seen_ms)
2044                .map(|(k, _)| *k)
2045        {
2046            self.identity_cache.remove(&oldest_key);
2047        }
2048        true
2049    }
2050
2051    /// Look up a destination by FipsAddress prefix (bytes 1-15 of the IPv6 address).
2052    pub(crate) fn lookup_by_fips_prefix(
2053        &mut self,
2054        prefix: &[u8; 15],
2055    ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2056        if let Some(entry) = self.identity_cache.get_mut(prefix) {
2057            entry.last_seen_ms = Self::now_ms(); // LRU touch
2058            Some((entry.node_addr, entry.pubkey))
2059        } else {
2060            None
2061        }
2062    }
2063
2064    /// Check if a node's identity is in the cache (without LRU touch).
2065    pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2066        let mut prefix = [0u8; 15];
2067        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2068        self.identity_cache.contains_key(&prefix)
2069    }
2070
2071    /// Number of identity cache entries.
2072    pub fn identity_cache_len(&self) -> usize {
2073        self.identity_cache.len()
2074    }
2075
2076    /// Iterate over identity cache entries.
2077    ///
2078    /// Returns `(NodeAddr, PublicKey, last_seen_ms)` for each cached identity.
2079    /// Used by the `show_identity_cache` control query.
2080    pub fn identity_cache_iter(
2081        &self,
2082    ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2083        self.identity_cache
2084            .values()
2085            .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2086    }
2087
2088    /// Configured maximum identity cache size.
2089    pub fn identity_cache_max(&self) -> usize {
2090        self.config.node.cache.identity_size
2091    }
2092
2093    /// Number of pending discovery lookups.
2094    pub fn pending_lookup_count(&self) -> usize {
2095        self.pending_lookups.len()
2096    }
2097
2098    /// Iterate over pending discovery lookups for diagnostics.
2099    pub fn pending_lookups_iter(
2100        &self,
2101    ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2102        self.pending_lookups.iter()
2103    }
2104
2105    /// Number of recent discovery requests tracked.
2106    pub fn recent_request_count(&self) -> usize {
2107        self.recent_requests.len()
2108    }
2109
2110    /// Count of destinations with queued TUN packets awaiting session setup.
2111    pub fn pending_tun_destinations(&self) -> usize {
2112        self.pending_tun_packets.len()
2113    }
2114
2115    /// Total TUN packets queued across all destinations.
2116    pub fn pending_tun_total_packets(&self) -> usize {
2117        self.pending_tun_packets.values().map(|q| q.len()).sum()
2118    }
2119
2120    /// Iterate over retry state for diagnostics.
2121    pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2122        self.retry_pending.iter()
2123    }
2124
2125    // === Routing ===
2126
2127    /// Check if a peer is a tree neighbor (parent or child in the spanning tree).
2128    ///
2129    /// Returns true if the peer is our current tree parent, or if the peer
2130    /// has declared us as their parent (making them our child).
2131    pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2132        // Peer is our parent
2133        if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2134            return true;
2135        }
2136        // Peer is our child (their declaration names us as parent)
2137        if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2138            && decl.parent_id() == self.node_addr()
2139        {
2140            return true;
2141        }
2142        false
2143    }
2144
2145    /// Find next hop for a destination node address.
2146    ///
2147    /// Routing priority:
2148    /// 1. Destination is self → `None` (local delivery)
2149    /// 2. Destination is a direct peer → that peer
2150    /// 3. Reply-learned routes in `reply_learned` mode. These are locally
2151    ///    observed reverse paths, selected with weighted multipath plus
2152    ///    periodic coordinate/tree exploration.
2153    /// 4. Bloom filter candidates with cached dest coords → among peers whose
2154    ///    bloom filter contains the destination, pick the one that minimizes
2155    ///    tree distance to the destination, with
2156    ///    `(link_cost, tree_distance_to_dest, node_addr)` tie-breaking.
2157    ///    The self-distance check ensures only peers strictly closer to the
2158    ///    destination than us are considered (prevents routing loops).
2159    /// 5. Greedy tree routing fallback (requires cached dest coords)
2160    /// 6. No route → `None`
2161    ///
2162    /// Both the bloom filter and tree routing paths require cached destination
2163    /// coordinates (checked in `coord_cache`). Without coordinates, the node
2164    /// cannot make loop-free forwarding decisions. The caller should signal
2165    /// `CoordsRequired` back to the source when `None` is returned for a
2166    /// non-local destination.
2167    pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2168        // 1. Local delivery
2169        if dest_node_addr == self.node_addr() {
2170            return None;
2171        }
2172
2173        // 2. Direct peer
2174        if let Some(peer) = self.peers.get(dest_node_addr)
2175            && peer.can_send()
2176        {
2177            return Some(peer);
2178        }
2179
2180        let now_ms = Self::now_ms();
2181
2182        let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2183            Some(
2184                self.peers
2185                    .iter()
2186                    .filter(|(_, peer)| peer.can_send())
2187                    .map(|(addr, _)| *addr)
2188                    .collect::<HashSet<_>>(),
2189            )
2190        } else {
2191            None
2192        };
2193
2194        // 3. Optional reply-learned routing. These entries are not peer
2195        // claims; they are local observations of which peer carried traffic
2196        // or a verified lookup response back from the destination. Most
2197        // packets use weighted multipath over learned routes, but periodic
2198        // fallback exploration lets coord/bloom/tree routes discover better
2199        // candidates.
2200        let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2201            self.learned_routes.should_explore_fallback(
2202                dest_node_addr,
2203                now_ms,
2204                self.config.node.routing.learned_fallback_explore_interval,
2205                |addr| sendable.contains(addr),
2206            )
2207        });
2208        if let Some(sendable) = &sendable_learned_peers
2209            && !explore_fallback
2210            && let Some(next_hop_addr) =
2211                self.learned_routes
2212                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2213        {
2214            return self.peers.get(&next_hop_addr);
2215        }
2216
2217        // Look up cached destination coordinates (required by both bloom and tree paths).
2218        let Some(dest_coords) = self
2219            .coord_cache
2220            .get_and_touch(dest_node_addr, now_ms)
2221            .cloned()
2222        else {
2223            if let Some(sendable) = &sendable_learned_peers
2224                && let Some(next_hop_addr) =
2225                    self.learned_routes
2226                        .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2227            {
2228                return self.peers.get(&next_hop_addr);
2229            }
2230            return None;
2231        };
2232
2233        // 4. Bloom filter candidates — requires dest_coords for loop-free selection.
2234        //    If no candidate is strictly closer, fall through to tree routing.
2235        let coordinate_route_addr = {
2236            let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2237            if !candidates.is_empty() {
2238                self.select_best_candidate(&candidates, &dest_coords)
2239                    .map(|peer| *peer.node_addr())
2240            } else {
2241                None
2242            }
2243        };
2244        if let Some(next_hop_addr) = coordinate_route_addr {
2245            return self.peers.get(&next_hop_addr);
2246        }
2247
2248        // 5. Greedy tree routing fallback
2249        let tree_route_addr = self
2250            .tree_state
2251            .find_next_hop(&dest_coords)
2252            .filter(|next_hop_id| {
2253                self.peers
2254                    .get(next_hop_id)
2255                    .is_some_and(|peer| peer.can_send())
2256            });
2257        if let Some(next_hop_addr) = tree_route_addr {
2258            return self.peers.get(&next_hop_addr);
2259        }
2260        if explore_fallback {
2261            return sendable_learned_peers.as_ref().and_then(|sendable| {
2262                self.learned_routes
2263                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2264                    .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2265            });
2266        }
2267
2268        if let Some(sendable) = &sendable_learned_peers
2269            && let Some(next_hop_addr) =
2270                self.learned_routes
2271                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2272        {
2273            return self.peers.get(&next_hop_addr);
2274        }
2275
2276        None
2277    }
2278
2279    pub(in crate::node) fn learn_reverse_route(
2280        &mut self,
2281        destination: NodeAddr,
2282        next_hop: NodeAddr,
2283    ) {
2284        if self.config.node.routing.mode != RoutingMode::ReplyLearned
2285            || destination == *self.node_addr()
2286        {
2287            return;
2288        }
2289        let now_ms = Self::now_ms();
2290        self.learned_routes.learn(
2291            destination,
2292            next_hop,
2293            now_ms,
2294            self.config.node.routing.learned_ttl_secs,
2295            self.config.node.routing.max_learned_routes_per_dest,
2296        );
2297    }
2298
2299    pub(in crate::node) fn record_route_failure(
2300        &mut self,
2301        destination: NodeAddr,
2302        next_hop: NodeAddr,
2303    ) {
2304        if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2305            return;
2306        }
2307        self.learned_routes.record_failure(&destination, &next_hop);
2308    }
2309
2310    pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2311        self.learned_routes.snapshot(now_ms)
2312    }
2313
2314    pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2315        self.learned_routes.purge_expired(now_ms);
2316    }
2317
2318    /// Select the best peer from a set of bloom filter candidates.
2319    ///
2320    /// Uses distance from each candidate's tree coordinates to the destination
2321    /// as the primary metric (after link_cost). Only selects peers that are
2322    /// strictly closer to the destination than we are (self-distance check
2323    /// prevents routing loops).
2324    ///
2325    /// Ordering: `(link_cost, distance_to_dest, node_addr)`.
2326    fn select_best_candidate<'a>(
2327        &'a self,
2328        candidates: &[&'a ActivePeer],
2329        dest_coords: &crate::tree::TreeCoordinate,
2330    ) -> Option<&'a ActivePeer> {
2331        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2332
2333        let mut best: Option<(&ActivePeer, f64, usize)> = None;
2334
2335        for &candidate in candidates {
2336            if !candidate.can_send() {
2337                continue;
2338            }
2339
2340            let cost = candidate.link_cost();
2341
2342            let dist = self
2343                .tree_state
2344                .peer_coords(candidate.node_addr())
2345                .map(|pc| pc.distance_to(dest_coords))
2346                .unwrap_or(usize::MAX);
2347
2348            // Self-distance check: only consider peers strictly closer
2349            // to the destination than we are (prevents routing loops)
2350            if dist >= my_distance {
2351                continue;
2352            }
2353
2354            let dominated = match &best {
2355                None => true,
2356                Some((_, best_cost, best_dist)) => {
2357                    cost < *best_cost
2358                        || (cost == *best_cost && dist < *best_dist)
2359                        || (cost == *best_cost
2360                            && dist == *best_dist
2361                            && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2362                }
2363            };
2364
2365            if dominated {
2366                best = Some((candidate, cost, dist));
2367            }
2368        }
2369
2370        best.map(|(peer, _, _)| peer)
2371    }
2372
2373    /// Check if a destination is in any peer's bloom filter.
2374    pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2375        self.peers.values().filter(|p| p.may_reach(dest)).collect()
2376    }
2377
2378    /// Get the TUN packet sender channel.
2379    ///
2380    /// Returns None if TUN is not active or the node hasn't been started.
2381    pub fn tun_tx(&self) -> Option<&TunTx> {
2382        self.tun_tx.as_ref()
2383    }
2384
2385    /// Attach app-owned packet I/O for embedded operation without a system TUN.
2386    ///
2387    /// This must be called before [`Node::start`] and requires `tun.enabled =
2388    /// false`. Outbound packets sent to the returned sender are processed by the
2389    /// normal session pipeline. Inbound packets delivered by FIPS sessions are
2390    /// sent to the returned receiver with source attribution.
2391    pub fn attach_external_packet_io(
2392        &mut self,
2393        capacity: usize,
2394    ) -> Result<ExternalPacketIo, NodeError> {
2395        if self.state != NodeState::Created {
2396            return Err(NodeError::Config(ConfigError::Validation(
2397                "external packet I/O must be attached before node start".to_string(),
2398            )));
2399        }
2400        if self.config.tun.enabled {
2401            return Err(NodeError::Config(ConfigError::Validation(
2402                "external packet I/O requires tun.enabled=false".to_string(),
2403            )));
2404        }
2405
2406        let capacity = capacity.max(1);
2407        let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2408        let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2409        self.tun_outbound_rx = Some(outbound_rx);
2410        self.external_packet_tx = Some(inbound_tx);
2411
2412        Ok(ExternalPacketIo {
2413            outbound_tx,
2414            inbound_rx,
2415        })
2416    }
2417
2418    /// Attach app-owned endpoint data I/O for embedded operation.
2419    ///
2420    /// Commands sent to the returned sender are processed by the node RX loop.
2421    /// Incoming endpoint data is emitted as source-attributed events.
2422    pub(crate) fn attach_endpoint_data_io(
2423        &mut self,
2424        capacity: usize,
2425    ) -> Result<EndpointDataIo, NodeError> {
2426        if self.state != NodeState::Created {
2427            return Err(NodeError::Config(ConfigError::Validation(
2428                "endpoint data I/O must be attached before node start".to_string(),
2429            )));
2430        }
2431
2432        let command_capacity = endpoint_data_command_capacity(capacity);
2433        let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2434        // Inbound endpoint-data events use an unbounded channel — see
2435        // `EndpointDataIo::event_rx` docs for the rationale (kills the
2436        // per-packet semaphore + the cross-task relay task that used to
2437        // sit on top of this channel).
2438        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2439        self.endpoint_command_rx = Some(command_rx);
2440        self.endpoint_event_tx = Some(event_tx.clone());
2441
2442        Ok(EndpointDataIo {
2443            command_tx,
2444            event_rx,
2445            event_tx,
2446        })
2447    }
2448
2449    pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2450        let mut prefix = [0u8; 15];
2451        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2452        self.identity_cache
2453            .get(&prefix)
2454            .filter(|entry| &entry.node_addr == addr)
2455            .map(|entry| entry.pubkey)
2456    }
2457
2458    pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2459        let mut prefix = [0u8; 15];
2460        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2461        self.identity_cache
2462            .get(&prefix)
2463            .filter(|entry| &entry.node_addr == addr)
2464            .map(|entry| entry.npub.clone())
2465    }
2466
2467    pub(in crate::node) fn deliver_external_ipv6_packet(
2468        &self,
2469        src_addr: &NodeAddr,
2470        packet: Vec<u8>,
2471    ) {
2472        let Some(external_packet_tx) = &self.external_packet_tx else {
2473            return;
2474        };
2475        if packet.len() < 40 {
2476            return;
2477        }
2478        let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2479            return;
2480        };
2481        let delivered = NodeDeliveredPacket {
2482            source_node_addr: *src_addr,
2483            source_npub: self.npub_for_node_addr(src_addr),
2484            destination,
2485            packet,
2486        };
2487        if let Err(error) = external_packet_tx.try_send(delivered) {
2488            debug!(error = %error, "Failed to deliver packet to external app sink");
2489        }
2490    }
2491
2492    // === Sending ===
2493
2494    /// Encrypt and send a link-layer message to an authenticated peer.
2495    ///
2496    /// The plaintext should include the message type byte followed by the
2497    /// message-specific payload (e.g., `[0x50, reason]` for Disconnect).
2498    ///
2499    /// The send path prepends a 4-byte session-relative timestamp (inner
2500    /// header) before encryption. The full 16-byte outer header is used
2501    /// as AAD for the AEAD construction.
2502    ///
2503    /// This is the standard path for sending any link-layer control message
2504    /// to a peer over their encrypted Noise session.
2505    pub(super) async fn send_encrypted_link_message(
2506        &mut self,
2507        node_addr: &NodeAddr,
2508        plaintext: &[u8],
2509    ) -> Result<(), NodeError> {
2510        self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2511            .await
2512    }
2513
2514    /// Update the local-outbound-broken signal from a `transport.send`
2515    /// outcome. Sets `last_local_send_failure_at` on local-side io
2516    /// errors (NetworkUnreachable / HostUnreachable / AddrNotAvailable);
2517    /// clears it on success. The reaper consults this in
2518    /// `check_link_heartbeats` to switch to `fast_link_dead_timeout_secs`.
2519    pub(in crate::node) fn note_local_send_outcome(
2520        &mut self,
2521        result: &Result<usize, TransportError>,
2522    ) {
2523        match result {
2524            Ok(_) => {
2525                if self.last_local_send_failure_at.is_some() {
2526                    self.last_local_send_failure_at = None;
2527                }
2528            }
2529            Err(TransportError::Io(e))
2530                if matches!(
2531                    e.kind(),
2532                    std::io::ErrorKind::NetworkUnreachable
2533                        | std::io::ErrorKind::HostUnreachable
2534                        | std::io::ErrorKind::AddrNotAvailable
2535                ) =>
2536            {
2537                self.last_local_send_failure_at = Some(std::time::Instant::now());
2538            }
2539            Err(_) => {}
2540        }
2541    }
2542
2543    /// Returns the wall-clock instant of the most recent observed local
2544    /// outbound failure, if any. Used by the link-dead reaper.
2545    pub(in crate::node) fn last_local_send_failure_at(&self) -> Option<std::time::Instant> {
2546        self.last_local_send_failure_at
2547    }
2548
2549    /// Like `send_encrypted_link_message` but allows setting the FMP CE flag.
2550    ///
2551    /// Used by the forwarding path to relay congestion signals hop-by-hop.
2552    pub(super) async fn send_encrypted_link_message_with_ce(
2553        &mut self,
2554        node_addr: &NodeAddr,
2555        plaintext: &[u8],
2556        ce_flag: bool,
2557    ) -> Result<(), NodeError> {
2558        let peer = self
2559            .peers
2560            .get_mut(node_addr)
2561            .ok_or(NodeError::PeerNotFound(*node_addr))?;
2562
2563        let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2564            node_addr: *node_addr,
2565            reason: "no their_index".into(),
2566        })?;
2567        let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2568            node_addr: *node_addr,
2569            reason: "no transport_id".into(),
2570        })?;
2571        let remote_addr = peer
2572            .current_addr()
2573            .cloned()
2574            .ok_or_else(|| NodeError::SendFailed {
2575                node_addr: *node_addr,
2576                reason: "no current_addr".into(),
2577            })?;
2578        #[cfg(any(target_os = "linux", target_os = "macos"))]
2579        let connected_socket = peer.connected_udp();
2580
2581        // Prepend 4-byte session-relative timestamp (inner header)
2582        let timestamp_ms = peer.session_elapsed_ms();
2583
2584        // MMP: read spin bit value before entering session borrow
2585        let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2586        let mut flags = if sp_flag { FLAG_SP } else { 0 };
2587        if ce_flag {
2588            flags |= FLAG_CE;
2589        }
2590        if peer.current_k_bit() {
2591            flags |= FLAG_KEY_EPOCH;
2592        }
2593
2594        let session = peer
2595            .noise_session_mut()
2596            .ok_or_else(|| NodeError::SendFailed {
2597                node_addr: *node_addr,
2598                reason: "no noise session".into(),
2599            })?;
2600
2601        // Build 16-byte outer header upfront. The inner-plaintext
2602        // layout is `[ts:4 LE][plaintext...]`, so its length is exactly
2603        // `INNER_TS_LEN + plaintext.len()` — no need to build the Vec
2604        // just to measure it. The worker path uses this length to size
2605        // the wire buffer directly; the legacy path below still
2606        // materialises a separate `inner_plaintext` Vec for the inline
2607        // encrypt-and-send call.
2608        const INNER_TS_LEN: usize = 4;
2609        let counter = session.current_send_counter();
2610        let inner_len = INNER_TS_LEN + plaintext.len();
2611        let payload_len = inner_len as u16;
2612        let header = build_established_header(their_index, counter, flags, payload_len);
2613
2614        // **UDP send fast path.** The encrypt-worker pool is always
2615        // spawned at lifecycle start (workers = num_cpus) in
2616        // production, so this branch is taken for every authentic
2617        // send on every UDP-transported established session. The
2618        // AEAD work + sendmsg syscall run on a dedicated OS thread;
2619        // the rx_loop only builds the wire buffer + reserves the
2620        // counter inline.
2621        //
2622        // Other transport kinds (BLE, TCP, sim, ethernet) fall
2623        // through to the inline encrypt + transport.send path
2624        // below — those don't have raw-fd / sendmmsg / UDP_GSO
2625        // benefits to expose through the worker pool, so the simpler
2626        // synchronous send is the right shape for them.
2627        //
2628        // The `encrypt_workers.is_some()` check below is true in
2629        // production (lifecycle::start spawns the pool); it stays
2630        // checked rather than `expect()`-ed because unit tests
2631        // construct `Node` without calling `start()`.
2632        let transport_for_send = self
2633            .transports
2634            .get(&transport_id)
2635            .ok_or(NodeError::TransportNotFound(transport_id))?;
2636        let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2637        if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2638            && is_udp
2639            && let Some(cipher_clone) = session.send_cipher_clone()
2640        {
2641            {
2642                // Reserve the counter on the session so subsequent
2643                // sends don't reuse it. `current_send_counter` only
2644                // peeks; we advance via `take_send_counter`.
2645                let reserved_counter =
2646                    session
2647                        .take_send_counter()
2648                        .map_err(|e| NodeError::SendFailed {
2649                            node_addr: *node_addr,
2650                            reason: format!("counter reservation failed: {}", e),
2651                        })?;
2652                debug_assert_eq!(reserved_counter, counter);
2653                // Re-derive the header with the now-locked-in counter
2654                // value (same value, but the call sequence is more
2655                // explicit).
2656                let header =
2657                    build_established_header(their_index, reserved_counter, flags, payload_len);
2658                let transport = transport_for_send;
2659                // Snapshot the per-peer connected UDP socket before
2660                // resolving the fallback address. On the established
2661                // steady-state path this socket already carries the
2662                // kernel peer address, so re-parsing the configured
2663                // transport address and touching the DNS cache on every
2664                // packet is pure overhead on the sender hot path.
2665                let send_target = {
2666                    if let TransportHandle::Udp(udp) = transport {
2667                        let socket_addr = {
2668                            #[cfg(any(target_os = "linux", target_os = "macos"))]
2669                            {
2670                                match connected_socket.as_ref() {
2671                                    Some(socket) => Some(socket.peer_addr()),
2672                                    None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2673                                }
2674                            }
2675                            #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2676                            {
2677                                udp.resolve_for_off_task(&remote_addr).await.ok()
2678                            }
2679                        };
2680                        match (udp.async_socket(), socket_addr) {
2681                            (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
2682                            _ => None,
2683                        }
2684                    } else {
2685                        None
2686                    }
2687                };
2688                if let Some((socket, socket_addr)) = send_target {
2689                    // Build the wire buffer **directly** from
2690                    // `plaintext` with a single allocation:
2691                    //   `[16 header][4 ts][plaintext...]` with
2692                    // +16 trailing capacity for the AEAD tag.
2693                    // The worker seals `wire_buf[16..]` in
2694                    // place and appends the tag — no second
2695                    // alloc, no second memcpy.
2696                    //
2697                    // Previous design built `inner_plaintext`
2698                    // via `prepend_inner_header` (1 alloc + 1
2699                    // copy) and then let the worker memcpy
2700                    // header + plaintext into a fresh Vec
2701                    // (another alloc + copy). At ~100 kpps the
2702                    // saved alloc/copy is ~150 MB/sec of memory
2703                    // bandwidth on the hot rx_loop + worker.
2704                    let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
2705                    let mut wire_buf = Vec::with_capacity(wire_capacity);
2706                    wire_buf.extend_from_slice(&header);
2707                    wire_buf.extend_from_slice(&timestamp_ms.to_le_bytes());
2708                    wire_buf.extend_from_slice(plaintext);
2709                    let predicted_bytes = wire_capacity;
2710                    // Stats / MMP update inline — predicted size
2711                    // is exact for ChaCha20-Poly1305 (tag is
2712                    // constant 16 bytes). When `connected_socket` is
2713                    // `Some`, the worker sends on it without a
2714                    // destination sockaddr — the kernel skips the
2715                    // per-packet sockaddr + route + neighbor resolve.
2716                    if let Some(peer) = self.peers.get_mut(node_addr) {
2717                        peer.link_stats_mut().record_sent(predicted_bytes);
2718                        if let Some(mmp) = peer.mmp_mut() {
2719                            mmp.sender
2720                                .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
2721                        }
2722                    }
2723                    workers.dispatch(self::encrypt_worker::FmpSendJob {
2724                        cipher: cipher_clone,
2725                        counter: reserved_counter,
2726                        wire_buf,
2727                        fsp_seal: None,
2728                        socket,
2729                        dest_addr: socket_addr,
2730                        #[cfg(any(target_os = "linux", target_os = "macos"))]
2731                        connected_socket,
2732                        drop_on_backpressure: plaintext
2733                            .first()
2734                            .is_some_and(|ty| *ty == SessionMessageType::EndpointData.to_byte()),
2735                        queued_at: crate::perf_profile::stamp(),
2736                    });
2737                    return Ok(());
2738                }
2739            }
2740        }
2741
2742        // Inline (legacy) path: encrypt + send on the rx_loop.
2743        // Build the inner plaintext lazily here — the worker path
2744        // above never reaches this point, so the prepend_inner_header
2745        // alloc is avoided in the fast path.
2746        let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
2747        // Encrypt with AAD binding to the outer header
2748        let ciphertext = {
2749            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
2750            session
2751                .encrypt_with_aad(&inner_plaintext, &header)
2752                .map_err(|e| NodeError::SendFailed {
2753                    node_addr: *node_addr,
2754                    reason: format!("encryption failed: {}", e),
2755                })?
2756        };
2757
2758        let wire_packet = build_encrypted(&header, &ciphertext);
2759
2760        // Re-borrow peer for stats update after sending
2761        let send_result = {
2762            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
2763            let transport = self
2764                .transports
2765                .get(&transport_id)
2766                .ok_or(NodeError::TransportNotFound(transport_id))?;
2767            transport.send(&remote_addr, &wire_packet).await
2768        };
2769        self.note_local_send_outcome(&send_result);
2770        let bytes_sent = send_result.map_err(|e| match e {
2771            TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
2772                node_addr: *node_addr,
2773                packet_size,
2774                mtu,
2775            },
2776            other => NodeError::SendFailed {
2777                node_addr: *node_addr,
2778                reason: format!("transport send: {}", other),
2779            },
2780        })?;
2781
2782        // Update send statistics
2783        if let Some(peer) = self.peers.get_mut(node_addr) {
2784            peer.link_stats_mut().record_sent(bytes_sent);
2785            // MMP: record sent frame for sender report generation
2786            if let Some(mmp) = peer.mmp_mut() {
2787                mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
2788            }
2789        }
2790
2791        Ok(())
2792    }
2793}
2794
2795impl fmt::Debug for Node {
2796    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2797        f.debug_struct("Node")
2798            .field("node_addr", self.node_addr())
2799            .field("state", &self.state)
2800            .field("is_leaf_only", &self.is_leaf_only)
2801            .field("connections", &self.connection_count())
2802            .field("peers", &self.peer_count())
2803            .field("links", &self.link_count())
2804            .field("transports", &self.transport_count())
2805            .finish()
2806    }
2807}