Skip to main content

fips_core/node/
mod.rs

1//! FIPS Node Entity
2//!
3//! Top-level structure representing a running FIPS instance. The Node
4//! holds all state required for mesh routing: identity, tree state,
5//! Bloom filters, coordinate caches, transports, links, and peers.
6
7mod acl;
8mod bloom;
9mod decrypt_worker;
10mod discovery_rate_limit;
11mod encrypt_worker;
12mod handlers;
13mod lifecycle;
14mod rate_limit;
15mod retry;
16mod routing;
17mod routing_error_rate_limit;
18pub(crate) mod session;
19pub(crate) mod session_wire;
20pub(crate) mod stats;
21pub(crate) mod stats_history;
22#[cfg(test)]
23mod tests;
24mod tree;
25pub(crate) mod wire;
26
27use self::discovery_rate_limit::{DiscoveryBackoff, DiscoveryForwardRateLimiter};
28use self::rate_limit::HandshakeRateLimiter;
29use self::routing::{LearnedRouteTable, LearnedRouteTableSnapshot};
30use self::routing_error_rate_limit::RoutingErrorRateLimiter;
31use self::wire::{
32    ESTABLISHED_HEADER_SIZE, FLAG_CE, FLAG_KEY_EPOCH, FLAG_SP, build_encrypted,
33    build_established_header, prepend_inner_header,
34};
35use crate::bloom::BloomState;
36use crate::cache::CoordCache;
37use crate::config::RoutingMode;
38use crate::node::session::SessionEntry;
39use crate::peer::{ActivePeer, PeerConnection};
40#[cfg(any(target_os = "linux", target_os = "macos"))]
41use crate::transport::ethernet::EthernetTransport;
42use crate::transport::tcp::TcpTransport;
43use crate::transport::tor::TorTransport;
44use crate::transport::udp::UdpTransport;
45use crate::transport::{
46    Link, LinkId, PacketRx, PacketTx, TransportAddr, TransportError, TransportHandle, TransportId,
47};
48use crate::tree::TreeState;
49use crate::upper::hosts::HostMap;
50use crate::upper::icmp_rate_limit::IcmpRateLimiter;
51use crate::upper::tun::{TunError, TunOutboundRx, TunState, TunTx};
52use crate::utils::index::IndexAllocator;
53use crate::{
54    Config, ConfigError, FipsAddress, Identity, IdentityError, NodeAddr, PeerIdentity,
55    SessionMessageType, encode_npub,
56};
57use rand::Rng;
58use std::collections::{HashMap, HashSet, VecDeque};
59use std::fmt;
60use std::sync::Arc;
61use std::thread::JoinHandle;
62use thiserror::Error;
63use tracing::{debug, warn};
64
65/// Half-range of the symmetric jitter applied to per-session rekey timers.
66///
67/// Each FMP/FSP session draws an offset uniformly from
68/// `[-REKEY_JITTER_SECS, +REKEY_JITTER_SECS]` seconds at construction and
69/// after each cutover. This preserves the configured mean interval while
70/// reducing dual-initiation bursts in symmetric-start meshes.
71pub(crate) const REKEY_JITTER_SECS: i64 = 15;
72
73/// Errors related to node operations.
74#[derive(Debug, Error)]
75pub enum NodeError {
76    #[error("node not started")]
77    NotStarted,
78
79    #[error("node already started")]
80    AlreadyStarted,
81
82    #[error("node already stopped")]
83    AlreadyStopped,
84
85    #[error("transport not found: {0}")]
86    TransportNotFound(TransportId),
87
88    #[error("no transport available for type: {0}")]
89    NoTransportForType(String),
90
91    #[error("link not found: {0}")]
92    LinkNotFound(LinkId),
93
94    #[error("connection not found: {0}")]
95    ConnectionNotFound(LinkId),
96
97    #[error("peer not found: {0:?}")]
98    PeerNotFound(NodeAddr),
99
100    #[error("peer already exists: {0:?}")]
101    PeerAlreadyExists(NodeAddr),
102
103    #[error("connection already exists for link: {0}")]
104    ConnectionAlreadyExists(LinkId),
105
106    #[error("invalid peer npub '{npub}': {reason}")]
107    InvalidPeerNpub { npub: String, reason: String },
108
109    #[error("access denied: {0}")]
110    AccessDenied(String),
111
112    #[error("max connections exceeded: {max}")]
113    MaxConnectionsExceeded { max: usize },
114
115    #[error("max peers exceeded: {max}")]
116    MaxPeersExceeded { max: usize },
117
118    #[error("max links exceeded: {max}")]
119    MaxLinksExceeded { max: usize },
120
121    #[error("handshake incomplete for link {0}")]
122    HandshakeIncomplete(LinkId),
123
124    #[error("no session available for link {0}")]
125    NoSession(LinkId),
126
127    #[error("promotion failed for link {link_id}: {reason}")]
128    PromotionFailed { link_id: LinkId, reason: String },
129
130    #[error("send failed to {node_addr}: {reason}")]
131    SendFailed { node_addr: NodeAddr, reason: String },
132
133    #[error("mtu exceeded forwarding to {node_addr}: packet {packet_size} > mtu {mtu}")]
134    MtuExceeded {
135        node_addr: NodeAddr,
136        packet_size: usize,
137        mtu: u16,
138    },
139
140    #[error("config error: {0}")]
141    Config(#[from] ConfigError),
142
143    #[error("identity error: {0}")]
144    Identity(#[from] IdentityError),
145
146    #[error("TUN error: {0}")]
147    Tun(#[from] TunError),
148
149    #[error("index allocation failed: {0}")]
150    IndexAllocationFailed(String),
151
152    #[error("handshake failed: {0}")]
153    HandshakeFailed(String),
154
155    #[error("transport error: {0}")]
156    TransportError(String),
157
158    #[error("bootstrap handoff failed: {0}")]
159    BootstrapHandoff(String),
160}
161
162/// Source-attributed packet delivered by a node running without a system TUN.
163#[derive(Debug, Clone, PartialEq, Eq)]
164pub struct NodeDeliveredPacket {
165    /// FIPS node address that originated the packet.
166    pub source_node_addr: NodeAddr,
167    /// Source Nostr public key when the node has learned it.
168    pub source_npub: Option<String>,
169    /// Destination FIPS address from the IPv6 packet.
170    pub destination: FipsAddress,
171    /// Full IPv6 packet after FIPS session decapsulation.
172    pub packet: Vec<u8>,
173}
174
175#[derive(Debug, Clone)]
176struct IdentityCacheEntry {
177    node_addr: NodeAddr,
178    pubkey: secp256k1::PublicKey,
179    npub: String,
180    last_seen_ms: u64,
181}
182
183impl IdentityCacheEntry {
184    fn new(
185        node_addr: NodeAddr,
186        pubkey: secp256k1::PublicKey,
187        npub: String,
188        last_seen_ms: u64,
189    ) -> Self {
190        Self {
191            node_addr,
192            pubkey,
193            npub,
194            last_seen_ms,
195        }
196    }
197}
198
199/// App-owned packet channels for embedding FIPS without a system TUN.
200#[derive(Debug)]
201pub struct ExternalPacketIo {
202    /// Send outbound IPv6 packets into the node.
203    pub outbound_tx: crate::upper::tun::TunOutboundTx,
204    /// Receive inbound IPv6 packets delivered by FIPS sessions.
205    pub inbound_rx: tokio::sync::mpsc::Receiver<NodeDeliveredPacket>,
206}
207
208/// App-owned endpoint data channels for embedding FIPS without a daemon.
209#[derive(Debug)]
210pub(crate) struct EndpointDataIo {
211    /// Send endpoint data commands into the node RX loop.
212    ///
213    /// Bounded with a generous default so normal sender bursts do not
214    /// stall on semaphore acquisition. macOS pacing happens at the UDP
215    /// egress thread where the real Wi-Fi/interface bottleneck is visible;
216    /// constraining this app queue instead caused the inner TCP flow to
217    /// collapse under iperf. `FIPS_ENDPOINT_DATA_QUEUE_CAP` overrides the
218    /// default for benches.
219    pub(crate) command_tx: tokio::sync::mpsc::Sender<NodeEndpointCommand>,
220    /// Receive endpoint data delivered by FIPS sessions.
221    ///
222    /// Unbounded so the rx_loop's send on inbound packet delivery is a
223    /// wait-free push (no semaphore acquire), and so we can drop the
224    /// per-packet cross-task relay that previously sat between the node
225    /// task and the `FipsEndpoint::recv()` consumer. Backpressure is
226    /// naturally bounded — the rx_loop both produces here and runs the
227    /// same runtime that schedules the consumer, so a stalled consumer
228    /// stalls production too.
229    pub(crate) event_rx: tokio::sync::mpsc::UnboundedReceiver<NodeEndpointEvent>,
230    /// Clone of the event_tx exposed for in-process loopback (e.g.
231    /// `FipsEndpoint::send` to self_npub). Lets the endpoint inject an
232    /// event into the same queue without going through the encrypt /
233    /// decrypt path, while keeping every consumer reading from a single
234    /// channel.
235    pub(crate) event_tx: tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>,
236}
237
238fn endpoint_data_command_capacity(requested: usize) -> usize {
239    if let Ok(raw) = std::env::var("FIPS_ENDPOINT_DATA_QUEUE_CAP")
240        && let Ok(value) = raw.trim().parse::<usize>()
241        && value > 0
242    {
243        return value;
244    }
245
246    requested.max(1).max(32_768)
247}
248
249/// Commands accepted by the node endpoint data service.
250#[derive(Debug)]
251pub(crate) enum NodeEndpointCommand {
252    /// Send with an explicit response channel — used by callers that
253    /// care whether the local-stack handoff succeeded (e.g.
254    /// `blocking_send` waits for the runtime to accept the send).
255    Send {
256        remote: PeerIdentity,
257        payload: Vec<u8>,
258        queued_at: Option<std::time::Instant>,
259        response_tx: tokio::sync::oneshot::Sender<Result<(), NodeError>>,
260    },
261    /// **Fire-and-forget** variant of `Send` — no oneshot allocation,
262    /// no per-packet result channel. Used by the data-plane fast path
263    /// (`FipsEndpoint::send`) where the caller already discards the
264    /// result. Saves one oneshot::channel() allocation per outbound
265    /// packet on the application's send hot path.
266    SendOneway {
267        remote: PeerIdentity,
268        payload: Vec<u8>,
269        queued_at: Option<std::time::Instant>,
270    },
271    PeerSnapshot {
272        response_tx: tokio::sync::oneshot::Sender<Vec<NodeEndpointPeer>>,
273    },
274    /// Replace the runtime peer list. Newly added auto-connect peers get
275    /// `initiate_peer_connection` immediately; removed peers are dropped
276    /// from the retry queue (the regular liveness timeout reaps any active
277    /// session). Existing entries are kept and their `addresses` field is
278    /// refreshed so the next retry sees the latest hints.
279    UpdatePeers {
280        peers: Vec<crate::config::PeerConfig>,
281        response_tx: tokio::sync::oneshot::Sender<Result<UpdatePeersOutcome, NodeError>>,
282    },
283}
284
285/// Reports what changed in response to `UpdatePeers`.
286#[derive(Debug, Clone, Default, PartialEq, Eq)]
287pub(crate) struct UpdatePeersOutcome {
288    pub(crate) added: usize,
289    pub(crate) removed: usize,
290    pub(crate) updated: usize,
291    pub(crate) unchanged: usize,
292}
293
294/// Endpoint data events emitted by the node session receive path.
295#[derive(Debug)]
296pub(crate) enum NodeEndpointEvent {
297    Data {
298        source_node_addr: NodeAddr,
299        source_npub: Option<String>,
300        payload: Vec<u8>,
301        queued_at: Option<std::time::Instant>,
302    },
303}
304
305/// Authenticated peer state exposed to embedded endpoint callers.
306#[derive(Debug, Clone, PartialEq, Eq)]
307pub(crate) struct NodeEndpointPeer {
308    pub(crate) npub: String,
309    pub(crate) transport_addr: Option<String>,
310    pub(crate) transport_type: Option<String>,
311    pub(crate) link_id: u64,
312    pub(crate) srtt_ms: Option<u64>,
313    pub(crate) packets_sent: u64,
314    pub(crate) packets_recv: u64,
315    pub(crate) bytes_sent: u64,
316    pub(crate) bytes_recv: u64,
317}
318
319/// Node operational state.
320#[derive(Clone, Copy, Debug, PartialEq, Eq)]
321pub enum NodeState {
322    /// Created but not started.
323    Created,
324    /// Starting up (initializing transports).
325    Starting,
326    /// Fully operational.
327    Running,
328    /// Shutting down.
329    Stopping,
330    /// Stopped.
331    Stopped,
332}
333
334impl NodeState {
335    /// Check if node is operational.
336    pub fn is_operational(&self) -> bool {
337        matches!(self, NodeState::Running)
338    }
339
340    /// Check if node can be started.
341    pub fn can_start(&self) -> bool {
342        matches!(self, NodeState::Created | NodeState::Stopped)
343    }
344
345    /// Check if node can be stopped.
346    pub fn can_stop(&self) -> bool {
347        matches!(self, NodeState::Running)
348    }
349}
350
351impl fmt::Display for NodeState {
352    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
353        let s = match self {
354            NodeState::Created => "created",
355            NodeState::Starting => "starting",
356            NodeState::Running => "running",
357            NodeState::Stopping => "stopping",
358            NodeState::Stopped => "stopped",
359        };
360        write!(f, "{}", s)
361    }
362}
363
364/// Recent request tracking for dedup and reverse-path forwarding.
365///
366/// When a LookupRequest is forwarded through a node, the node stores the
367/// request_id and which peer sent it. When the corresponding LookupResponse
368/// arrives, it's forwarded back to that peer (reverse-path forwarding).
369/// The `response_forwarded` flag prevents response routing loops.
370#[derive(Clone, Debug)]
371pub(crate) struct RecentRequest {
372    /// The peer who sent this request to us.
373    pub(crate) from_peer: NodeAddr,
374    /// When we received this request (Unix milliseconds).
375    pub(crate) timestamp_ms: u64,
376    /// Whether we've already forwarded a response for this request.
377    /// Prevents response routing loops when convergent request paths
378    /// create bidirectional entries in recent_requests.
379    pub(crate) response_forwarded: bool,
380}
381
382impl RecentRequest {
383    pub(crate) fn new(from_peer: NodeAddr, timestamp_ms: u64) -> Self {
384        Self {
385            from_peer,
386            timestamp_ms,
387            response_forwarded: false,
388        }
389    }
390
391    /// Check if this entry has expired (older than expiry_ms).
392    pub(crate) fn is_expired(&self, current_time_ms: u64, expiry_ms: u64) -> bool {
393        current_time_ms.saturating_sub(self.timestamp_ms) > expiry_ms
394    }
395}
396
397/// Key for addr_to_link reverse lookup.
398type AddrKey = (TransportId, TransportAddr);
399
400/// Per-transport kernel drop tracking for congestion detection.
401///
402/// Sampled every tick (1s). The `dropping` flag indicates whether new
403/// kernel drops were observed since the previous sample.
404#[derive(Debug, Default)]
405struct TransportDropState {
406    /// Previous `recv_drops` sample (cumulative counter).
407    prev_drops: u64,
408    /// True if drops increased since the last sample.
409    dropping: bool,
410}
411
412/// State for a link waiting for transport-level connection establishment.
413///
414/// For connection-oriented transports (TCP, Tor), the transport connect runs
415/// asynchronously. This struct holds the data needed to complete the handshake
416/// once the connection is ready.
417struct PendingConnect {
418    /// The link that was created for this connection.
419    link_id: LinkId,
420    /// Which transport is being used.
421    transport_id: TransportId,
422    /// The remote address being connected to.
423    remote_addr: TransportAddr,
424    /// The peer identity (for handshake initiation).
425    peer_identity: PeerIdentity,
426}
427
428/// A running FIPS node instance.
429///
430/// This is the top-level container holding all node state.
431///
432/// ## Peer Lifecycle
433///
434/// Peers go through two phases:
435/// 1. **Connection phase** (`connections`): Handshake in progress, indexed by LinkId
436/// 2. **Active phase** (`peers`): Authenticated, indexed by NodeAddr
437///
438/// The `addr_to_link` map enables dispatching incoming packets to the right
439/// connection before authentication completes.
440// Discovery lookup constants moved to config: node.discovery.attempt_timeouts_secs, node.discovery.ttl
441pub struct Node {
442    // === Identity ===
443    /// This node's cryptographic identity.
444    identity: Identity,
445
446    /// Random epoch generated at startup for peer restart detection.
447    /// Exchanged inside Noise handshake messages so peers can detect restarts.
448    startup_epoch: [u8; 8],
449
450    /// Instant when the node was created, for uptime reporting.
451    started_at: std::time::Instant,
452
453    // === Configuration ===
454    /// Loaded configuration.
455    config: Config,
456
457    // === State ===
458    /// Node operational state.
459    state: NodeState,
460
461    /// Whether this is a leaf-only node.
462    is_leaf_only: bool,
463
464    // === Spanning Tree ===
465    /// Local spanning tree state.
466    tree_state: TreeState,
467
468    // === Bloom Filter ===
469    /// Local Bloom filter state.
470    bloom_state: BloomState,
471
472    // === Routing ===
473    /// Address -> coordinates cache (from session setup and discovery).
474    coord_cache: CoordCache,
475    /// Locally learned reverse-path next-hop hints.
476    learned_routes: LearnedRouteTable,
477    /// Recent discovery requests (dedup + reverse-path forwarding).
478    /// Maps request_id → RecentRequest.
479    recent_requests: HashMap<u64, RecentRequest>,
480    /// Per-destination path MTU lookup, keyed by FipsAddress (mirrors
481    /// `coord_cache.entries[*].path_mtu`). Sync read-only access from
482    /// the TUN reader/writer threads at TCP MSS clamp time so the
483    /// SYN/SYN-ACK clamp can use the smaller of the local-egress floor
484    /// and the learned per-destination path MTU.
485    path_mtu_lookup: Arc<std::sync::RwLock<HashMap<crate::FipsAddress, u16>>>,
486
487    // === Transports & Links ===
488    /// Active transports (owned by Node).
489    transports: HashMap<TransportId, TransportHandle>,
490    /// Per-transport kernel drop tracking for congestion detection.
491    transport_drops: HashMap<TransportId, TransportDropState>,
492    /// Active links.
493    links: HashMap<LinkId, Link>,
494    /// Reverse lookup: (transport_id, remote_addr) -> link_id.
495    addr_to_link: HashMap<AddrKey, LinkId>,
496
497    // === Packet Channel ===
498    /// Packet sender for transports.
499    packet_tx: Option<PacketTx>,
500    /// Packet receiver (for event loop).
501    packet_rx: Option<PacketRx>,
502
503    // === Connections (Handshake Phase) ===
504    /// Pending connections (handshake in progress).
505    /// Indexed by LinkId since we don't know the peer's identity yet.
506    connections: HashMap<LinkId, PeerConnection>,
507
508    // === Peers (Active Phase) ===
509    /// Authenticated peers.
510    /// Indexed by NodeAddr (verified identity).
511    peers: HashMap<NodeAddr, ActivePeer>,
512
513    // === End-to-End Sessions ===
514    /// Session table for end-to-end encrypted sessions.
515    /// Keyed by remote NodeAddr.
516    sessions: HashMap<NodeAddr, SessionEntry>,
517
518    // === Identity Cache ===
519    /// Maps FipsAddress prefix bytes (bytes 1-15) to cached peer identity data.
520    /// Enables reverse lookup from IPv6 destination to session/routing identity.
521    identity_cache: HashMap<[u8; 15], IdentityCacheEntry>,
522
523    // === Pending TUN Packets ===
524    /// Packets queued while waiting for session establishment.
525    /// Keyed by destination NodeAddr, bounded per-dest and total.
526    pending_tun_packets: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
527    /// Endpoint data payloads queued while waiting for session establishment.
528    pending_endpoint_data: HashMap<NodeAddr, VecDeque<Vec<u8>>>,
529    // === Pending Discovery Lookups ===
530    /// Tracks in-flight discovery lookups. Maps target NodeAddr to the
531    /// initiation timestamp (Unix ms). Prevents duplicate flood queries.
532    pending_lookups: HashMap<NodeAddr, handlers::discovery::PendingLookup>,
533
534    // === Resource Limits ===
535    /// Maximum connections (0 = unlimited).
536    max_connections: usize,
537    /// Maximum peers (0 = unlimited).
538    max_peers: usize,
539    /// Maximum links (0 = unlimited).
540    max_links: usize,
541
542    // === Counters ===
543    /// Next link ID to allocate.
544    next_link_id: u64,
545    /// Next transport ID to allocate.
546    next_transport_id: u32,
547
548    // === Node Statistics ===
549    /// Routing, forwarding, discovery, and error signal counters.
550    stats: stats::NodeStats,
551
552    /// Time-series history of node-level metrics (1s/1m rings).
553    stats_history: stats_history::StatsHistory,
554
555    // === TUN Interface ===
556    /// TUN device state.
557    tun_state: TunState,
558    /// TUN interface name (for cleanup).
559    tun_name: Option<String>,
560    /// TUN packet sender channel.
561    tun_tx: Option<TunTx>,
562    /// Receiver for outbound packets from the TUN reader.
563    tun_outbound_rx: Option<TunOutboundRx>,
564    /// App-owned packet sink used by embedded/no-TUN integrations.
565    external_packet_tx: Option<tokio::sync::mpsc::Sender<NodeDeliveredPacket>>,
566    /// Endpoint data command receiver used by embedded/no-daemon integrations.
567    endpoint_command_rx: Option<tokio::sync::mpsc::Receiver<NodeEndpointCommand>>,
568    /// Endpoint data event sink used by embedded/no-daemon integrations.
569    endpoint_event_tx: Option<tokio::sync::mpsc::UnboundedSender<NodeEndpointEvent>>,
570    /// Off-task FMP-encrypt + UDP-send worker pool. `None` if not yet
571    /// spawned (set up in `start()` once transports are running).
572    /// `Some(pool)` once available; the pool internally holds
573    /// per-worker mpsc senders and round-robins jobs across them.
574    /// See `node::encrypt_worker` for the rationale and layout.
575    encrypt_workers: Option<encrypt_worker::EncryptWorkerPool>,
576    /// Off-task FMP + FSP decrypt + delivery worker pool. Mirror of
577    /// `encrypt_workers` for the receive side.
578    decrypt_workers: Option<decrypt_worker::DecryptWorkerPool>,
579    /// Set of sessions that have been registered with the decrypt
580    /// shard worker pool. Used by rx_loop to decide between fast-path
581    /// dispatch (worker owns the session) and legacy in-place decrypt
582    /// (worker doesn't have it yet). Per the data-plane restructure,
583    /// the worker owns its session state directly — there's no shared
584    /// `Arc<RwLock<HashMap>>` of cipher / replay state anymore, only
585    /// this set tracks **whether** the worker has been told about a
586    /// given session.
587    decrypt_registered_sessions: std::collections::HashSet<(TransportId, u32)>,
588    /// Fallback channel: decrypt worker bounces non-fast-path packets
589    /// (anything that's not bulk EndpointData) back here for rx_loop
590    /// to handle via the legacy path. Drained by a new rx_loop arm.
591    decrypt_fallback_rx:
592        Option<tokio::sync::mpsc::UnboundedReceiver<decrypt_worker::DecryptWorkerEvent>>,
593    decrypt_fallback_tx: tokio::sync::mpsc::UnboundedSender<decrypt_worker::DecryptWorkerEvent>,
594    /// TUN reader thread handle.
595    tun_reader_handle: Option<JoinHandle<()>>,
596    /// TUN writer thread handle.
597    tun_writer_handle: Option<JoinHandle<()>>,
598    /// Shutdown pipe: writing to this fd unblocks the TUN reader thread on macOS.
599    /// On Linux, deleting the interface via netlink serves the same purpose.
600    #[cfg(target_os = "macos")]
601    tun_shutdown_fd: Option<std::os::unix::io::RawFd>,
602
603    // === DNS Responder ===
604    /// Receiver for resolved identities from the DNS responder.
605    dns_identity_rx: Option<crate::upper::dns::DnsIdentityRx>,
606    /// DNS responder task handle.
607    dns_task: Option<tokio::task::JoinHandle<()>>,
608
609    // === Index-Based Session Dispatch ===
610    /// Allocator for session indices.
611    index_allocator: IndexAllocator,
612    /// O(1) lookup: (transport_id, our_index) → NodeAddr.
613    /// This maps our session index to the peer that uses it.
614    peers_by_index: HashMap<(TransportId, u32), NodeAddr>,
615    /// Pending outbound handshakes by our sender_idx.
616    /// Tracks which LinkId corresponds to which session index.
617    pending_outbound: HashMap<(TransportId, u32), LinkId>,
618
619    // === Rate Limiting ===
620    /// Rate limiter for msg1 processing (DoS protection).
621    msg1_rate_limiter: HandshakeRateLimiter,
622    /// Rate limiter for ICMP Packet Too Big messages.
623    icmp_rate_limiter: IcmpRateLimiter,
624    /// Rate limiter for routing error signals (CoordsRequired / PathBroken).
625    routing_error_rate_limiter: RoutingErrorRateLimiter,
626    /// Rate limiter for source-side CoordsRequired/PathBroken responses.
627    coords_response_rate_limiter: RoutingErrorRateLimiter,
628    /// Backoff for failed discovery lookups (originator-side).
629    discovery_backoff: DiscoveryBackoff,
630    /// Rate limiter for forwarded discovery requests (transit-side).
631    discovery_forward_limiter: DiscoveryForwardRateLimiter,
632
633    // === Pending Transport Connects ===
634    /// Links waiting for transport-level connection establishment before
635    /// sending handshake msg1. For connection-oriented transports (TCP, Tor),
636    /// the transport connect runs in the background; the tick handler polls
637    /// connection_state() and initiates the handshake when connected.
638    pending_connects: Vec<PendingConnect>,
639
640    // === Connection Retry ===
641    /// Retry state for peers whose outbound connections have failed.
642    /// Keyed by NodeAddr. Entries are created when a handshake times out
643    /// or fails, and removed on successful promotion or when max retries
644    /// are exhausted.
645    retry_pending: HashMap<NodeAddr, retry::RetryState>,
646
647    /// Optional Nostr/STUN overlay discovery coordinator for `udp:nat` peers.
648    nostr_discovery: Option<Arc<crate::discovery::nostr::NostrDiscovery>>,
649    /// mDNS / DNS-SD responder + browser for local-link peer discovery.
650    /// Identity is unverified at this layer — the Noise XX handshake
651    /// initiated against an mDNS-observed endpoint is what proves the
652    /// peer holds the matching private key.
653    lan_discovery: Option<Arc<crate::discovery::lan::LanDiscovery>>,
654    /// Wall-clock ms when Nostr discovery successfully started, used to
655    /// schedule the one-shot startup advert sweep after a settle delay.
656    /// `None` until discovery comes up; remains `None` if discovery is
657    /// disabled or failed to start.
658    nostr_discovery_started_at_ms: Option<u64>,
659    /// Whether the one-shot startup advert sweep has run. Set to true
660    /// after the first sweep fires (under `policy: open`); thereafter
661    /// only the per-tick `queue_open_discovery_retries` continues.
662    startup_open_discovery_sweep_done: bool,
663    /// Per-peer UDP transports adopted from NAT traversal handoff.
664    bootstrap_transports: HashSet<TransportId>,
665    /// Originating peer npub (bech32) for each adopted bootstrap
666    /// transport, captured at `adopt_established_traversal` time.
667    /// Populated alongside `bootstrap_transports`; cleared in
668    /// `cleanup_bootstrap_transport_if_unused`. Used by the rx loop to
669    /// route fatal-protocol-mismatch observations back to the
670    /// Nostr-discovery `failure_state` for long cooldown application.
671    bootstrap_transport_npubs: HashMap<TransportId, String>,
672    /// Peers that should not be used as reply-learned fallback transit for
673    /// other destinations. Direct lookups to the peer are still permitted.
674    discovery_fallback_transit_blocked_peers: HashSet<NodeAddr>,
675
676    // === Periodic Parent Re-evaluation ===
677    /// Timestamp of last periodic parent re-evaluation (for pacing).
678    last_parent_reeval: Option<crate::time::Instant>,
679
680    // === Congestion Logging ===
681    /// Timestamp of last congestion detection log (rate-limited to 5s).
682    last_congestion_log: Option<std::time::Instant>,
683
684    // === Mesh Size Estimate ===
685    /// Cached estimated mesh size (computed once per tick from bloom filters).
686    estimated_mesh_size: Option<u64>,
687    /// Timestamp of last mesh size log emission.
688    last_mesh_size_log: Option<std::time::Instant>,
689
690    // === Bloom Self-Plausibility ===
691    /// Rate-limit state for the self-plausibility WARN. Fires at most
692    /// once per 60s globally when our own outgoing FilterAnnounce has
693    /// an FPR above `node.bloom.max_inbound_fpr`, signalling either
694    /// aggregation drift or an ingress bypass.
695    last_self_warn: Option<std::time::Instant>,
696
697    // === Local Outbound Liveness ===
698    /// Set when a `transport.send` returned a local-side io error
699    /// (`NetworkUnreachable` / `HostUnreachable` / `AddrNotAvailable`),
700    /// cleared on the next successful send. Used by
701    /// `check_link_heartbeats` to compress the dead-timeout to
702    /// `fast_link_dead_timeout_secs` while our outbound is observed
703    /// broken — direct kernel evidence beats waiting on receive-silence.
704    last_local_send_failure_at: Option<std::time::Instant>,
705
706    // === Display Names ===
707    /// Human-readable names for configured peers (alias or short npub).
708    /// Populated at startup from peer config.
709    peer_aliases: HashMap<NodeAddr, String>,
710
711    /// Reloadable peer ACL state from standard allow/deny files.
712    peer_acl: acl::PeerAclReloader,
713
714    // === Host Map ===
715    /// Static hostname → npub mapping for DNS resolution.
716    /// Built at construction from peer aliases and /etc/fips/hosts.
717    host_map: Arc<HostMap>,
718}
719
720impl Node {
721    /// Create a new node from configuration.
722    pub fn new(config: Config) -> Result<Self, NodeError> {
723        config.validate()?;
724        let identity = config.create_identity()?;
725        let node_addr = *identity.node_addr();
726        let is_leaf_only = config.is_leaf_only();
727
728        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
729        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
730
731        let mut startup_epoch = [0u8; 8];
732        rand::rng().fill_bytes(&mut startup_epoch);
733
734        let mut bloom_state = if is_leaf_only {
735            BloomState::leaf_only(node_addr)
736        } else {
737            BloomState::new(node_addr)
738        };
739        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
740
741        let tun_state = if config.tun.enabled {
742            TunState::Configured
743        } else {
744            TunState::Disabled
745        };
746
747        // Initialize tree state with signed self-declaration
748        let mut tree_state = TreeState::new(node_addr);
749        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
750        tree_state.set_hold_down(config.node.tree.hold_down_secs);
751        tree_state.set_flap_dampening(
752            config.node.tree.flap_threshold,
753            config.node.tree.flap_window_secs,
754            config.node.tree.flap_dampening_secs,
755        );
756        tree_state
757            .sign_declaration(&identity)
758            .expect("signing own declaration should never fail");
759
760        let coord_cache = CoordCache::new(
761            config.node.cache.coord_size,
762            config.node.cache.coord_ttl_secs * 1000,
763        );
764        let rl = &config.node.rate_limit;
765        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
766            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
767            config.node.limits.max_pending_inbound,
768        );
769
770        let max_connections = config.node.limits.max_connections;
771        let max_peers = config.node.limits.max_peers;
772        let max_links = config.node.limits.max_links;
773        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
774        let backoff_base_secs = config.node.discovery.backoff_base_secs;
775        let backoff_max_secs = config.node.discovery.backoff_max_secs;
776        let forward_min_interval_secs = config.node.discovery.forward_min_interval_secs;
777
778        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
779
780        Ok(Self {
781            identity,
782            startup_epoch,
783            started_at: std::time::Instant::now(),
784            config,
785            state: NodeState::Created,
786            is_leaf_only,
787            tree_state,
788            bloom_state,
789            coord_cache,
790            learned_routes: LearnedRouteTable::default(),
791            recent_requests: HashMap::new(),
792            transports: HashMap::new(),
793            transport_drops: HashMap::new(),
794            links: HashMap::new(),
795            addr_to_link: HashMap::new(),
796            packet_tx: None,
797            packet_rx: None,
798            connections: HashMap::new(),
799            peers: HashMap::new(),
800            sessions: HashMap::new(),
801            identity_cache: HashMap::new(),
802            pending_tun_packets: HashMap::new(),
803            pending_endpoint_data: HashMap::new(),
804            pending_lookups: HashMap::new(),
805            max_connections,
806            max_peers,
807            max_links,
808            next_link_id: 1,
809            next_transport_id: 1,
810            stats: stats::NodeStats::new(),
811            stats_history: stats_history::StatsHistory::new(),
812            tun_state,
813            tun_name: None,
814            tun_tx: None,
815            tun_outbound_rx: None,
816            external_packet_tx: None,
817            endpoint_command_rx: None,
818            endpoint_event_tx: None,
819            encrypt_workers: None,
820            decrypt_workers: None,
821            decrypt_registered_sessions: std::collections::HashSet::new(),
822            decrypt_fallback_tx,
823            decrypt_fallback_rx,
824            tun_reader_handle: None,
825            tun_writer_handle: None,
826            #[cfg(target_os = "macos")]
827            tun_shutdown_fd: None,
828            dns_identity_rx: None,
829            dns_task: None,
830            index_allocator: IndexAllocator::new(),
831            peers_by_index: HashMap::new(),
832            pending_outbound: HashMap::new(),
833            msg1_rate_limiter,
834            icmp_rate_limiter: IcmpRateLimiter::new(),
835            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
836            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
837                std::time::Duration::from_millis(coords_response_interval_ms),
838            ),
839            discovery_backoff: DiscoveryBackoff::with_params(backoff_base_secs, backoff_max_secs),
840            discovery_forward_limiter: DiscoveryForwardRateLimiter::with_interval(
841                std::time::Duration::from_secs(forward_min_interval_secs),
842            ),
843            pending_connects: Vec::new(),
844            retry_pending: HashMap::new(),
845            nostr_discovery: None,
846            nostr_discovery_started_at_ms: None,
847            lan_discovery: None,
848            startup_open_discovery_sweep_done: false,
849            bootstrap_transports: HashSet::new(),
850            bootstrap_transport_npubs: HashMap::new(),
851            discovery_fallback_transit_blocked_peers: HashSet::new(),
852            last_parent_reeval: None,
853            last_congestion_log: None,
854            estimated_mesh_size: None,
855            last_mesh_size_log: None,
856            last_self_warn: None,
857            last_local_send_failure_at: None,
858            peer_aliases: HashMap::new(),
859            peer_acl,
860            host_map,
861            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
862        })
863    }
864
865    /// Create a node with a specific identity.
866    ///
867    /// This constructor validates cross-field config invariants before
868    /// constructing the node, same as [`Node::new`].
869    pub fn with_identity(identity: Identity, config: Config) -> Result<Self, NodeError> {
870        config.validate()?;
871        let node_addr = *identity.node_addr();
872
873        let (decrypt_fallback_tx, decrypt_fallback_rx) = tokio::sync::mpsc::unbounded_channel();
874        let decrypt_fallback_rx = Some(decrypt_fallback_rx);
875
876        let mut startup_epoch = [0u8; 8];
877        rand::rng().fill_bytes(&mut startup_epoch);
878
879        let tun_state = if config.tun.enabled {
880            TunState::Configured
881        } else {
882            TunState::Disabled
883        };
884
885        // Initialize tree state with signed self-declaration
886        let mut tree_state = TreeState::new(node_addr);
887        tree_state.set_parent_hysteresis(config.node.tree.parent_hysteresis);
888        tree_state.set_hold_down(config.node.tree.hold_down_secs);
889        tree_state.set_flap_dampening(
890            config.node.tree.flap_threshold,
891            config.node.tree.flap_window_secs,
892            config.node.tree.flap_dampening_secs,
893        );
894        tree_state
895            .sign_declaration(&identity)
896            .expect("signing own declaration should never fail");
897
898        let mut bloom_state = BloomState::new(node_addr);
899        bloom_state.set_update_debounce_ms(config.node.bloom.update_debounce_ms);
900
901        let coord_cache = CoordCache::new(
902            config.node.cache.coord_size,
903            config.node.cache.coord_ttl_secs * 1000,
904        );
905        let rl = &config.node.rate_limit;
906        let msg1_rate_limiter = HandshakeRateLimiter::with_params(
907            rate_limit::TokenBucket::with_params(rl.handshake_burst, rl.handshake_rate),
908            config.node.limits.max_pending_inbound,
909        );
910
911        let max_connections = config.node.limits.max_connections;
912        let max_peers = config.node.limits.max_peers;
913        let max_links = config.node.limits.max_links;
914        let coords_response_interval_ms = config.node.session.coords_response_interval_ms;
915
916        let (host_map, peer_acl) = Self::host_map_and_peer_acl(&config);
917
918        Ok(Self {
919            identity,
920            startup_epoch,
921            started_at: std::time::Instant::now(),
922            config,
923            state: NodeState::Created,
924            is_leaf_only: false,
925            tree_state,
926            bloom_state,
927            coord_cache,
928            learned_routes: LearnedRouteTable::default(),
929            recent_requests: HashMap::new(),
930            transports: HashMap::new(),
931            transport_drops: HashMap::new(),
932            links: HashMap::new(),
933            addr_to_link: HashMap::new(),
934            packet_tx: None,
935            packet_rx: None,
936            connections: HashMap::new(),
937            peers: HashMap::new(),
938            sessions: HashMap::new(),
939            identity_cache: HashMap::new(),
940            pending_tun_packets: HashMap::new(),
941            pending_endpoint_data: HashMap::new(),
942            pending_lookups: HashMap::new(),
943            max_connections,
944            max_peers,
945            max_links,
946            next_link_id: 1,
947            next_transport_id: 1,
948            stats: stats::NodeStats::new(),
949            stats_history: stats_history::StatsHistory::new(),
950            tun_state,
951            tun_name: None,
952            tun_tx: None,
953            tun_outbound_rx: None,
954            external_packet_tx: None,
955            endpoint_command_rx: None,
956            endpoint_event_tx: None,
957            encrypt_workers: None,
958            decrypt_workers: None,
959            decrypt_registered_sessions: std::collections::HashSet::new(),
960            decrypt_fallback_tx,
961            decrypt_fallback_rx,
962            tun_reader_handle: None,
963            tun_writer_handle: None,
964            #[cfg(target_os = "macos")]
965            tun_shutdown_fd: None,
966            dns_identity_rx: None,
967            dns_task: None,
968            index_allocator: IndexAllocator::new(),
969            peers_by_index: HashMap::new(),
970            pending_outbound: HashMap::new(),
971            msg1_rate_limiter,
972            icmp_rate_limiter: IcmpRateLimiter::new(),
973            routing_error_rate_limiter: RoutingErrorRateLimiter::new(),
974            coords_response_rate_limiter: RoutingErrorRateLimiter::with_interval(
975                std::time::Duration::from_millis(coords_response_interval_ms),
976            ),
977            discovery_backoff: DiscoveryBackoff::new(),
978            discovery_forward_limiter: DiscoveryForwardRateLimiter::new(),
979            pending_connects: Vec::new(),
980            retry_pending: HashMap::new(),
981            nostr_discovery: None,
982            nostr_discovery_started_at_ms: None,
983            lan_discovery: None,
984            startup_open_discovery_sweep_done: false,
985            bootstrap_transports: HashSet::new(),
986            bootstrap_transport_npubs: HashMap::new(),
987            discovery_fallback_transit_blocked_peers: HashSet::new(),
988            last_parent_reeval: None,
989            last_congestion_log: None,
990            estimated_mesh_size: None,
991            last_mesh_size_log: None,
992            last_self_warn: None,
993            last_local_send_failure_at: None,
994            peer_aliases: HashMap::new(),
995            peer_acl,
996            host_map,
997            path_mtu_lookup: Arc::new(std::sync::RwLock::new(HashMap::new())),
998        })
999    }
1000
1001    /// Create a leaf-only node (simplified state).
1002    pub fn leaf_only(config: Config) -> Result<Self, NodeError> {
1003        let mut node = Self::new(config)?;
1004        node.is_leaf_only = true;
1005        node.bloom_state = BloomState::leaf_only(*node.identity.node_addr());
1006        Ok(node)
1007    }
1008
1009    fn host_map_and_peer_acl(config: &Config) -> (Arc<HostMap>, acl::PeerAclReloader) {
1010        let base_host_map = HostMap::from_peer_configs(config.peers());
1011        if !config.node.system_files_enabled {
1012            return (
1013                Arc::new(base_host_map.clone()),
1014                acl::PeerAclReloader::memory_only(base_host_map),
1015            );
1016        }
1017
1018        let mut host_map = base_host_map.clone();
1019        let hosts_path = std::path::PathBuf::from(crate::upper::hosts::DEFAULT_HOSTS_PATH);
1020        let hosts_file = HostMap::load_hosts_file(std::path::Path::new(
1021            crate::upper::hosts::DEFAULT_HOSTS_PATH,
1022        ));
1023        host_map.merge(hosts_file);
1024        let peer_acl = acl::PeerAclReloader::with_alias_sources(
1025            std::path::PathBuf::from(acl::DEFAULT_PEERS_ALLOW_PATH),
1026            std::path::PathBuf::from(acl::DEFAULT_PEERS_DENY_PATH),
1027            base_host_map,
1028            hosts_path,
1029        );
1030        (Arc::new(host_map), peer_acl)
1031    }
1032
1033    /// Create transport instances from configuration.
1034    ///
1035    /// Returns a vector of TransportHandles for all configured transports.
1036    async fn create_transports(&mut self, packet_tx: &PacketTx) -> Vec<TransportHandle> {
1037        let mut transports = Vec::new();
1038
1039        // Collect UDP configs with optional names to avoid borrow conflicts
1040        let udp_instances: Vec<_> = self
1041            .config
1042            .transports
1043            .udp
1044            .iter()
1045            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1046            .collect();
1047
1048        // Create UDP transport instances
1049        for (name, udp_config) in udp_instances {
1050            let transport_id = self.allocate_transport_id();
1051            let udp = UdpTransport::new(transport_id, name, udp_config, packet_tx.clone());
1052            transports.push(TransportHandle::Udp(udp));
1053        }
1054
1055        #[cfg(feature = "sim-transport")]
1056        {
1057            let sim_instances: Vec<_> = self
1058                .config
1059                .transports
1060                .sim
1061                .iter()
1062                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1063                .collect();
1064
1065            for (name, sim_config) in sim_instances {
1066                let transport_id = self.allocate_transport_id();
1067                let sim = crate::transport::sim::SimTransport::new(
1068                    transport_id,
1069                    name,
1070                    sim_config,
1071                    packet_tx.clone(),
1072                );
1073                transports.push(TransportHandle::Sim(sim));
1074            }
1075        }
1076
1077        // Create Ethernet transport instances where raw-socket support exists.
1078        #[cfg(any(target_os = "linux", target_os = "macos"))]
1079        {
1080            let eth_instances: Vec<_> = self
1081                .config
1082                .transports
1083                .ethernet
1084                .iter()
1085                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1086                .collect();
1087            let xonly = self.identity.pubkey();
1088            for (name, eth_config) in eth_instances {
1089                let transport_id = self.allocate_transport_id();
1090                let mut eth =
1091                    EthernetTransport::new(transport_id, name, eth_config, packet_tx.clone());
1092                eth.set_local_pubkey(xonly);
1093                transports.push(TransportHandle::Ethernet(eth));
1094            }
1095        }
1096
1097        // Create TCP transport instances
1098        let tcp_instances: Vec<_> = self
1099            .config
1100            .transports
1101            .tcp
1102            .iter()
1103            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1104            .collect();
1105
1106        for (name, tcp_config) in tcp_instances {
1107            let transport_id = self.allocate_transport_id();
1108            let tcp = TcpTransport::new(transport_id, name, tcp_config, packet_tx.clone());
1109            transports.push(TransportHandle::Tcp(tcp));
1110        }
1111
1112        // Create Tor transport instances
1113        let tor_instances: Vec<_> = self
1114            .config
1115            .transports
1116            .tor
1117            .iter()
1118            .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1119            .collect();
1120
1121        for (name, tor_config) in tor_instances {
1122            let transport_id = self.allocate_transport_id();
1123            let tor = TorTransport::new(transport_id, name, tor_config, packet_tx.clone());
1124            transports.push(TransportHandle::Tor(tor));
1125        }
1126
1127        // Create BLE transport instances
1128        #[cfg(bluer_available)]
1129        {
1130            let ble_instances: Vec<_> = self
1131                .config
1132                .transports
1133                .ble
1134                .iter()
1135                .map(|(name, config)| (name.map(|s| s.to_string()), config.clone()))
1136                .collect();
1137
1138            #[cfg(all(bluer_available, not(test)))]
1139            for (name, ble_config) in ble_instances {
1140                let transport_id = self.allocate_transport_id();
1141                let adapter = ble_config.adapter().to_string();
1142                let mtu = ble_config.mtu();
1143                match crate::transport::ble::io::BluerIo::new(&adapter, mtu).await {
1144                    Ok(io) => {
1145                        let mut ble = crate::transport::ble::BleTransport::new(
1146                            transport_id,
1147                            name,
1148                            ble_config,
1149                            io,
1150                            packet_tx.clone(),
1151                        );
1152                        ble.set_local_pubkey(self.identity.pubkey().serialize());
1153                        transports.push(TransportHandle::Ble(ble));
1154                    }
1155                    Err(e) => {
1156                        tracing::warn!(adapter = %adapter, error = %e, "failed to initialize BLE adapter");
1157                    }
1158                }
1159            }
1160
1161            #[cfg(any(not(bluer_available), test))]
1162            if !ble_instances.is_empty() {
1163                #[cfg(not(test))]
1164                tracing::warn!("BLE transport configured but this build lacks BlueZ support");
1165            }
1166        }
1167
1168        transports
1169    }
1170
1171    /// Find an operational transport that matches the given transport type name.
1172    ///
1173    /// Adopted UDP bootstrap transports are point-to-point sockets handed off
1174    /// from Nostr/STUN traversal. They must not be reused for ordinary
1175    /// `udp host:port` dials discovered through static config, mDNS, or overlay
1176    /// adverts: on macOS a `send_to` through the wrong adopted socket can fail
1177    /// with `EINVAL`, and even on platforms that allow it the packet would use
1178    /// the wrong 5-tuple/NAT mapping. Prefer configured transports and make the
1179    /// choice deterministic by lowest transport id instead of HashMap order.
1180    fn find_transport_for_type(&self, transport_type: &str) -> Option<TransportId> {
1181        self.transports
1182            .iter()
1183            .filter(|(id, handle)| {
1184                handle.transport_type().name == transport_type
1185                    && handle.is_operational()
1186                    && !self.bootstrap_transports.contains(id)
1187            })
1188            .min_by_key(|(id, _)| id.as_u32())
1189            .map(|(id, _)| *id)
1190    }
1191
1192    /// Resolve an Ethernet peer address ("interface/mac") to a transport ID
1193    /// and binary TransportAddr.
1194    ///
1195    /// Finds the Ethernet transport instance bound to the named interface
1196    /// and parses the MAC portion into a 6-byte TransportAddr.
1197    #[allow(unused_variables)]
1198    fn resolve_ethernet_addr(
1199        &self,
1200        addr_str: &str,
1201    ) -> Result<(TransportId, TransportAddr), NodeError> {
1202        #[cfg(any(target_os = "linux", target_os = "macos"))]
1203        {
1204            let (iface, mac_str) = addr_str.split_once('/').ok_or_else(|| {
1205                NodeError::NoTransportForType(format!(
1206                    "invalid Ethernet address format '{}': expected 'interface/mac'",
1207                    addr_str
1208                ))
1209            })?;
1210
1211            // Find the Ethernet transport bound to this interface
1212            let transport_id = self
1213                .transports
1214                .iter()
1215                .find(|(_, handle)| {
1216                    handle.transport_type().name == "ethernet"
1217                        && handle.is_operational()
1218                        && handle.interface_name() == Some(iface)
1219                })
1220                .map(|(id, _)| *id)
1221                .ok_or_else(|| {
1222                    NodeError::NoTransportForType(format!(
1223                        "no operational Ethernet transport for interface '{}'",
1224                        iface
1225                    ))
1226                })?;
1227
1228            let mac = crate::transport::ethernet::parse_mac_string(mac_str).map_err(|e| {
1229                NodeError::NoTransportForType(format!("invalid MAC in '{}': {}", addr_str, e))
1230            })?;
1231
1232            Ok((transport_id, TransportAddr::from_bytes(&mac)))
1233        }
1234        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
1235        {
1236            Err(NodeError::NoTransportForType(
1237                "Ethernet transport is not supported on this platform".to_string(),
1238            ))
1239        }
1240    }
1241
1242    /// Resolve a BLE address string (`"adapter/AA:BB:CC:DD:EE:FF"`) to a
1243    /// (TransportId, TransportAddr) pair by finding the BLE transport
1244    /// instance matching the adapter name.
1245    #[cfg(bluer_available)]
1246    fn resolve_ble_addr(&self, addr_str: &str) -> Result<(TransportId, TransportAddr), NodeError> {
1247        let ta = TransportAddr::from_string(addr_str);
1248        let adapter = crate::transport::ble::addr::adapter_from_addr(&ta).ok_or_else(|| {
1249            NodeError::NoTransportForType(format!(
1250                "invalid BLE address format '{}': expected 'adapter/mac'",
1251                addr_str
1252            ))
1253        })?;
1254
1255        // Find the BLE transport for this adapter
1256        let transport_id = self
1257            .transports
1258            .iter()
1259            .find(|(_, handle)| handle.transport_type().name == "ble" && handle.is_operational())
1260            .map(|(id, _)| *id)
1261            .ok_or_else(|| {
1262                NodeError::NoTransportForType(format!(
1263                    "no operational BLE transport for adapter '{}'",
1264                    adapter
1265                ))
1266            })?;
1267
1268        // Validate the address format
1269        crate::transport::ble::addr::BleAddr::parse(addr_str).map_err(|e| {
1270            NodeError::NoTransportForType(format!("invalid BLE address '{}': {}", addr_str, e))
1271        })?;
1272
1273        Ok((transport_id, TransportAddr::from_string(addr_str)))
1274    }
1275
1276    // === Identity Accessors ===
1277
1278    /// Get this node's identity.
1279    pub fn identity(&self) -> &Identity {
1280        &self.identity
1281    }
1282
1283    /// Get this node's NodeAddr.
1284    pub fn node_addr(&self) -> &NodeAddr {
1285        self.identity.node_addr()
1286    }
1287
1288    /// Get this node's npub.
1289    pub fn npub(&self) -> String {
1290        self.identity.npub()
1291    }
1292
1293    /// Return a human-readable display name for a NodeAddr.
1294    ///
1295    /// Lookup order:
1296    /// 1. Host map hostname (from peer aliases + /etc/fips/hosts)
1297    /// 2. Configured peer alias or short npub (from startup map)
1298    /// 3. Active peer's short npub (e.g., inbound peer not in config)
1299    /// 4. Session endpoint's short npub (end-to-end, may not be direct peer)
1300    /// 5. Truncated NodeAddr hex (unknown address)
1301    pub(crate) fn peer_display_name(&self, addr: &NodeAddr) -> String {
1302        if let Some(hostname) = self.host_map.lookup_hostname(addr) {
1303            return hostname.to_string();
1304        }
1305        if let Some(name) = self.peer_aliases.get(addr) {
1306            return name.clone();
1307        }
1308        if let Some(peer) = self.peers.get(addr) {
1309            return peer.identity().short_npub();
1310        }
1311        if let Some(entry) = self.sessions.get(addr) {
1312            let (xonly, _) = entry.remote_pubkey().x_only_public_key();
1313            return PeerIdentity::from_pubkey(xonly).short_npub();
1314        }
1315        addr.short_hex()
1316    }
1317
1318    /// Tear down a `peers_by_index` entry **and** keep the shard-owned
1319    /// decrypt-worker state coherent: removes the same `cache_key`
1320    /// from the registered-sessions tracking set and tells the
1321    /// assigned shard worker to drop its `OwnedSessionState` entry.
1322    ///
1323    /// Use this instead of a bare `self.peers_by_index.remove(&key)`
1324    /// at every session-lifecycle teardown site (rekey cross-connection
1325    /// swap, peer disconnect, dispatch session-rotation) so the worker
1326    /// doesn't keep stale ciphers / replay windows around. The
1327    /// follow-up `RegisterSession` for the NEW key (if any) will then
1328    /// install the fresh state on the same shard.
1329    pub(in crate::node) fn deregister_session_index(&mut self, cache_key: (TransportId, u32)) {
1330        // Find the peer that owns this index BEFORE removing it from
1331        // the index map, so we can decide whether the deregistration
1332        // also tears down the peer's connected UDP socket.
1333        let owning_peer = self.peers_by_index.get(&cache_key).copied();
1334        self.peers_by_index.remove(&cache_key);
1335        if self.decrypt_registered_sessions.remove(&cache_key)
1336            && let Some(workers) = self.decrypt_workers.as_ref()
1337        {
1338            workers.unregister_session(cache_key);
1339        }
1340        // Tear down the per-peer connected UDP socket *only* if no
1341        // other peers_by_index entry still resolves to this peer.
1342        // Rekey drain calls into this helper with the OLD session
1343        // index while the NEW index is already installed and points
1344        // at the same peer — there the connect()-ed 5-tuple is
1345        // still valid for the new session and we must not close it.
1346        // Peer-teardown sites (CrossConnection swap, stale-index
1347        // fall-through in encrypted.rs, disconnect handler) call
1348        // here when this is the peer's last index, so the connected
1349        // socket goes away with the peer.
1350        if let Some(peer_addr) = owning_peer {
1351            let peer_has_other_index = self
1352                .peers_by_index
1353                .values()
1354                .any(|other| *other == peer_addr);
1355            if !peer_has_other_index {
1356                self.clear_connected_udp_for_peer(&peer_addr);
1357            }
1358        }
1359    }
1360
1361    /// Ensure the current FMP receive index resolves to this peer.
1362    ///
1363    /// Rekey msg1/msg2 handlers pre-register the pending index before
1364    /// cutover, but losing that registration in a debug build used to
1365    /// panic in the cutover path. Repairing the map here is safe: the
1366    /// peer has already promoted the pending session, and the decrypt
1367    /// worker registration immediately after cutover depends on the
1368    /// same `(transport_id, our_index)` key.
1369    pub(in crate::node) fn ensure_current_session_index_registered(
1370        &mut self,
1371        node_addr: &NodeAddr,
1372        context: &'static str,
1373    ) -> bool {
1374        let Some(peer) = self.peers.get(node_addr) else {
1375            return false;
1376        };
1377        let Some(transport_id) = peer.transport_id() else {
1378            warn!(
1379                peer = %self.peer_display_name(node_addr),
1380                context,
1381                "Cannot register current session index without transport id"
1382            );
1383            return false;
1384        };
1385        let Some(our_index) = peer.our_index() else {
1386            warn!(
1387                peer = %self.peer_display_name(node_addr),
1388                context,
1389                "Cannot register current session index without local index"
1390            );
1391            return false;
1392        };
1393
1394        let cache_key = (transport_id, our_index.as_u32());
1395        match self.peers_by_index.get(&cache_key).copied() {
1396            Some(existing) if existing == *node_addr => true,
1397            Some(existing) => {
1398                warn!(
1399                    peer = %self.peer_display_name(node_addr),
1400                    previous_owner = %self.peer_display_name(&existing),
1401                    transport_id = %transport_id,
1402                    our_index = %our_index,
1403                    context,
1404                    "Repairing current session index with stale owner"
1405                );
1406                self.peers_by_index.insert(cache_key, *node_addr);
1407                true
1408            }
1409            None => {
1410                warn!(
1411                    peer = %self.peer_display_name(node_addr),
1412                    transport_id = %transport_id,
1413                    our_index = %our_index,
1414                    context,
1415                    "Repairing missing current session index"
1416                );
1417                self.peers_by_index.insert(cache_key, *node_addr);
1418                true
1419            }
1420        }
1421    }
1422
1423    // === Configuration ===
1424
1425    /// Get the configuration.
1426    pub fn config(&self) -> &Config {
1427        &self.config
1428    }
1429
1430    /// Calculate the effective IPv6 MTU that can be sent over FIPS.
1431    ///
1432    /// Delegates to `upper::icmp::effective_ipv6_mtu()` with this node's
1433    /// transport MTU. Returns the maximum IPv6 packet size (including
1434    /// IPv6 header) that can be transmitted through the FIPS mesh.
1435    pub fn effective_ipv6_mtu(&self) -> u16 {
1436        crate::upper::icmp::effective_ipv6_mtu(self.transport_mtu())
1437    }
1438
1439    /// Get the transport MTU governing the global TUN-boundary MSS clamp.
1440    ///
1441    /// Returns the **minimum** MTU across all operational transports, or
1442    /// 1280 (IPv6 minimum) as fallback. Used for initial TUN configuration
1443    /// where a specific egress transport isn't yet known: the resulting
1444    /// `effective_ipv6_mtu` (transport_mtu - 77) and `max_mss`
1445    /// (effective_mtu - 60) form a conservative ceiling that fits ANY
1446    /// configured-transport's egress, eliminating PMTU-D black holes that
1447    /// would otherwise occur when a flow's actual egress is smaller than
1448    /// the clamp ceiling assumed at TUN init.
1449    ///
1450    /// Returning the smallest (rather than the first-iterated, which used
1451    /// to vary across HashMap iteration order + async-startup race) makes
1452    /// the clamp deterministic across daemon restarts.
1453    ///
1454    /// See `ISSUE-2026-0011` for the empirical investigation.
1455    pub fn transport_mtu(&self) -> u16 {
1456        let min_operational = self
1457            .transports
1458            .values()
1459            .filter(|h| h.is_operational())
1460            .map(|h| h.mtu())
1461            .min();
1462        if let Some(mtu) = min_operational {
1463            return mtu;
1464        }
1465        // Fallback to config: try UDP first, then Ethernet
1466        if let Some((_, cfg)) = self.config.transports.udp.iter().next() {
1467            return cfg.mtu();
1468        }
1469        1280
1470    }
1471
1472    // === State ===
1473
1474    /// Get the node state.
1475    pub fn state(&self) -> NodeState {
1476        self.state
1477    }
1478
1479    /// Get the node uptime.
1480    pub fn uptime(&self) -> std::time::Duration {
1481        self.started_at.elapsed()
1482    }
1483
1484    /// Check if node is operational.
1485    pub fn is_running(&self) -> bool {
1486        self.state.is_operational()
1487    }
1488
1489    /// Check if this is a leaf-only node.
1490    pub fn is_leaf_only(&self) -> bool {
1491        self.is_leaf_only
1492    }
1493
1494    // === Tree State ===
1495
1496    /// Get the tree state.
1497    pub fn tree_state(&self) -> &TreeState {
1498        &self.tree_state
1499    }
1500
1501    /// Get mutable tree state.
1502    pub fn tree_state_mut(&mut self) -> &mut TreeState {
1503        &mut self.tree_state
1504    }
1505
1506    // === Bloom State ===
1507
1508    /// Get the Bloom filter state.
1509    pub fn bloom_state(&self) -> &BloomState {
1510        &self.bloom_state
1511    }
1512
1513    /// Get mutable Bloom filter state.
1514    pub fn bloom_state_mut(&mut self) -> &mut BloomState {
1515        &mut self.bloom_state
1516    }
1517
1518    // === Mesh Size Estimate ===
1519
1520    /// Get the cached estimated mesh size.
1521    pub fn estimated_mesh_size(&self) -> Option<u64> {
1522        self.estimated_mesh_size
1523    }
1524
1525    /// Compute and cache the estimated mesh size from bloom filters.
1526    ///
1527    /// Uses the spanning tree partition: parent's filter covers nodes reachable
1528    /// upward, children's filters cover disjoint subtrees downward. The sum
1529    /// of estimated entry counts plus one (self) approximates total network size.
1530    pub(crate) fn compute_mesh_size(&mut self) {
1531        let my_addr = *self.tree_state.my_node_addr();
1532        let parent_id = *self.tree_state.my_declaration().parent_id();
1533        let is_root = self.tree_state.is_root();
1534
1535        let max_fpr = self.config.node.bloom.max_inbound_fpr;
1536        let mut total: f64 = 1.0; // count self
1537        let mut child_count: u32 = 0;
1538        let mut has_data = false;
1539
1540        // Parent's filter: nodes reachable upward through the tree.
1541        // If any contributing filter is above the FPR cap, we refuse to
1542        // estimate rather than substitute a partial/biased aggregate —
1543        // Node.estimated_mesh_size is already Option<u64> and consumers
1544        // (control socket, fipstop, periodic debug log) handle None.
1545        if !is_root
1546            && let Some(parent) = self.peers.get(&parent_id)
1547            && let Some(filter) = parent.inbound_filter()
1548        {
1549            match filter.estimated_count(max_fpr) {
1550                Some(n) => {
1551                    total += n;
1552                    has_data = true;
1553                }
1554                None => {
1555                    self.estimated_mesh_size = None;
1556                    return;
1557                }
1558            }
1559        }
1560
1561        // Children's filters: each child's subtree is disjoint
1562        for (peer_addr, peer) in &self.peers {
1563            if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
1564                && *decl.parent_id() == my_addr
1565            {
1566                child_count += 1;
1567                if let Some(filter) = peer.inbound_filter() {
1568                    match filter.estimated_count(max_fpr) {
1569                        Some(n) => {
1570                            total += n;
1571                            has_data = true;
1572                        }
1573                        None => {
1574                            self.estimated_mesh_size = None;
1575                            return;
1576                        }
1577                    }
1578                }
1579            }
1580        }
1581
1582        if !has_data {
1583            self.estimated_mesh_size = None;
1584            return;
1585        }
1586
1587        let size = total.round() as u64;
1588        self.estimated_mesh_size = Some(size);
1589
1590        // Periodic logging (reuse MMP default interval: 30s)
1591        let now = std::time::Instant::now();
1592        let should_log = match self.last_mesh_size_log {
1593            None => true,
1594            Some(last) => {
1595                now.duration_since(last)
1596                    >= std::time::Duration::from_secs(self.config.node.mmp.log_interval_secs)
1597            }
1598        };
1599        if should_log {
1600            tracing::debug!(
1601                estimated_mesh_size = size,
1602                peers = self.peers.len(),
1603                children = child_count,
1604                "Mesh size estimate"
1605            );
1606            self.last_mesh_size_log = Some(now);
1607        }
1608    }
1609
1610    // === Coord Cache ===
1611
1612    /// Get the coordinate cache.
1613    pub fn coord_cache(&self) -> &CoordCache {
1614        &self.coord_cache
1615    }
1616
1617    /// Get mutable coordinate cache.
1618    pub fn coord_cache_mut(&mut self) -> &mut CoordCache {
1619        &mut self.coord_cache
1620    }
1621
1622    // === Node Statistics ===
1623
1624    /// Get the node statistics.
1625    pub fn stats(&self) -> &stats::NodeStats {
1626        &self.stats
1627    }
1628
1629    /// Get mutable node statistics.
1630    pub(crate) fn stats_mut(&mut self) -> &mut stats::NodeStats {
1631        &mut self.stats
1632    }
1633
1634    /// Get the stats history collector.
1635    pub fn stats_history(&self) -> &stats_history::StatsHistory {
1636        &self.stats_history
1637    }
1638
1639    /// Sample the current node state into the stats history ring.
1640    /// Called once per tick from the RX loop.
1641    pub(crate) fn record_stats_history(&mut self) {
1642        let fwd = &self.stats.forwarding;
1643        let peers_with_mmp: Vec<f64> = self
1644            .peers
1645            .values()
1646            .filter_map(|p| p.mmp().map(|m| m.metrics.loss_rate()))
1647            .collect();
1648        let loss_rate = if peers_with_mmp.is_empty() {
1649            0.0
1650        } else {
1651            peers_with_mmp.iter().sum::<f64>() / peers_with_mmp.len() as f64
1652        };
1653
1654        let snap = stats_history::Snapshot {
1655            mesh_size: self.estimated_mesh_size,
1656            tree_depth: self.tree_state.my_coords().depth() as u32,
1657            peer_count: self.peers.len() as u64,
1658            parent_switches_total: self.stats.tree.parent_switches,
1659            bytes_in_total: fwd.received_bytes,
1660            bytes_out_total: fwd.forwarded_bytes + fwd.originated_bytes,
1661            packets_in_total: fwd.received_packets,
1662            packets_out_total: fwd.forwarded_packets + fwd.originated_packets,
1663            loss_rate,
1664            active_sessions: self.sessions.len() as u64,
1665        };
1666
1667        let now = std::time::Instant::now();
1668        let peer_snaps: Vec<stats_history::PeerSnapshot> = self
1669            .peers
1670            .values()
1671            .map(|p| {
1672                let stats = p.link_stats();
1673                let (srtt_ms, loss_rate, ecn_ce) = match p.mmp() {
1674                    Some(m) => (
1675                        m.metrics.srtt_ms(),
1676                        Some(m.metrics.loss_rate()),
1677                        m.receiver.ecn_ce_count() as u64,
1678                    ),
1679                    None => (None, None, 0),
1680                };
1681                stats_history::PeerSnapshot {
1682                    node_addr: *p.node_addr(),
1683                    last_seen: now,
1684                    srtt_ms,
1685                    loss_rate,
1686                    bytes_in_total: stats.bytes_recv,
1687                    bytes_out_total: stats.bytes_sent,
1688                    packets_in_total: stats.packets_recv,
1689                    packets_out_total: stats.packets_sent,
1690                    ecn_ce_total: ecn_ce,
1691                }
1692            })
1693            .collect();
1694
1695        self.stats_history.tick(now, &snap, &peer_snaps);
1696    }
1697
1698    // === TUN Interface ===
1699
1700    /// Get the TUN state.
1701    pub fn tun_state(&self) -> TunState {
1702        self.tun_state
1703    }
1704
1705    /// Get the TUN interface name, if active.
1706    pub fn tun_name(&self) -> Option<&str> {
1707        self.tun_name.as_deref()
1708    }
1709
1710    // === Resource Limits ===
1711
1712    /// Set the maximum number of connections (handshake phase).
1713    pub fn set_max_connections(&mut self, max: usize) {
1714        self.max_connections = max;
1715    }
1716
1717    /// Set the maximum number of peers (authenticated).
1718    pub fn set_max_peers(&mut self, max: usize) {
1719        self.max_peers = max;
1720    }
1721
1722    /// Set the maximum number of links.
1723    pub fn set_max_links(&mut self, max: usize) {
1724        self.max_links = max;
1725    }
1726
1727    // === Counts ===
1728
1729    /// Number of pending connections (handshake in progress).
1730    pub fn connection_count(&self) -> usize {
1731        self.connections.len()
1732    }
1733
1734    /// Number of authenticated peers.
1735    pub fn peer_count(&self) -> usize {
1736        self.peers.len()
1737    }
1738
1739    /// Number of active links.
1740    pub fn link_count(&self) -> usize {
1741        self.links.len()
1742    }
1743
1744    /// Number of active transports.
1745    pub fn transport_count(&self) -> usize {
1746        self.transports.len()
1747    }
1748
1749    // === Transport Management ===
1750
1751    /// Allocate a new transport ID.
1752    pub fn allocate_transport_id(&mut self) -> TransportId {
1753        let id = TransportId::new(self.next_transport_id);
1754        self.next_transport_id += 1;
1755        id
1756    }
1757
1758    /// Get a transport by ID.
1759    pub fn get_transport(&self, id: &TransportId) -> Option<&TransportHandle> {
1760        self.transports.get(id)
1761    }
1762
1763    /// Get mutable transport by ID.
1764    pub fn get_transport_mut(&mut self, id: &TransportId) -> Option<&mut TransportHandle> {
1765        self.transports.get_mut(id)
1766    }
1767
1768    /// Iterate over transport IDs.
1769    pub fn transport_ids(&self) -> impl Iterator<Item = &TransportId> {
1770        self.transports.keys()
1771    }
1772
1773    /// Get the packet receiver for the event loop.
1774    pub fn packet_rx(&mut self) -> Option<&mut PacketRx> {
1775        self.packet_rx.as_mut()
1776    }
1777
1778    // === Link Management ===
1779
1780    /// Allocate a new link ID.
1781    pub fn allocate_link_id(&mut self) -> LinkId {
1782        let id = LinkId::new(self.next_link_id);
1783        self.next_link_id += 1;
1784        id
1785    }
1786
1787    /// Add a link.
1788    pub fn add_link(&mut self, link: Link) -> Result<(), NodeError> {
1789        if self.max_links > 0 && self.links.len() >= self.max_links {
1790            return Err(NodeError::MaxLinksExceeded {
1791                max: self.max_links,
1792            });
1793        }
1794        let link_id = link.link_id();
1795        let transport_id = link.transport_id();
1796        let remote_addr = link.remote_addr().clone();
1797
1798        self.links.insert(link_id, link);
1799        self.addr_to_link
1800            .insert((transport_id, remote_addr), link_id);
1801        Ok(())
1802    }
1803
1804    /// Get a link by ID.
1805    pub fn get_link(&self, link_id: &LinkId) -> Option<&Link> {
1806        self.links.get(link_id)
1807    }
1808
1809    /// Get a mutable link by ID.
1810    pub fn get_link_mut(&mut self, link_id: &LinkId) -> Option<&mut Link> {
1811        self.links.get_mut(link_id)
1812    }
1813
1814    /// Find link ID by transport address.
1815    pub fn find_link_by_addr(
1816        &self,
1817        transport_id: TransportId,
1818        addr: &TransportAddr,
1819    ) -> Option<LinkId> {
1820        self.addr_to_link
1821            .get(&(transport_id, addr.clone()))
1822            .copied()
1823    }
1824
1825    /// Remove a link.
1826    ///
1827    /// Only removes the addr_to_link reverse lookup if it still points to this
1828    /// link. In cross-connection scenarios, a newer link may have replaced the
1829    /// entry for the same address.
1830    pub fn remove_link(&mut self, link_id: &LinkId) -> Option<Link> {
1831        if let Some(link) = self.links.remove(link_id) {
1832            // Clean up reverse lookup only if it still maps to this link
1833            let key = (link.transport_id(), link.remote_addr().clone());
1834            if self.addr_to_link.get(&key) == Some(link_id) {
1835                self.addr_to_link.remove(&key);
1836            }
1837            Some(link)
1838        } else {
1839            None
1840        }
1841    }
1842
1843    pub(crate) fn cleanup_bootstrap_transport_if_unused(&mut self, transport_id: TransportId) {
1844        if !self.bootstrap_transports.contains(&transport_id) {
1845            return;
1846        }
1847
1848        let transport_in_use = self
1849            .links
1850            .values()
1851            .any(|link| link.transport_id() == transport_id)
1852            || self
1853                .connections
1854                .values()
1855                .any(|conn| conn.transport_id() == Some(transport_id))
1856            || self
1857                .peers
1858                .values()
1859                .any(|peer| peer.transport_id() == Some(transport_id))
1860            || self
1861                .pending_connects
1862                .iter()
1863                .any(|pending| pending.transport_id == transport_id);
1864
1865        if transport_in_use {
1866            return;
1867        }
1868
1869        tracing::debug!(
1870            transport_id = %transport_id,
1871            "bootstrap transport has no remaining references; dropping"
1872        );
1873
1874        self.bootstrap_transports.remove(&transport_id);
1875        self.bootstrap_transport_npubs.remove(&transport_id);
1876        self.transport_drops.remove(&transport_id);
1877        self.transports.remove(&transport_id);
1878    }
1879
1880    /// Iterate over all links.
1881    pub fn links(&self) -> impl Iterator<Item = &Link> {
1882        self.links.values()
1883    }
1884
1885    // === Connection Management (Handshake Phase) ===
1886
1887    /// Add a pending connection.
1888    pub fn add_connection(&mut self, connection: PeerConnection) -> Result<(), NodeError> {
1889        let link_id = connection.link_id();
1890
1891        if self.connections.contains_key(&link_id) {
1892            return Err(NodeError::ConnectionAlreadyExists(link_id));
1893        }
1894
1895        if self.max_connections > 0 && self.connections.len() >= self.max_connections {
1896            return Err(NodeError::MaxConnectionsExceeded {
1897                max: self.max_connections,
1898            });
1899        }
1900
1901        self.connections.insert(link_id, connection);
1902        Ok(())
1903    }
1904
1905    /// Get a connection by LinkId.
1906    pub fn get_connection(&self, link_id: &LinkId) -> Option<&PeerConnection> {
1907        self.connections.get(link_id)
1908    }
1909
1910    /// Get a mutable connection by LinkId.
1911    pub fn get_connection_mut(&mut self, link_id: &LinkId) -> Option<&mut PeerConnection> {
1912        self.connections.get_mut(link_id)
1913    }
1914
1915    /// Remove a connection.
1916    pub fn remove_connection(&mut self, link_id: &LinkId) -> Option<PeerConnection> {
1917        self.connections.remove(link_id)
1918    }
1919
1920    /// Iterate over all connections.
1921    pub fn connections(&self) -> impl Iterator<Item = &PeerConnection> {
1922        self.connections.values()
1923    }
1924
1925    // === Peer Management (Active Phase) ===
1926
1927    /// Get a peer by NodeAddr.
1928    pub fn get_peer(&self, node_addr: &NodeAddr) -> Option<&ActivePeer> {
1929        self.peers.get(node_addr)
1930    }
1931
1932    /// Get a mutable peer by NodeAddr.
1933    pub fn get_peer_mut(&mut self, node_addr: &NodeAddr) -> Option<&mut ActivePeer> {
1934        self.peers.get_mut(node_addr)
1935    }
1936
1937    /// Remove a peer.
1938    pub fn remove_peer(&mut self, node_addr: &NodeAddr) -> Option<ActivePeer> {
1939        self.peers.remove(node_addr)
1940    }
1941
1942    /// Iterate over all peers.
1943    pub fn peers(&self) -> impl Iterator<Item = &ActivePeer> {
1944        self.peers.values()
1945    }
1946
1947    /// Reference to the Nostr discovery handle if discovery is enabled.
1948    /// Used by control queries (`show_peers` per-peer Nostr-traversal
1949    /// state) to read failure-state without taking shared ownership.
1950    pub fn nostr_discovery_handle(&self) -> Option<&crate::discovery::nostr::NostrDiscovery> {
1951        self.nostr_discovery.as_deref()
1952    }
1953
1954    /// Iterate over all peer node IDs.
1955    pub fn peer_ids(&self) -> impl Iterator<Item = &NodeAddr> {
1956        self.peers.keys()
1957    }
1958
1959    /// Iterate over peers that can send traffic.
1960    pub fn sendable_peers(&self) -> impl Iterator<Item = &ActivePeer> {
1961        self.peers.values().filter(|p| p.can_send())
1962    }
1963
1964    /// Number of peers that can send traffic.
1965    pub fn sendable_peer_count(&self) -> usize {
1966        self.peers.values().filter(|p| p.can_send()).count()
1967    }
1968
1969    pub(crate) fn set_discovery_fallback_transit_allowed(
1970        &mut self,
1971        peer_addr: NodeAddr,
1972        allowed: bool,
1973    ) {
1974        if allowed {
1975            self.discovery_fallback_transit_blocked_peers
1976                .remove(&peer_addr);
1977        } else {
1978            self.discovery_fallback_transit_blocked_peers
1979                .insert(peer_addr);
1980        }
1981    }
1982
1983    pub(crate) fn configured_discovery_fallback_transit(
1984        &self,
1985        peer_addr: &NodeAddr,
1986    ) -> Option<bool> {
1987        self.config.peers().iter().find_map(|peer| {
1988            PeerIdentity::from_npub(&peer.npub)
1989                .ok()
1990                .filter(|identity| identity.node_addr() == peer_addr)
1991                .map(|_| peer.discovery_fallback_transit)
1992        })
1993    }
1994
1995    pub(crate) fn discovery_fallback_transit_for_promotion(&self, peer_addr: &NodeAddr) -> bool {
1996        if let Some(retry_state) = self.retry_pending.get(peer_addr) {
1997            return retry_state.peer_config.discovery_fallback_transit;
1998        }
1999
2000        if let Some(allowed) = self.configured_discovery_fallback_transit(peer_addr) {
2001            return allowed;
2002        }
2003
2004        self.config.node.discovery.nostr.policy != crate::config::NostrDiscoveryPolicy::Open
2005    }
2006
2007    // === End-to-End Sessions ===
2008
2009    /// Get a session by remote NodeAddr.
2010    /// Disable the discovery forward rate limiter (for tests).
2011    #[cfg(test)]
2012    pub(crate) fn disable_discovery_forward_rate_limit(&mut self) {
2013        self.discovery_forward_limiter
2014            .set_interval(std::time::Duration::ZERO);
2015    }
2016
2017    #[cfg(test)]
2018    pub(crate) fn get_session(&self, remote: &NodeAddr) -> Option<&SessionEntry> {
2019        self.sessions.get(remote)
2020    }
2021
2022    /// Get a mutable session by remote NodeAddr.
2023    #[cfg(test)]
2024    pub(crate) fn get_session_mut(&mut self, remote: &NodeAddr) -> Option<&mut SessionEntry> {
2025        self.sessions.get_mut(remote)
2026    }
2027
2028    /// Remove a session.
2029    #[cfg(test)]
2030    pub(crate) fn remove_session(&mut self, remote: &NodeAddr) -> Option<SessionEntry> {
2031        self.sessions.remove(remote)
2032    }
2033
2034    /// Read the path_mtu_lookup entry for a destination FipsAddress.
2035    #[cfg(test)]
2036    pub(crate) fn path_mtu_lookup_get(&self, fips_addr: &crate::FipsAddress) -> Option<u16> {
2037        self.path_mtu_lookup
2038            .read()
2039            .ok()
2040            .and_then(|map| map.get(fips_addr).copied())
2041    }
2042
2043    /// Write a path_mtu_lookup entry directly (for tests that pre-seed the map).
2044    #[cfg(test)]
2045    pub(crate) fn path_mtu_lookup_insert(&self, fips_addr: crate::FipsAddress, mtu: u16) {
2046        if let Ok(mut map) = self.path_mtu_lookup.write() {
2047            map.insert(fips_addr, mtu);
2048        }
2049    }
2050
2051    /// Number of end-to-end sessions.
2052    pub fn session_count(&self) -> usize {
2053        self.sessions.len()
2054    }
2055
2056    /// Iterate over all session entries (for control queries).
2057    pub(crate) fn session_entries(&self) -> impl Iterator<Item = (&NodeAddr, &SessionEntry)> {
2058        self.sessions.iter()
2059    }
2060
2061    // === Identity Cache ===
2062
2063    /// Register a node in the identity cache for FipsAddress → NodeAddr lookup.
2064    pub(crate) fn register_identity(
2065        &mut self,
2066        node_addr: NodeAddr,
2067        pubkey: secp256k1::PublicKey,
2068    ) -> bool {
2069        let mut prefix = [0u8; 15];
2070        prefix.copy_from_slice(&node_addr.as_bytes()[0..15]);
2071        if let Some(entry) = self.identity_cache.get(&prefix)
2072            && entry.node_addr == node_addr
2073            && entry.pubkey == pubkey
2074        {
2075            // Endpoint sends pass the same PeerIdentity on every packet. Once
2076            // validated, avoid re-deriving NodeAddr from the public key in the
2077            // data path; that hash showed up in macOS sender profiles.
2078            return true;
2079        }
2080
2081        let (xonly, _) = pubkey.x_only_public_key();
2082        let derived_node_addr = NodeAddr::from_pubkey(&xonly);
2083        if derived_node_addr != node_addr {
2084            debug!(
2085                claimed_node_addr = %node_addr,
2086                derived_node_addr = %derived_node_addr,
2087                "Rejected identity cache entry with mismatched public key"
2088            );
2089            return false;
2090        }
2091
2092        let now_ms = Self::now_ms();
2093        if let Some(entry) = self.identity_cache.get_mut(&prefix)
2094            && entry.node_addr == node_addr
2095        {
2096            entry.pubkey = pubkey;
2097            entry.last_seen_ms = now_ms;
2098            return true;
2099        }
2100
2101        let npub = encode_npub(&xonly);
2102        self.identity_cache.insert(
2103            prefix,
2104            IdentityCacheEntry::new(node_addr, pubkey, npub, now_ms),
2105        );
2106        // LRU eviction
2107        let max = self.config.node.cache.identity_size;
2108        if self.identity_cache.len() > max
2109            && let Some(oldest_key) = self
2110                .identity_cache
2111                .iter()
2112                .min_by_key(|(_, entry)| entry.last_seen_ms)
2113                .map(|(k, _)| *k)
2114        {
2115            self.identity_cache.remove(&oldest_key);
2116        }
2117        true
2118    }
2119
2120    /// Look up a destination by FipsAddress prefix (bytes 1-15 of the IPv6 address).
2121    pub(crate) fn lookup_by_fips_prefix(
2122        &mut self,
2123        prefix: &[u8; 15],
2124    ) -> Option<(NodeAddr, secp256k1::PublicKey)> {
2125        if let Some(entry) = self.identity_cache.get_mut(prefix) {
2126            entry.last_seen_ms = Self::now_ms(); // LRU touch
2127            Some((entry.node_addr, entry.pubkey))
2128        } else {
2129            None
2130        }
2131    }
2132
2133    /// Check if a node's identity is in the cache (without LRU touch).
2134    pub(crate) fn has_cached_identity(&self, addr: &NodeAddr) -> bool {
2135        let mut prefix = [0u8; 15];
2136        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2137        self.identity_cache.contains_key(&prefix)
2138    }
2139
2140    /// Number of identity cache entries.
2141    pub fn identity_cache_len(&self) -> usize {
2142        self.identity_cache.len()
2143    }
2144
2145    /// Iterate over identity cache entries.
2146    ///
2147    /// Returns `(NodeAddr, PublicKey, last_seen_ms)` for each cached identity.
2148    /// Used by the `show_identity_cache` control query.
2149    pub fn identity_cache_iter(
2150        &self,
2151    ) -> impl Iterator<Item = (&NodeAddr, &secp256k1::PublicKey, u64)> {
2152        self.identity_cache
2153            .values()
2154            .map(|entry| (&entry.node_addr, &entry.pubkey, entry.last_seen_ms))
2155    }
2156
2157    /// Configured maximum identity cache size.
2158    pub fn identity_cache_max(&self) -> usize {
2159        self.config.node.cache.identity_size
2160    }
2161
2162    /// Number of pending discovery lookups.
2163    pub fn pending_lookup_count(&self) -> usize {
2164        self.pending_lookups.len()
2165    }
2166
2167    /// Iterate over pending discovery lookups for diagnostics.
2168    pub fn pending_lookups_iter(
2169        &self,
2170    ) -> impl Iterator<Item = (&NodeAddr, &handlers::discovery::PendingLookup)> {
2171        self.pending_lookups.iter()
2172    }
2173
2174    /// Number of recent discovery requests tracked.
2175    pub fn recent_request_count(&self) -> usize {
2176        self.recent_requests.len()
2177    }
2178
2179    /// Count of destinations with queued TUN packets awaiting session setup.
2180    pub fn pending_tun_destinations(&self) -> usize {
2181        self.pending_tun_packets.len()
2182    }
2183
2184    /// Total TUN packets queued across all destinations.
2185    pub fn pending_tun_total_packets(&self) -> usize {
2186        self.pending_tun_packets.values().map(|q| q.len()).sum()
2187    }
2188
2189    /// Iterate over retry state for diagnostics.
2190    pub fn retry_state_iter(&self) -> impl Iterator<Item = (&NodeAddr, &retry::RetryState)> {
2191        self.retry_pending.iter()
2192    }
2193
2194    // === Routing ===
2195
2196    /// Check if a peer is a tree neighbor (parent or child in the spanning tree).
2197    ///
2198    /// Returns true if the peer is our current tree parent, or if the peer
2199    /// has declared us as their parent (making them our child).
2200    pub(crate) fn is_tree_peer(&self, peer_addr: &NodeAddr) -> bool {
2201        // Peer is our parent
2202        if !self.tree_state.is_root() && self.tree_state.my_declaration().parent_id() == peer_addr {
2203            return true;
2204        }
2205        // Peer is our child (their declaration names us as parent)
2206        if let Some(decl) = self.tree_state.peer_declaration(peer_addr)
2207            && decl.parent_id() == self.node_addr()
2208        {
2209            return true;
2210        }
2211        false
2212    }
2213
2214    /// Find next hop for a destination node address.
2215    ///
2216    /// Routing priority:
2217    /// 1. Destination is self → `None` (local delivery)
2218    /// 2. Destination is a direct peer → that peer
2219    /// 3. Reply-learned routes in `reply_learned` mode. These are locally
2220    ///    observed reverse paths, selected with weighted multipath plus
2221    ///    periodic coordinate/tree exploration.
2222    /// 4. Bloom filter candidates with cached dest coords → among peers whose
2223    ///    bloom filter contains the destination, pick the one that minimizes
2224    ///    tree distance to the destination, with
2225    ///    `(link_cost, tree_distance_to_dest, node_addr)` tie-breaking.
2226    ///    The self-distance check ensures only peers strictly closer to the
2227    ///    destination than us are considered (prevents routing loops).
2228    /// 5. Greedy tree routing fallback (requires cached dest coords)
2229    /// 6. No route → `None`
2230    ///
2231    /// Both the bloom filter and tree routing paths require cached destination
2232    /// coordinates (checked in `coord_cache`). Without coordinates, the node
2233    /// cannot make loop-free forwarding decisions. The caller should signal
2234    /// `CoordsRequired` back to the source when `None` is returned for a
2235    /// non-local destination.
2236    pub fn find_next_hop(&mut self, dest_node_addr: &NodeAddr) -> Option<&ActivePeer> {
2237        // 1. Local delivery
2238        if dest_node_addr == self.node_addr() {
2239            return None;
2240        }
2241
2242        // 2. Direct peer
2243        if let Some(peer) = self.peers.get(dest_node_addr)
2244            && peer.can_send()
2245        {
2246            return Some(peer);
2247        }
2248
2249        let now_ms = Self::now_ms();
2250
2251        let sendable_learned_peers = if self.config.node.routing.mode == RoutingMode::ReplyLearned {
2252            Some(
2253                self.peers
2254                    .iter()
2255                    .filter(|(_, peer)| peer.can_send())
2256                    .map(|(addr, _)| *addr)
2257                    .collect::<HashSet<_>>(),
2258            )
2259        } else {
2260            None
2261        };
2262
2263        // 3. Optional reply-learned routing. These entries are not peer
2264        // claims; they are local observations of which peer carried traffic
2265        // or a verified lookup response back from the destination. Most
2266        // packets use weighted multipath over learned routes, but periodic
2267        // fallback exploration lets coord/bloom/tree routes discover better
2268        // candidates.
2269        let explore_fallback = sendable_learned_peers.as_ref().is_some_and(|sendable| {
2270            self.learned_routes.should_explore_fallback(
2271                dest_node_addr,
2272                now_ms,
2273                self.config.node.routing.learned_fallback_explore_interval,
2274                |addr| sendable.contains(addr),
2275            )
2276        });
2277        if let Some(sendable) = &sendable_learned_peers
2278            && !explore_fallback
2279            && let Some(next_hop_addr) =
2280                self.learned_routes
2281                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2282        {
2283            return self.peers.get(&next_hop_addr);
2284        }
2285
2286        // Look up cached destination coordinates (required by both bloom and tree paths).
2287        let Some(dest_coords) = self
2288            .coord_cache
2289            .get_and_touch(dest_node_addr, now_ms)
2290            .cloned()
2291        else {
2292            if let Some(sendable) = &sendable_learned_peers
2293                && let Some(next_hop_addr) =
2294                    self.learned_routes
2295                        .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2296            {
2297                return self.peers.get(&next_hop_addr);
2298            }
2299            return None;
2300        };
2301
2302        // 4. Bloom filter candidates — requires dest_coords for loop-free selection.
2303        //    If no candidate is strictly closer, fall through to tree routing.
2304        let coordinate_route_addr = {
2305            let candidates: Vec<&ActivePeer> = self.destination_in_filters(dest_node_addr);
2306            if !candidates.is_empty() {
2307                self.select_best_candidate(&candidates, &dest_coords)
2308                    .map(|peer| *peer.node_addr())
2309            } else {
2310                None
2311            }
2312        };
2313        if let Some(next_hop_addr) = coordinate_route_addr {
2314            return self.peers.get(&next_hop_addr);
2315        }
2316
2317        // 5. Greedy tree routing fallback
2318        let tree_route_addr = self
2319            .tree_state
2320            .find_next_hop(&dest_coords)
2321            .filter(|next_hop_id| {
2322                self.peers
2323                    .get(next_hop_id)
2324                    .is_some_and(|peer| peer.can_send())
2325            });
2326        if let Some(next_hop_addr) = tree_route_addr {
2327            return self.peers.get(&next_hop_addr);
2328        }
2329        if explore_fallback {
2330            return sendable_learned_peers.as_ref().and_then(|sendable| {
2331                self.learned_routes
2332                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2333                    .and_then(|next_hop_addr| self.peers.get(&next_hop_addr))
2334            });
2335        }
2336
2337        if let Some(sendable) = &sendable_learned_peers
2338            && let Some(next_hop_addr) =
2339                self.learned_routes
2340                    .select_next_hop(dest_node_addr, now_ms, |addr| sendable.contains(addr))
2341        {
2342            return self.peers.get(&next_hop_addr);
2343        }
2344
2345        None
2346    }
2347
2348    pub(in crate::node) fn learn_reverse_route(
2349        &mut self,
2350        destination: NodeAddr,
2351        next_hop: NodeAddr,
2352    ) {
2353        if self.config.node.routing.mode != RoutingMode::ReplyLearned
2354            || destination == *self.node_addr()
2355        {
2356            return;
2357        }
2358        let now_ms = Self::now_ms();
2359        self.learned_routes.learn(
2360            destination,
2361            next_hop,
2362            now_ms,
2363            self.config.node.routing.learned_ttl_secs,
2364            self.config.node.routing.max_learned_routes_per_dest,
2365        );
2366    }
2367
2368    pub(in crate::node) fn record_route_failure(
2369        &mut self,
2370        destination: NodeAddr,
2371        next_hop: NodeAddr,
2372    ) {
2373        if self.config.node.routing.mode != RoutingMode::ReplyLearned {
2374            return;
2375        }
2376        self.learned_routes.record_failure(&destination, &next_hop);
2377    }
2378
2379    pub(crate) fn learned_route_table_snapshot(&self, now_ms: u64) -> LearnedRouteTableSnapshot {
2380        self.learned_routes.snapshot(now_ms)
2381    }
2382
2383    pub(in crate::node) fn purge_learned_routes(&mut self, now_ms: u64) {
2384        self.learned_routes.purge_expired(now_ms);
2385    }
2386
2387    /// Select the best peer from a set of bloom filter candidates.
2388    ///
2389    /// Uses distance from each candidate's tree coordinates to the destination
2390    /// as the primary metric (after link_cost). Only selects peers that are
2391    /// strictly closer to the destination than we are (self-distance check
2392    /// prevents routing loops).
2393    ///
2394    /// Ordering: `(link_cost, distance_to_dest, node_addr)`.
2395    fn select_best_candidate<'a>(
2396        &'a self,
2397        candidates: &[&'a ActivePeer],
2398        dest_coords: &crate::tree::TreeCoordinate,
2399    ) -> Option<&'a ActivePeer> {
2400        let my_distance = self.tree_state.my_coords().distance_to(dest_coords);
2401
2402        let mut best: Option<(&ActivePeer, f64, usize)> = None;
2403
2404        for &candidate in candidates {
2405            if !candidate.can_send() {
2406                continue;
2407            }
2408
2409            let cost = candidate.link_cost();
2410
2411            let dist = self
2412                .tree_state
2413                .peer_coords(candidate.node_addr())
2414                .map(|pc| pc.distance_to(dest_coords))
2415                .unwrap_or(usize::MAX);
2416
2417            // Self-distance check: only consider peers strictly closer
2418            // to the destination than we are (prevents routing loops)
2419            if dist >= my_distance {
2420                continue;
2421            }
2422
2423            let dominated = match &best {
2424                None => true,
2425                Some((_, best_cost, best_dist)) => {
2426                    cost < *best_cost
2427                        || (cost == *best_cost && dist < *best_dist)
2428                        || (cost == *best_cost
2429                            && dist == *best_dist
2430                            && candidate.node_addr() < best.as_ref().unwrap().0.node_addr())
2431                }
2432            };
2433
2434            if dominated {
2435                best = Some((candidate, cost, dist));
2436            }
2437        }
2438
2439        best.map(|(peer, _, _)| peer)
2440    }
2441
2442    /// Check if a destination is in any peer's bloom filter.
2443    pub fn destination_in_filters(&self, dest: &NodeAddr) -> Vec<&ActivePeer> {
2444        self.peers.values().filter(|p| p.may_reach(dest)).collect()
2445    }
2446
2447    /// Get the TUN packet sender channel.
2448    ///
2449    /// Returns None if TUN is not active or the node hasn't been started.
2450    pub fn tun_tx(&self) -> Option<&TunTx> {
2451        self.tun_tx.as_ref()
2452    }
2453
2454    /// Attach app-owned packet I/O for embedded operation without a system TUN.
2455    ///
2456    /// This must be called before [`Node::start`] and requires `tun.enabled =
2457    /// false`. Outbound packets sent to the returned sender are processed by the
2458    /// normal session pipeline. Inbound packets delivered by FIPS sessions are
2459    /// sent to the returned receiver with source attribution.
2460    pub fn attach_external_packet_io(
2461        &mut self,
2462        capacity: usize,
2463    ) -> Result<ExternalPacketIo, NodeError> {
2464        if self.state != NodeState::Created {
2465            return Err(NodeError::Config(ConfigError::Validation(
2466                "external packet I/O must be attached before node start".to_string(),
2467            )));
2468        }
2469        if self.config.tun.enabled {
2470            return Err(NodeError::Config(ConfigError::Validation(
2471                "external packet I/O requires tun.enabled=false".to_string(),
2472            )));
2473        }
2474
2475        let capacity = capacity.max(1);
2476        let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(capacity);
2477        let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(capacity);
2478        self.tun_outbound_rx = Some(outbound_rx);
2479        self.external_packet_tx = Some(inbound_tx);
2480
2481        Ok(ExternalPacketIo {
2482            outbound_tx,
2483            inbound_rx,
2484        })
2485    }
2486
2487    /// Attach app-owned endpoint data I/O for embedded operation.
2488    ///
2489    /// Commands sent to the returned sender are processed by the node RX loop.
2490    /// Incoming endpoint data is emitted as source-attributed events.
2491    pub(crate) fn attach_endpoint_data_io(
2492        &mut self,
2493        capacity: usize,
2494    ) -> Result<EndpointDataIo, NodeError> {
2495        if self.state != NodeState::Created {
2496            return Err(NodeError::Config(ConfigError::Validation(
2497                "endpoint data I/O must be attached before node start".to_string(),
2498            )));
2499        }
2500
2501        let command_capacity = endpoint_data_command_capacity(capacity);
2502        let (command_tx, command_rx) = tokio::sync::mpsc::channel(command_capacity);
2503        // Inbound endpoint-data events use an unbounded channel — see
2504        // `EndpointDataIo::event_rx` docs for the rationale (kills the
2505        // per-packet semaphore + the cross-task relay task that used to
2506        // sit on top of this channel).
2507        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
2508        self.endpoint_command_rx = Some(command_rx);
2509        self.endpoint_event_tx = Some(event_tx.clone());
2510
2511        Ok(EndpointDataIo {
2512            command_tx,
2513            event_rx,
2514            event_tx,
2515        })
2516    }
2517
2518    pub(crate) fn pubkey_for_node_addr(&self, addr: &NodeAddr) -> Option<secp256k1::PublicKey> {
2519        let mut prefix = [0u8; 15];
2520        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2521        self.identity_cache
2522            .get(&prefix)
2523            .filter(|entry| &entry.node_addr == addr)
2524            .map(|entry| entry.pubkey)
2525    }
2526
2527    pub(crate) fn npub_for_node_addr(&self, addr: &NodeAddr) -> Option<String> {
2528        let mut prefix = [0u8; 15];
2529        prefix.copy_from_slice(&addr.as_bytes()[0..15]);
2530        self.identity_cache
2531            .get(&prefix)
2532            .filter(|entry| &entry.node_addr == addr)
2533            .map(|entry| entry.npub.clone())
2534    }
2535
2536    pub(in crate::node) fn deliver_external_ipv6_packet(
2537        &self,
2538        src_addr: &NodeAddr,
2539        packet: Vec<u8>,
2540    ) {
2541        let Some(external_packet_tx) = &self.external_packet_tx else {
2542            return;
2543        };
2544        if packet.len() < 40 {
2545            return;
2546        }
2547        let Ok(destination) = FipsAddress::from_slice(&packet[24..40]) else {
2548            return;
2549        };
2550        let delivered = NodeDeliveredPacket {
2551            source_node_addr: *src_addr,
2552            source_npub: self.npub_for_node_addr(src_addr),
2553            destination,
2554            packet,
2555        };
2556        if let Err(error) = external_packet_tx.try_send(delivered) {
2557            debug!(error = %error, "Failed to deliver packet to external app sink");
2558        }
2559    }
2560
2561    // === Sending ===
2562
2563    /// Encrypt and send a link-layer message to an authenticated peer.
2564    ///
2565    /// The plaintext should include the message type byte followed by the
2566    /// message-specific payload (e.g., `[0x50, reason]` for Disconnect).
2567    ///
2568    /// The send path prepends a 4-byte session-relative timestamp (inner
2569    /// header) before encryption. The full 16-byte outer header is used
2570    /// as AAD for the AEAD construction.
2571    ///
2572    /// This is the standard path for sending any link-layer control message
2573    /// to a peer over their encrypted Noise session.
2574    pub(super) async fn send_encrypted_link_message(
2575        &mut self,
2576        node_addr: &NodeAddr,
2577        plaintext: &[u8],
2578    ) -> Result<(), NodeError> {
2579        self.send_encrypted_link_message_with_ce(node_addr, plaintext, false)
2580            .await
2581    }
2582
2583    /// Update the local-outbound-broken signal from a `transport.send`
2584    /// outcome. Sets `last_local_send_failure_at` on local-side io
2585    /// errors (NetworkUnreachable / HostUnreachable / AddrNotAvailable);
2586    /// clears it on success. The reaper consults this in
2587    /// `check_link_heartbeats` to switch to `fast_link_dead_timeout_secs`.
2588    pub(in crate::node) fn note_local_send_outcome(
2589        &mut self,
2590        result: &Result<usize, TransportError>,
2591    ) {
2592        match result {
2593            Ok(_) => {
2594                if self.last_local_send_failure_at.is_some() {
2595                    self.last_local_send_failure_at = None;
2596                }
2597            }
2598            Err(TransportError::Io(e))
2599                if matches!(
2600                    e.kind(),
2601                    std::io::ErrorKind::NetworkUnreachable
2602                        | std::io::ErrorKind::HostUnreachable
2603                        | std::io::ErrorKind::AddrNotAvailable
2604                ) =>
2605            {
2606                self.last_local_send_failure_at = Some(std::time::Instant::now());
2607            }
2608            Err(_) => {}
2609        }
2610    }
2611
2612    /// Returns the wall-clock instant of the most recent observed local
2613    /// outbound failure, if any. Used by the link-dead reaper.
2614    pub(in crate::node) fn last_local_send_failure_at(&self) -> Option<std::time::Instant> {
2615        self.last_local_send_failure_at
2616    }
2617
2618    /// Like `send_encrypted_link_message` but allows setting the FMP CE flag.
2619    ///
2620    /// Used by the forwarding path to relay congestion signals hop-by-hop.
2621    pub(super) async fn send_encrypted_link_message_with_ce(
2622        &mut self,
2623        node_addr: &NodeAddr,
2624        plaintext: &[u8],
2625        ce_flag: bool,
2626    ) -> Result<(), NodeError> {
2627        let peer = self
2628            .peers
2629            .get_mut(node_addr)
2630            .ok_or(NodeError::PeerNotFound(*node_addr))?;
2631
2632        let their_index = peer.their_index().ok_or_else(|| NodeError::SendFailed {
2633            node_addr: *node_addr,
2634            reason: "no their_index".into(),
2635        })?;
2636        let transport_id = peer.transport_id().ok_or_else(|| NodeError::SendFailed {
2637            node_addr: *node_addr,
2638            reason: "no transport_id".into(),
2639        })?;
2640        let remote_addr = peer
2641            .current_addr()
2642            .cloned()
2643            .ok_or_else(|| NodeError::SendFailed {
2644                node_addr: *node_addr,
2645                reason: "no current_addr".into(),
2646            })?;
2647        #[cfg(any(target_os = "linux", target_os = "macos"))]
2648        let connected_socket = peer.connected_udp();
2649
2650        // Prepend 4-byte session-relative timestamp (inner header)
2651        let timestamp_ms = peer.session_elapsed_ms();
2652
2653        // MMP: read spin bit value before entering session borrow
2654        let sp_flag = peer.mmp().map(|mmp| mmp.spin_bit.tx_bit()).unwrap_or(false);
2655        let mut flags = if sp_flag { FLAG_SP } else { 0 };
2656        if ce_flag {
2657            flags |= FLAG_CE;
2658        }
2659        if peer.current_k_bit() {
2660            flags |= FLAG_KEY_EPOCH;
2661        }
2662
2663        let session = peer
2664            .noise_session_mut()
2665            .ok_or_else(|| NodeError::SendFailed {
2666                node_addr: *node_addr,
2667                reason: "no noise session".into(),
2668            })?;
2669
2670        // Build 16-byte outer header upfront. The inner-plaintext
2671        // layout is `[ts:4 LE][plaintext...]`, so its length is exactly
2672        // `INNER_TS_LEN + plaintext.len()` — no need to build the Vec
2673        // just to measure it. The worker path uses this length to size
2674        // the wire buffer directly; the legacy path below still
2675        // materialises a separate `inner_plaintext` Vec for the inline
2676        // encrypt-and-send call.
2677        const INNER_TS_LEN: usize = 4;
2678        let counter = session.current_send_counter();
2679        let inner_len = INNER_TS_LEN + plaintext.len();
2680        let payload_len = inner_len as u16;
2681        let header = build_established_header(their_index, counter, flags, payload_len);
2682
2683        // **UDP send fast path.** The encrypt-worker pool is always
2684        // spawned at lifecycle start (workers = num_cpus) in
2685        // production, so this branch is taken for every authentic
2686        // send on every UDP-transported established session. The
2687        // AEAD work + sendmsg syscall run on a dedicated OS thread;
2688        // the rx_loop only builds the wire buffer + reserves the
2689        // counter inline.
2690        //
2691        // Other transport kinds (BLE, TCP, sim, ethernet) fall
2692        // through to the inline encrypt + transport.send path
2693        // below — those don't have raw-fd / sendmmsg / UDP_GSO
2694        // benefits to expose through the worker pool, so the simpler
2695        // synchronous send is the right shape for them.
2696        //
2697        // The `encrypt_workers.is_some()` check below is true in
2698        // production (lifecycle::start spawns the pool); it stays
2699        // checked rather than `expect()`-ed because unit tests
2700        // construct `Node` without calling `start()`.
2701        let transport_for_send = self
2702            .transports
2703            .get(&transport_id)
2704            .ok_or(NodeError::TransportNotFound(transport_id))?;
2705        let is_udp = matches!(transport_for_send, TransportHandle::Udp(_));
2706        if let Some(workers) = self.encrypt_workers.as_ref().cloned()
2707            && is_udp
2708            && let Some(cipher_clone) = session.send_cipher_clone()
2709        {
2710            {
2711                // Reserve the counter on the session so subsequent
2712                // sends don't reuse it. `current_send_counter` only
2713                // peeks; we advance via `take_send_counter`.
2714                let reserved_counter =
2715                    session
2716                        .take_send_counter()
2717                        .map_err(|e| NodeError::SendFailed {
2718                            node_addr: *node_addr,
2719                            reason: format!("counter reservation failed: {}", e),
2720                        })?;
2721                debug_assert_eq!(reserved_counter, counter);
2722                // Re-derive the header with the now-locked-in counter
2723                // value (same value, but the call sequence is more
2724                // explicit).
2725                let header =
2726                    build_established_header(their_index, reserved_counter, flags, payload_len);
2727                let transport = transport_for_send;
2728                // Snapshot the per-peer connected UDP socket before
2729                // resolving the fallback address. On the established
2730                // steady-state path this socket already carries the
2731                // kernel peer address, so re-parsing the configured
2732                // transport address and touching the DNS cache on every
2733                // packet is pure overhead on the sender hot path.
2734                let send_target = {
2735                    if let TransportHandle::Udp(udp) = transport {
2736                        let socket_addr = {
2737                            #[cfg(any(target_os = "linux", target_os = "macos"))]
2738                            {
2739                                match connected_socket.as_ref() {
2740                                    Some(socket) => Some(socket.peer_addr()),
2741                                    None => udp.resolve_for_off_task(&remote_addr).await.ok(),
2742                                }
2743                            }
2744                            #[cfg(not(any(target_os = "linux", target_os = "macos")))]
2745                            {
2746                                udp.resolve_for_off_task(&remote_addr).await.ok()
2747                            }
2748                        };
2749                        match (udp.async_socket(), socket_addr) {
2750                            (Some(socket), Some(socket_addr)) => Some((socket, socket_addr)),
2751                            _ => None,
2752                        }
2753                    } else {
2754                        None
2755                    }
2756                };
2757                if let Some((socket, socket_addr)) = send_target {
2758                    // Build the wire buffer **directly** from
2759                    // `plaintext` with a single allocation:
2760                    //   `[16 header][4 ts][plaintext...]` with
2761                    // +16 trailing capacity for the AEAD tag.
2762                    // The worker seals `wire_buf[16..]` in
2763                    // place and appends the tag — no second
2764                    // alloc, no second memcpy.
2765                    //
2766                    // Previous design built `inner_plaintext`
2767                    // via `prepend_inner_header` (1 alloc + 1
2768                    // copy) and then let the worker memcpy
2769                    // header + plaintext into a fresh Vec
2770                    // (another alloc + copy). At ~100 kpps the
2771                    // saved alloc/copy is ~150 MB/sec of memory
2772                    // bandwidth on the hot rx_loop + worker.
2773                    let wire_capacity = ESTABLISHED_HEADER_SIZE + inner_len + 16;
2774                    let mut wire_buf = Vec::with_capacity(wire_capacity);
2775                    wire_buf.extend_from_slice(&header);
2776                    wire_buf.extend_from_slice(&timestamp_ms.to_le_bytes());
2777                    wire_buf.extend_from_slice(plaintext);
2778                    let predicted_bytes = wire_capacity;
2779                    // Stats / MMP update inline — predicted size
2780                    // is exact for ChaCha20-Poly1305 (tag is
2781                    // constant 16 bytes). When `connected_socket` is
2782                    // `Some`, the worker sends on it without a
2783                    // destination sockaddr — the kernel skips the
2784                    // per-packet sockaddr + route + neighbor resolve.
2785                    if let Some(peer) = self.peers.get_mut(node_addr) {
2786                        peer.link_stats_mut().record_sent(predicted_bytes);
2787                        if let Some(mmp) = peer.mmp_mut() {
2788                            mmp.sender
2789                                .record_sent(reserved_counter, timestamp_ms, predicted_bytes);
2790                        }
2791                    }
2792                    workers.dispatch(self::encrypt_worker::FmpSendJob {
2793                        cipher: cipher_clone,
2794                        counter: reserved_counter,
2795                        wire_buf,
2796                        fsp_seal: None,
2797                        socket,
2798                        dest_addr: socket_addr,
2799                        #[cfg(any(target_os = "linux", target_os = "macos"))]
2800                        connected_socket,
2801                        drop_on_backpressure: plaintext
2802                            .first()
2803                            .is_some_and(|ty| *ty == SessionMessageType::EndpointData.to_byte()),
2804                        queued_at: crate::perf_profile::stamp(),
2805                    });
2806                    return Ok(());
2807                }
2808            }
2809        }
2810
2811        // Inline (legacy) path: encrypt + send on the rx_loop.
2812        // Build the inner plaintext lazily here — the worker path
2813        // above never reaches this point, so the prepend_inner_header
2814        // alloc is avoided in the fast path.
2815        let inner_plaintext = prepend_inner_header(timestamp_ms, plaintext);
2816        // Encrypt with AAD binding to the outer header
2817        let ciphertext = {
2818            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::FmpEncrypt);
2819            session
2820                .encrypt_with_aad(&inner_plaintext, &header)
2821                .map_err(|e| NodeError::SendFailed {
2822                    node_addr: *node_addr,
2823                    reason: format!("encryption failed: {}", e),
2824                })?
2825        };
2826
2827        let wire_packet = build_encrypted(&header, &ciphertext);
2828
2829        // Re-borrow peer for stats update after sending
2830        let send_result = {
2831            let _t = crate::perf_profile::Timer::start(crate::perf_profile::Stage::UdpSend);
2832            let transport = self
2833                .transports
2834                .get(&transport_id)
2835                .ok_or(NodeError::TransportNotFound(transport_id))?;
2836            transport.send(&remote_addr, &wire_packet).await
2837        };
2838        self.note_local_send_outcome(&send_result);
2839        let bytes_sent = send_result.map_err(|e| match e {
2840            TransportError::MtuExceeded { packet_size, mtu } => NodeError::MtuExceeded {
2841                node_addr: *node_addr,
2842                packet_size,
2843                mtu,
2844            },
2845            other => NodeError::SendFailed {
2846                node_addr: *node_addr,
2847                reason: format!("transport send: {}", other),
2848            },
2849        })?;
2850
2851        // Update send statistics
2852        if let Some(peer) = self.peers.get_mut(node_addr) {
2853            peer.link_stats_mut().record_sent(bytes_sent);
2854            // MMP: record sent frame for sender report generation
2855            if let Some(mmp) = peer.mmp_mut() {
2856                mmp.sender.record_sent(counter, timestamp_ms, bytes_sent);
2857            }
2858        }
2859
2860        Ok(())
2861    }
2862}
2863
2864impl fmt::Debug for Node {
2865    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2866        f.debug_struct("Node")
2867            .field("node_addr", self.node_addr())
2868            .field("state", &self.state)
2869            .field("is_leaf_only", &self.is_leaf_only)
2870            .field("connections", &self.connection_count())
2871            .field("peers", &self.peer_count())
2872            .field("links", &self.link_count())
2873            .field("transports", &self.transport_count())
2874            .finish()
2875    }
2876}